Whamcloud - gitweb
land b_ost_amd onto HEAD.
authorphil <phil>
Mon, 14 Jun 2004 05:10:57 +0000 (05:10 +0000)
committerphil <phil>
Mon, 14 Jun 2004 05:10:57 +0000 (05:10 +0000)
I haven't re-tested the AMD functionality since landing, but I did
test iozone and dbench to ensure sanity.

60 files changed:
lnet/include/linux/libcfs.h
lnet/utils/debug.c
lustre/include/linux/lustre_cfg.h
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_ha.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_log.h
lustre/include/linux/lustre_net.h
lustre/include/linux/obd.h
lustre/include/linux/obd_class.h
lustre/include/lustre/lustre_user.h
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/liblustre/llite_lib.c
lustre/liblustre/super.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/lproc_llite.c
lustre/lov/lov_internal.h
lustre/lov/lov_log.c
lustre/lov/lov_obd.c
lustre/lov/lov_pack.c
lustre/lvfs/llog.c
lustre/lvfs/llog_cat.c
lustre/lvfs/llog_lvfs.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/lproc_mds.c
lustre/mds/mds_internal.h
lustre/mds/mds_lov.c
lustre/mds/mds_open.c
lustre/obdclass/class_obd.c
lustre/obdclass/genops.c
lustre/obdclass/llog_ioctl.c
lustre/obdclass/llog_obd.c
lustre/obdclass/llog_test.c
lustre/obdclass/lprocfs_status.c
lustre/obdclass/obd_config.c
lustre/osc/osc_request.c
lustre/portals/include/linux/libcfs.h
lustre/portals/utils/debug.c
lustre/ptlrpc/import.c
lustre/ptlrpc/llog_client.c
lustre/ptlrpc/llog_server.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/recov_thread.c
lustre/tests/test-framework.sh
lustre/utils/Lustre/__init__.py
lustre/utils/Lustre/lustredb.py
lustre/utils/lconf
lustre/utils/lctl.c
lustre/utils/lmc
lustre/utils/lustre_cfg.c
lustre/utils/obd.c
lustre/utils/obdctl.h
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index a205163..dfea810 100644 (file)
@@ -111,6 +111,7 @@ extern unsigned int portal_cerror;
 #define D_RPCTRACE    0x00100000 /* for distributed debugging */
 #define D_VFSTRACE    0x00200000
 #define D_READA       0x00400000 /* read-ahead */
+#define D_CONFIG      0x00800000
 
 #ifdef __KERNEL__
 # include <linux/sched.h> /* THREAD_SIZE */
index 66c3807..67d8edc 100644 (file)
@@ -72,7 +72,7 @@ static const char *portal_debug_masks[] =
         {"trace", "inode", "super", "ext2", "malloc", "cache", "info", "ioctl",
          "blocks", "net", "warning", "buffs", "other", "dentry", "portals",
          "page", "dlmtrace", "error", "emerg", "ha", "rpctrace", "vfstrace",
-         "reada", NULL};
+         "reada", "config", NULL};
 
 struct debug_daemon_cmd {
         char *cmd;
index c426cfc..fcc9a56 100644 (file)
@@ -35,7 +35,9 @@ enum lcfg_command_type {
         LCFG_MOUNTOPT       = 0x00cf007,
         LCFG_DEL_MOUNTOPT   = 0x00cf008,
         LCFG_SET_TIMEOUT    = 0x00cf009,
-        LCFG_SET_UPCALL     = 0x00cf010,
+        LCFG_SET_UPCALL     = 0x00cf00a,
+        LCFG_LOV_ADD_OBD    = 0x00cf00b,
+        LCFG_LOV_DEL_OBD    = 0x00cf00c,
 };
 
 struct lustre_cfg {
index 687d400..da00920 100644 (file)
@@ -67,6 +67,7 @@ typedef enum {
 #define LDLM_FL_LOCAL          0x004000 /* local lock (ie, no srv/cli split) */
 #define LDLM_FL_WARN           0x008000 /* see ldlm_cli_cancel_unused */
 #define LDLM_FL_DISCARD_DATA   0x010000 /* discard (no writeback) on cancel */
+#define LDLM_FL_CONFIG_CHANGE  0x020000 /* see ldlm_cli_cancel_unused */
 
 #define LDLM_FL_NO_TIMEOUT     0x020000 /* Blocked by group lock - wait
                                          * indefinitely */
@@ -182,7 +183,8 @@ struct ldlm_namespace {
         __u64                  ns_resources;
         ldlm_res_policy        ns_policy;
         struct ldlm_valblock_ops *ns_lvbo;
-        void                    *ns_lvbp;
+        void                  *ns_lvbp;
+        wait_queue_head_t      ns_waitq;
 };
 
 /*
index fe83b7d..739a875 100644 (file)
@@ -19,6 +19,7 @@ void ptlrpc_free_committed(struct obd_import *imp);
 void ptlrpc_wake_delayed(struct obd_import *imp);
 int ptlrpc_recover_import(struct obd_import *imp, char *new_uuid);
 int ptlrpc_set_import_active(struct obd_import *imp, int active);
+void ptlrpc_activate_import(struct obd_import *imp);
 void ptlrpc_deactivate_import(struct obd_import *imp);
 void ptlrpc_invalidate_import(struct obd_import *imp, int in_rpc);
 void ptlrpc_fail_import(struct obd_import *imp, int generation);
index 85e6132..4ab5ce8 100644 (file)
@@ -1019,7 +1019,7 @@ struct llog_log_hdr {
         __u32                   llh_cat_idx;
         /* for a catalog the first plain slot is next to it */
         struct obd_uuid         llh_tgtuuid;
-        __u32                   llh_reserved[LLOG_HEADER_SIZE/sizeof(__u32) - 23];
+        __u32                 llh_reserved[LLOG_HEADER_SIZE/sizeof(__u32) - 23];
         __u32                   llh_bitmap[LLOG_BITMAP_BYTES/sizeof(__u32)];
         struct llog_rec_tail    llh_tail;
 } __attribute__((packed));
@@ -1034,7 +1034,7 @@ struct llog_cookie {
 
 /* llog protocol */
 enum llogd_rpc_ops {
-        LLOG_ORIGIN_HANDLE_CREATE       = 501,
+        LLOG_ORIGIN_HANDLE_OPEN         = 501,
         LLOG_ORIGIN_HANDLE_NEXT_BLOCK   = 502,
         LLOG_ORIGIN_HANDLE_READ_HEADER  = 503,
         LLOG_ORIGIN_HANDLE_WRITE_REC    = 504,
index e6a5c14..cc6d910 100644 (file)
@@ -201,8 +201,8 @@ struct llog_operations {
                          struct obd_device *, int, struct llog_logid *);
 
         int (*lop_cleanup)(struct llog_ctxt *ctxt);
-        int (*lop_create)(struct llog_ctxt *ctxt, struct llog_handle **,
-                          struct llog_logid *logid, char *name);
+        int (*lop_open)(struct llog_ctxt *ctxt, struct llog_handle **,
+                        struct llog_logid *logid, char *name, int flags);
         int (*lop_destroy)(struct llog_handle *handle);
         int (*lop_close)(struct llog_handle *handle);
 
@@ -311,8 +311,8 @@ static inline struct llog_ctxt *llog_get_context(struct obd_llogs *llogs,
         return llogs->llog_ctxt[index];
 }
 
-static inline int llog_create(struct llog_ctxt *ctxt, struct llog_handle **res,
-                              struct llog_logid *logid, char *name)
+static inline int llog_open(struct llog_ctxt *ctxt, struct llog_handle **res,
+                            struct llog_logid *logid, char *name, int flags)
 {
         struct llog_operations *lop;
         int rc;
@@ -321,10 +321,10 @@ static inline int llog_create(struct llog_ctxt *ctxt, struct llog_handle **res,
         rc = llog_ctxt2ops(ctxt, &lop);
         if (rc)
                 RETURN(rc);
-        if (lop->lop_create == NULL)
+        if (lop->lop_open == NULL)
                 RETURN(-EOPNOTSUPP);
 
-        rc = lop->lop_create(ctxt, res, logid, name);
+        rc = lop->lop_open(ctxt, res, logid, name, flags);
         RETURN(rc);
 }
 
index 09fd52e..81a652e 100644 (file)
@@ -729,7 +729,7 @@ void ptlrpc_lprocfs_unregister_obd(struct obd_device *obddev);
 #endif
 
 /* ptlrpc/llog_server.c */
-int llog_origin_handle_create(struct ptlrpc_request *req);
+int llog_origin_handle_open(struct ptlrpc_request *req);
 int llog_origin_handle_prev_block(struct ptlrpc_request *req);
 int llog_origin_handle_next_block(struct ptlrpc_request *req);
 int llog_origin_handle_read_header(struct ptlrpc_request *req);
index 417cce0..fd3f196 100644 (file)
@@ -333,6 +333,7 @@ struct mds_obd {
           FIXME later will be totally fixed by b_cmd*/
         int                              mds_num;
         atomic_t                         mds_open_count;
+        int                              mds_config_version;
 
         char                            *mds_lmv_name;
         struct obd_device               *mds_lmv_obd; /* XXX lmv_obd */
@@ -406,6 +407,7 @@ struct cache_obd {
 
 struct lov_tgt_desc {
         struct obd_uuid          uuid;
+        __u32                    ltd_gen;
         struct obd_export       *ltd_exp;
         int                      active; /* is this target up for requests */
 };
@@ -602,6 +604,7 @@ struct obd_device {
 #define OBD_OPT_REAL_CLIENT     0x0004
 
 #define OBD_LLOG_FL_SENDNOW     0x0001
+#define OBD_LLOG_FL_CREATE      0x0002
 
 struct mdc_op_data;
 
@@ -618,6 +621,8 @@ struct obd_ops {
         int (*o_setup) (struct obd_device *dev, obd_count len, void *data);
         int (*o_precleanup)(struct obd_device *dev, int flags);
         int (*o_cleanup)(struct obd_device *dev, int flags);
+        int (*o_process_config)(struct obd_device *dev, obd_count len,
+                                void *data);
         int (*o_postrecov)(struct obd_device *dev);
         int (*o_connect)(struct lustre_handle *conn, struct obd_device *src,
                          struct obd_uuid *cluuid);
@@ -629,6 +634,9 @@ struct obd_ops {
                         struct lov_stripe_md *mem_src);
         int (*o_unpackmd)(struct obd_export *exp,struct lov_stripe_md **mem_tgt,
                           struct lov_mds_md *disk_src, int disk_len);
+        int (*o_revalidate_md)(struct obd_export *exp,  struct obdo *oa,
+                               struct lov_stripe_md *ea,
+                               struct obd_trans_info *oti);
         int (*o_preallocate)(struct lustre_handle *, obd_count *req,
                              obd_id *ids);
         int (*o_create)(struct obd_export *exp,  struct obdo *oa,
index 086e9b9..d78158a 100644 (file)
@@ -42,7 +42,7 @@
 #include <linux/lustre_lib.h>
 #include <linux/lustre_idl.h>
 #include <linux/lprocfs_status.h>
-#include <linux/lustre_mds.h>
+#include <linux/lustre_log.h>
 
 /* OBD Device Declarations */
 #define MAX_OBD_DEVICES 256
@@ -78,21 +78,18 @@ void oig_complete_one(struct obd_io_group *oig,
 void oig_release(struct obd_io_group *oig);
 int oig_wait(struct obd_io_group *oig);
 
-/* config.c */
+/* obd_config.c */
 int class_process_config(struct lustre_cfg *lcfg);
-int class_attach(struct lustre_cfg *lcfg);
-int class_setup(struct obd_device *obd, struct lustre_cfg *lcfg);
-int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg);
-int class_detach(struct obd_device *obd, struct lustre_cfg *lcfg);
 
-/* Passed as data param to class_config_parse_llog */
+/* Passed as data param to class_config_parse_handler() */
 struct config_llog_instance {
         char * cfg_instance;
         struct obd_uuid cfg_uuid;
-        ptl_nid_t  cfg_local_nid;
+        ptl_nid_t cfg_local_nid;
 };
-int class_config_parse_llog(struct llog_ctxt *ctxt, char *name,
-                            struct config_llog_instance *cfg);
+
+int class_config_process_llog(struct llog_ctxt *ctxt, char *name,
+                              struct config_llog_instance *cfg);
 int class_config_dump_llog(struct llog_ctxt *ctxt, char *name,
                            struct config_llog_instance *cfg);
 
@@ -430,6 +427,19 @@ static inline int obd_cleanup(struct obd_device *obd, int flags)
         RETURN(rc);
 }
 
+static inline int
+obd_process_config(struct obd_device *obd, int datalen, void *data)
+{
+        int rc;
+        ENTRY;
+
+        OBD_CHECK_OP(obd, process_config, -EOPNOTSUPP);
+        OBD_COUNTER_INCREMENT(obd, process_config);
+
+        rc = OBP(obd, process_config)(obd, datalen, data);
+        RETURN(rc);
+}
+
 /* Pack an in-memory MD struct for storage on disk.
  * Returns +ve size of packed MD (0 for free), or -ve error.
  *
@@ -513,6 +523,20 @@ static inline int obd_free_memmd(struct obd_export *exp,
         return obd_unpackmd(exp, mem_tgt, NULL, 0);
 }
 
+static inline int obd_revalidate_md(struct obd_export *exp, struct obdo *obdo,
+                                    struct lov_stripe_md *ea,
+                                    struct obd_trans_info *oti)
+{
+        int rc;
+        ENTRY;
+
+        EXP_CHECK_OP(exp, revalidate_md);
+        OBD_COUNTER_INCREMENT(exp->exp_obd, revalidate_md);
+
+        rc = OBP(exp->exp_obd, revalidate_md)(exp, obdo, ea, oti);
+        RETURN(rc);
+}
+
 static inline int obd_create(struct obd_export *exp, struct obdo *obdo,
                              struct lov_stripe_md **ea,
                              struct obd_trans_info *oti)
index 2ad7215..16ebe76 100644 (file)
@@ -105,6 +105,11 @@ static inline int obd_uuid_equals(struct obd_uuid *u1, struct obd_uuid *u2)
         return strcmp(u1->uuid, u2->uuid) == 0;
 }
 
+static inline int obd_uuid_empty(struct obd_uuid *uuid)
+{
+        return uuid->uuid[0] == '\0';
+}
+
 static inline void obd_str2uuid(struct obd_uuid *uuid, char *tmp)
 {
         strncpy(uuid->uuid, tmp, sizeof(*uuid));
index 1da5314..93fad5e 100644 (file)
@@ -1000,9 +1000,9 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 rc = llog_origin_handle_cancel(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
-        case LLOG_ORIGIN_HANDLE_CREATE:
+        case LLOG_ORIGIN_HANDLE_OPEN:
                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
-                rc = llog_origin_handle_create(req);
+                rc = llog_origin_handle_open(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
index a1d7fb3..0625389 100644 (file)
@@ -671,15 +671,14 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
                 if (opaque != NULL && lock->l_ast_data != opaque) {
                         LDLM_ERROR(lock, "data %p doesn't match opaque %p",
                                    lock->l_ast_data, opaque);
-                        //LBUG();
                         continue;
                 }
 
                 if (lock->l_readers || lock->l_writers) {
-                        if (flags & LDLM_FL_WARN) {
+                        if (flags & LDLM_FL_CONFIG_CHANGE)
+                                lock->l_flags |= LDLM_FL_CBPENDING;
+                        else if (flags & LDLM_FL_WARN)
                                 LDLM_ERROR(lock, "lock in use");
-                                //LBUG();
-                        }
                         continue;
                 }
 
@@ -718,16 +717,32 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
         RETURN(0);
 }
 
+static inline int have_no_nsresource(struct ldlm_namespace *ns)
+{
+        int no_resource = 0;
+
+        spin_lock(&ns->ns_counter_lock);
+        if (ns->ns_resources == 0)
+                no_resource = 1;
+        spin_unlock(&ns->ns_counter_lock);
+
+        RETURN(no_resource);
+}
+
 /* Cancel all locks on a namespace (or a specific resource, if given)
  * that have 0 readers/writers.
  *
  * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
  * to notify the server.
- * If flags & LDLM_FL_WARN, print a warning if some locks are still in use. */
+ * If flags & LDLM_FL_NO_CALLBACK, don't run the cancel callback.
+ * If flags & LDLM_FL_WARN, print a warning if some locks are still in use. 
+ * If flags & LDLM_FL_CONFIG_CHANGE, mark all locks as having a pending callback
+ */
 int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
                            struct ldlm_res_id *res_id, int flags, void *opaque)
 {
         int i;
+        struct l_wait_info lwi = { 0 };
         ENTRY;
 
         if (ns == NULL)
@@ -739,23 +754,28 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
 
         l_lock(&ns->ns_lock);
         for (i = 0; i < RES_HASH_SIZE; i++) {
-                struct list_head *tmp, *pos;
-                list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
+                struct list_head *tmp, *next;
+                list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
                         int rc;
                         struct ldlm_resource *res;
                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
                         ldlm_resource_getref(res);
+                        l_unlock(&ns->ns_lock);
 
                         rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
                                                              flags, opaque);
-
                         if (rc)
                                 CERROR("cancel_unused_res ("LPU64"): %d\n",
                                        res->lr_name.name[0], rc);
+
+                        l_lock(&ns->ns_lock);
+                        next = tmp->next;
                         ldlm_resource_putref(res);
                 }
         }
         l_unlock(&ns->ns_lock);
+        if (flags & LDLM_FL_CONFIG_CHANGE)
+                l_wait_event(ns->ns_waitq, have_no_nsresource(ns), &lwi);
 
         RETURN(ELDLM_OK);
 }
index 787d921..0aefd79 100644 (file)
@@ -237,6 +237,7 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client)
         spin_lock_init(&ns->ns_counter_lock);
         ns->ns_locks = 0;
         ns->ns_resources = 0;
+        init_waitqueue_head(&ns->ns_waitq);
 
         for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash;
              bucket--)
@@ -597,6 +598,8 @@ int ldlm_resource_putref(struct ldlm_resource *res)
 
                 spin_lock(&ns->ns_counter_lock);
                 ns->ns_resources--;
+                if (ns->ns_resources == 0)
+                        wake_up(&ns->ns_waitq);
                 spin_unlock(&ns->ns_counter_lock);
 
                 rc = 1;
index f5b2ba5..62704f0 100644 (file)
@@ -434,10 +434,9 @@ int liblustre_process_log(struct config_llog_instance *cfg, int allow_recov)
         exp = class_conn2export(&mdc_conn);
         
         ctxt = exp->exp_obd->obd_llog_ctxt[LLOG_CONFIG_REPL_CTXT];
-        rc = class_config_parse_llog(ctxt, g_zconf_profile, cfg);
-        if (rc) {
-                CERROR("class_config_parse_llog failed: rc = %d\n", rc);
-        }
+        rc = class_config_process_llog(ctxt, g_zconf_profile, NULL, lcfg);
+        if (rc)
+                CERROR("class_config_process_llog failed: rc = %d\n", rc);
 
         err = obd_disconnect(exp, 0);
 
index 2adf126..558885a 100644 (file)
@@ -142,7 +142,7 @@ void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid)
 
         if (valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME))
                 CDEBUG(D_INODE, "valid %x, cur time %lu/%lu, new %lu/%lu\n",
-                       src->o_valid, 
+                       src->o_valid,
                        LTIME_S(lli->lli_st_mtime), LTIME_S(lli->lli_st_ctime),
                        (long)src->o_mtime, (long)src->o_ctime);
 
@@ -182,7 +182,7 @@ void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid)
 
         if (valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME))
                 CDEBUG(D_INODE, "valid %x, new time %lu/%lu\n",
-                       valid, LTIME_S(lli->lli_st_mtime), 
+                       valid, LTIME_S(lli->lli_st_mtime),
                        LTIME_S(lli->lli_st_ctime));
 
         if (valid & OBD_MD_FLATIME) {
@@ -273,7 +273,7 @@ int llu_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm)
         if (rc)
                 RETURN(rc);
 
-        refresh_valid = OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | 
+        refresh_valid = OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
                         OBD_MD_FLCTIME | OBD_MD_FLSIZE;
 
         /* We set this flag in commit write as we extend the file size.  When
@@ -289,7 +289,7 @@ int llu_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm)
         if (test_bit(LLI_F_PREFER_EXTENDED_SIZE, &lli->lli_flags)) {
                 if (oa.o_size < lli->lli_st_size)
                         refresh_valid &= ~OBD_MD_FLSIZE;
-                else 
+                else
                         clear_bit(LLI_F_PREFER_EXTENDED_SIZE, &lli->lli_flags);
         }
 
@@ -840,7 +840,7 @@ static int llu_readlink_internal(struct inode *inode,
                 CERROR ("OBD_MD_LINKNAME not set on reply\n");
                 GOTO (failed, rc = -EPROTO);
         }
-        
+
         LASSERT (symlen != 0);
         if (body->eadatasize != symlen) {
                 CERROR ("inode %lu: symlink length %d not expected %d\n",
@@ -1246,7 +1246,7 @@ static int llu_put_grouplock(struct inode *inode, unsigned long arg)
         memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
 
         RETURN(0);
-}       
+}
 
 static int llu_iop_ioctl(struct inode *ino, unsigned long int request,
                          va_list ap)
@@ -1308,9 +1308,9 @@ struct inode *llu_iget(struct filesys *fs, struct lustre_md *md)
                 struct llu_inode_info *lli = llu_i2info(inode);
 
                 if (lli->lli_stale_flag ||
-                    lli->lli_st_generation != md->body->generation)
+                    lli->lli_st_generation != md->body->generation) {
                         I_RELE(inode);
-                else {
+                else {
                         llu_update_inode(inode, md->body, md->lsm);
                         return inode;
                 }
@@ -1319,7 +1319,7 @@ struct inode *llu_iget(struct filesys *fs, struct lustre_md *md)
         inode = llu_new_inode(fs, &fid);
         if (inode)
                 llu_update_inode(inode, md->body, md->lsm);
-        
+
         return inode;
 }
 
@@ -1372,9 +1372,9 @@ llu_fsswop_mount(const char *source,
 
                 /* generate a string unique to this super, let's try
                  the address of the super itself.*/
-                len = (sizeof(sbi) * 2) + 1; 
+                len = (sizeof(sbi) * 2) + 1;
                 OBD_ALLOC(sbi->ll_instance, len);
-                if (sbi->ll_instance == NULL) 
+                if (sbi->ll_instance == NULL)
                         GOTO(out_free, err = -ENOMEM);
                 sprintf(sbi->ll_instance, "%p", sbi);
 
@@ -1385,6 +1385,7 @@ llu_fsswop_mount(const char *source,
                         CERROR("Unable to process log: %s\n", g_zconf_profile);
 
                         GOTO(out_free, err);
+
                 }
 
                 lprof = class_get_profile(g_zconf_profile);
@@ -1394,13 +1395,13 @@ llu_fsswop_mount(const char *source,
                 }
                 if (osc)
                         OBD_FREE(osc, strlen(osc) + 1);
-                OBD_ALLOC(osc, strlen(lprof->lp_osc) + 
+                OBD_ALLOC(osc, strlen(lprof->lp_osc) +
                           strlen(sbi->ll_instance) + 2);
                 sprintf(osc, "%s-%s", lprof->lp_osc, sbi->ll_instance);
 
                 if (mdc)
                         OBD_FREE(mdc, strlen(mdc) + 1);
-                OBD_ALLOC(mdc, strlen(lprof->lp_mdc) + 
+                OBD_ALLOC(mdc, strlen(lprof->lp_mdc) +
                           strlen(sbi->ll_instance) + 2);
                 sprintf(mdc, "%s-%s", lprof->lp_mdc, sbi->ll_instance);
         } else {
index a1793cf..2890551 100644 (file)
@@ -31,6 +31,7 @@
 #include <linux/lustre_compat25.h>
 #endif
 #include "llite_internal.h"
+#include <linux/obd_lov.h>
 
 int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
                  struct file *file)
@@ -853,7 +854,7 @@ static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
         if (!capable (CAP_SYS_ADMIN))
                 RETURN(-EPERM);
 
-        rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg, 
+        rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
                             sizeof(struct ll_recreate_obj));
         if (rc) {
                 RETURN(-EFAULT);
@@ -880,7 +881,7 @@ static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
                 RETURN(-ENOMEM);
         }
 
-        oa->o_id = ucreatp.lrc_id; 
+        oa->o_id = ucreatp.lrc_id;
         oa->o_nlink = ucreatp.lrc_ost_idx;
         oa->o_gr = ucreatp.lrc_group;
         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
index 7c8927b..25878b9 100644 (file)
@@ -41,6 +41,8 @@ struct ll_sb_info {
         unsigned long             ll_read_ahead_pages;
         unsigned long             ll_max_read_ahead_pages;
 
+        int                       ll_config_version; /* last-applied update */
+
         /* list of GNS mounts; protected by the dcache_lock */
         struct list_head          ll_mnt_list;
 
@@ -154,6 +156,7 @@ void ll_truncate(struct inode *inode);
 extern struct file_operations ll_file_operations;
 extern struct inode_operations ll_file_inode_operations;
 extern int ll_inode_revalidate_it(struct dentry *, struct lookup_intent *);
+int ll_refresh_lsm(struct inode *inode, struct lov_stripe_md *lsm);
 int ll_extent_lock(struct ll_file_data *, struct inode *,
                    struct lov_stripe_md *, int mode, ldlm_policy_data_t *,
                    struct lustre_handle *, int ast_flags);
@@ -222,6 +225,7 @@ struct dentry *ll_fh_to_dentry(struct super_block *sb, __u32 *data, int len,
                                int fhtype, int parent);
 int ll_dentry_to_fh(struct dentry *, __u32 *datap, int *lenp, int need_parent);
 int null_if_equal(struct ldlm_lock *lock, void *data);
+int ll_process_config_update(struct ll_sb_info *sbi, int clean);
 
 /* llite/special.c */
 extern struct inode_operations ll_special_inode_operations;
index 2e5e41f..26dec71 100644 (file)
@@ -407,8 +407,8 @@ out:
         RETURN(err);
 } /* ll_read_super */
 
-int lustre_process_log(struct lustre_mount_data *lmd, char * profile,
-                       struct config_llog_instance *cfg, int allow_recov)
+static int lustre_process_log(struct lustre_mount_data *lmd, char *profile,
+                              struct config_llog_instance *cfg, int allow_recov)
 {
         struct lustre_cfg lcfg;
         struct portals_cfg pcfg;
@@ -498,18 +498,10 @@ int lustre_process_log(struct lustre_mount_data *lmd, char * profile,
 
         exp = class_conn2export(&mdc_conn);
 
-        ctxt = llog_get_context(&exp->exp_obd->obd_llogs, LLOG_CONFIG_REPL_CTXT);
-#if 1
-        rc = class_config_parse_llog(ctxt, profile, cfg);
-#else
-        /*
-         * For debugging, it's useful to just dump the log
-         */
-        rc = class_config_dump_llog(ctxt, profile, cfg);
-#endif
-        if (rc) {
-                CERROR("class_config_parse_llog failed: rc = %d\n", rc);
-        }
+        ctxt = llog_get_context(&exp->exp_obd->obd_llogs,LLOG_CONFIG_REPL_CTXT);
+        rc = class_config_process_llog(ctxt, profile, cfg);
+        if (rc)
+                CERROR("class_config_process_llog failed: rc = %d\n", rc);
 
         err = obd_disconnect(exp, 0);
 
@@ -597,7 +589,6 @@ int lustre_fill_super(struct super_block *sb, void *data, int silent)
                 err = lustre_process_log(lmd, lmd->lmd_profile, &cfg, 0);
                 if (err < 0) {
                         CERROR("Unable to process log: %s\n", lmd->lmd_profile);
-
                         GOTO(out_free, err);
                 }
 
@@ -648,6 +639,7 @@ out_free:
                 int err;
 
                 if (sbi->ll_instance != NULL) {
+                        struct lustre_mount_data *lmd = sbi->ll_lmd;
                         char * cln_prof;
                         struct config_llog_instance cfg;
 
@@ -655,10 +647,9 @@ out_free:
                         cfg.cfg_uuid = sbi->ll_sb_uuid;
 
                         OBD_ALLOC(cln_prof, len);
-                        sprintf(cln_prof, "%s-clean", sbi->ll_lmd->lmd_profile);
+                        sprintf(cln_prof, "%s-clean", lmd->lmd_profile);
 
-                        err = lustre_process_log(sbi->ll_lmd, cln_prof, &cfg,
-                                                 0);
+                        err = lustre_process_log(lmd, cln_prof, &cfg, 0);
                         if (err < 0)
                                 CERROR("Unable to process log: %s\n", cln_prof);
                         OBD_FREE(cln_prof, len);
@@ -751,6 +742,75 @@ void lustre_put_super(struct super_block *sb)
         EXIT;
 } /* lustre_put_super */
 
+int ll_process_config_update(struct ll_sb_info *sbi, int clean)
+{
+        struct obd_export *mdc_exp = sbi->ll_mdc_exp;
+        struct lustre_mount_data *lmd = sbi->ll_lmd;
+        struct llog_ctxt *ctxt;
+        struct config_llog_instance cfg;
+        char *profile = lmd->lmd_profile, *name = NULL;
+        int rc, namelen =  0, version;
+        ENTRY;
+
+        if (profile == NULL)
+                RETURN(0);
+        if (lmd == NULL) {
+                CERROR("Client not mounted with zero-conf; cannot process "
+                       "update log.\n");
+                RETURN(0);
+        }
+
+        rc = ldlm_cli_cancel_unused(mdc_exp->exp_obd->obd_namespace, NULL,
+                                    LDLM_FL_CONFIG_CHANGE, NULL);
+        if (rc != 0)
+                CWARN("ldlm_cli_cancel_unused(mdc): %d\n", rc);
+
+        rc = obd_cancel_unused(sbi->ll_osc_exp, NULL, LDLM_FL_CONFIG_CHANGE,
+                               NULL);
+        if (rc != 0)
+                CWARN("obd_cancel_unused(lov): %d\n", rc);
+
+        cfg.cfg_instance = sbi->ll_instance;
+        cfg.cfg_uuid = sbi->ll_sb_uuid;
+        cfg.cfg_local_nid = lmd->lmd_local_nid;
+
+        namelen = strlen(profile) + 20; /* -clean-######### */
+        OBD_ALLOC(name, namelen);
+        if (name == NULL)
+                RETURN(-ENOMEM);
+
+        if (clean) {
+                version = sbi->ll_config_version - 1;
+                sprintf(name, "%s-clean-%d", profile, version);
+        } else {
+                version = sbi->ll_config_version + 1;
+                sprintf(name, "%s-%d", profile, version);
+        }
+
+        CWARN("Applying configuration log %s\n", name);
+
+        ctxt = llog_get_context(&mdc_exp->exp_obd->obd_llogs,
+                                LLOG_CONFIG_REPL_CTXT);
+        rc = class_config_process_llog(ctxt, name, &cfg);
+        if (rc == 0)
+                sbi->ll_config_version = version;
+        CWARN("Finished applying configuration log %s: %d\n", name, rc);
+
+        if (rc == 0 && clean == 0) {
+                struct lov_desc desc;
+                int rc, valsize;
+                valsize = sizeof(desc);
+                rc = obd_get_info(sbi->ll_osc_exp, strlen("lovdesc") + 1,
+                                  "lovdesc", &valsize, &desc);
+
+                rc = obd_init_ea_size(mdc_exp,
+                                      obd_size_diskmd(sbi->ll_osc_exp, NULL),
+                                      (desc.ld_tgt_count *
+                                       sizeof(struct llog_cookie)));
+        }
+        OBD_FREE(name, namelen);
+        RETURN(rc);
+}
 
 struct inode *ll_inode_from_lock(struct ldlm_lock *lock)
 {
@@ -1102,6 +1162,7 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
                         if (lli->lli_maxbytes > PAGE_CACHE_MAXBYTES)
                                 lli->lli_maxbytes = PAGE_CACHE_MAXBYTES;
                 } else {
+                        int i;
                         if (memcmp(lli->lli_smd, lsm, sizeof(*lsm))) {
                                 CERROR("lsm mismatch for inode %ld\n",
                                        inode->i_ino);
@@ -1111,6 +1172,18 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
                                 dump_lsm(D_ERROR, lsm);
                                 LBUG();
                         }
+                        /* XXX FIXME -- We should decide on a safer (atomic) and
+                         * more elegant way to update the lsm */
+                        for (i = 0; i < lsm->lsm_stripe_count; i++) {
+                                lli->lli_smd->lsm_oinfo[i].loi_id =
+                                        lsm->lsm_oinfo[i].loi_id;
+                                lli->lli_smd->lsm_oinfo[i].loi_gr =
+                                        lsm->lsm_oinfo[i].loi_gr;
+                                lli->lli_smd->lsm_oinfo[i].loi_ost_idx =
+                                        lsm->lsm_oinfo[i].loi_ost_idx;
+                                lli->lli_smd->lsm_oinfo[i].loi_ost_gen =
+                                        lsm->lsm_oinfo[i].loi_ost_gen;
+                        }
                 }
                 /* bug 2844 - limit i_blksize for broken user-space apps */
                 LASSERTF(lsm->lsm_xfersize != 0, "%lu\n", lsm->lsm_xfersize);
index 35676f2..ccf05f0 100644 (file)
@@ -204,7 +204,7 @@ static int ll_wr_read_ahead(struct file *file, const char *buffer,
         ENTRY;
 
         if (1 != sscanf(buffer, "%d", &readahead))
-                RETURN(-EINVAL);        
+                RETURN(-EINVAL);
 
         if (readahead)
                 sbi->ll_flags |= LL_SBI_READAHEAD;
@@ -214,7 +214,17 @@ static int ll_wr_read_ahead(struct file *file, const char *buffer,
         RETURN(count);
 }
 
-static int ll_rd_max_read_ahead_mb(char *page, char **start, off_t off, 
+static int ll_wr_config_update(struct file *file, const char *buffer,
+                               unsigned long count, void *data)
+{
+        struct super_block *sb = (struct super_block*)data;
+        struct ll_sb_info *sbi = ll_s2sbi(sb);
+        ENTRY;
+
+        RETURN(ll_process_config_update(sbi, 0));
+}
+
+static int ll_rd_max_read_ahead_mb(char *page, char **start, off_t off,
                                    int count, int *eof, void *data)
 {
         struct super_block *sb = data;
@@ -261,8 +271,9 @@ static struct lprocfs_vars lprocfs_obd_vars[] = {
         { "filesfree",    ll_rd_filesfree,        0, 0 },
         //{ "filegroups",   lprocfs_rd_filegroups,  0, 0 },
         { "read_ahead",   ll_rd_read_ahead, ll_wr_read_ahead, 0 },
-        { "max_read_ahead_mb",   ll_rd_max_read_ahead_mb, 
-                                 ll_wr_max_read_ahead_mb, 0 },
+        { "config_update", 0, ll_wr_config_update, 0 },
+        { "max_read_ahead_mb", ll_rd_max_read_ahead_mb,
+                               ll_wr_max_read_ahead_mb, 0 },
         { 0 }
 };
 
@@ -499,8 +510,8 @@ static int llite_dump_pgcache_seq_show(struct seq_file *seq, void *v)
                 int has_flags = 0;
                 struct page *page = llap->llap_page;
 
-                seq_printf(seq, "%lu | %p %p | %p %p %lu [", 
-                                sbi->ll_pglist_gen, 
+                seq_printf(seq, "%lu | %p %p | %p %p %lu [",
+                                sbi->ll_pglist_gen,
                                 llap, llap->llap_cookie,
                                 page, page->mapping->host, page->index);
                 seq_page_flag(seq, page, locked, has_flags);
@@ -511,7 +522,7 @@ static int llite_dump_pgcache_seq_show(struct seq_file *seq, void *v)
                 seq_page_flag(seq, page, highmem, has_flags);
                 if (!has_flags)
                         seq_puts(seq, "-]\n");
-                else 
+                else
                         seq_puts(seq, "]\n");
         }
 
@@ -530,7 +541,7 @@ static void *llite_dump_pgcache_seq_start(struct seq_file *seq, loff_t *pos)
         return (void *)1;
 }
 
-static void *llite_dump_pgcache_seq_next(struct seq_file *seq, void *v, 
+static void *llite_dump_pgcache_seq_next(struct seq_file *seq, void *v,
                                          loff_t *pos)
 {
         struct ll_async_page *llap, *dummy_llap = seq->private;
index 13d143a..5e0d326 100644 (file)
 
 #define LAP_MAGIC 8200
 
+static inline int lov_tgt_changed(struct lov_obd *lov, struct lov_oinfo *loi)
+{
+        return lov->tgts[loi->loi_ost_idx].ltd_gen != loi->loi_ost_gen;
+}
+
 struct lov_async_page {
         int                             lap_magic;
         int                             lap_stripe;
index 6ac2f39..3320032 100644 (file)
@@ -77,15 +77,16 @@ static int lov_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
         LASSERT(logcookies && numcookies >= lsm->lsm_stripe_count);
 
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
-                struct obd_device *child = lov->tgts[loi->loi_ost_idx].ltd_exp->exp_obd; 
+                struct obd_device *child =
+                        lov->tgts[loi->loi_ost_idx].ltd_exp->exp_obd; 
                 struct llog_ctxt *cctxt;
                 cctxt = llog_get_context(&child->obd_llogs, ctxt->loc_idx);
+
                 lur->lur_oid = loi->loi_id;
                 lur->lur_ogen = loi->loi_gr;
                 LASSERT(lsm->lsm_object_gr == loi->loi_gr);
                 rc += llog_add(cctxt, &lur->lur_hdr, NULL, logcookies + rc,
-                                numcookies - rc, NULL);
-
+                               numcookies - rc, NULL);
         }
         OBD_FREE(lur, sizeof(*lur));
 
@@ -93,20 +94,24 @@ static int lov_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
 }
 
 static int lov_llog_origin_connect(struct llog_ctxt *ctxt, int count,
-                                   struct llog_logid *logid, 
-                                   struct llog_gen *gen,
-                                   struct obd_uuid *uuid)
+                                   struct llog_logid *logid,
+                                   struct llog_gen *gen, struct obd_uuid *uuid)
 {
         struct obd_device *obd = ctxt->loc_obd;
         struct lov_obd *lov = &obd->u.lov;
+        struct lov_tgt_desc *tgt;
         int i, rc = 0;
         ENTRY;
 
         LASSERT(lov->desc.ld_tgt_count  == count);
-        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                struct obd_device *child = lov->tgts[i].ltd_exp->exp_obd;
+        for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
+                struct obd_device *child;
                 struct llog_ctxt *cctxt;
 
+                if (!tgt->active)
+                        continue;
+                child = tgt->ltd_exp->exp_obd;
+
                 cctxt = llog_get_context(&child->obd_llogs, ctxt->loc_idx);
                 if (uuid && !obd_uuid_equals(uuid, &lov->tgts[i].uuid))
                         continue;
@@ -139,7 +144,8 @@ static int lov_llog_repl_cancel(struct llog_ctxt *ctxt, int count,
         loi = lsm->lsm_oinfo;
         lov = &obd->u.lov;
         for (i = 0; i < count; i++, cookies++, loi++) {
-                struct obd_device *child = lov->tgts[loi->loi_ost_idx].ltd_exp->exp_obd; 
+                struct obd_device *child =
+                        lov->tgts[loi->loi_ost_idx].ltd_exp->exp_obd; 
                 struct llog_ctxt *cctxt;
                 int err;
 
@@ -169,6 +175,7 @@ int lov_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
                   struct obd_device *tgt, int count, struct llog_catid *logid)
 {
         struct lov_obd *lov = &obd->u.lov;
+        struct lov_tgt_desc *ctgt;
         int i, rc = 0;
         ENTRY;
         
@@ -183,8 +190,12 @@ int lov_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
                 RETURN(rc);
 
         LASSERT(lov->desc.ld_tgt_count  == count);
-        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                struct obd_device *child = lov->tgts[i].ltd_exp->exp_obd;
+        for (i = 0, ctgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, ctgt++) {
+                struct obd_device *child;
+
+                if (!ctgt->active)
+                        continue;
+                child = ctgt->ltd_exp->exp_obd;
                 rc = obd_llog_init(child, &child->obd_llogs, tgt, 1, logid + i);
                 if (rc) {
                         CERROR("error osc_llog_init %d\n", i);
@@ -197,9 +208,10 @@ int lov_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
 int lov_llog_finish(struct obd_device *obd, struct obd_llogs *llogs, int count)
 {
         struct lov_obd *lov = &obd->u.lov;
+        struct lov_tgt_desc *tgt;
         int i, rc = 0;
         ENTRY;
-        
+
         rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_UNLINK_ORIG_CTXT));
         if (rc)
                 RETURN(rc);
@@ -208,12 +220,21 @@ int lov_llog_finish(struct obd_device *obd, struct obd_llogs *llogs, int count)
         if (rc)
                 RETURN(rc);
 
-        LASSERT(lov->desc.ld_tgt_count  == count);
-        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                struct obd_device *child = lov->tgts[i].ltd_exp->exp_obd;
+        if (lov->desc.ld_tgt_count != count) {
+                CERROR("LOV tgt count != passed tgt count (%d != %d)\n",
+                       lov->desc.ld_tgt_count, count);
+                count = MIN(lov->desc.ld_tgt_count, count);
+        }
+        for (i = 0, tgt = lov->tgts; i < count; i++, tgt++) {
+                struct obd_device *child;
+
+                if (!tgt->active)
+                        continue;
+                child = tgt->ltd_exp->exp_obd;
                 rc = obd_llog_finish(child, &child->obd_llogs, 1);
                 if (rc) {
-                        CERROR("error osc_llog_finish %d\n", i);
+                        CERROR("osc_llog_finish error; index=%d; rc=%d\n",
+                               i, rc);
                         break;
                 }
         }
index 494661a..14935b7 100644 (file)
@@ -114,13 +114,101 @@ static void lov_llh_destroy(struct lov_lock_handles *llh)
 }
 
 /* obd methods */
+#define MAX_STRING_SIZE 128
+static int lov_connect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt,
+                           int activate)
+{
+        struct lov_obd *lov = &obd->u.lov;
+        struct obd_uuid *tgt_uuid = &tgt->uuid;
+        struct obd_device *tgt_obd;
+        struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
+        struct lustre_handle conn = {0, };
+        struct proc_dir_entry *lov_proc_dir;
+        int rc;
+        ENTRY;
+
+        tgt_obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
+                                        &obd->obd_uuid);
+
+        if (!tgt_obd) {
+                CERROR("Target %s not attached\n", tgt_uuid->uuid);
+                RETURN(-EINVAL);
+        }
+
+        if (!tgt_obd->obd_set_up) {
+                CERROR("Target %s not set up\n", tgt_uuid->uuid);
+                RETURN(-EINVAL);
+        }
+
+        if (activate) {
+                tgt_obd->obd_no_recov = 0;
+                ptlrpc_activate_import(tgt_obd->u.cli.cl_import);
+        }
+
+        if (tgt_obd->u.cli.cl_import->imp_invalid) {
+                CERROR("not connecting OSC %s; administratively "
+                       "disabled\n", tgt_uuid->uuid);
+                rc = obd_register_observer(tgt_obd, obd);
+                if (rc) {
+                        CERROR("Target %s register_observer error %d; "
+                               "will not be able to reactivate\n",
+                               tgt_uuid->uuid, rc);
+                }
+                RETURN(0);
+        }
+
+        rc = obd_connect(&conn, tgt_obd, &lov_osc_uuid);
+        if (rc) {
+                CERROR("Target %s connect error %d\n", tgt_uuid->uuid, rc);
+                RETURN(rc);
+        }
+        tgt->ltd_exp = class_conn2export(&conn);
+
+        rc = obd_register_observer(tgt_obd, obd);
+        if (rc) {
+                CERROR("Target %s register_observer error %d\n",
+                       tgt_uuid->uuid, rc);
+                obd_disconnect(tgt->ltd_exp, 0);
+                tgt->ltd_exp = NULL;
+                RETURN(rc);
+        }
+
+        tgt->active = 1;
+        lov->desc.ld_active_tgt_count++;
+
+        lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
+        if (lov_proc_dir) {
+                struct obd_device *osc_obd = class_conn2obd(&conn);
+                struct proc_dir_entry *osc_symlink;
+                char name[MAX_STRING_SIZE + 1];
+
+                LASSERT(osc_obd != NULL);
+                LASSERT(osc_obd->obd_type != NULL);
+                LASSERT(osc_obd->obd_type->typ_name != NULL);
+                name[MAX_STRING_SIZE] = '\0';
+                snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
+                         osc_obd->obd_type->typ_name,
+                         osc_obd->obd_name);
+                osc_symlink = proc_symlink(osc_obd->obd_name, lov_proc_dir,
+                                           name);
+                if (osc_symlink == NULL) {
+                        CERROR("could not register LOV target "
+                               "/proc/fs/lustre/%s/%s/target_obds/%s.",
+                               obd->obd_type->typ_name, obd->obd_name,
+                               osc_obd->obd_name);
+                        lprocfs_remove(lov_proc_dir);
+                        lov_proc_dir = NULL;
+                }
+        }
+
+        RETURN(0);
+}
+
 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
                        struct obd_uuid *cluuid)
 {
-        struct ptlrpc_request *req = NULL;
         struct lov_obd *lov = &obd->u.lov;
-        struct lov_desc *desc = &lov->desc;
-        struct lov_tgt_desc *tgts;
+        struct lov_tgt_desc *tgt;
         struct obd_export *exp;
         int rc, rc2, i;
         ENTRY;
@@ -139,72 +227,26 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
                 RETURN(0);
         }
 
-        for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) {
-                struct obd_uuid *tgt_uuid = &tgts->uuid;
-                struct obd_device *tgt_obd;
-                struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
-                struct lustre_handle conn = {0, };
-
-                LASSERT( tgt_uuid != NULL);
-
-                tgt_obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
-                                                &obd->obd_uuid);
-
-                if (!tgt_obd) {
-                        CERROR("Target %s not attached\n", tgt_uuid->uuid);
-                        GOTO(out_disc, rc = -EINVAL);
-                }
-
-                if (!tgt_obd->obd_set_up) {
-                        CERROR("Target %s not set up\n", tgt_uuid->uuid);
-                        GOTO(out_disc, rc = -EINVAL);
-                }
-
-                if (tgt_obd->u.cli.cl_import->imp_invalid) {
-                        CERROR("not connecting OSC %s; administratively "
-                               "disabled\n", tgt_uuid->uuid);
-                        rc = obd_register_observer(tgt_obd, obd);
-                        if (rc) {
-                                CERROR("Target %s register_observer error %d; "
-                                       "will not be able to reactivate\n",
-                                       tgt_uuid->uuid, rc);
-                        }
+        for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
+                if (obd_uuid_empty(&tgt->uuid))
                         continue;
-                }
-
-                rc = obd_connect(&conn, tgt_obd, &lov_osc_uuid);
-                if (rc) {
-                        CERROR("Target %s connect error %d\n", tgt_uuid->uuid,
-                               rc);
-                        GOTO(out_disc, rc);
-                }
-                tgts->ltd_exp = class_conn2export(&conn);
-
-                rc = obd_register_observer(tgt_obd, obd);
-                if (rc) {
-                        CERROR("Target %s register_observer error %d\n",
-                               tgt_uuid->uuid, rc);
-                        obd_disconnect(tgts->ltd_exp, 0);
+                rc = lov_connect_obd(obd, tgt, 0);
+                if (rc)
                         GOTO(out_disc, rc);
-                }
-
-                desc->ld_active_tgt_count++;
-                tgts->active = 1;
         }
 
-        ptlrpc_req_finished(req);
         class_export_put(exp);
         RETURN (0);
 
  out_disc:
         while (i-- > 0) {
                 struct obd_uuid uuid;
-                --tgts;
-                --desc->ld_active_tgt_count;
-                tgts->active = 0;
+                --tgt;
+                --lov->desc.ld_active_tgt_count;
+                tgt->active = 0;
                 /* save for CERROR below; (we know it's terminated) */
-                uuid = tgts->uuid;
-                rc2 = obd_disconnect(tgts->ltd_exp, 0);
+                uuid = tgt->uuid;
+                rc2 = obd_disconnect(tgt->ltd_exp, 0);
                 if (rc2)
                         CERROR("error: LOV target %s disconnect on OST idx %d: "
                                "rc = %d\n", uuid.uuid, i, rc2);
@@ -213,10 +255,63 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         RETURN (rc);
 }
 
+static int lov_disconnect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt,
+                              int flags)
+{
+        struct proc_dir_entry *lov_proc_dir;
+        struct obd_device *osc_obd = class_exp2obd(tgt->ltd_exp);
+        struct lov_obd *lov = &obd->u.lov;
+        int rc;
+        ENTRY;
+
+        lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
+        if (lov_proc_dir) {
+                struct proc_dir_entry *osc_symlink;
+
+                osc_symlink = lprocfs_srch(lov_proc_dir, osc_obd->obd_name);
+                if (osc_symlink) {
+                        lprocfs_remove(osc_symlink);
+                } else {
+                        CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing.",
+                               obd->obd_type->typ_name, obd->obd_name,
+                               osc_obd->obd_name);
+                }
+        }
+
+        if (obd->obd_no_recov) {
+                /* Pass it on to our clients.
+                 * XXX This should be an argument to disconnect,
+                 * XXX not a back-door flag on the OBD.  Ah well.
+                 */
+                if (osc_obd)
+                        osc_obd->obd_no_recov = 1;
+        }
+
+        obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
+
+        rc = obd_disconnect(tgt->ltd_exp, flags);
+        if (rc) {
+                if (tgt->active) {
+                        CERROR("Target %s disconnect error %d\n",
+                               tgt->uuid.uuid, rc);
+                }
+                rc = 0;
+        }
+
+        if (tgt->active) {
+                tgt->active = 0;
+                lov->desc.ld_active_tgt_count--;
+        }
+
+        tgt->ltd_exp = NULL;
+        RETURN(0);
+}
+
 static int lov_disconnect(struct obd_export *exp, int flags)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct lov_obd *lov = &obd->u.lov;
+        struct lov_tgt_desc *tgt;
         int rc, i;
         ENTRY;
 
@@ -228,36 +323,9 @@ static int lov_disconnect(struct obd_export *exp, int flags)
         if (lov->refcount != 0)
                 goto out_local;
 
-        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                if (lov->tgts[i].ltd_exp == NULL)
-                        continue;
-
-                if (obd->obd_no_recov) {
-                        /* Pass it on to our clients.
-                         * XXX This should be an argument to disconnect,
-                         * XXX not a back-door flag on the OBD.  Ah well.
-                         */
-                        struct obd_device *osc_obd;
-                        osc_obd = class_exp2obd(lov->tgts[i].ltd_exp);
-                        if (osc_obd)
-                                osc_obd->obd_no_recov = 1;
-                }
-
-                obd_register_observer(lov->tgts[i].ltd_exp->exp_obd, NULL);
-
-                rc = obd_disconnect(lov->tgts[i].ltd_exp, flags);
-                if (rc) {
-                        if (lov->tgts[i].active) {
-                                CERROR("Target %s disconnect error %d\n",
-                                       lov->tgts[i].uuid.uuid, rc);
-                        }
-                        rc = 0;
-                }
-                if (lov->tgts[i].active) {
-                        lov->desc.ld_active_tgt_count--;
-                        lov->tgts[i].active = 0;
-                }
-                lov->tgts[i].ltd_exp = NULL;
+        for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
+                if (tgt->ltd_exp)
+                        lov_disconnect_obd(obd, tgt, flags);
         }
 
  out_local:
@@ -389,9 +457,6 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
         struct lustre_cfg *lcfg = buf;
         struct lov_desc *desc;
         struct lov_obd *lov = &obd->u.lov;
-        struct obd_uuid *uuids;
-        struct lov_tgt_desc *tgts;
-        int i;
         int count;
         ENTRY;
 
@@ -400,11 +465,6 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
                 RETURN(-EINVAL);
         }
 
-        if (lcfg->lcfg_inllen2 < 1) {
-                CERROR("LOV setup requires an OST UUID list\n");
-                RETURN(-EINVAL);
-        }
-
         desc = (struct lov_desc *)lcfg->lcfg_inlbuf1;
         if (sizeof(*desc) > lcfg->lcfg_inllen1) {
                 CERROR("descriptor size wrong: %d > %d\n",
@@ -412,45 +472,35 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
                 RETURN(-EINVAL);
         }
 
-        count = desc->ld_tgt_count;
-        uuids = (struct obd_uuid *)lcfg->lcfg_inlbuf2;
-        if (sizeof(*uuids) * count != lcfg->lcfg_inllen2) {
-                CERROR("UUID array size wrong: %u * %u != %u\n",
-                       (int)sizeof(*uuids), count, lcfg->lcfg_inllen2);
-                RETURN(-EINVAL);
-        }
-
         /* Because of 64-bit divide/mod operations only work with a 32-bit
          * divisor in a 32-bit kernel, we cannot support a stripe width
          * of 4GB or larger on 32-bit CPUs.
          */
-        if ((desc->ld_default_stripe_count ?
-             desc->ld_default_stripe_count : desc->ld_tgt_count) *
-             desc->ld_default_stripe_size > ~0UL) {
+
+        count = desc->ld_default_stripe_count;
+        if (count && (count * desc->ld_default_stripe_size) > ~0UL) {
                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
-                       desc->ld_default_stripe_size,
-                       desc->ld_default_stripe_count ?
-                       desc->ld_default_stripe_count : desc->ld_tgt_count,~0UL);
+                       desc->ld_default_stripe_size, count, ~0UL);
                 RETURN(-EINVAL);
         }
 
-        lov->bufsize = sizeof(struct lov_tgt_desc) * count;
-        OBD_ALLOC(lov->tgts, lov->bufsize);
-        if (lov->tgts == NULL) {
-                CERROR("Out of memory\n");
-                RETURN(-EINVAL);
+        if (desc->ld_tgt_count) {
+                lov->bufsize = desc->ld_tgt_count * sizeof(struct lov_tgt_desc);
+                OBD_ALLOC(lov->tgts, lov->bufsize);
+                if (lov->tgts == NULL) {
+                        lov->bufsize = 0;
+                        CERROR("couldn't allocate %d bytes for target table.\n",
+                               lov->bufsize);
+                        RETURN(-EINVAL);
+                }
+                desc->ld_tgt_count = 0;
         }
 
+        desc->ld_active_tgt_count = 0;
+        CDEBUG(D_CONFIG, "tgts: %p bufsize: 0x%x\n", lov->tgts, lov->bufsize);
         lov->desc = *desc;
         spin_lock_init(&lov->lov_lock);
 
-        for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) {
-                struct obd_uuid *uuid = &tgts->uuid;
-
-                /* NULL termination already checked */
-                *uuid = uuids[i];
-        }
-
         RETURN(0);
 }
 
@@ -462,6 +512,209 @@ static int lov_cleanup(struct obd_device *obd, int flags)
         RETURN(0);
 }
 
+static int
+lov_add_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
+{
+        struct lov_obd *lov = &obd->u.lov;
+        struct lov_tgt_desc *tgt;
+        struct obd_export *exp_observer;
+        __u32 bufsize;
+        __u32 size = 2;
+        obd_id params[2];
+        char name[32] = "CATLIST";
+        int rc, old_count;
+        ENTRY;
+
+        CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
+               uuidp->uuid, index, gen);
+
+        if (index < 0) {
+                CERROR("request to add OBD %s at invalid index: %d\n",
+                       uuidp->uuid, index);
+                RETURN(-EINVAL);
+        }
+
+        if (gen <= 0) {
+                CERROR("request to add OBD %s with invalid generation: %d\n",
+                       uuidp->uuid, gen);
+                RETURN(-EINVAL);
+        }
+
+        bufsize = sizeof(struct lov_tgt_desc) * (index + 1);
+        if (bufsize > lov->bufsize) {
+                OBD_ALLOC(tgt, bufsize);
+                if (tgt == NULL) {
+                        CERROR("couldn't allocate %d bytes for new table.\n",
+                               bufsize);
+                        RETURN(-ENOMEM);
+                }
+
+                memset(tgt, 0, bufsize);
+                if (lov->tgts) {
+                        memcpy(tgt, lov->tgts, lov->bufsize);
+                        OBD_FREE(lov->tgts, lov->bufsize);
+                }
+
+                lov->tgts = tgt;
+                lov->bufsize = bufsize;
+                CDEBUG(D_CONFIG, "tgts: %p bufsize: %d\n",
+                       lov->tgts, lov->bufsize);
+        }
+
+        tgt = lov->tgts + index;
+        if (!obd_uuid_empty(&tgt->uuid)) {
+                CERROR("OBD already assigned at LOV target index %d\n",
+                       index);
+                RETURN(-EEXIST);
+        }
+
+        tgt->uuid = *uuidp;
+        /* XXX - add a sanity check on the generation number. */
+        tgt->ltd_gen = gen;
+
+        old_count = lov->desc.ld_tgt_count;
+        if (index >= lov->desc.ld_tgt_count)
+                lov->desc.ld_tgt_count = index + 1;
+
+        CDEBUG(D_CONFIG, "idx: %d ltd_gen: %d ld_tgt_count: %d\n",
+                index, tgt->ltd_gen, lov->desc.ld_tgt_count);
+
+        if (lov->refcount == 0)
+                RETURN(0);
+
+        if (tgt->ltd_exp) {
+                struct obd_device *osc_obd;
+
+                osc_obd = class_exp2obd(tgt->ltd_exp);
+                if (osc_obd)
+                        osc_obd->obd_no_recov = 0;
+        }
+
+        rc = lov_connect_obd(obd, tgt, 1);
+        if (rc || !obd->obd_observer)
+                RETURN(rc);
+
+        /* tell the mds_lov about the new target */
+        obd_llog_finish(obd->obd_observer, &obd->obd_observer->obd_llogs,
+                        old_count);
+        obd_llog_cat_initialize(obd->obd_observer,
+                                &obd->obd_observer->obd_llogs,
+                                lov->desc.ld_tgt_count, name);
+
+        params[0] = index;
+        rc = obd_get_info(tgt->ltd_exp, strlen("last_id"), "last_id", &size,
+                          &params[1]);
+        if (rc)
+                GOTO(out, rc);
+
+        exp_observer = obd->obd_observer->obd_self_export;
+        rc = obd_set_info(exp_observer, strlen("next_id"),"next_id", 2, params);
+        if (rc)
+                GOTO(out, rc);
+
+        rc = lov_notify(obd, tgt->ltd_exp->exp_obd, 1);
+        GOTO(out, rc);
+ out:
+        if (rc && tgt->ltd_exp != NULL)
+                lov_disconnect_obd(obd, tgt, 0);
+        return rc;
+}
+
+static int
+lov_del_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
+{
+        struct lov_obd *lov = &obd->u.lov;
+        struct lov_tgt_desc *tgt;
+        int count = lov->desc.ld_tgt_count;
+        int rc = 0;
+        ENTRY;
+
+        CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
+               uuidp->uuid, index, gen);
+
+        if (index >= count) {
+                CERROR("LOV target index %d >= number of LOV OBDs %d.\n",
+                       index, count);
+                RETURN(-EINVAL);
+        }
+
+        tgt = lov->tgts + index;
+
+        if (obd_uuid_empty(&tgt->uuid)) {
+                CERROR("LOV target at index %d is not setup.\n", index);
+                RETURN(-EINVAL);
+        }
+
+        if (strncmp(uuidp->uuid, tgt->uuid.uuid, sizeof uuidp->uuid) != 0) {
+                CERROR("LOV target UUID %s at index %d doesn't match %s.\n",
+                       tgt->uuid.uuid, index, uuidp->uuid);
+                RETURN(-EINVAL);
+        }
+
+        if (tgt->ltd_exp) {
+                struct obd_device *osc_obd;
+
+                osc_obd = class_exp2obd(tgt->ltd_exp);
+                if (osc_obd) {
+                        osc_obd->obd_no_recov = 1;
+                        rc = obd_llog_finish(osc_obd, &osc_obd->obd_llogs, 1);
+                        if (rc)
+                                CERROR("osc_llog_finish error: %d\n", rc);
+                }
+                lov_disconnect_obd(obd, tgt, 0);
+        }
+
+        /* XXX - right now there is a dependency on ld_tgt_count being the
+         * maximum tgt index for computing the mds_max_easize. So we can't
+         * shrink it. */
+
+        /* lt_gen = 0 will mean it will not match the gen of any valid loi */
+        memset(tgt, 0, sizeof(*tgt));
+
+        CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
+               tgt->uuid.uuid, index, tgt->ltd_gen, tgt->ltd_exp, tgt->active);
+
+        RETURN(rc);
+}
+
+static int lov_process_config(struct obd_device *obd, obd_count len, void *buf)
+{
+        struct lustre_cfg *lcfg = buf;
+        struct obd_uuid obd_uuid;
+        int cmd;
+        int index;
+        int gen;
+        int rc = 0;
+        ENTRY;
+
+        switch(cmd = lcfg->lcfg_command) {
+        case LCFG_LOV_ADD_OBD:
+        case LCFG_LOV_DEL_OBD: {
+                if (lcfg->lcfg_inllen1 > sizeof(obd_uuid.uuid))
+                        GOTO(out, rc = -EINVAL);
+
+                obd_str2uuid(&obd_uuid, lcfg->lcfg_inlbuf1);
+
+                if (sscanf(lcfg->lcfg_inlbuf2, "%d", &index) != 1)
+                        GOTO(out, rc = -EINVAL);
+                if (sscanf(lcfg->lcfg_inlbuf3, "%d", &gen) != 1)
+                        GOTO(out, rc = -EINVAL);
+                if (cmd == LCFG_LOV_ADD_OBD)
+                        rc = lov_add_obd(obd, &obd_uuid, index, gen);
+                else
+                        rc = lov_del_obd(obd, &obd_uuid, index, gen);
+                GOTO(out, rc);
+        }
+        default: {
+                CERROR("Unknown command: %d\n", lcfg->lcfg_command);
+                GOTO(out, rc = -EINVAL);
+
+        }
+        }
+out:
+        RETURN(rc);
+}
+
 /* compute object size given "stripeno" and the ost size */
 static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
                                 int stripeno)
@@ -719,7 +972,8 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa,
 
                 ++ost_start_idx;
                 if (lov->tgts[ost_idx].active == 0) {
-                        CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
+                        if (!obd_uuid_empty(&lov->tgts[ost_idx].uuid))
+                                CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
                         continue;
                 }
 
@@ -765,9 +1019,10 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa,
                 loi->loi_id = tmp_oa->o_id;
                 loi->loi_gr = tmp_oa->o_gr;
                 loi->loi_ost_idx = ost_idx;
-                CDEBUG(D_INODE,
-                       "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
-                       lsm->lsm_object_id, loi->loi_id, loi->loi_id, ost_idx);
+                loi->loi_ost_gen = lov->tgts[ost_idx].ltd_gen;
+                CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at "
+                      "idx %d gen %d\n", lsm->lsm_object_id, loi->loi_id,
+                       ost_idx, loi->loi_ost_gen);
 
                 lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm,
                                 obj_alloc, &set);
@@ -888,6 +1143,30 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa,
         goto out_tmp;
 }
 
+static int lov_revalidate_policy(struct lov_obd *lov, struct lov_stripe_md *lsm)
+{
+        static int last_idx = 0;
+        struct lov_tgt_desc *tgt;
+        int i, count;
+
+        /* XXX - we should do something clever and take lsm
+         * into account but just do round robin for now. */
+
+        /* last_idx must always be less that count because
+         * ld_tgt_count currently cannot shrink. */
+        count = lov->desc.ld_tgt_count;
+
+        for (i = last_idx, tgt = lov->tgts + i; i < count; i++, tgt++)
+                if (tgt->active)
+                        RETURN(last_idx = i);
+
+        for (i = 0, tgt = lov->tgts; i < last_idx; i++, tgt++)
+                if (tgt->active)
+                        RETURN(last_idx = i);
+
+        RETURN(-EIO);
+}
+
 #define lsm_bad_magic(LSMP)                                     \
 ({                                                              \
         struct lov_stripe_md *_lsm__ = (LSMP);                  \
@@ -919,8 +1198,10 @@ static int lov_destroy(struct obd_export *exp, struct obdo *oa,
                 RETURN(-ENODEV);
 
         lov = &exp->exp_obd->u.lov;
-        for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
+        loi = lsm->lsm_oinfo;
+        for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
                 int err;
+
                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         /* Orphan clean up will (someday) fix this up. */
@@ -1011,8 +1292,8 @@ static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data,
         if (rc == 0) {
                 /* NB all stripe requests succeeded to get here */
 
-                for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
-                     i++, loi++) {
+                loi = lsm->lsm_oinfo;
+                for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
                         if (obdos[i].o_valid == 0)      /* inactive stripe */
                                 continue;
 
@@ -1064,8 +1345,10 @@ static int lov_getattr_async(struct obd_export *exp, struct obdo *oa,
                         continue;
                 }
 
-                CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
-                       "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
+                CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at "
+                       "idx %u gen %d\n", oa->o_id, i, loi->loi_id,
+                       loi->loi_ost_idx, loi->loi_ost_gen);
+
                 /* create data objects with "parent" OA */
                 memcpy(&obdos[i], oa, sizeof(obdos[i]));
                 obdos[i].o_id = loi->loi_id;
@@ -1370,11 +1653,75 @@ static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
         unsigned long swidth = ssize * lsm->lsm_stripe_count;
         unsigned long stripe_off;
 
+        if (lsm->lsm_pattern == LOV_PATTERN_CMOBD)
+                return 0;
+
         stripe_off = do_div(lov_off, swidth);
 
         return stripe_off / ssize;
 }
 
+static int lov_revalidate_md(struct obd_export *exp, struct obdo *src_oa,
+                             struct lov_stripe_md *ea,
+                             struct obd_trans_info *oti)
+{
+        struct obd_export *osc_exp;
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
+        struct lov_stripe_md *lsm = ea;
+        struct lov_stripe_md obj_md;
+        struct lov_stripe_md *obj_mdp = &obj_md;
+        struct lov_oinfo *loi;
+        struct obdo *tmp_oa;
+        int ost_idx, updates = 0, i;
+        ENTRY;
+
+        tmp_oa = obdo_alloc();
+        if (tmp_oa == NULL)
+                RETURN(-ENOMEM);
+
+        loi = lsm->lsm_oinfo;
+        for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                int rc;
+                if (!obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid))
+                        continue;
+
+                ost_idx = lov_revalidate_policy(lov, lsm);
+                if (ost_idx < 0) {
+                        /* FIXME: punt for now. */
+                        CERROR("lov_revalidate_policy failed; no active "
+                               "OSCs?\n");
+                        continue;
+                }
+
+                /* create a new object */
+                memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
+                /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
+                osc_exp = lov->tgts[ost_idx].ltd_exp;
+                rc = obd_create(osc_exp, tmp_oa, &obj_mdp, oti);
+                if (rc) {
+                        CERROR("error creating new subobj at idx %d; "
+                               "rc = %d\n", ost_idx, rc);
+                        continue;
+                }
+                if (oti->oti_objid)
+                        oti->oti_objid[ost_idx] = tmp_oa->o_id;
+                loi->loi_id = tmp_oa->o_id;
+                loi->loi_ost_idx = ost_idx;
+                loi->loi_ost_gen = lov->tgts[ost_idx].ltd_gen;
+                CDEBUG(D_INODE, "replacing objid "LPX64" subobj "LPX64
+                       " with idx %d gen %d.\n", lsm->lsm_object_id,
+                       loi->loi_id, ost_idx, loi->loi_ost_gen);
+                updates = 1;
+        }
+
+        /* If we got an error revalidating an entry there's no need to
+         * cleanup up objects we allocated here because the bad entry
+         * still points to a deleted OST. */
+
+        obdo_free(tmp_oa);
+        RETURN(updates);
+}
+
 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
  * we can send this 'punch' to just the authoritative node and the nodes
  * that the punch will affect. */
@@ -1485,6 +1832,7 @@ static int lov_brw_check(struct lov_obd *lov, struct obdo *oa,
                          obd_count oa_bufs, struct brw_page *pga)
 {
         int i, rc = 0;
+        ENTRY;
 
         /* The caller just wants to know if there's a chance that this
          * I/O can succeed */
@@ -1500,14 +1848,14 @@ static int lov_brw_check(struct lov_obd *lov, struct obdo *oa,
 
                 if (lov->tgts[ost].active == 0) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
-                        return -EIO;
+                        RETURN(-EIO);
                 }
-                rc = obd_brw(OBD_BRW_CHECK, lov->tgts[stripe].ltd_exp, oa,
+                rc = obd_brw(OBD_BRW_CHECK, lov->tgts[ost].ltd_exp, oa,
                              NULL, 1, &pga[i], NULL);
                 if (rc)
                         break;
         }
-        return rc;
+        RETURN(rc);
 }
 
 static int lov_brw(int cmd, struct obd_export *exp, struct obdo *src_oa,
@@ -1845,20 +2193,34 @@ static struct obd_async_page_ops lov_async_page_ops = {
         .ap_completion =        lov_ap_completion,
 };
 
-int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
-                           struct lov_oinfo *loi, struct page *page,
-                           obd_off offset, struct obd_async_page_ops *ops,
-                           void *data, void **res)
+static int lov_prep_async_page(struct obd_export *exp,
+                               struct lov_stripe_md *lsm,
+                               struct lov_oinfo *loi, struct page *page,
+                               obd_off offset, struct obd_async_page_ops *ops,
+                               void *data, void **res)
 {
         struct lov_obd *lov = &exp->exp_obd->u.lov;
         struct lov_async_page *lap;
-        int rc;
+        int rc, stripe;
         ENTRY;
 
         if (lsm_bad_magic(lsm))
                 RETURN(-EINVAL);
         LASSERT(loi == NULL);
 
+        stripe = lov_stripe_number(lsm, offset);
+        loi = &lsm->lsm_oinfo[stripe];
+
+        if (obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid))
+                RETURN(-EIO);
+        if (lov->tgts[loi->loi_ost_idx].active == 0)
+                RETURN(-EIO);
+        if (lov->tgts[loi->loi_ost_idx].ltd_exp == NULL) {
+                CERROR("ltd_exp == NULL, but OST idx %d doesn't appear to be "
+                       "deleted or inactive.\n", loi->loi_ost_idx);
+                RETURN(-EIO);
+        }
+
         OBD_ALLOC(lap, sizeof(*lap));
         if (lap == NULL)
                 RETURN(-ENOMEM);
@@ -1868,20 +2230,18 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
         lap->lap_caller_data = data;
 
         /* FIXME handle multiple oscs after landing b_raid1 */
+        lap->lap_stripe = stripe;
         switch (lsm->lsm_pattern) {
                 case LOV_PATTERN_RAID0:
-                        lap->lap_stripe = lov_stripe_number(lsm, offset);
                         lov_stripe_offset(lsm, offset, lap->lap_stripe, 
                                           &lap->lap_sub_offset);
                         break;
                 case LOV_PATTERN_CMOBD:
-                        lap->lap_stripe = 0;
                         lap->lap_sub_offset = offset;
                         break;
                 default:
                         LBUG();
         }
-        loi = &lsm->lsm_oinfo[lap->lap_stripe];
 
         /* so the callback doesn't need the lsm */
         lap->lap_loi_id = loi->loi_id;
@@ -1920,6 +2280,7 @@ static int lov_queue_async_io(struct obd_export *exp,
                 RETURN(PTR_ERR(lap));
 
         loi = &lsm->lsm_oinfo[lap->lap_stripe];
+
         rc = obd_queue_async_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm,
                                 loi, lap->lap_sub_cookie, cmd, off, count,
                                 brw_flags, async_flags);
@@ -1945,6 +2306,7 @@ static int lov_set_async_flags(struct obd_export *exp,
                 RETURN(PTR_ERR(lap));
 
         loi = &lsm->lsm_oinfo[lap->lap_stripe];
+
         rc = obd_set_async_flags(lov->tgts[loi->loi_ost_idx].ltd_exp,
                                  lsm, loi, lap->lap_sub_cookie, async_flags);
         RETURN(rc);
@@ -1971,6 +2333,7 @@ static int lov_queue_group_io(struct obd_export *exp,
                 RETURN(PTR_ERR(lap));
 
         loi = &lsm->lsm_oinfo[lap->lap_stripe];
+
         rc = obd_queue_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi,
                                 oig, lap->lap_sub_cookie, cmd, off, count,
                                 brw_flags, async_flags);
@@ -1993,8 +2356,13 @@ static int lov_trigger_group_io(struct obd_export *exp,
         if (lsm_bad_magic(lsm))
                 RETURN(-EINVAL);
 
-        for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
-             i++, loi++) {
+        loi = lsm->lsm_oinfo;
+        for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
+                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+                        continue;
+                }
+
                 err = obd_trigger_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp,
                                            lsm, loi, oig);
                 if (rc == 0 && err != 0)
@@ -2021,6 +2389,7 @@ static int lov_teardown_async_page(struct obd_export *exp,
                 RETURN(PTR_ERR(lap));
 
         loi = &lsm->lsm_oinfo[lap->lap_stripe];
+
         rc = obd_teardown_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
                                      lsm, loi, lap->lap_sub_cookie);
         if (rc) {
@@ -2229,6 +2598,10 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
                 sub_ext.l_extent.start = start;
                 sub_ext.l_extent.end = end;
 
+                if (obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid)) {
+                        CDEBUG(D_HA, "lov idx %d deleted\n", loi->loi_ost_idx);
+                        continue;
+                }
                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         rc = -EIO;
@@ -2303,8 +2676,10 @@ static int lov_change_cbdata(struct obd_export *exp,
         lov = &exp->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 struct lov_stripe_md submd;
-                if (lov->tgts[loi->loi_ost_idx].active == 0)
+                if (lov->tgts[loi->loi_ost_idx].active == 0) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+                        continue;
+                }
 
                 submd.lsm_object_id = loi->loi_id;
                 submd.lsm_object_gr = lsm->lsm_object_gr;
@@ -2391,6 +2766,17 @@ static int lov_cancel_unused(struct obd_export *exp,
         int rc = 0, i;
         ENTRY;
 
+        lov = &exp->exp_obd->u.lov;
+        if (lsm == NULL) {
+                for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                        int err = obd_cancel_unused(lov->tgts[i].ltd_exp, NULL,
+                                                    flags, opaque);
+                        if (!rc)
+                                rc = err;
+                }
+                RETURN(rc);
+        }
+
         if (lsm_bad_magic(lsm))
                 RETURN(-EINVAL);
 
@@ -2399,7 +2785,6 @@ static int lov_cancel_unused(struct obd_export *exp,
 
         LASSERT(lsm->lsm_object_gr > 0);
 
-        lov = &exp->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 struct lov_stripe_md submd;
                 int err;
@@ -2446,7 +2831,6 @@ static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
         /* We only get block data from the OBD */
         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
                 int err;
-
                 if (!lov->tgts[i].active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
                         continue;
@@ -2503,10 +2887,8 @@ static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 {
         struct obd_device *obddev = class_exp2obd(exp);
         struct lov_obd *lov = &obddev->u.lov;
-        int i, count = lov->desc.ld_tgt_count;
+        int i, rc, count = lov->desc.ld_tgt_count;
         struct obd_uuid *uuidp;
-        int rc;
-
         ENTRY;
 
         switch (cmd) {
@@ -2515,6 +2897,7 @@ static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 struct lov_tgt_desc *tgtdesc;
                 struct lov_desc *desc;
                 char *buf = NULL;
+                __u32 *genp;
 
                 buf = NULL;
                 len = 0;
@@ -2533,13 +2916,22 @@ static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                         RETURN(-EINVAL);
                 }
 
+                if (sizeof(__u32) * count > data->ioc_inllen3) {
+                        OBD_FREE(buf, len);
+                        RETURN(-EINVAL);
+                }
+
                 desc = (struct lov_desc *)data->ioc_inlbuf1;
                 memcpy(desc, &(lov->desc), sizeof(*desc));
 
                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
+                genp = (__u32 *)data->ioc_inlbuf3;
                 tgtdesc = lov->tgts;
-                for (i = 0; i < count; i++, uuidp++, tgtdesc++)
+                /* the uuid will be empty for deleted OSTs */
+                for (i = 0; i < count; i++, uuidp++, genp++, tgtdesc++) {
                         obd_str2uuid(uuidp, tgtdesc->uuid.uuid);
+                        *genp = tgtdesc->ltd_gen;
+                }
 
                 rc = copy_to_user((void *)uarg, buf, len);
                 if (rc)
@@ -2564,6 +2956,10 @@ static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 for (i = 0; i < count; i++) {
                         int err;
 
+                        /* OST was deleted */
+                        if (obd_uuid_empty(&lov->tgts[i].uuid))
+                                continue;
+
                         err = obd_iocontrol(cmd, lov->tgts[i].ltd_exp,
                                             len, karg, uarg);
                         if (err) {
@@ -2613,6 +3009,9 @@ static int lov_get_info(struct obd_export *exp, __u32 keylen,
                 /* XXX This is another one of those bits that will need to
                  * change if we ever actually support nested LOVs.  It uses
                  * the lock's export to find out which stripe it is. */
+                /* XXX - it's assumed all the locks for deleted OSTs have
+                 * been cancelled. Also, the export for deleted OSTs will
+                 * be NULL and won't match the lock's export. */
                 for (i = 0, loi = data->lsm->lsm_oinfo;
                      i < data->lsm->lsm_stripe_count;
                      i++, loi++) {
@@ -2676,6 +3075,10 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen,
                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
                         int er;
 
+                        /* OST was deleted */
+                        if (obd_uuid_empty(&lov->tgts[i].uuid))
+                                continue;
+
                         /* initialize all OSCs, even inactive ones */
 
                         er = obd_set_info(lov->tgts[i].ltd_exp, keylen, key,
@@ -2701,6 +3104,10 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen,
         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
                 int er;
 
+                /* OST was deleted */
+                if (obd_uuid_empty(&lov->tgts[i].uuid))
+                        continue;
+
                 if (!val && !lov->tgts[i].active)
                         continue;
 
@@ -2888,11 +3295,13 @@ struct obd_ops lov_obd_ops = {
         .o_detach              = lov_detach,
         .o_setup               = lov_setup,
         .o_cleanup             = lov_cleanup,
+        o_process_config: lov_process_config,
         .o_connect             = lov_connect,
         .o_disconnect          = lov_disconnect,
         .o_statfs              = lov_statfs,
         .o_packmd              = lov_packmd,
         .o_unpackmd            = lov_unpackmd,
+        o_revalidate_md: lov_revalidate_md,
         .o_create              = lov_create,
         .o_destroy             = lov_destroy,
         .o_getattr             = lov_getattr,
@@ -2925,6 +3334,7 @@ int __init lov_init(void)
 {
         struct lprocfs_static_vars lvars;
         int rc;
+        ENTRY;
 
         lprocfs_init_vars(lov, &lvars);
         rc = class_register_type(&lov_obd_ops, NULL, lvars.module_vars,
index 2c1f4b4..2594559 100644 (file)
@@ -368,7 +368,7 @@ int lov_unpackmd_v1(struct lov_obd *lov, struct lov_stripe_md *lsm,
                 loi->loi_gr = le64_to_cpu(lmm->lmm_objects[i].l_object_gr);
                 loi->loi_ost_idx = le32_to_cpu(lmm->lmm_objects[i].l_ost_idx);
                 loi->loi_ost_gen = le32_to_cpu(lmm->lmm_objects[i].l_ost_gen);
-                if (loi->loi_ost_idx > lov->desc.ld_tgt_count) {
+                if (loi->loi_ost_idx >= lov->desc.ld_tgt_count) {
                         CERROR("OST index %d more than OST count %d\n",
                                loi->loi_ost_idx, lov->desc.ld_tgt_count);
                         lov_dump_lmm_v1(D_WARNING, lmm);
index 0c58d13..ae06d64 100644 (file)
@@ -243,7 +243,7 @@ int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
                 /* process records in buffer, starting where we found one */
                 while ((void *)rec < buf + LLOG_CHUNK_SIZE) {
                         if (rec->lrh_index == 0)
-                                GOTO(out, 0); /* no more records */
+                                GOTO(out, rc = 0); /* no more records */
 
                         /* if set, process the callback on this record */
                         if (ext2_test_bit(index, llh->llh_bitmap)) {
index c05774f..307713a 100644 (file)
@@ -76,7 +76,8 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
                 llh->llh_tail.lrt_index = cpu_to_le32(index);
         }
 
-        rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
+        rc = llog_open(cathandle->lgh_ctxt, &loghandle, NULL, NULL,
+                       OBD_LLOG_FL_CREATE);
         if (rc) {
                 CERROR("cannot create new log, error = %d\n", rc);
                 RETURN(ERR_PTR(rc));
@@ -148,7 +149,7 @@ int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
                 }
         }
 
-        rc = llog_create(cathandle->lgh_ctxt, &loghandle, logid, NULL);
+        rc = llog_open(cathandle->lgh_ctxt, &loghandle, logid, NULL, 0);
         if (rc) {
                 CERROR("error opening log id "LPX64":%x: rc %d\n",
                        logid->lgl_oid, logid->lgl_ogen, rc);
@@ -549,9 +550,10 @@ int llog_catalog_setup(struct llog_ctxt **res, char *name,
                 RETURN(rc);
         }
         if (catid.lci_logid.lgl_oid)
-                rc = llog_create(ctxt, &handle, &catid.lci_logid, 0);
+                rc = llog_open(ctxt, &handle, &catid.lci_logid, NULL,
+                               OBD_LLOG_FL_CREATE);
         else {
-                rc = llog_create(ctxt, &handle, NULL, NULL);
+                rc = llog_open(ctxt, &handle, NULL, NULL, OBD_LLOG_FL_CREATE);
                 if (!rc)
                         catid.lci_logid = handle->lgh_id;
         }
@@ -579,7 +581,7 @@ int llog_catalog_cleanup(struct llog_ctxt *ctxt)
         struct llog_log_hdr *llh;
         int rc, index;
         ENTRY;
-                                                                                                                             
+
         if (!ctxt)
                 return 0;
 
@@ -599,7 +601,7 @@ int llog_catalog_cleanup(struct llog_ctxt *ctxt)
                                 LASSERT(rc == 0);
                                 index = loghandle->u.phd.phd_cookie.lgc_index;
                                 llog_free_handle(loghandle);
-                                                                                                                             
+
                                 LASSERT(index);
                                 llog_cat_set_first_idx(cathandle, index);
                                 rc = llog_cancel_rec(cathandle, index);
@@ -620,7 +622,7 @@ int llog_cat_half_bottom(struct llog_cookie *cookie, struct llog_handle *handle)
         struct llog_handle *loghandle;
         struct llog_logid *lgl = &cookie->lgc_lgl;
         int rc;
-                                                                                                                             
+
         down_read(&handle->lgh_lock);
         rc = llog_cat_id2handle(handle, &loghandle, lgl);
         if (rc)
index 6b70cf1..6a4c74f 100644 (file)
@@ -204,7 +204,7 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
                 loff_t saved_offset;
 
                 /* no header: only allowed to insert record 1 */
-                if (idx != 1 && !file->f_dentry->d_inode->i_size) {
+                if (idx > 1 && !file->f_dentry->d_inode->i_size) {
                         CERROR("idx != -1 in empty log\n");
                         LBUG();
                 }
@@ -268,12 +268,14 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
         if (rc)
                 RETURN(rc);
 
+        CDEBUG(D_HA, "adding record "LPX64": idx: %u, %u bytes off: %lld\n",
+               loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len),
+               file->f_pos);
+
         rc = llog_lvfs_write_blob(ctxt, file, rec, buf, file->f_pos);
         if (rc)
                 RETURN(rc);
 
-        CDEBUG(D_HA, "added record "LPX64": idx: %u, %u bytes\n",
-               loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len));
         if (rc == 0 && reccookie) {
                 reccookie->lgc_lgl = loghandle->lgh_id;
                 reccookie->lgc_index = index;
@@ -318,7 +320,6 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
                                 int len)
 {
         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
-        int rc;
         ENTRY;
 
         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
@@ -331,6 +332,7 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
                 struct llog_rec_hdr *rec;
                 struct llog_rec_tail *tail;
                 loff_t ppos;
+                int nbytes, rc;
 
                 llog_skip_over(curr_offset, *curr_idx, next_idx);
 
@@ -347,21 +349,20 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
                         RETURN(rc);
                 }
 
-                /* put number of bytes read into rc to make code simpler */
-                rc = ppos - *curr_offset;
+                nbytes = ppos - *curr_offset;
                 *curr_offset = ppos;
 
-                if (rc == 0) /* end of file, nothing to do */
+                if (nbytes == 0) /* end of file, nothing to do */
                         RETURN(0);
 
-                if (rc < sizeof(*tail)) {
+                if (nbytes < sizeof(*tail)) {
                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
                                LPU64"\n", loghandle->lgh_id.lgl_oid,
                                loghandle->lgh_id.lgl_ogen, *curr_offset);
                         RETURN(-EINVAL);
                 }
 
-                tail = buf + rc - sizeof(struct llog_rec_tail);
+                tail = buf + nbytes - sizeof(struct llog_rec_tail);
                 *curr_idx = le32_to_cpu(tail->lrt_index);
 
                 /* this shouldn't happen */
@@ -476,7 +477,8 @@ static struct file *llog_filp_open(char *name, int flags, int mode)
         } else {
                 filp = l_filp_open(logname, flags, mode);
                 if (IS_ERR(filp)) {
-                        CERROR("logfile creation %s: %ld\n", logname,
+                        CERROR("logfile %s(%s): %ld\n",
+                               flags & O_CREAT ? "create" : "open", logname,
                                PTR_ERR(filp));
                 }
         }
@@ -664,15 +666,18 @@ out:
         RETURN(rc);
 }
 
-static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
-                            struct llog_logid *logid, char *name)
+static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
+                          struct llog_logid *logid, char *name, int flags)
 {
         struct llog_handle *handle;
         struct lvfs_run_ctxt saved;
         int rc = 0;
-        int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
+        int open_flags = O_RDWR | O_LARGEFILE;
         ENTRY;
-        
+
+        if (flags & OBD_LLOG_FL_CREATE)
+                open_flags |= O_CREAT;
+
         handle = llog_alloc_handle();
         if (handle == NULL)
                 RETURN(-ENOMEM);
@@ -937,7 +942,7 @@ int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
 EXPORT_SYMBOL(llog_put_cat_list);
 
 struct llog_operations llog_lvfs_ops = {
-        lop_create:      llog_lvfs_create,
+        lop_open:        llog_lvfs_open,
         lop_destroy:     llog_lvfs_destroy,
         lop_close:       llog_lvfs_close,
         lop_read_header: llog_lvfs_read_header,
@@ -964,8 +969,8 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
         return 0;
 }
 
-static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
-                            struct llog_logid *logid, char *name)
+static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
+                          struct llog_logid *logid, char *name, int flags)
 {
         LBUG();
         return 0;
@@ -1014,7 +1019,7 @@ int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
 }
 
 struct llog_operations llog_lvfs_ops = {
-        lop_create:      llog_lvfs_create,
+        lop_open:        llog_lvfs_open,
         lop_destroy:     llog_lvfs_destroy,
         lop_close:       llog_lvfs_close,
         lop_read_header: llog_lvfs_read_header,
index 335d475..4adccbe 100644 (file)
@@ -672,7 +672,7 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         case OBD_IOC_PARSE: {
                 ctxt = llog_get_context(&exp->exp_obd->obd_llogs,
                                         LLOG_CONFIG_REPL_CTXT);
-                rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
+                rc = class_config_process_llog(ctxt, data->ioc_inlbuf1, NULL);
                 GOTO(out, rc);
         }
 #ifdef __KERNEL__
@@ -985,7 +985,7 @@ err_rpc_lock:
         RETURN(rc);
 }
 
-int mdc_init_ea_size(struct obd_export *exp, int easize, int cookiesize)
+static int mdc_init_ea_size(struct obd_export *exp, int easize, int cookiesize)
 {
         struct obd_device *obd = exp->exp_obd;
         struct client_obd *cli = &obd->u.cli;
index 02dfa13..8fc0cdc 100644 (file)
@@ -1494,9 +1494,11 @@ static int mds_set_info(struct obd_export *exp, __u32 keylen,
                 RETURN(-EINVAL);
         }
 
+#define KEY_IS(str) \
+        (keylen == strlen(str) && memcmp(key, str, keylen) == 0)
+
         mds = &obd->u.mds;
-        if (keylen == strlen("mds_num") &&
-            memcmp(key, "mds_num", keylen) == 0) {
+        if (KEY_IS("mds_num")) {
                 int valsize;
                 __u32 group;
                 CDEBUG(D_IOCTL, "set mds num %d\n", *(int*)val);
@@ -1508,8 +1510,7 @@ static int mds_set_info(struct obd_export *exp, __u32 keylen,
                 rc = obd_set_info(mds->mds_osc_exp, strlen("mds_conn"), "mds_conn",
                           valsize, &group);
                 RETURN(rc);
-        } else if (keylen == strlen("client") &&
-                   memcmp(key, "client", keylen) == 0) {
+        } else if (KEY_IS("client")) {
                 if (!(exp->exp_flags & OBD_OPT_REAL_CLIENT)) {
                         atomic_inc(&mds->mds_real_clients);
                         CDEBUG(D_OTHER,"%s: peer from %s is real client (%d)\n",
@@ -1784,10 +1785,10 @@ int mds_handle(struct ptlrpc_request *req)
                 LBUG();
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
                 break;
-        case LLOG_ORIGIN_HANDLE_CREATE:
+        case LLOG_ORIGIN_HANDLE_OPEN:
                 DEBUG_REQ(D_INODE, req, "llog_init");
                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
-                rc = llog_origin_handle_create(req);
+                rc = llog_origin_handle_open(req);
                 break;
         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
                 DEBUG_REQ(D_INODE, req, "llog next block");
@@ -2085,7 +2086,7 @@ static int mds_postsetup(struct obd_device *obd)
                 cfg.cfg_instance = NULL;
                 cfg.cfg_uuid = mds->mds_lov_uuid;
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-                rc = class_config_parse_llog(llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT),
+                rc = class_config_process_llog(llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT),
                                              mds->mds_profile, &cfg);
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 if (rc)
@@ -2182,8 +2183,8 @@ int mds_lov_clean(struct obd_device *obd)
                 cfg.cfg_uuid = mds->mds_lov_uuid;
 
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-                class_config_parse_llog(llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT),
-                                        cln_prof, &cfg);
+                class_config_process_llog(llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT),
+                                          cln_prof, &cfg);
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
                 OBD_FREE(cln_prof, len);
index a3e3417..8b2426b 100644 (file)
@@ -28,6 +28,7 @@
 #include <linux/obd.h>
 #include <linux/obd_class.h>
 #include <linux/lprocfs_status.h>
+#include "mds_internal.h"
 
 #ifndef LPROCFS
 struct lprocfs_vars lprocfs_mds_obd_vars[]  = { {0} };
@@ -153,6 +154,15 @@ static int lprocfs_mds_wr_evict_client(struct file *file, const char *buffer,
         return count;
 }
 
+static int lprocfs_mds_wr_config_update(struct file *file, const char *buffer,
+                                        unsigned long count, void *data)
+{
+        struct obd_device *obd = data;
+        ENTRY;
+
+        RETURN(mds_lov_update_config(obd, 0));
+}
+
 struct lprocfs_vars lprocfs_mds_obd_vars[] = {
         { "uuid",         lprocfs_rd_uuid,        0, 0 },
         { "blocksize",    lprocfs_rd_blksize,     0, 0 },
@@ -166,6 +176,7 @@ struct lprocfs_vars lprocfs_mds_obd_vars[] = {
         { "mntdev",       lprocfs_mds_rd_mntdev,  0, 0 },
         { "recovery_status", lprocfs_mds_rd_recovery_status, 0, 0 },
         { "evict_client", 0, lprocfs_mds_wr_evict_client, 0 },
+        { "config_update", 0, lprocfs_mds_wr_config_update, 0 },
         { "num_exports",  lprocfs_rd_num_exports, 0, 0 },
         { 0 }
 };
index 0751a8e..d02429d 100644 (file)
@@ -80,8 +80,11 @@ int mds_lov_set_nextid(struct obd_device *obd);
 int mds_lov_clearorphans(struct mds_obd *mds, struct obd_uuid *ost_uuid);
 int mds_post_mds_lovconf(struct obd_device *obd);
 int mds_notify(struct obd_device *obd, struct obd_device *watched, int active);
+int mds_lov_update_config(struct obd_device *obd, int transno);
 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
                        struct lov_mds_md *lmm, int lmm_size);
+int mds_revalidate_lov_ea(struct obd_device *obd, struct inode *inode,
+                          struct lustre_msg *msg, int offset);
 
 /* mds/mds_open.c */
 int mds_query_write_access(struct inode *inode);
index 64e7041..73a4629 100644 (file)
@@ -172,12 +172,32 @@ int mds_lov_set_growth(struct mds_obd *mds, int count)
         RETURN(rc);
 }
 
+static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        int valsize = sizeof(mds->mds_lov_desc), rc, i;
+        ENTRY;
+
+        rc = obd_get_info(lov, strlen("lovdesc") + 1, "lovdesc", &valsize,
+                          &mds->mds_lov_desc);
+        if (rc)
+                RETURN(rc);
+
+        i = lov_mds_md_size(mds->mds_lov_desc.ld_tgt_count);
+        if (i > mds->mds_max_mdsize)
+                mds->mds_max_mdsize = i;
+        mds->mds_max_cookiesize = mds->mds_lov_desc.ld_tgt_count *
+                sizeof(struct llog_cookie);
+        mds->mds_has_lov_desc = 1;
+        RETURN(0);
+}
+
 int mds_lov_connect(struct obd_device *obd, char * lov_name)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lustre_handle conn = {0,};
         char name[32] = "CATLIST";
-        int valsize, rc, i;
+        int rc, i, valsize;
         __u32 group;
         ENTRY;
 
@@ -195,6 +215,9 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
                 RETURN(-ENOTCONN);
         }
 
+        CDEBUG(D_HA, "obd: %s osc: %s lov_name: %s\n",
+               obd->obd_name, mds->mds_osc_obd->obd_name, lov_name);
+
         rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid);
         if (rc) {
                 CERROR("MDS cannot connect to LOV %s (%d)\n",
@@ -211,18 +234,10 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
                 GOTO(err_discon, rc);
         }
 
-        valsize = sizeof(mds->mds_lov_desc);
-        rc = obd_get_info(mds->mds_osc_exp, strlen("lovdesc") + 1, "lovdesc",
-                          &valsize, &mds->mds_lov_desc);
+        rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
         if (rc)
                 GOTO(err_reg, rc);
 
-        i = lov_mds_md_size(mds->mds_lov_desc.ld_tgt_count);
-        if (i > mds->mds_max_mdsize)
-                mds->mds_max_mdsize = i;
-        mds->mds_max_cookiesize = mds->mds_lov_desc.ld_tgt_count*
-                sizeof(struct llog_cookie);
-        mds->mds_has_lov_desc = 1;
         rc = mds_lov_read_objids(obd);
         if (rc) {
                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
@@ -317,6 +332,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         struct obd_ioctl_data *data = karg;
         struct lvfs_run_ctxt saved;
         int rc = 0;
+        ENTRY;
 
         switch (cmd) {
         case OBD_IOC_RECORD: {
@@ -325,9 +341,10 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                         RETURN(-EBUSY);
 
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-                rc = llog_create(llog_get_context(&obd->obd_llogs, 
-                                 LLOG_CONFIG_ORIG_CTXT),
-                                 &mds->mds_cfg_llh, NULL, name);
+                rc = llog_open(llog_get_context(&obd->obd_llogs, 
+                                                LLOG_CONFIG_ORIG_CTXT),
+                               &mds->mds_cfg_llh, NULL, name,
+                               OBD_LLOG_FL_CREATE);
                 if (rc == 0)
                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
                                          &cfg_uuid);
@@ -356,18 +373,19 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                         RETURN(-EBUSY);
 
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-                rc = llog_create(llog_get_context(&obd->obd_llogs, 
-                                 LLOG_CONFIG_ORIG_CTXT),
-                                 &mds->mds_cfg_llh, NULL, name);
+                rc = llog_open(llog_get_context(&obd->obd_llogs, 
+                                                LLOG_CONFIG_ORIG_CTXT),
+                               &mds->mds_cfg_llh, NULL, name,
+                               OBD_LLOG_FL_CREATE);
                 if (rc == 0) {
                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
                                          NULL);
-                                                                                                                             
+
                         rc = llog_destroy(mds->mds_cfg_llh);
                         llog_free_handle(mds->mds_cfg_llh);
                 }
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-                                                                                                                             
+
                 mds->mds_cfg_llh = NULL;
                 RETURN(rc);
         }
@@ -378,6 +396,8 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 if (!mds->mds_cfg_llh)
                         RETURN(-EBADF);
 
+                /* XXX - this probably should be a parameter to this ioctl.
+                 * For now, just use llh_max_transno for expediency. */
                 rec.lrh_len = llog_data_len(data->ioc_plen1);
 
                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
@@ -411,7 +431,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 struct llog_ctxt *ctxt =
                         llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT);
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-                rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
+                rc = class_config_process_llog(ctxt, data->ioc_inlbuf1, NULL);
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 if (rc)
                         RETURN(rc);
@@ -425,8 +445,6 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-                if (rc)
-                        RETURN(rc);
 
                 RETURN(rc);
         }
@@ -608,6 +626,107 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, int active)
         RETURN(rc);
 }
 
+int mds_set_info(struct obd_export *exp, obd_count keylen,
+                 void *key, obd_count vallen, void *val)
+{
+        struct obd_device *obd = class_exp2obd(exp);
+        struct mds_obd *mds = &obd->u.mds;
+        ENTRY;
+
+#define KEY_IS(str) \
+        (keylen == strlen(str) && memcmp(key, str, keylen) == 0)
+
+        if (KEY_IS("next_id")) {
+                obd_id *id = (obd_id *)val;
+                int idx, rc;
+
+                /* XXX - this really should be vallen != (2 * sizeof(*id)) *
+                 * Just following the precedent set by lov_set_info.       */
+                if (vallen != 2)
+                        RETURN(-EINVAL);
+
+                if ((idx = *id) != *id)
+                        RETURN(-EINVAL);
+
+                CDEBUG(D_CONFIG, "idx: %d id: %llu\n", idx, *(id + 1));
+
+                /* The size of the LOV target table may have increased. */
+                if (idx >= mds->mds_lov_desc.ld_tgt_count) {
+                        obd_id *ids;
+                        int oldsize, size;
+
+                        oldsize = mds->mds_lov_desc.ld_tgt_count * sizeof(*ids);
+                        rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
+                        if (rc)
+                                RETURN(rc);
+                        if (idx >= mds->mds_lov_desc.ld_tgt_count)
+                                RETURN(-EINVAL);
+                        size = mds->mds_lov_desc.ld_tgt_count * sizeof(*ids);
+                        OBD_ALLOC(ids, size);
+                        if (ids == NULL)
+                                RETURN(-ENOMEM);
+                        memset(ids, 0, size);
+                        if (mds->mds_lov_objids != NULL) {
+                                memcpy(ids, mds->mds_lov_objids, oldsize);
+                                OBD_FREE(mds->mds_lov_objids, oldsize);
+                        }
+                        mds->mds_lov_objids = ids;
+                }
+
+                mds->mds_lov_objids[idx] = *++id;
+                CDEBUG(D_CONFIG, "objid: %d: %lld\n", idx, *id);
+                /* XXX - should we be writing this out here ? */
+                RETURN(mds_lov_write_objids(obd));
+        }
+
+        RETURN(-EINVAL);
+}
+
+int mds_lov_update_config(struct obd_device *obd, int clean)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct lvfs_run_ctxt saved;
+        struct config_llog_instance cfg;
+        struct llog_ctxt *ctxt;
+        char *profile = mds->mds_profile, *name;
+        int rc, version, namelen;
+        ENTRY;
+
+        if (profile == NULL)
+                RETURN(0);
+
+        cfg.cfg_instance = NULL;
+        cfg.cfg_uuid = mds->mds_lov_uuid;
+
+        namelen = strlen(profile) + 20; /* -clean-######### */
+        OBD_ALLOC(name, namelen);
+        if (name == NULL)
+                RETURN(-ENOMEM);
+
+        if (clean) {
+                version = mds->mds_config_version - 1;
+                sprintf(name, "%s-clean-%d", profile, version);
+        } else {
+                version = mds->mds_config_version + 1;
+                sprintf(name, "%s-%d", profile, version);
+        }
+
+        CWARN("Applying configuration log %s\n", name);
+
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        ctxt = llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT);
+        rc = class_config_process_llog(ctxt, name, &cfg);
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        if (rc == 0) {
+                mds->mds_config_version = version;
+                rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
+        }
+        CWARN("Finished applying configuration log %s: %d\n", name, rc);
+
+        OBD_FREE(name, namelen);
+        RETURN(rc);
+}
+
 /* Convert the on-disk LOV EA structre.
  * We always try to convert from an old LOV EA format to the common in-memory
  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
@@ -659,3 +778,86 @@ conv_free:
 conv_end:
         return rc;
 }
+
+/* Must be called with i_sem held */
+int mds_revalidate_lov_ea(struct obd_device *obd, struct inode *inode,
+                          struct lustre_msg *msg, int offset)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct obd_export *osc_exp = mds->mds_osc_exp;
+        struct lov_mds_md *lmm= NULL;
+        struct lov_stripe_md *lsm = NULL;
+        struct obdo *oa;
+        struct obd_trans_info oti = {0};
+        unsigned valid = 0;
+        int lmm_size = 0, lsm_size = 0, err, rc;
+        void *handle;
+        ENTRY;
+
+        LASSERT(down_trylock(&inode->i_sem) != 0);
+
+        /* XXX - add way to know if EA is already up to date & return
+         * without doing anything. Easy to do since we get notified of
+         * LOV updates. */
+
+        lmm = lustre_msg_buf(msg, offset, 0);
+        if (lmm == NULL) {
+                CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
+                       inode->i_ino);
+                RETURN(0);
+        }
+        lmm_size = msg->buflens[offset];
+
+        rc = obd_unpackmd(osc_exp, &lsm, lmm, lmm_size);
+        if (rc < 0)
+                RETURN(0);
+
+        lsm_size = rc;
+
+        LASSERT(lsm->lsm_magic == LOV_MAGIC);
+
+        oa = obdo_alloc();
+        if (oa == NULL)
+                GOTO(out_lsm, rc = -ENOMEM);
+        oa->o_mode = S_IFREG | 0600;
+        oa->o_id = inode->i_ino;
+        oa->o_generation = inode->i_generation;
+        oa->o_uid = 0; /* must have 0 uid / gid on OST */
+        oa->o_gid = 0;
+
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLTYPE |
+                      OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID;
+        valid = OBD_MD_FLTYPE | OBD_MD_FLATIME | OBD_MD_FLMTIME |
+                OBD_MD_FLCTIME;
+        obdo_from_inode(oa, inode, valid);
+
+        rc = obd_revalidate_md(osc_exp, oa, lsm, &oti);
+        if (rc == 0)
+                GOTO(out_oa, rc);
+        if (rc < 0) {
+                CERROR("Error validating LOV EA on %lu/%u: %d\n",
+                       inode->i_ino, inode->i_generation, rc);
+                GOTO(out_oa, rc);
+        }
+
+        rc = obd_packmd(osc_exp, &lmm, lsm);
+        if (rc < 0)
+                GOTO(out_oa, rc);
+        lmm_size = rc;
+
+        handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
+        if (IS_ERR(handle)) {
+                rc = PTR_ERR(handle);
+                GOTO(out_oa, rc);
+        }
+
+        rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size);
+        err = fsfilt_commit(obd, inode->i_sb, inode, handle, 0);
+        if (!rc)
+                rc = err;
+out_oa:
+        obdo_free(oa);
+out_lsm:
+        obd_free_memmd(osc_exp, &lsm);
+        RETURN(rc);
+}
index 14d740b..3112bc0 100644 (file)
@@ -652,8 +652,20 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
                         up(&dchild->d_inode->i_sem);
                         RETURN(rc);
                 }
+                if (S_ISREG(dchild->d_inode->i_mode) &&
+                    (body->valid & OBD_MD_FLEASIZE)) {
+                        rc = mds_revalidate_lov_ea(obd, dchild->d_inode,
+                                                   req->rq_repmsg, 2);
+                        if (!rc)
+                                rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
+                                                 dchild->d_inode, 0);
+                        if (rc) {
+                                up(&dchild->d_inode->i_sem);
+                                RETURN(rc);
+                        }
+                }
         }
-        /* If the inode has EA data, then OSTs hold size, mtime */
+        /* If the inode has no EA data, then MDSs hold size, mtime */
         if (S_ISREG(dchild->d_inode->i_mode) &&
             !(body->valid & OBD_MD_FLEASIZE)) {
                 body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
index e244759..7ca260f 100644 (file)
@@ -433,12 +433,8 @@ EXPORT_SYMBOL(class_handle2object);
 EXPORT_SYMBOL(class_get_profile);
 EXPORT_SYMBOL(class_del_profile);
 EXPORT_SYMBOL(class_process_config);
-EXPORT_SYMBOL(class_config_parse_llog);
+EXPORT_SYMBOL(class_config_process_llog);
 EXPORT_SYMBOL(class_config_dump_llog);
-EXPORT_SYMBOL(class_attach);
-EXPORT_SYMBOL(class_setup);
-EXPORT_SYMBOL(class_cleanup);
-EXPORT_SYMBOL(class_detach);
 
 /* mea.c */
 EXPORT_SYMBOL(mea_name2idx);
index 52091d4..b3f43ba 100644 (file)
@@ -438,6 +438,11 @@ void __class_export_put(struct obd_export *exp)
                 if (obd->obd_set_up) {
                         atomic_dec(&obd->obd_refcount);
                         wake_up(&obd->obd_refcount_waitq);
+                } else {
+                        CERROR("removing export %p from obd %s (%p) -- OBD "
+                               "not set up (refcount = %d)\n", exp,
+                               obd->obd_name, obd,
+                               atomic_read(&obd->obd_refcount));
                 }
         }
 }
index 621b94f..d036c2d 100644 (file)
@@ -252,12 +252,12 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
                 err = str2logid(&logid, data->ioc_inlbuf1, data->ioc_inllen1);
                 if (err)
                         GOTO(out, err);
-                err = llog_create(ctxt, &handle, &logid, NULL);
+                err = llog_open(ctxt, &handle, &logid, NULL, 0);
                 if (err)
                         GOTO(out, err);
         } else if (*data->ioc_inlbuf1 == '$') {
                 char *name = data->ioc_inlbuf1 + 1;
-                err = llog_create(ctxt, &handle, NULL, name);
+                err = llog_open(ctxt, &handle, NULL, name, 0);
                 if (err)
                         GOTO(out, err);
         } else {
index 80894ce..b63d55c 100644 (file)
@@ -21,6 +21,7 @@
 
 #include <linux/obd_class.h>
 #include <linux/lustre_log.h>
+#include <linux/lustre_mds.h>
 #include <portals/list.h>
 
 /* helper functions for calling the llog obd methods */
@@ -78,7 +79,8 @@ int obd_llog_cleanup(struct llog_ctxt *ctxt)
         int rc = 0;
         ENTRY;
 
-        LASSERT(ctxt);
+        if (ctxt == NULL)
+                RETURN(0);
 
         if (CTXTP(ctxt, cleanup))  
                 rc = CTXTP(ctxt, cleanup)(ctxt);
@@ -162,10 +164,10 @@ int llog_obd_origin_setup(struct obd_device *obd, struct obd_llogs *llogs,
         LASSERT(ctxt);
         llog_gen_init(ctxt);
 
-        if (logid->lgl_oid)
-                rc = llog_create(ctxt, &handle, logid, NULL);
-        else {
-                rc = llog_create(ctxt, &handle, NULL, NULL);
+        if (logid->lgl_oid) {
+                rc = llog_open(ctxt, &handle, logid, NULL, 0);
+        else {
+                rc = llog_open(ctxt, &handle, NULL, NULL, 0);
                 if (!rc)
                         *logid = handle->lgh_id;
         }
index f7d8b5f..994208b 100644 (file)
@@ -90,9 +90,9 @@ static int llog_test_1(struct obd_device *obd, char *name)
         ctxt = llog_get_context(&obd->obd_llogs, LLOG_TEST_ORIG_CTXT);
         LASSERT(ctxt);
 
-        rc = llog_create(ctxt, &llh, NULL, name);
+        rc = llog_open(ctxt, &llh, NULL, name, OBD_LLOG_FL_CREATE);
         if (rc) {
-                CERROR("1a: llog_create with name %s failed: %d\n", name, rc);
+                CERROR("1a: llog_open with name %s failed: %d\n", name, rc);
                 RETURN(rc);
         }
         llog_init_handle(llh, LLOG_F_IS_PLAIN, &uuid);
@@ -123,7 +123,7 @@ static int llog_test_2(struct obd_device *obd, char *name,
 
         ctxt = llog_get_context(&obd->obd_llogs, LLOG_TEST_ORIG_CTXT);
         CWARN("2a: re-open a log with name: %s\n", name);
-        rc = llog_create(ctxt, llh, NULL, name);
+        rc = llog_open(ctxt, llh, NULL, name, 0);
         if (rc) {
                 CERROR("2a: re-open log with name %s failed: %d\n", name, rc);
                 RETURN(rc);
@@ -134,7 +134,7 @@ static int llog_test_2(struct obd_device *obd, char *name,
                 RETURN(rc);
 
         CWARN("2b: create a log without specified NAME & LOGID\n");
-        rc = llog_create(ctxt, &loghandle, NULL, NULL);
+        rc = llog_open(ctxt, &loghandle, NULL, NULL, OBD_LLOG_FL_CREATE);
         if (rc) {
                 CERROR("2b: create log failed\n");
                 RETURN(rc);
@@ -144,7 +144,7 @@ static int llog_test_2(struct obd_device *obd, char *name,
         llog_close(loghandle);
 
         CWARN("2b: re-open the log by LOGID\n");
-        rc = llog_create(ctxt, &loghandle, &logid, NULL);
+        rc = llog_open(ctxt, &loghandle, &logid, NULL, 0);
         if (rc) {
                 CERROR("2b: re-open log by LOGID failed\n");
                 RETURN(rc);
@@ -244,9 +244,9 @@ static int llog_test_4(struct obd_device *obd)
 
         sprintf(name, "%x", llog_test_rand+1);
         CWARN("4a: create a catalog log with name: %s\n", name);
-        rc = llog_create(ctxt, &cath, NULL, name);
+        rc = llog_open(ctxt, &cath, NULL, name, OBD_LLOG_FL_CREATE);
         if (rc) {
-                CERROR("1a: llog_create with name %s failed: %d\n", name, rc);
+                CERROR("1a: llog_open with name %s failed: %d\n", name, rc);
                 GOTO(out, rc);
         }
         llog_init_handle(cath, LLOG_F_IS_CAT, &uuid);
@@ -381,9 +381,9 @@ static int llog_test_5(struct obd_device *obd)
         lmr.lmr_hdr.lrh_type = 0xf00f00;
 
         CWARN("5a: re-open catalog by id\n");
-        rc = llog_create(ctxt, &llh, &cat_logid, NULL);
+        rc = llog_open(ctxt, &llh, &cat_logid, NULL, 0);
         if (rc) {
-                CERROR("5a: llog_create with logid failed: %d\n", rc);
+                CERROR("5a: llog_open with logid failed: %d\n", rc);
                 GOTO(out, rc);
         }
         llog_init_handle(llh, LLOG_F_IS_CAT, &uuid);
@@ -473,9 +473,9 @@ static int llog_test_6(struct obd_device *obd, char *name)
         exp = class_conn2export(&exph);
 
         nctxt = llog_get_context(&mdc_obd->obd_llogs, LLOG_CONFIG_REPL_CTXT);
-        rc = llog_create(nctxt, &llh, NULL, name);
+        rc = llog_open(nctxt, &llh, NULL, name, 0);
         if (rc) {
-                CERROR("6: llog_create failed %d\n", rc);
+                CERROR("6: llog_open failed %d\n", rc);
                 RETURN(rc);
         }
 
@@ -518,9 +518,9 @@ static int llog_test_7(struct obd_device *obd)
         CWARN("7: create a log with name: %s\n", name);
         LASSERT(ctxt);
 
-        rc = llog_create(ctxt, &llh, NULL, name);
+        rc = llog_open(ctxt, &llh, NULL, name, OBD_LLOG_FL_CREATE);
         if (rc) {
-                CERROR("7: llog_create with name %s failed: %d\n", name, rc);
+                CERROR("7: llog_open with name %s failed: %d\n", name, rc);
                 RETURN(rc);
         }
         llog_init_handle(llh, LLOG_F_IS_PLAIN, &uuid);
index a5693d2..7879648 100644 (file)
@@ -611,12 +611,14 @@ int lprocfs_alloc_obd_stats(struct obd_device *obd, unsigned num_private_stats)
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, setup);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, precleanup);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, cleanup);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, process_config);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, postrecov);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, connect);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, disconnect);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, statfs);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, packmd);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, unpackmd);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, revalidate_md);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, preallocate);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, create);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, destroy);
index 2390d47..d686ab3 100644 (file)
@@ -43,7 +43,7 @@
 /* Create a new device and set the type, name and uuid.  If
  * successful, the new device can be accessed by either name or uuid.
  */
-int class_attach(struct lustre_cfg *lcfg)
+static int class_attach(struct lustre_cfg *lcfg)
 {
         struct obd_type *type;
         struct obd_device *obd;
@@ -103,7 +103,11 @@ int class_attach(struct lustre_cfg *lcfg)
                 GOTO(out, rc = -EINVAL);
 
         /* have we attached a type to this device */
-        if (obd->obd_attached || obd->obd_type) {
+        if (obd->obd_attached) {
+                CERROR("OBD: Device %d already attached.\n", obd->obd_minor);
+                GOTO(out, rc = -EBUSY);
+        }
+        if (obd->obd_type != NULL) {
                 CERROR("OBD: Device %d already typed as %s.\n",
                        obd->obd_minor, MKSTR(obd->obd_type->typ_name));
                 GOTO(out, rc = -EBUSY);
@@ -165,13 +169,13 @@ int class_attach(struct lustre_cfg *lcfg)
         case 2:
                 OBD_FREE(obd->obd_name, strlen(obd->obd_name) + 1);
         case 1:
-                class_put_type(obd->obd_type);
+                class_put_type(type);
                 obd->obd_type = NULL;
         }
         return rc;
 }
 
-int class_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
+static int class_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
         int err = 0;
         struct obd_export *exp;
@@ -217,7 +221,7 @@ err_exp:
         RETURN(err);
 }
 
-int class_detach(struct obd_device *obd, struct lustre_cfg *lcfg)
+static int class_detach(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
         int minor;
         int err = 0;
@@ -275,7 +279,7 @@ static void dump_exports(struct obd_device *obd)
         }
 }
 
-int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg)
+static int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
         int flags = 0;
         int err = 0;
@@ -356,10 +360,12 @@ out:
                 obd->obd_set_up = obd->obd_stopping = 0;
                 obd->obd_type->typ_refcnt--;
                 /* XXX this should be an LASSERT */
-                if (atomic_read(&obd->obd_refcount) > 0)
-                        CERROR("%s still has refcount %d after "
-                               "cleanup.\n", obd->obd_name,
+                if (atomic_read(&obd->obd_refcount) > 0) {
+                        CERROR("%s (%p) still has refcount %d after "
+                               "cleanup.\n", obd->obd_name, obd,
                                atomic_read(&obd->obd_refcount));
+                        dump_exports(obd);
+                }
         }
 
         RETURN(err);
@@ -437,6 +443,7 @@ int class_process_config(struct lustre_cfg *lcfg)
         struct obd_device *obd;
         char str[PTL_NALFMT_SIZE];
         int err;
+        ENTRY;
 
         LASSERT(lcfg && !IS_ERR(lcfg));
 
@@ -527,8 +534,8 @@ int class_process_config(struct lustre_cfg *lcfg)
                 GOTO(out, err = 0);
         }
         default: {
-                CERROR("Unknown command: %d\n", lcfg->lcfg_command);
-                GOTO(out, err = -EINVAL);
+                err = obd_process_config(obd, sizeof(*lcfg), lcfg);
+                GOTO(out, err);
 
         }
         }
@@ -536,14 +543,15 @@ out:
         RETURN(err);
 }
 
-static int class_config_llog_handler(struct llog_handle * handle,
-                                     struct llog_rec_hdr *rec, void *data)
+static int class_config_parse_handler(struct llog_handle * handle,
+                                      struct llog_rec_hdr *rec, void *data)
 {
         struct config_llog_instance *cfg = data;
         int cfg_len = rec->lrh_len;
         char *cfg_buf = (char*) (rec + 1);
         int rc = 0;
         ENTRY;
+
         if (rec->lrh_type == OBD_CFG_REC) {
                 char *buf;
                 struct lustre_cfg *lcfg;
@@ -597,40 +605,39 @@ static int class_config_llog_handler(struct llog_handle * handle,
                 lustre_cfg_freedata(buf, cfg_len);
         } else if (rec->lrh_type == PTL_CFG_REC) {
                 struct portals_cfg *pcfg = (struct portals_cfg *)cfg_buf;
-                if (pcfg->pcfg_command ==NAL_CMD_REGISTER_MYNID &&
+                if (pcfg->pcfg_command == NAL_CMD_REGISTER_MYNID &&
                     cfg->cfg_local_nid != PTL_NID_ANY) {
                         pcfg->pcfg_nid = cfg->cfg_local_nid;
                 }
 
                 rc = libcfs_nal_cmd(pcfg);
+        } else {
+                CERROR("unrecognized record type: 0x%x\n", rec->lrh_type);
         }
 out:
         RETURN(rc);
 }
 
-int class_config_parse_llog(struct llog_ctxt *ctxt, char *name,
-                            struct config_llog_instance *cfg)
+int class_config_process_llog(struct llog_ctxt *ctxt, char *name,
+                              struct config_llog_instance *cfg)
 {
         struct llog_handle *llh;
         int rc, rc2;
         ENTRY;
 
-        rc = llog_create(ctxt, &llh, NULL, name);
+        rc = llog_open(ctxt, &llh, NULL, name, 0);
         if (rc)
                 RETURN(rc);
 
         rc = llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
-        if (rc)
-                GOTO(parse_out, rc);
+        if (rc == 0)
+                rc = llog_process(llh, class_config_parse_handler, cfg, NULL);
 
-        rc = llog_process(llh, class_config_llog_handler, cfg, NULL);
-parse_out:
         rc2 = llog_close(llh);
         if (rc == 0)
                 rc = rc2;
 
         RETURN(rc);
-
 }
 
 static int class_config_dump_handler(struct llog_handle * handle,
@@ -640,6 +647,7 @@ static int class_config_dump_handler(struct llog_handle * handle,
         char *cfg_buf = (char*) (rec + 1);
         int rc = 0;
         ENTRY;
+
         if (rec->lrh_type == OBD_CFG_REC) {
                 char *buf;
                 struct lustre_cfg *lcfg;
@@ -649,12 +657,12 @@ static int class_config_dump_handler(struct llog_handle * handle,
                         GOTO(out, rc);
                 lcfg = (struct lustre_cfg* ) buf;
 
-                CDEBUG(D_INFO, "lcfg command: %x\n", lcfg->lcfg_command);
+                CDEBUG(D_INFO, "lcfg command: 0x%x\n", lcfg->lcfg_command);
                 if (lcfg->lcfg_dev_name)
                         CDEBUG(D_INFO, "     devname: %s\n",
                                lcfg->lcfg_dev_name);
                 if (lcfg->lcfg_flags)
-                        CDEBUG(D_INFO, "       flags: %x\n", lcfg->lcfg_flags);
+                        CDEBUG(D_INFO, "       flags: 0x%x\n", lcfg->lcfg_flags);
                 if (lcfg->lcfg_nid)
                         CDEBUG(D_INFO, "         nid: "LPX64"\n",
                                lcfg->lcfg_nid);
@@ -665,7 +673,7 @@ static int class_config_dump_handler(struct llog_handle * handle,
                 if (lcfg->lcfg_inlbuf1)
                         CDEBUG(D_INFO, "     inlbuf1: %s\n",lcfg->lcfg_inlbuf1);
                 if (lcfg->lcfg_inlbuf2)
-                        CDEBUG(D_INFO, "     inlbuf3: %s\n",lcfg->lcfg_inlbuf2);
+                        CDEBUG(D_INFO, "     inlbuf2: %s\n",lcfg->lcfg_inlbuf2);
                 if (lcfg->lcfg_inlbuf3)
                         CDEBUG(D_INFO, "     inlbuf3: %s\n",lcfg->lcfg_inlbuf3);
                 if (lcfg->lcfg_inlbuf4)
@@ -679,7 +687,7 @@ static int class_config_dump_handler(struct llog_handle * handle,
         } else if (rec->lrh_type == PTL_CFG_REC) {
                 struct portals_cfg *pcfg = (struct portals_cfg *)cfg_buf;
 
-                CDEBUG(D_INFO, "pcfg command: %d\n", pcfg->pcfg_command);
+                CDEBUG(D_INFO, "pcfg command: 0x%x\n", pcfg->pcfg_command);
                 if (pcfg->pcfg_nal)
                         CDEBUG(D_INFO, "         nal: %d\n",
                                pcfg->pcfg_nal);
@@ -690,19 +698,19 @@ static int class_config_dump_handler(struct llog_handle * handle,
                         CDEBUG(D_INFO, "         nid: "LPX64"\n",
                                pcfg->pcfg_nid);
                 if (pcfg->pcfg_nid2)
-                        CDEBUG(D_INFO, "         nid: "LPX64"\n",
+                        CDEBUG(D_INFO, "        nid2: "LPX64"\n",
                                pcfg->pcfg_nid2);
                 if (pcfg->pcfg_nid3)
-                        CDEBUG(D_INFO, "         nid: "LPX64"\n",
+                        CDEBUG(D_INFO, "        nid3: "LPX64"\n",
                                pcfg->pcfg_nid3);
                 if (pcfg->pcfg_misc)
-                        CDEBUG(D_INFO, "         nid: %d\n",
+                        CDEBUG(D_INFO, "        misc: %d\n",
                                pcfg->pcfg_misc);
                 if (pcfg->pcfg_id)
-                        CDEBUG(D_INFO, "          id: %x\n",
+                        CDEBUG(D_INFO, "          id: 0x%x\n",
                                pcfg->pcfg_id);
                 if (pcfg->pcfg_flags)
-                        CDEBUG(D_INFO, "       flags: %x\n",
+                        CDEBUG(D_INFO, "       flags: 0x%x\n",
                                pcfg->pcfg_flags);
         } else {
                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
@@ -719,20 +727,17 @@ int class_config_dump_llog(struct llog_ctxt *ctxt, char *name,
         int rc, rc2;
         ENTRY;
 
-        rc = llog_create(ctxt, &llh, NULL, name);
+        rc = llog_open(ctxt, &llh, NULL, name, 0);
         if (rc)
                 RETURN(rc);
 
         rc = llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
-        if (rc)
-                GOTO(parse_out, rc);
+        if (rc == 0)
+                rc = llog_process(llh, class_config_dump_handler, cfg, NULL);
 
-        rc = llog_process(llh, class_config_dump_handler, cfg, NULL);
-parse_out:
         rc2 = llog_close(llh);
         if (rc == 0)
                 rc = rc2;
 
         RETURN(rc);
-
 }
index 6010c07..67e4393 100644 (file)
@@ -2522,13 +2522,15 @@ static int osc_cancel_unused(struct obd_export *exp,
                              struct lov_stripe_md *lsm, int flags, void *opaque)
 {
         struct obd_device *obd = class_exp2obd(exp);
-        struct ldlm_res_id res_id = { .name = {0} };
+        struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
 
-        res_id.name[0] = lsm->lsm_object_id;
-        res_id.name[2] = lsm->lsm_object_gr;
+        if (lsm != NULL) {
+                res_id.name[0] = lsm->lsm_object_id;
+                res_id.name[2] = lsm->lsm_object_gr;
+                resp = &res_id;
+        }
 
-        return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
-                                      opaque);
+        return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
 }
 
 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
@@ -2658,6 +2660,11 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                         GOTO(out, err = -EINVAL);
                 }
 
+                if (data->ioc_inllen3 < sizeof(__u32)) {
+                        OBD_FREE(buf, len);
+                        GOTO(out, err = -EINVAL);
+                }
+
                 desc = (struct lov_desc *)data->ioc_inlbuf1;
                 desc->ld_tgt_count = 1;
                 desc->ld_active_tgt_count = 1;
@@ -2666,8 +2673,8 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 desc->ld_default_stripe_offset = 0;
                 desc->ld_pattern = 0;
                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
-
                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
+                *((__u32 *)data->ioc_inlbuf3) = 1;
 
                 err = copy_to_user((void *)uarg, buf, len);
                 if (err)
@@ -2983,7 +2990,7 @@ static int osc_detach(struct obd_device *dev)
         return lprocfs_obd_detach(dev);
 }
 
-int osc_setup(struct obd_device *obd, obd_count len, void *buf)
+static int osc_setup(struct obd_device *obd, obd_count len, void *buf)
 {
         int rc;
         ENTRY;
@@ -3000,10 +3007,15 @@ int osc_setup(struct obd_device *obd, obd_count len, void *buf)
         RETURN(rc);
 }
 
-int osc_cleanup(struct obd_device *obd, int flags)
+static int osc_cleanup(struct obd_device *obd, int flags)
 {
         int rc;
 
+        rc = ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
+                                    LDLM_FL_CONFIG_CHANGE, NULL);
+        if (rc)
+                RETURN(rc);
+
         rc = client_obd_cleanup(obd, flags);
         ptlrpcd_decref();
         RETURN(rc);
index a205163..dfea810 100644 (file)
@@ -111,6 +111,7 @@ extern unsigned int portal_cerror;
 #define D_RPCTRACE    0x00100000 /* for distributed debugging */
 #define D_VFSTRACE    0x00200000
 #define D_READA       0x00400000 /* read-ahead */
+#define D_CONFIG      0x00800000
 
 #ifdef __KERNEL__
 # include <linux/sched.h> /* THREAD_SIZE */
index 66c3807..67d8edc 100644 (file)
@@ -72,7 +72,7 @@ static const char *portal_debug_masks[] =
         {"trace", "inode", "super", "ext2", "malloc", "cache", "info", "ioctl",
          "blocks", "net", "warning", "buffs", "other", "dentry", "portals",
          "page", "dlmtrace", "error", "emerg", "ha", "rpctrace", "vfstrace",
-         "reada", NULL};
+         "reada", "config", NULL};
 
 struct debug_daemon_cmd {
         char *cmd;
index 99b2264..045896f 100644 (file)
@@ -180,7 +180,7 @@ void ptlrpc_invalidate_import(struct obd_import *imp, int in_rpc)
         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
 }
 
-static void ptlrpc_activate_import(struct obd_import *imp)
+void ptlrpc_activate_import(struct obd_import *imp)
 {
         struct obd_device *obd = imp->imp_obd;
         unsigned long flags;
index 4fcaf37..b562c0c 100644 (file)
@@ -42,8 +42,8 @@
 
 /* This is a callback from the llog_* functions.
  * Assumes caller has already pushed us into the kernel context. */
-static int llog_client_create(struct llog_ctxt *ctxt, struct llog_handle **res,
-                            struct llog_logid *logid, char *name)
+static int llog_client_open(struct llog_ctxt *ctxt, struct llog_handle **res,
+                            struct llog_logid *logid, char *name, int flags)
 {
         struct obd_import *imp;
         struct llogd_body req_body;
@@ -52,9 +52,7 @@ static int llog_client_create(struct llog_ctxt *ctxt, struct llog_handle **res,
         struct ptlrpc_request *req = NULL;
         int size[2] = {sizeof(req_body)};
         char *tmp[2] = {(char*) &req_body};
-        int bufcount = 1;
-        int repsize[] = {sizeof (req_body)};
-        int rc;
+        int bufcount = 1, repsize[] = {sizeof (req_body)}, rc;
         ENTRY;
 
         LASSERT(ctxt->loc_imp);
@@ -69,6 +67,7 @@ static int llog_client_create(struct llog_ctxt *ctxt, struct llog_handle **res,
         if (logid)
                 req_body.lgd_logid = *logid;
         req_body.lgd_ctxt_idx = ctxt->loc_idx - 1;
+        req_body.lgd_llh_flags = flags;
 
         if (name) {
                 size[bufcount] = strlen(name) + 1;
@@ -76,7 +75,7 @@ static int llog_client_create(struct llog_ctxt *ctxt, struct llog_handle **res,
                 bufcount++;
         }
 
-        req = ptlrpc_prep_req(imp, LLOG_ORIGIN_HANDLE_CREATE,bufcount,size,tmp);
+        req = ptlrpc_prep_req(imp, LLOG_ORIGIN_HANDLE_OPEN, bufcount, size,tmp);
         if (!req)
                 GOTO(err_free, rc = -ENOMEM);
 
@@ -279,6 +278,6 @@ struct llog_operations llog_client_ops = {
         lop_prev_block:  llog_client_prev_block,
         lop_next_block:  llog_client_next_block,
         lop_read_header: llog_client_read_header,
-        lop_create:      llog_client_create,
+        lop_open:        llog_client_open,
         lop_close:       llog_client_close,
 };
index 86f9c30..8d40bc9 100644 (file)
@@ -43,7 +43,7 @@
 
 #ifdef __KERNEL__
 
-int llog_origin_handle_create(struct ptlrpc_request *req)
+int llog_origin_handle_open(struct ptlrpc_request *req)
 {
         struct obd_export *exp = req->rq_export;
         struct obd_device *obd = exp->exp_obd;
@@ -52,13 +52,12 @@ int llog_origin_handle_create(struct ptlrpc_request *req)
         struct lvfs_run_ctxt saved;
         struct llog_logid *logid = NULL;
         struct llog_ctxt *ctxt;
-        char * name = NULL;
-        int size = sizeof (*body);
-        int rc, rc2;
+        char *name = NULL;
+        int rc, rc2, size = sizeof (*body);
         ENTRY;
 
         body = lustre_swab_reqbuf(req, 0, sizeof(*body),
-                                 lustre_swab_llogd_body);
+                                  lustre_swab_llogd_body);
         if (body == NULL) {
                 CERROR ("Can't unpack llogd_body\n");
                 GOTO(out, rc =-EFAULT);
@@ -79,7 +78,7 @@ int llog_origin_handle_create(struct ptlrpc_request *req)
         LASSERT(ctxt != NULL);
         push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
 
-        rc = llog_create(ctxt, &loghandle, logid, name);
+        rc = llog_open(ctxt, &loghandle, logid, name, body->lgd_llh_flags);
         if (rc)
                 GOTO(out_pop, rc);
 
@@ -87,7 +86,7 @@ int llog_origin_handle_create(struct ptlrpc_request *req)
         if (rc)
                 GOTO(out_close, rc = -ENOMEM);
 
-        body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
+        body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
         body->lgd_logid = loghandle->lgh_id;
 
 out_close:
@@ -129,7 +128,7 @@ int llog_origin_handle_prev_block(struct ptlrpc_request *req)
         LASSERT(ctxt != NULL);
         push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
 
-        rc = llog_create(ctxt, &loghandle, &body->lgd_logid, NULL);
+        rc = llog_open(ctxt, &loghandle, &body->lgd_logid, NULL, 0);
         if (rc)
                 GOTO(out_pop, rc);
 
@@ -197,7 +196,7 @@ int llog_origin_handle_next_block(struct ptlrpc_request *req)
         LASSERT(ctxt != NULL);
         push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
 
-        rc = llog_create(ctxt, &loghandle, &body->lgd_logid, NULL);
+        rc = llog_open(ctxt, &loghandle, &body->lgd_logid, NULL, 0);
         if (rc)
                 GOTO(out_pop, rc);
 
@@ -261,7 +260,7 @@ int llog_origin_handle_read_header(struct ptlrpc_request *req)
         LASSERT(ctxt != NULL);
         push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
 
-        rc = llog_create(ctxt, &loghandle, &body->lgd_logid, NULL);
+        rc = llog_open(ctxt, &loghandle, &body->lgd_logid, NULL, 0);
         if (rc)
                 GOTO(out_pop, rc);
 
@@ -382,7 +381,7 @@ static int llog_catinfo_config(struct obd_device *obd, char *buf, int buf_len,
 
         for (i = 0; i < 4; i++) {
                 int index, uncanceled = 0;
-                rc = llog_create(ctxt, &handle, NULL, name[i]);
+                rc = llog_open(ctxt, &handle, NULL, name[i], 0);
                 if (rc)
                         GOTO(out_pop, rc);
                 rc = llog_init_handle(handle, 0, NULL);
@@ -444,7 +443,7 @@ static int llog_catinfo_cb(struct llog_handle *cat,
 
         lir = (struct llog_logid_rec *)rec;
         logid = &lir->lid_id;
-        rc = llog_create(ctxt, &handle, logid, NULL);
+        rc = llog_open(ctxt, &handle, logid, NULL, 0);
         if (rc)
                 RETURN(-EINVAL);
         rc = llog_init_handle(handle, 0, NULL);
@@ -515,7 +514,7 @@ static int llog_catinfo_deletions(struct obd_device *obd, char *buf,
                 int l, index, uncanceled = 0;
 
                 id = &idarray[i].lci_logid;
-                rc = llog_create(ctxt, &handle, id, NULL);
+                rc = llog_open(ctxt, &handle, id, NULL, 0);
                 if (rc)
                         GOTO(out_pop, rc);
                 rc = llog_init_handle(handle, 0, NULL);
@@ -589,7 +588,7 @@ out_free:
 }
 
 #else /* !__KERNEL__ */
-int llog_origin_handle_create(struct ptlrpc_request *req)
+int llog_origin_handle_open(struct ptlrpc_request *req)
 {
         LBUG();
         return 0;
index a2a1633..56fb946 100644 (file)
@@ -2229,10 +2229,6 @@ void lustre_assert_wire_constants(void)
                  (long long)offsetof(struct llog_log_hdr, llh_tgtuuid));
         LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_tgtuuid) == 40, " found %lld\n",
                  (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_tgtuuid));
-        LASSERTF(offsetof(struct llog_log_hdr, llh_reserved) == 84, " found %lld\n",
-                 (long long)offsetof(struct llog_log_hdr, llh_reserved));
-        LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_reserved) == 4, " found %lld\n",
-                 (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_reserved));
         LASSERTF(offsetof(struct llog_log_hdr, llh_bitmap) == 88, " found %lld\n",
                  (long long)offsetof(struct llog_log_hdr, llh_bitmap));
         LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_bitmap) == 8096, " found %lld\n",
@@ -2289,8 +2285,8 @@ void lustre_assert_wire_constants(void)
                  (long long)offsetof(struct llogd_body, lgd_cur_offset));
         LASSERTF((int)sizeof(((struct llogd_body *)0)->lgd_cur_offset) == 8, " found %lld\n",
                  (long long)(int)sizeof(((struct llogd_body *)0)->lgd_cur_offset));
-        LASSERTF(LLOG_ORIGIN_HANDLE_CREATE == 501, " found %lld\n",
-                 (long long)LLOG_ORIGIN_HANDLE_CREATE);
+        LASSERTF(LLOG_ORIGIN_HANDLE_OPEN == 501, " found %lld\n",
+                 (long long)LLOG_ORIGIN_HANDLE_OPEN);
         LASSERTF(LLOG_ORIGIN_HANDLE_NEXT_BLOCK == 502, " found %lld\n",
                  (long long)LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
         LASSERTF(LLOG_ORIGIN_HANDLE_READ_HEADER == 503, " found %lld\n",
index 26ae032..1097379 100644 (file)
@@ -125,6 +125,13 @@ EXPORT_SYMBOL(ptlrpc_expired_set);
 EXPORT_SYMBOL(ptlrpc_interrupted_set);
 EXPORT_SYMBOL(ptlrpc_mark_interrupted);
 
+/* import.c */
+EXPORT_SYMBOL(ptlrpc_deactivate_import);
+EXPORT_SYMBOL(ptlrpc_invalidate_import);
+EXPORT_SYMBOL(ptlrpc_activate_import);
+EXPORT_SYMBOL(ptlrpc_fail_import);
+EXPORT_SYMBOL(ptlrpc_disconnect_import);
+
 /* service.c */
 EXPORT_SYMBOL(ptlrpc_save_lock);
 EXPORT_SYMBOL(ptlrpc_schedule_difficult_reply);
@@ -184,13 +191,9 @@ EXPORT_SYMBOL(mdc_rename_pack);
 /* recover.c */
 EXPORT_SYMBOL(ptlrpc_run_recovery_over_upcall);
 EXPORT_SYMBOL(ptlrpc_run_failed_import_upcall);
-EXPORT_SYMBOL(ptlrpc_disconnect_import);
 EXPORT_SYMBOL(ptlrpc_resend);
 EXPORT_SYMBOL(ptlrpc_wake_delayed);
 EXPORT_SYMBOL(ptlrpc_set_import_active);
-EXPORT_SYMBOL(ptlrpc_deactivate_import);
-EXPORT_SYMBOL(ptlrpc_invalidate_import);
-EXPORT_SYMBOL(ptlrpc_fail_import);
 EXPORT_SYMBOL(ptlrpc_fail_export);
 EXPORT_SYMBOL(ptlrpc_recover_import);
 
@@ -210,7 +213,7 @@ EXPORT_SYMBOL(ptlrpc_lprocfs_register_obd);
 EXPORT_SYMBOL(ptlrpc_lprocfs_unregister_obd);
 
 /* llogd.c */
-EXPORT_SYMBOL(llog_origin_handle_create);
+EXPORT_SYMBOL(llog_origin_handle_open);
 EXPORT_SYMBOL(llog_origin_handle_prev_block);
 EXPORT_SYMBOL(llog_origin_handle_next_block);
 EXPORT_SYMBOL(llog_origin_handle_read_header);
index 9534c37..1dc989b 100644 (file)
@@ -514,7 +514,7 @@ static int log_process_thread(void *args)
         SIGNAL_MASK_UNLOCK(current, flags);
         unlock_kernel();
 
-        rc = llog_create(ctxt, &llh, &logid, NULL);
+        rc = llog_open(ctxt, &llh, &logid, NULL, 0);
         if (rc) {
                 CERROR("llog_create failed %d\n", rc);
                 RETURN(rc);
index b17eea6..b74c998 100644 (file)
@@ -326,6 +326,18 @@ add_ost() {
     do_lmc --add ost --node ${facet}_facet --ost ${facet}_svc --fstype $FSTYPE $*
 }
 
+del_ost() {
+    facet=$1
+    shift
+    do_lmc --delete ost --node ${facet}_facet --ost ${facet}_svc $*
+}
+
+deactivate_ost() {
+    facet=$1
+    shift
+    do_lmc --deactivate ost --node ${facet}_facet --ost ${facet}_svc $*
+}
+
 add_ostfailover() {
     facet=$1
     shift
@@ -360,6 +372,9 @@ add_client() {
     do_lmc --add mtpt --node ${facet}_facet $*
 }
 
+config_commit() {
+    do_lmc --commit
+}
 
 ####### 
 # General functions
index 7a21df3..50287d4 100644 (file)
@@ -4,4 +4,4 @@ from lustredb import LustreDB, LustreDB_XML, LustreDB_LDAP
 from error import LconfError, OptionError
 from cmdline import Options
 
-CONFIG_VERSION="2003070801"
+CONFIG_VERSION="2004050202"
index eda5779..d359e28 100644 (file)
@@ -57,6 +57,11 @@ class LustreDB:
         uuids = self._get_all_refs()
         return uuids
 
+    def get_lov_tgts(self, tag):
+        """ Returns list of lovtgts. """
+        tgts = self._get_lov_tgts(tag)
+        return tgts
+
     def nid2server(self, nid, net_type, cluster_id):
         netlist = self.lookup_class('network')
         for net_db in netlist:
@@ -183,6 +188,18 @@ class LustreDB_XML(LustreDB):
             uuids.append(self.xml_get_ref(r))
         return uuids
 
+    def _get_lov_tgts(self, tag):
+        """ Get all the refs of type TAG.  Returns list of lov_tgts. """
+        tgts = []
+        tgtlist = self.dom_node.getElementsByTagName(tag)
+        for tgt in tgtlist:
+            uuidref = tgt.getAttribute('uuidref')
+            index = tgt.getAttribute('index')
+            generation = tgt.getAttribute('generation')
+            active = int(tgt.getAttribute('active'))
+            tgts.append((uuidref, index, generation, active))
+        return tgts
+
     def xmllookup_by_uuid(self, dom_node, uuid):
         for n in dom_node.childNodes:
             if n.nodeType == n.ELEMENT_NODE:
index 787321f..6c2ea4d 100755 (executable)
@@ -23,7 +23,7 @@
 # lconf is the main driver script for starting and stopping
 # lustre filesystem services.
 #
-# Based in part on the XML obdctl modifications done by Brian Behlendorf 
+# Based in part on the XML obdctl modifications done by Brian Behlendorf
 
 import sys, getopt, types
 import string, os, stat, popen2, socket, time, random, fcntl, select
@@ -61,10 +61,10 @@ MAX_LOOP_DEVICES = 256
 PORTALS_DIR = 'portals'
 
 # Needed to call lconf --record
-CONFIG_FILE = "" 
+CONFIG_FILE = ""
 
 # Please keep these in sync with the values in portals/kp30.h
-ptldebug_names = { 
+ptldebug_names = {
     "trace" :     (1 << 0),
     "inode" :     (1 << 1),
     "super" :     (1 << 2),
@@ -88,6 +88,7 @@ ptldebug_names = {
     "rpctrace" :  (1 << 20),
     "vfstrace" :  (1 << 21),
     "reada" :     (1 << 22),
+    "config" :    (1 << 23),
     }
 
 subsystem_names = {
@@ -123,11 +124,11 @@ def cleanup_error(rc):
     if not first_cleanup_error:
         first_cleanup_error = rc
 
-# ============================================================ 
+# ============================================================
 # debugging and error funcs
 
 def fixme(msg = "this feature"):
-    raise Lustre.LconfError, msg + ' not implmemented yet.'
+    raise Lustre.LconfError, msg + ' not implemented yet.'
 
 def panic(*args):
     msg = string.join(map(str,args))
@@ -240,7 +241,7 @@ class DaemonHandler:
             return pid
         except IOError:
             return 0
-        
+
     def clean_pidfile(self):
         """ Remove a stale pidfile """
         log("removing stale pidfile:", self.pidfile())
@@ -248,7 +249,7 @@ class DaemonHandler:
             os.unlink(self.pidfile())
         except OSError, e:
             log(self.pidfile(), e)
-            
+
 class AcceptorHandler(DaemonHandler):
     def __init__(self, port, net_type, send_mem, recv_mem, irq_aff):
         DaemonHandler.__init__(self, "acceptor")
@@ -265,7 +266,7 @@ class AcceptorHandler(DaemonHandler):
 
     def command_line(self):
         return string.join(map(str,('-s', self.send_mem, '-r', self.recv_mem, self.flags, self.port)))
-    
+
 acceptors = {}
 
 # start the acceptors
@@ -285,14 +286,14 @@ def run_one_acceptor(port):
         if not daemon.running():
             daemon.start()
     else:
-         panic("run_one_acceptor: No acceptor defined for port:", port)   
-        
+         panic("run_one_acceptor: No acceptor defined for port:", port)
+
 def stop_acceptor(port):
     if acceptors.has_key(port):
         daemon = acceptors[port]
         if daemon.running():
             daemon.stop()
-        
+
 
 # ============================================================
 # handle lctl interface
@@ -317,7 +318,7 @@ class LCTLInterface:
 
     def use_save_file(self, file):
         self.save_file = file
-        
+
     def record(self, dev_name, logname):
         log("Recording log", logname, "on", dev_name)
         self.record_device = dev_name
@@ -349,7 +350,7 @@ class LCTLInterface:
     device $%s
     record %s
     %s""" % (self.record_device, self.record_log, cmds)
-            
+
         debug("+", cmd_line, cmds)
         if config.noexec: return (0, [])
 
@@ -401,7 +402,7 @@ class LCTLInterface:
             raise CommandError(self.lctl, out, rc)
         return rc, out
 
-            
+
     def clear_log(self, dev, log):
         """ clear an existing log """
         cmds =  """
@@ -437,7 +438,7 @@ class LCTLInterface:
              recv_mem,
              nid, hostaddr, port, flags )
             self.run(cmds)
-    
+
     def connect(self, srv):
         self.add_uuid(srv.net_type, srv.nid_uuid, srv.nid)
         if srv.net_type  in ('tcp',) and not config.lctl_dump:
@@ -453,7 +454,7 @@ class LCTLInterface:
     device $%s
     recover %s""" %(dev_name, new_conn)
         self.run(cmds)
-                
+
     # add a route to a range
     def add_route(self, net, gw, lo, hi):
         cmds =  """
@@ -466,7 +467,7 @@ class LCTLInterface:
         except CommandError, e:
             log ("ignore: ")
             e.dump()
-                
+
     def del_route(self, net, gw, lo, hi):
         cmds =  """
   ignore_errors
@@ -509,7 +510,7 @@ class LCTLInterface:
   quit""" % (net_type,
              nid, hostaddr)
                 self.run(cmds)
-        
+
     # disconnect one connection
     def disconnect(self, srv):
         self.del_uuid(srv.nid_uuid)
@@ -544,7 +545,7 @@ class LCTLInterface:
   setup %s
   quit""" % (name, setup)
         self.run(cmds)
-        
+
 
     # create a new device with lctl
     def newdev(self, type, name, uuid, setup = ""):
@@ -554,7 +555,7 @@ class LCTLInterface:
         except CommandError, e:
             self.cleanup(name, uuid, 0)
             raise e
-        
+
 
     # cleanup a device
     def cleanup(self, name, uuid, force, failover = 0):
@@ -570,14 +571,20 @@ class LCTLInterface:
 
     # create an lov
     def lov_setup(self, name, uuid, desc_uuid, mdsuuid, stripe_cnt,
-                  stripe_sz, stripe_off,
-                      pattern, devlist):
+                  stripe_sz, stripe_off, pattern):
         cmds = """
   attach lov %s %s
-  lov_setup %s %d %d %d %s %s
-  quit""" % (name, uuid, desc_uuid, stripe_cnt, stripe_sz, stripe_off,
-             pattern, devlist)
+  lov_setup %s %d %d %d %s
+  quit""" % (name, uuid, desc_uuid, stripe_cnt, stripe_sz, stripe_off, pattern)
         self.run(cmds)
+
+    # add an OBD to a LOV
+    def lov_add_obd(self, name, uuid, obd_uuid, index, gen):
+        cmds = """
+  lov_modify_tgts add %s %s %s %s
+  quit""" % (name, obd_uuid, index, gen)
+        self.run(cmds)
+
     # create an lmv
     def lmv_setup(self, name, uuid, desc_uuid, devlist):
         cmds = """
@@ -585,13 +592,20 @@ class LCTLInterface:
   lmv_setup %s %s
   quit""" % (name, uuid, desc_uuid, devlist)
         self.run(cmds)
-    # create an lov
-    def lov_setconfig(self, uuid, mdsuuid, stripe_cnt, stripe_sz, stripe_off,
-                      pattern, devlist):
+
+    # delete an OBD from a LOV
+    def lov_del_obd(self, name, uuid, obd_uuid, index, gen):
         cmds = """
-  cfg_device $%s
-  lov_setconfig %s %d %d %d %s %s
-  quit""" % (mdsuuid, uuid, stripe_cnt, stripe_sz, stripe_off, pattern, devlist)
+  lov_modify_tgts del %s %s %s %s
+  quit""" % (name, obd_uuid, index, gen)
+        self.run(cmds)
+
+    # deactivate an OBD
+    def deactivate(self, name):
+        cmds = """
+  device $%s
+  deactivate
+  quit""" % (name)
         self.run(cmds)
 
     # dump the log file
@@ -639,7 +653,6 @@ class LCTLInterface:
   quit""" % (timeout,)
         self.run(cmds)
 
-    # delete mount options
     def set_lustre_upcall(self, upcall):
         cmds = """
   set_lustre_upcall %s
@@ -711,7 +724,7 @@ def find_module(src_dir, dev_dir, modname):
     modbase = src_dir +'/'+ dev_dir +'/'+ modname
     for modext in '.ko', '.o':
         module = modbase + modext
-        try: 
+        try:
             if os.access(module, os.R_OK):
                 return module
         except OSError:
@@ -753,7 +766,7 @@ def mkfs(dev, devsize, fstype, jsize, isize, mkfsoptions, isblock=1):
             if devsize > 1024 * 1024:
                 jsize = ((devsize / 102400) * 4)
             if jsize > 400:
-                jsize = 400        
+                jsize = 400
         if jsize:  jopt = "-J size=%d" %(jsize,)
         if isize:  iopt = "-I %d" %(isize,)
         mkfs = 'mkfs.ext2 -j -b 4096 '
@@ -860,7 +873,7 @@ def clean_loop(file):
 
 # determine if dev is formatted as a <fstype> filesystem
 def need_format(fstype, dev):
-    # FIXME don't know how to implement this    
+    # FIXME don't know how to implement this
     return 0
 
 # initialize a block device if needed
@@ -879,7 +892,7 @@ def block_dev(dev, size, fstype, reformat, autoformat, journal_size,
 #        panic("device:", dev,
 #              "not prepared, and autoformat is not set.\n",
 #              "Rerun with --reformat option to format ALL filesystems")
-        
+
     return dev
 
 def if2addr(iface):
@@ -917,7 +930,7 @@ def sys_get_local_nid(net_type, wildcard, cluster_id):
     else:
         local = sys_get_local_address(net_type, wildcard, cluster_id)
     return local
-        
+
 def sys_get_local_address(net_type, wildcard, cluster_id):
     """Return the local address for the network type."""
     local = ""
@@ -945,7 +958,7 @@ def sys_get_local_address(net_type, wildcard, cluster_id):
                     elan_id = a[1]
                     break
             try:
-                nid = my_int(cluster_id) + my_int(elan_id) 
+                nid = my_int(cluster_id) + my_int(elan_id)
                 local = "%d" % (nid)
             except ValueError, e:
                 local = elan_id
@@ -1022,7 +1035,7 @@ def fs_is_mounted(path):
     except IOError, e:
         log(e)
     return 0
-        
+
 
 class kmod:
     """Manage kernel modules"""
@@ -1088,7 +1101,7 @@ class Module:
         self._server = None
         self._connected = 0
         self.kmod = kmod(config.lustre, config.portals)
-        
+
     def info(self, *args):
         msg = string.join(map(str,args))
         print self.module_name + ":", self.name, self.uuid, msg
@@ -1102,7 +1115,7 @@ class Module:
             log(self.module_name, "cleanup failed: ", self.name)
             e.dump()
             cleanup_error(e.rc)
-            
+
     def add_portals_module(self, dev_dir, modname):
         """Append a module to list of modules to load."""
         self.kmod.add_portals_module(dev_dir, modname)
@@ -1114,7 +1127,7 @@ class Module:
     def load_module(self):
         """Load all the modules in the list in the order they appear."""
         self.kmod.load_module()
-            
+
     def cleanup_module(self):
         """Unload the modules in the list in reverse order."""
         if self.safe_to_clean():
@@ -1122,10 +1135,10 @@ class Module:
 
     def safe_to_clean(self):
         return 1
-        
+
     def safe_to_clean_modules(self):
         return self.safe_to_clean()
-        
+
 class Network(Module):
     def __init__(self,db):
         Module.__init__(self, 'NETWORK', db)
@@ -1170,7 +1183,7 @@ class Network(Module):
         return "NID_%s_UUID" %(nid,)
 
     def prepare(self):
-        if is_network_prepared():
+        if not config.record and is_network_prepared():
             return
         self.info(self.net_type, self.nid, self.port)
         if not (config.record and self.generic_nid):
@@ -1251,9 +1264,9 @@ class RouteTable(Module):
             return None
 
         return Network(srvdb)
-        
+
     def prepare(self):
-        if is_network_prepared():
+        if not config.record and is_network_prepared():
             return
         self.info()
         for net_type, gw, gw_cluster_id, tgt_cluster_id, lo, hi in self.db.get_route_tbl():
@@ -1295,7 +1308,7 @@ class Management(Module):
         self.add_lustre_module('mgmt', 'mgmt_svc')
 
     def prepare(self):
-        if is_prepared(self.name):
+        if not config.record and is_prepared(self.name):
             return
         self.info()
         lctl.newdev("mgmt", self.name, self.uuid)
@@ -1339,7 +1352,7 @@ class LOV(Module):
         self.stripe_sz = self.db.get_val_int('stripesize', 1048576)
         self.stripe_off = self.db.get_val_int('stripeoffset', 0)
         self.pattern = self.db.get_val_int('stripepattern', 0)
-        self.devlist = self.db.get_refs('obd')
+        self.devlist = self.db.get_lov_tgts('lov_tgt')
         self.stripe_cnt = self.db.get_val_int('stripecount', len(self.devlist))
         self.osclist = []
         self.desc_uuid = self.uuid
@@ -1349,13 +1362,15 @@ class LOV(Module):
             self.config_only = 1
             return
         self.config_only = None
-        mds= self.db.lookup(self.mds_uuid)
+        mds = self.db.lookup(self.mds_uuid)
         self.mds_name = mds.getName()
-        for obd_uuid in self.devlist:
+        for (obd_uuid, index, gen, active) in self.devlist:
+            if obd_uuid == '':
+                continue
             obd = self.db.lookup(obd_uuid)
             osc = get_osc(obd, self.uuid, fs_name)
             if osc:
-                self.osclist.append(osc)
+                self.osclist.append((osc, index, gen, active))
             else:
                 panic('osc not found:', obd_uuid)
     def get_uuid(self):
@@ -1363,38 +1378,39 @@ class LOV(Module):
     def get_name(self):
         return self.name
     def prepare(self):
-        if is_prepared(self.name):
+        if not config.record and is_prepared(self.name):
             return
-        if self.config_only:
-            panic("Can't prepare config_only LOV ", self.name)
-            
-        for osc in self.osclist:
+        self.info(self.mds_uuid, self.stripe_cnt, self.stripe_sz,
+                  self.stripe_off, self.pattern, self.devlist,
+                  self.mds_name)
+        lctl.lov_setup(self.name, self.uuid,
+                       self.desc_uuid, self.mds_name, self.stripe_cnt,
+                       self.stripe_sz, self.stripe_off, self.pattern)
+        for (osc, index, gen, active) in self.osclist:
+            target_uuid = osc.target_uuid
             try:
                 # Only ignore connect failures with --force, which
                 # isn't implemented here yet.
+                osc.active = active
                 osc.prepare(ignore_connect_failure=0)
             except CommandError, e:
                 print "Error preparing OSC %s\n" % osc.uuid
                 raise e
-        self.info(self.mds_uuid, self.stripe_cnt, self.stripe_sz,
-                  self.stripe_off, self.pattern, self.devlist, self.mds_name)
-        lctl.lov_setup(self.name, self.uuid,
-                       self.desc_uuid, self.mds_name, self.stripe_cnt,
-                       self.stripe_sz, self.stripe_off, self.pattern,
-                       string.join(self.devlist))
+            lctl.lov_add_obd(self.name, self.uuid, target_uuid, index, gen)
 
     def cleanup(self):
+        for (osc, index, gen, active) in self.osclist:
+            target_uuid = osc.target_uuid
+            osc.cleanup()
         if is_prepared(self.name):
             Module.cleanup(self)
         if self.config_only:
             panic("Can't clean up config_only LOV ", self.name)
-        for osc in self.osclist:
-            osc.cleanup()
 
     def load_module(self):
         if self.config_only:
             panic("Can't load modules for config_only LOV ", self.name)
-        for osc in self.osclist:
+        for (osc, index, gen, active) in self.osclist:
             osc.load_module()
             break
         Module.load_module(self)
@@ -1403,8 +1419,9 @@ class LOV(Module):
         if self.config_only:
             panic("Can't cleanup modules for config_only LOV ", self.name)
         Module.cleanup_module(self)
-        for osc in self.osclist:
-            osc.cleanup_module()
+        for (osc, index, gen, active) in self.osclist:
+            if active:
+                osc.cleanup_module()
             break
 
     def correct_level(self, level, op=None):
@@ -1588,9 +1605,9 @@ class MDSDEV(Module):
     def load_module(self):
         if self.active:
             Module.load_module(self)
-            
+
     def prepare(self):
-        if is_prepared(self.name):
+        if not config.record and is_prepared(self.name):
             return
         if not self.active:
             debug(self.uuid, "not active")
@@ -1658,59 +1675,61 @@ class MDSDEV(Module):
                 raise e
 
     def write_conf(self):
-        if is_prepared(self.name):
-            return
-        self.info(self.devpath, self.fstype, self.format)
+        do_cleanup = 0
+        if not is_prepared(self.name):
+            self.info(self.devpath, self.fstype, self.format)
 
-        blkdev = block_dev(self.devpath, self.size, self.fstype,
-                           config.reformat, self.format, self.journal_size,
-                           self.inode_size, self.mkfsoptions, self.backfstype,
-                           self.backdevpath)
+            blkdev = block_dev(self.devpath, self.size, self.fstype,
+                               config.reformat, self.format, self.journal_size,
+                               self.inode_size, self.mkfsoptions,
+                               self.backfstype, self.backdevpath)
 
-       # Even for writing logs we mount mds with supplied mount options
-       # because it will not mount smfs (if used) otherwise.
-       
-        mountfsoptions = def_mount_options(self.fstype, 'mds')
-            
-        if config.mountfsoptions:
-            if mountfsoptions:
-                mountfsoptions = mountfsoptions + ',' + config.mountfsoptions
-            else:
-                mountfsoptions = config.mountfsoptions
-            if self.mountfsoptions:
-                mountfsoptions = mountfsoptions + ',' + self.mountfsoptions
-        else:
-            if self.mountfsoptions:
+            # Even for writing logs we mount mds with supplied mount options
+            # because it will not mount smfs (if used) otherwise.
+
+            mountfsoptions = def_mount_options(self.fstype, 'mds')
+
+            if config.mountfsoptions:
                 if mountfsoptions:
-                    mountfsoptions = mountfsoptions + ',' + self.mountfsoptions
+                    mountfsoptions = mountfsoptions + ',' + config.mountfsoptions
                 else:
-                    mountfsoptions = self.mountfsoptions
-            
-        if self.fstype == 'smfs':
-            realdev = self.fstype
+                    mountfsoptions = config.mountfsoptions
+                if self.mountfsoptions:
+                    mountfsoptions = mountfsoptions + ',' + self.mountfsoptions
+            else:
+                if self.mountfsoptions:
+                    if mountfsoptions:
+                        mountfsoptions = mountfsoptions + ',' + self.mountfsoptions
+                    else:
+                        mountfsoptions = self.mountfsoptions
+
+            if self.fstype == 'smfs':
+                realdev = self.fstype
                 
-            if mountfsoptions:
-                mountfsoptions = "%s,type=%s,dev=%s" % (mountfsoptions, 
-                                                        self.backfstype, 
-                                                        blkdev)
+                if mountfsoptions:
+                    mountfsoptions = "%s,type=%s,dev=%s" % (mountfsoptions, 
+                                                            self.backfstype, 
+                                                            blkdev)
+                else:
+                    mountfsoptions = "type=%s,dev=%s" % (self.backfstype, 
+                                                         blkdev)
             else:
-                mountfsoptions = "type=%s,dev=%s" % (self.backfstype, 
-                                                     blkdev)
-        else:
-            realdev = blkdev
+                realdev = blkdev
        
-        print 'MDS mount options: ' + mountfsoptions
-        
-       # As mount options are passed by 4th param to config tool, we need 
-       # to pass something in 3rd param. But we do not want this 3rd param
-       # be counted as a profile name for reading log on MDS setup, thus,
-       # we pass there some predefined sign like 'dumb', which will be 
-        # checked in MDS code and skipped. Probably there is more nice way
-        # like pass empty string and check it in config tool and pass null
-        # as 4th param.
-        lctl.newdev("mds", self.name, self.uuid,
-                    setup ="%s %s %s %s" %(realdev, self.fstype, 
-                                           'dumb', mountfsoptions))
+                print 'MDS mount options: ' + mountfsoptions
+
+            # As mount options are passed by 4th param to config tool, we need 
+            # to pass something in 3rd param. But we do not want this 3rd param
+            # be counted as a profile name for reading log on MDS setup, thus,
+            # we pass there some predefined sign like 'dumb', which will be 
+            # checked in MDS code and skipped. Probably there is more nice way
+            # like pass empty string and check it in config tool and pass null
+            # as 4th param.
+            lctl.newdev("mds", self.name, self.uuid,
+                        setup ="%s %s %s %s" %(realdev, self.fstype, 
+                                               'dumb', mountfsoptions))
+            do_cleanup = 1
+
         # record logs for the MDS lov
         for uuid in self.filesystem_uuids:
             log("recording clients for filesystem:", uuid)
@@ -1736,6 +1755,7 @@ class MDSDEV(Module):
             client.prepare()
             lctl.mount_option(self.name, client.get_name(), "")
             lctl.end_record()
+            process_updates(self.db, self.name, self.name, client)
 
             config.cleanup = 1
             lctl.clear_log(self.name, self.name + '-clean')
@@ -1743,10 +1763,15 @@ class MDSDEV(Module):
             client.cleanup()
             lctl.del_mount_option(self.name)
             lctl.end_record()
+            process_updates(self.db, self.name, self.name + '-clean', client)
             config.cleanup = 0
             config.record = 0
 
         # record logs for each client
+        if config.noexec:
+            noexec_opt = '-n'
+        else:
+            noexec_opt = ''
         if config.ldapurl:
             config_options = "--ldapurl " + config.ldapurl + " --config " + config.config
         else:
@@ -1763,9 +1788,7 @@ class MDSDEV(Module):
                         debug("recording", client_name)
                         old_noexec = config.noexec
                         config.noexec = 0
-                        noexec_opt = ('', '-n')
-                        ret, out = run (sys.argv[0],
-                                        noexec_opt[old_noexec == 1],
+                        ret, out = run (sys.argv[0], noexec_opt,
                                         " -v --record --nomod",
                                         "--record_log", client_name,
                                         "--record_device", self.name,
@@ -1773,8 +1796,7 @@ class MDSDEV(Module):
                                         config_options)
                         if config.verbose:
                             for s in out: log("record> ", string.strip(s))
-                        ret, out = run (sys.argv[0],
-                                        noexec_opt[old_noexec == 1],
+                        ret, out = run (sys.argv[0], noexec_opt,
                                         "--cleanup -v --record --nomod",
                                         "--record_log", client_name + "-clean",
                                         "--record_device", self.name,
@@ -1783,18 +1805,19 @@ class MDSDEV(Module):
                         if config.verbose:
                             for s in out: log("record> ", string.strip(s))
                         config.noexec = old_noexec
-        try:
-            lctl.cleanup(self.name, self.uuid, 0, 0)
-        except CommandError, e:
-            log(self.module_name, "cleanup failed: ", self.name)
-            e.dump()
-            cleanup_error(e.rc)
-            Module.cleanup(self)
+        if do_cleanup:
+            try:
+                lctl.cleanup(self.name, self.uuid, 0, 0)
+            except CommandError, e:
+                log(self.module_name, "cleanup failed: ", self.name)
+                e.dump()
+                cleanup_error(e.rc)
+                Module.cleanup(self)
         
-        if self.fstype == 'smfs':
-            clean_loop(self.backdevpath)
-        else:
-            clean_loop(self.devpath)
+            if self.fstype == 'smfs':
+                clean_loop(self.backdevpath)
+            else:
+                clean_loop(self.devpath)
 
     def msd_remaining(self):
         out = lctl.device_list()
@@ -1876,7 +1899,7 @@ class OSD(Module):
             self.active = 0
         if self.active and config.group and config.group != ost.get_val('group'):
             self.active = 0
-            
+
         self.target_dev_uuid = self.uuid
         self.uuid = target_uuid
         # modules
@@ -2014,6 +2037,7 @@ class Client(Module):
         self.target_name = tgtdb.getName()
         self.target_uuid = tgtdb.getUUID()
         self.db = tgtdb
+        self.active = 1
 
         self.tgt_dev_uuid = get_active_target(tgtdb)
         if not self.tgt_dev_uuid:
@@ -2054,7 +2078,7 @@ class Client(Module):
 
     def prepare(self, ignore_connect_failure = 0):
         self.info(self.target_uuid)
-        if is_prepared(self.name):
+        if not config.record and is_prepared(self.name):
             self.cleanup()
         try:
             srv = choose_local_server(self.get_servers())
@@ -2070,7 +2094,7 @@ class Client(Module):
             if not ignore_connect_failure:
                 raise e
         if srv:
-            if self.target_uuid in config.inactive and self.permits_inactive():
+            if self.permits_inactive() and (self.target_uuid in config.inactive or self.active == 0):
                 debug("%s inactive" % self.target_uuid)
                 inactive_p = "inactive"
             else:
@@ -2098,6 +2122,13 @@ class Client(Module):
     def correct_level(self, level, op=None):
         return level
 
+    def deactivate(self):
+        try:
+            lctl.deactivate(self.name)
+        except CommandError, e:
+            log(self.module_name, "deactivate failed: ", self.name)
+            e.dump()
+            cleanup_error(e.rc)
 
 class MDC(Client):
     def __init__(self, db, uuid, fs_name):
@@ -2121,7 +2152,7 @@ class ManagementClient(Client):
         Client.__init__(self, db, uuid, 'mgmt_cli', '',
                         self_name = mgmtcli_name_for_uuid(db.getUUID()),
                         module_dir = 'mgmt')
-            
+
 class COBD(Module):
     def __init__(self, db, uuid, name, type, name_override = None):
         Module.__init__(self, 'COBD', db)
@@ -2158,7 +2189,7 @@ class COBD(Module):
     def prepare(self):
         self.real.prepare()
         self.cache.prepare()
-        if is_prepared(self.name):
+        if not config.record and is_prepared(self.name):
             return
         self.info(self.real_uuid, self.cache_uuid)
         lctl.newdev("cobd", self.name, self.uuid,
@@ -2242,7 +2273,7 @@ class ECHO_CLIENT(Module):
         self.osc = VOSC(obd, self.uuid, self.name)
 
     def prepare(self):
-        if is_prepared(self.name):
+        if not config.record and is_prepared(self.name):
             return
         run_acceptors()
         self.osc.prepare() # XXX This is so cheating. -p
@@ -2309,7 +2340,7 @@ class Mountpoint(Module):
             self.mgmtcli = None
 
     def prepare(self):
-        if fs_is_mounted(self.path):
+        if not config.record and fs_is_mounted(self.path):
             log(self.path, "already mounted.")
             return
         run_acceptors()
@@ -2388,7 +2419,7 @@ def get_ost_net(self, osd_uuid):
     return srv_list
 
 
-# the order of iniitailization is based on level. 
+# the order of iniitailization is based on level.
 def getServiceLevel(self):
     type = self.get_class()
     ret=0;
@@ -2412,7 +2443,7 @@ def getServiceLevel(self):
         panic("Unknown type: ", type)
 
     if ret < config.minlevel or ret > config.maxlevel:
-        ret = 0 
+        ret = 0
     return ret
 
 #
@@ -2434,7 +2465,7 @@ def getServices(self):
 
 
 ############################################################
-# MDC UUID hack - 
+# MDC UUID hack -
 # FIXME: clean this mess up!
 #
 # OSC is no longer in the xml, so we have to fake it.
@@ -2543,7 +2574,7 @@ def find_route(srv_list):
             if  (r[3] <= to and to <= r[4]) and cluster_id == r[2]:
                 result.append((srv, r))
     return result
-           
+
 def get_active_target(db):
     target_uuid = db.getUUID()
     target_name = db.getName()
@@ -2559,7 +2590,7 @@ def get_server_by_nid_uuid(db,  nid_uuid):
         net = Network(n)
         if net.nid_uuid == nid_uuid:
             return net
-        
+
 
 ############################################################
 # lconf level logic
@@ -2596,7 +2627,7 @@ def newService(db):
 
 #
 # Prepare the system to run lustre using a particular profile
-# in a the configuration. 
+# in a the configuration.
 #  * load & the modules
 #  * setup networking for the current node
 #  * make sure partitions are in place and prepared
@@ -2609,10 +2640,144 @@ def for_each_profile(db, prof_list, operation):
             panic("profile:", profile, "not found.")
         services = getServices(prof_db)
         operation(services)
-        
+
+def magic_get_osc(db, rec, lov):
+    if lov:
+        lov_uuid = lov.get_uuid()
+        lov_name = lov.osc.fs_name
+    else:
+        lov_uuid = rec.getAttribute('lov_uuidref')
+        # FIXME: better way to find the mountpoint?
+        filesystems = db.root_node.getElementsByTagName('filesystem')
+        fsuuid = None
+        for fs in filesystems:
+            ref = fs.getElementsByTagName('obd_ref')
+            if ref[0].getAttribute('uuidref') == lov_uuid:
+                fsuuid = fs.getAttribute('uuid')
+                break
+
+        if not fsuuid:
+            panic("malformed xml: lov uuid '" + lov_uuid + "' referenced in 'add' record is not used by any filesystems.")
+
+        mtpts = db.root_node.getElementsByTagName('mountpoint')
+        lov_name = None
+        for fs in mtpts:
+            ref = fs.getElementsByTagName('filesystem_ref')
+            if ref[0].getAttribute('uuidref') == fsuuid:
+                lov_name = fs.getAttribute('name')
+                break
+
+        if not lov_name:
+            panic("malformed xml: 'add' record references lov uuid '" + lov_uuid + "', which references filesystem uuid '" + fsuuid + "', which does not reference a mountpoint.")
+
+    print "lov_uuid: " + lov_uuid + "; lov_name: " + lov_name
+
+    ost_uuid = rec.getAttribute('ost_uuidref')
+    obd = db.lookup(ost_uuid)
+
+    if not obd:
+        panic("malformed xml: 'add' record references ost uuid '" + ost_uuid + "' which cannot be found.")
+
+    osc = get_osc(obd, lov_uuid, lov_name)
+    if not osc:
+        panic('osc not found:', obd_uuid)
+    return osc
+
+# write logs for update records.  sadly, logs of all types -- and updates in
+# particular -- are something of an afterthought.  lconf needs rewritten with
+# these as core concepts.  so this is a pretty big hack.
+def process_update_record(db, update, lov):
+    for rec in update.childNodes:
+        if rec.nodeType != rec.ELEMENT_NODE:
+            continue
+
+        log("found "+rec.nodeName+" record in update version " +
+            str(update.getAttribute('version')))
+
+        lov_uuid = rec.getAttribute('lov_uuidref')
+        ost_uuid = rec.getAttribute('ost_uuidref')
+        index = rec.getAttribute('index')
+        gen = rec.getAttribute('generation')
+
+        if not lov_uuid or not ost_uuid or not index or not gen:
+            panic("malformed xml: 'update' record requires lov_uuid, ost_uuid, index, and generation.")
+
+        if not lov:
+            tmplov = db.lookup(lov_uuid)
+            if not tmplov:
+                panic("malformed xml: 'delete' record contains lov UUID '" + lov_uuid + "', which cannot be located.")
+            lov_name = tmplov.getName()
+        else:
+            lov_name = lov.osc.name
+
+        # ------------------------------------------------------------- add
+        if rec.nodeName == 'add':
+            if config.cleanup:
+                lctl.lov_del_obd(lov_name, lov_uuid, ost_uuid, index, gen)
+                continue
+
+            osc = magic_get_osc(db, rec, lov)
+
+            try:
+                # Only ignore connect failures with --force, which
+                # isn't implemented here yet.
+                osc.prepare(ignore_connect_failure=0)
+            except CommandError, e:
+                print "Error preparing OSC %s\n" % osc.uuid
+                raise e
+
+            lctl.lov_add_obd(lov_name, lov_uuid, ost_uuid, index, gen)
+
+        # ------------------------------------------------------ deactivate
+        elif rec.nodeName == 'deactivate':
+            if config.cleanup:
+                continue
+
+            osc = magic_get_osc(db, rec, lov)
+
+            try:
+                osc.deactivate()
+            except CommandError, e:
+                print "Error deactivating OSC %s\n" % osc.uuid
+                raise e
+
+        # ---------------------------------------------------------- delete
+        elif rec.nodeName == 'delete':
+            if config.cleanup:
+                continue
+
+            osc = magic_get_osc(db, rec, lov)
+
+            try:
+                config.cleanup = 1
+                osc.cleanup()
+                config.cleanup = 0
+            except CommandError, e:
+                print "Error cleaning up OSC %s\n" % osc.uuid
+                raise e
+
+            lctl.lov_del_obd(lov_name, lov_uuid, ost_uuid, index, gen)
+
+def process_updates(db, log_device, log_name, lov = None):
+    updates = db.root_node.getElementsByTagName('update')
+    for u in updates:
+        if not u.childNodes:
+            log("ignoring empty update record (version " +
+                str(u.getAttribute('version')) + ")")
+            continue
+
+        version = u.getAttribute('version')
+        real_name = "%s-%s" % (log_name, version)
+        lctl.clear_log(log_device, real_name)
+        lctl.record(log_device, real_name)
+
+        process_update_record(db, u, lov)
+
+        lctl.end_record()
+
 def doWriteconf(services):
-    if config.nosetup:
-        return
+    #if config.nosetup:
+    #    return
     for s in services:
         if s[1].get_class() == 'mdsdev':
             n = newService(s[1])
@@ -2633,7 +2798,7 @@ def doSetup(services):
     nlist.sort()
     for n in nlist:
         n[1].prepare()
-    
+
 def doModules(services):
     if config.nomod:
         return
@@ -2669,7 +2834,7 @@ def doUnloadModules(services):
             n.cleanup_module()
 
 #
-# Load profile for 
+# Load profile for
 def doHost(lustreDB, hosts):
     global is_router, local_node_name
     node_db = None
@@ -2687,7 +2852,7 @@ def doHost(lustreDB, hosts):
     timeout = node_db.get_val_int('timeout', 0)
     ptldebug = node_db.get_val('ptldebug', '')
     subsystem = node_db.get_val('subsystem', '')
-    
+
     find_local_clusters(node_db)
     if not is_router:
         find_local_routes(lustreDB)
@@ -2794,7 +2959,7 @@ def setupModulePath(cmd, portals_dir = PORTALS_DIR):
     base = os.path.dirname(cmd)
     if development_mode():
         if not config.lustre:
-            debug('using objdir module paths')            
+            debug('using objdir module paths')
             config.lustre = (os.path.join(base, ".."))
         # normalize the portals dir, using command line arg if set
         if config.portals:
@@ -2804,7 +2969,7 @@ def setupModulePath(cmd, portals_dir = PORTALS_DIR):
         debug('config.portals', config.portals)
     elif config.lustre and config.portals:
         # production mode
-        # if --lustre and --portals, normalize portals 
+        # if --lustre and --portals, normalize portals
         # can ignore POTRALS_DIR here, since it is probly useless here
         config.portals = os.path.join(config.lustre, config.portals)
         debug('config.portals B', config.portals)
@@ -2895,8 +3060,8 @@ def sys_set_netmem_max(path, max):
         fp = open(path, 'w')
         fp.write('%d\n' %(max))
         fp.close()
-    
-    
+
+
 def sys_make_devices():
     if not os.access('/dev/portals', os.R_OK):
         run('mknod /dev/portals c 10 240')
@@ -2910,7 +3075,7 @@ def add_to_path(new_dir):
     if new_dir in syspath:
         return
     os.environ['PATH'] = os.environ['PATH'] + ':' + new_dir
-    
+
 def default_debug_path():
     path = '/tmp/lustre-log'
     if os.path.isdir('/r'):
@@ -2988,7 +3153,7 @@ lconf_options = [
               PARAM),
     ('minlevel', "Minimum level of services to configure/cleanup",
                  INTPARAM, 0),
-    ('maxlevel', """Maximum level of services to configure/cleanup 
+    ('maxlevel', """Maximum level of services to configure/cleanup
                     Levels are aproximatly like:
                             10 - netwrk
                             20 - device, ldlm
@@ -3019,14 +3184,14 @@ lconf_options = [
     ('inactive', """The name of an inactive service, to be ignored during
                     mounting (currently OST-only). Can be repeated.""",
                 PARAMLIST),
-    ]      
+    ]
 
 def main():
     global lctl, config, toplustreDB, CONFIG_FILE
 
     # in the upcall this is set to SIG_IGN
     signal.signal(signal.SIGCHLD, signal.SIG_DFL)
-    
+
     cl = Lustre.Options("lconf", "config.xml", lconf_options)
     try:
         config, args = cl.parse(sys.argv[1:])
@@ -3049,7 +3214,7 @@ def main():
     random.seed(seed)
 
     sanitise_path()
-    
+
     init_select(config.select)
 
     if len(args) > 0:
@@ -3131,8 +3296,12 @@ def main():
 
     doHost(lustreDB, node_list)
 
-    if config.record:
-        lctl.end_record()
+    if not config.record:
+        return
+
+    lctl.end_record()
+
+    process_updates(db, config.record_device, config.record_log)
 
 if __name__ == "__main__":
     try:
index 4da2878..4528b76 100644 (file)
@@ -149,12 +149,14 @@ command_t cmdlist[] = {
         {"detach", jt_obd_detach, 0,
          "remove driver (and name and uuid) from current device\n"
          "usage: detach"},
-        {"lov_setup", jt_lcfg_lov_setup, 0,
-         "write setup an lov device\n"
-         "usage: lov_setconfig lov-uuid stripe-count stripe-size offset pattern UUID1 [UUID2 ...]"},
+        {"lov_setup", jt_lcfg_lov_setup, 0, "create a LOV device\n"
+         "usage: lov_setup lov-uuid stripe-count stripe-size offset pattern"},
         {"lmv_setup", jt_lcfg_lmv_setup, 0,
-         "write setup an lmv device\n"
-         "usage: lmv_setconfig lmv-uuid UUID1 [UUID2 ...]"},
+         "create an LMV device\n"
+         "usage: lmv_setup lmv-uuid UUID1 [UUID2 ...]"},
+        {"lov_modify_tgts", jt_lcfg_lov_modify_tgts, 0,
+         "add or delete an obd to/from a LOV device\n"
+         "usage: lov_modify_tgts add|del <lov-name> <uuid> <index> <gen>"},
         {"lov_getconfig", jt_obd_lov_getconfig, 0,
          "read lov configuration from an mds device\n"
          "usage: lov_getconfig lov-uuid"},
index 6d63f19..9504f84 100755 (executable)
@@ -35,7 +35,7 @@ def printDoc(doc, stream=sys.stdout):
     except ImportError:
         stream.write(doc.toxml())
         stream.write("\n")
-    
+
 
 PYMOD_DIR = "/usr/lib/lustre/python"
 
@@ -50,7 +50,7 @@ if not development_mode():
 
 import Lustre
 
-DEFAULT_PORT = 988 
+DEFAULT_PORT = 988
 DEFAULT_STRIPE_SZ = 1048576
 DEFAULT_STRIPE_CNT = 1
 DEFAULT_STRIPE_PATTERN = 0
@@ -73,7 +73,7 @@ Object creation command summary:
 --add net
   --node node_name
   --nid nid
-  --cluster_id 
+  --cluster_id
   --nettype tcp|elan|gm
   --hostaddr addr
   --port port
@@ -110,6 +110,7 @@ Object creation command summary:
   --ost ost_name 
   --failover
   --lov lov_name 
+  --index index
   --dev path
   --backdev path
   --size size
@@ -123,6 +124,13 @@ Object creation command summary:
   --mountfsoptions options
   --nspath
  
+--delete ost
+  --node node_name
+  --ost ost_name
+--deactivate ost
+  --node node_name
+  --ost ost_name
+
 --add mtpt  - Mountpoint
   --node node_name
   --path /mnt/point
@@ -153,12 +161,14 @@ Object creation command summary:
   --node node_name
   --real_obd obd_name
   --cache_obd obd_name
+
+--commit - Close a configuration version, and start a new one
 """
 
 PARAM = Lustre.Options.PARAM
 lmc_options = [
     # lmc input/output options
-    ('reference', "Print short reference for commands."), 
+    ('reference', "Print short reference for commands."),
     ('verbose,v', "Print system commands as they are run."),
     ('merge,m', "Append to the specified config file.", PARAM),
     ('output,o', "Write XML configuration into given output file. Overwrite existing content.", PARAM),
@@ -167,7 +177,10 @@ lmc_options = [
 
     # commands
     ('add', "", PARAM),
-    
+    ('delete', "", PARAM),
+    ('deactivate', "", PARAM),
+    ('commit', "Commit all config changes and start a new version"),
+
     # node options
     ('node', "Add a new node in the cluster configuration.", PARAM),
     ('timeout', "Set timeout to initiate recovery.", PARAM),
@@ -177,7 +190,7 @@ lmc_options = [
     ('ptldebug', "Set the portals debug level",  PARAM),
     ('subsystem', "Specify which Lustre subsystems have debug output recorded in the log",  PARAM),
 
-    # network 
+    # network
     ('nettype', "Specify the network type. This can be tcp/elan/gm.", PARAM),
     ('nid', "Give the network ID, e.g ElanID/IP Address as used by portals.", PARAM),
     ('tcpbuf', "Optional argument to specify the TCP buffer size.", PARAM, "0"),
@@ -221,6 +234,7 @@ lmc_options = [
 
     # lov
     ('lov', "Specify LOV name.", PARAM,""),
+    ('index', "Specify index for OBD in LOV target table.", PARAM),
     ('stripe_sz', "Specify the stripe size in bytes.", PARAM),
     ('stripe_cnt', "Specify the number of OSTs each file should be striped on.", PARAM, 0),
     ('stripe_pattern', "Specify the stripe pattern. RAID 0 is the only one currently supported.", PARAM, 0),
@@ -246,11 +260,11 @@ def panic(cmd, msg):
     print msg
     sys.exit(1)
 
-    
+
 def warning(*args):
     msg = string.join(map(str,args))
     print "Warning: ", msg
-    
+
 #
 # manage names and uuids
 # need to initialize this by walking tree to ensure
@@ -325,14 +339,14 @@ class GenConfig:
         ref = self.doc.createElement(tag)
         ref.setAttribute("uuidref", uuid)
         return ref
-    
+
     def newService(self, tag, name, uuid):
         """ create a new  service elmement, which requires name and uuid attributes """
         new = self.doc.createElement(tag)
         new.setAttribute("uuid", uuid);
         new.setAttribute("name", name);
         return new
-    
+
     def addText(self, node, str):
         txt = self.doc.createTextNode(str)
         node.appendChild(txt)
@@ -362,14 +376,14 @@ class GenConfig:
             self.addElement(network, "recvmem", "%d" %(tcpbuf))
         if irq_aff:
             self.addElement(network, "irqaffinity", "%d" %(irq_aff))
-            
+
         return network
 
     def routetbl(self, name, uuid):
         """create <routetbl> node"""
         rtbl = self.newService("routetbl", name, uuid)
         return rtbl
-        
+
     def route(self, gw_net_type, gw, gw_cluster_id, tgt_cluster_id, lo, hi):
         """ create one entry for the route table """
         ref = self.doc.createElement('route')
@@ -381,7 +395,7 @@ class GenConfig:
         if hi:
             ref.setAttribute("hi", hi)
         return ref
-    
+
     def profile(self, name, uuid):
         """ create a host """
         profile = self.newService("profile", name, uuid)
@@ -453,6 +467,14 @@ class GenConfig:
         lov.setAttribute("stripepattern", str(pattern))
         return lov
 
+    def lov_tgt(self, obd_uuid, index, generation):
+        tgt = self.doc.createElement('lov_tgt')
+        tgt.setAttribute("uuidref", obd_uuid)
+        tgt.setAttribute("index", index)
+        tgt.setAttribute("generation", generation)
+        tgt.setAttribute("active", '1')
+        return tgt
+
     def lovconfig(self, name, uuid, lov_uuid):
         lovconfig = self.newService("lovconfig", name, uuid)
         lovconfig.appendChild(self.ref("lov", lov_uuid))
@@ -522,12 +544,36 @@ class GenConfig:
         if mgmt_uuid:
             fs.appendChild(self.ref("mgmt", mgmt_uuid))
         return fs
-        
+
     def echo_client(self, name, uuid, osc_uuid):
         ec = self.newService("echoclient", name, uuid)
         ec.appendChild(self.ref("obd", osc_uuid))
         return ec
 
+    def update(self, version):
+        new = self.doc.createElement("update")
+        new.setAttribute("version", version)
+        return new
+
+    def add(self, lov, ost, index, gen):
+        new = self.doc.createElement("add")
+        new.setAttribute("lov_uuidref", lov)
+        new.setAttribute("ost_uuidref", ost)
+        new.setAttribute("index", index)
+        new.setAttribute("generation", gen)
+        return new
+
+    def delete(self, lov, ost, index, gen, options):
+        if options.delete:
+            new = self.doc.createElement("delete")
+        else:
+            new = self.doc.createElement("deactivate")
+        new.setAttribute("lov_uuidref", lov)
+        new.setAttribute("ost_uuidref", ost)
+        new.setAttribute("index", index)
+        new.setAttribute("generation", gen)
+        return new
+
 ############################################################
 # Utilities to query a DOM tree
 # Using this functions we can treat use config information
@@ -538,6 +584,40 @@ def getName(n):
 def getUUID(node):
     return node.getAttribute('uuid')
 
+def findLastUpdate(lustre):
+    node = None
+    version = 0
+    for n in lustre.childNodes:
+        if n.nodeType == n.ELEMENT_NODE:
+            if n.nodeName != 'update':
+                continue
+            tmp = int(n.getAttribute('version'))
+            if not tmp:
+                error('malformed XML: update tag without a version attribute')
+            if tmp != version + 1:
+                error('malformed XML: expecting update record '+str(version + 1)+', found '+str(tmp)+'.')
+            version = tmp
+            node = n
+    return node
+
+def addUpdate(gen, lustre, node):
+    update = findLastUpdate(lustre)
+    if not update:
+        return
+    #add_record = update.getElementsByTagName('add')
+    #if not add_record:
+    #    add_record = gen.add()
+    #    update.appendChild(add_record)
+    #else:
+    #    add_record = add_record[0]
+    #add_record.appendChild(node)
+    update.appendChild(node)
+
+def delUpdate(gen, lustre, node):
+    update = findLastUpdate(lustre)
+    if not update:
+        return
+    update.appendChild(node)
 
 def findByName(lustre, name, tag = ""):
     for n in lustre.childNodes:
@@ -571,7 +651,7 @@ def name2uuid(lustre, name, tag="",  fatal=1):
         else:
             return ""
     return getUUID(ret)
-    
+
 def lookup_filesystem(lustre, mds_uuid, ost_uuid):
     for n in lustre.childNodes:
         if n.nodeType == n.ELEMENT_NODE and n.nodeName == 'filesystem':
@@ -591,10 +671,76 @@ def get_net_uuid(lustre, node_name):
         return getUUID(net[0])
     return None
 
+def lov_add_obd(gen, lustre, lov, osc_uuid, options):
+    lov_name = getName(lov)
+    if options.index:
+        lov_index = get_option_int(options, 'index')
+        for tgt in lustre.getElementsByTagName('lov_tgt'):
+            if str(lov_index) == tgt.getAttribute('index'):
+                uuidref = tgt.getAttribute('uuidref')
+                if uuidref != '':
+                    raise OptionError("%s --index %d is still in use: %s" %
+                                      (lov_name, lov_index, uuidref))
+                tgt.setAttribute('uuidref', osc_uuid)
+                gener = int(tgt.getAttribute('generation')) + 1
+                tgt.setAttribute('generation', str(gener))
+                addUpdate(gen, lustre, gen.add(getUUID(lov), osc_uuid,
+                                               str(lov_index), str(gener)))
+                return
+        lov.appendChild(gen.lov_tgt(osc_uuid, str(lov_index), '1'))
+        addUpdate(gen, lustre, gen.add(getUUID(lov), osc_uuid, str(lov_index),
+                                       str(gener)))
+        return
+
+    index = -1
+    for tgt in lustre.getElementsByTagName('lov_tgt'):
+        uuidref = tgt.getAttribute('uuidref')
+        tmp = int(tgt.getAttribute('index'))
+        if tmp != index + 1:
+            error('malformed xml: LOV targets are not ordered; found index '+str(tmp)+', expected '+str(index + 1)+'.')
+        index = tmp
+
+    lov.appendChild(gen.lov_tgt(osc_uuid, str(index + 1), '1'))
+    addUpdate(gen, lustre, gen.add(getUUID(lov), osc_uuid, str(index + 1), '1'))
+
+def lov_del_obd(gen, lustre, lov, osc_uuid, options):
+    lov_name = getName(lov)
+    if options.index:
+        lov_index = get_option_int(options, 'index')
+        for tgt in lustre.getElementsByTagName('lov_tgt'):
+            index = tgt.getAttribute('index')
+            if index == lov_index:
+                uuidref = tgt.getAttribute('uuidref')
+                if uuidref != osc_uuid:
+                    raise OptionError("%s --index %d contains %s, not %s" %
+                                      (lov_name, lov_index, osc_uuid, uuidref))
+                if options.delete:
+                    tgt.setAttribute('uuidref', '')
+                else:
+                    tgt.setAttribute('active', '0')
+                # bump the generation just in case...
+                gen = int(tgt.getAttribute('generation')) + 1
+                tgt.setAttribute('generation', str(gen))
+                return
+        raise OptionError("%s --index %d not in use by %s." %
+                          (lov_name, lov_index, osc_uuid))
+
+    for tgt in lustre.getElementsByTagName('lov_tgt'):
+        uuidref = tgt.getAttribute('uuidref')
+        if uuidref == osc_uuid:
+            genera = int(tgt.getAttribute('generation'))
+            delete_rec = gen.delete(getUUID(lov),
+                                    osc_uuid,tgt.getAttribute('index'),
+                                    str(genera), options)
+            delUpdate(gen, lustre, delete_rec)
+
+            if options.delete:
+                tgt.setAttribute('uuidref', '')
+            else:
+                tgt.setAttribute('active', '0')
+            genera = genera + 1
+            tgt.setAttribute('generation', str(genera))
 
-def lov_add_obd(gen, lov, osc_uuid):
-    lov.appendChild(gen.ref("obd", osc_uuid))
-                            
 def lmv_add_obd(gen, lmv, mdc_uuid):
     lmv.appendChild(gen.ref("mds", mdc_uuid))
                             
@@ -606,7 +752,7 @@ def ref_exists(profile, uuid):
             if ref == uuid:
                 return 1
     return 0
-        
+
 # ensure that uuid is not already in the profile
 # return true if uuid is added
 def node_add_profile(gen, node, ref, uuid):
@@ -622,7 +768,7 @@ def node_add_profile(gen, node, ref, uuid):
         return 0
     profile.appendChild(gen.ref(ref, uuid))
     return 1
-    
+
 def get_attr(dom_node, attr, default=""):
     v = dom_node.getAttribute(attr)
     if v:
@@ -644,7 +790,7 @@ def set_node_options(gen, node, options):
     if default_upcall or options.lustre_upcall:
         if options.lustre_upcall:
             gen.addElement(node, 'lustreUpcall', options.lustre_upcall)
-        else: 
+        else:
             gen.addElement(node, 'lustreUpcall', default_upcall)
     if default_upcall or options.portals_upcall:
         if options.portals_upcall:
@@ -668,9 +814,10 @@ def do_add_node(gen, lustre,  options, node_name):
 
     node_add_profile(gen, node, 'ldlm', ldlm_uuid)
     set_node_options(gen, node, options)
+
     return node
 
-    
+
 def add_node(gen, lustre, options):
     """ create a node with a network config """
 
@@ -733,7 +880,7 @@ def add_route(gen, lustre, options):
     node = findByName(lustre, node_name, "node")
     if not node:
         error (node_name, " not found.")
-    
+
     rlist = node.getElementsByTagName('routetbl')
     if len(rlist) > 0:
         rtbl = rlist[0]
@@ -801,7 +948,7 @@ def add_mds(gen, lustre, options):
                      size, journal_size, inode_size, nspath, mkfsoptions, 
                      mountfsoptions, backfstype, backdevname, lmv_uuid)
     lustre.appendChild(mdd)
-                   
+
 
 def add_mgmt(gen, lustre, options):
     node_name = get_option(options, 'node')
@@ -847,7 +994,7 @@ def add_ost(gen, lustre, options):
         inode_size = get_option(options, 'inode_size')
         mkfsoptions = get_option(options, 'mkfsoptions')
         mountfsoptions = get_option(options, 'mountfsoptions')
-        
+
     nspath = get_option(options, 'nspath')
 
     ostname = get_option(options, 'ost')
@@ -868,17 +1015,18 @@ def add_ost(gen, lustre, options):
 
         ost = gen.ost(ostname, ost_uuid, osd_uuid, options.group)
         lustre.appendChild(ost)
+
         if lovname:
             lov = findByName(lustre, lovname, "lov")
             if not lov:
                 error('add_ost:', '"'+lovname+'"', "lov element not found.")
-            lov_add_obd(gen, lov, ost_uuid)
+            lov_add_obd(gen, lustre, lov, ost_uuid, options)
     else:
         ost = lookup(lustre, ost_uuid)
 
     if options.failover:
         ost.setAttribute('failover', "1")
-    
+
 
     osd = gen.osd(osdname, osd_uuid, fstype, osdtype, devname,
                   get_format_flag(options), ost_uuid, node_uuid, size,
@@ -896,7 +1044,43 @@ def add_ost(gen, lustre, options):
     node_add_profile(gen, node, 'osd', osd_uuid)
     lustre.appendChild(osd)
 
-                   
+def del_ost(gen, lustre, options):
+    ostname = get_option(options, 'ost')
+    if not ostname:
+        raise OptionError("del_ost: --ost requires a <ost name>")
+    ost = findByName(lustre, ostname, "ost")
+    if not ost:
+        error('del_ost: ', 'Unable to find ', ostname)
+    ost_uuid = name2uuid(lustre, ostname, fatal=0)
+    if not ost_uuid:
+        error('del_ost: ', 'Unable to find uuid for ', ostname)
+    lovname = get_option(options, 'lov')
+    if lovname:
+        lov = findByName(lustre, lovname, "lov")
+        if not lov:
+            error('del_ost:', '"'+lovname+'"', "lov element not found.")
+        lov_del_obd(gen, lustre, lov, ost_uuid, options)
+        # if the user specified a speficic LOV don't delete the OST itself
+        return
+
+    # remove OSD references from all LOVs
+    for n in lustre.getElementsByTagName('lov'):
+        lov_del_obd(gen, lustre, n, ost_uuid, options)
+    return
+    # delete the OSDs
+    for osd in lustre.getElementsByTagName('osd'):
+        if ref_exists(osd, ost_uuid):
+            osd_uuid = osd.getAttribute('uuid')
+            # delete all profile references to this OSD
+            for profile in lustre.getElementsByTagName('profile'):
+                for osd_ref in profile.getElementsByTagName('osd_ref'):
+                    if osd_uuid == osd_ref.getAttribute('uuidref'):
+                        profile.removeChild(osd_ref)
+            lustre.removeChild(osd)
+
+    # delete the OST
+    lustre.removeChild(ost)
+
 def add_cobd(gen, lustre, options):
     node_name = get_option(options, 'node')
     name = get_option(options, 'cobd')
@@ -904,7 +1088,7 @@ def add_cobd(gen, lustre, options):
 
     real_name = get_option(options, 'real_obd')
     cache_name = get_option(options, 'cache_obd')
-    
+
     real_uuid = name2uuid(lustre, real_name, tag='lov', fatal=0)
     cache_uuid = name2uuid(lustre, cache_name, tag='lov', fatal=0)
 
@@ -1004,7 +1188,7 @@ def add_lov(gen, lustre, options):
 
     lov = gen.lov(name, uuid, mds_uuid, stripe_sz, stripe_cnt, pattern)
     lustre.appendChild(lov)
-    
+
     # add an lovconfig entry to the active mdsdev profile
     lovconfig_name = new_name('LVCFG_' + name)
     lovconfig_uuid = new_uuid(lovconfig_name)
@@ -1019,20 +1203,20 @@ def add_lov(gen, lustre, options):
 
 def add_default_lov(gen, lustre, mds_name, lov_name):
     """ create a default lov """
-                                                                                                                                               
+
     stripe_sz = DEFAULT_STRIPE_SZ
     stripe_cnt = DEFAULT_STRIPE_CNT
     pattern = DEFAULT_STRIPE_PATTERN
     uuid = new_uuid(lov_name)
-                                                                                                                                               
+
     ret = findByName(lustre, lov_name, "lov")
     if ret:
         error("LOV: ", lov_name, " already exists.")
-                                                                                                                                               
+
     mds_uuid = name2uuid(lustre, mds_name, 'mds')
     lov = gen.lov(lov_name, uuid, mds_uuid, stripe_sz, stripe_cnt, pattern)
     lustre.appendChild(lov)
-                                                                                                                                               
+
     # add an lovconfig entry to the active mdsdev profile
     lovconfig_name = new_name('LVCFG_' + lov_name)
     lovconfig_uuid = new_uuid(lovconfig_name)
@@ -1095,7 +1279,7 @@ def get_fs_uuid(gen, lustre, mds_name, obd_name, mgmt_name):
     if not fs_uuid:
         fs_uuid = new_filesystem(gen, lustre, mds_uuid, obd_uuid, mgmt_uuid)
     return fs_uuid
-    
+
 def add_mtpt(gen, lustre, options):
     """ create mtpt on a node """
     node_name = get_option(options, 'node')
@@ -1121,7 +1305,7 @@ def add_mtpt(gen, lustre, options):
             if not ost_uuid:
                 error('add_mtpt:', '"'+ost_name+'"', "ost element not found.")
             lov = findByName(lustre, lov_name, "lov")
-            lov_add_obd(gen, lov, ost_uuid)
+            lov_add_obd(gen, lustre, lov, ost_uuid, options)
 
     if fs_name == '':
         mgmt_name = get_option(options, 'mgmt')
@@ -1144,6 +1328,17 @@ def add_mtpt(gen, lustre, options):
     node_add_profile(gen, node, "mountpoint", uuid)
     lustre.appendChild(mtpt)
 
+def commit_version(gen, lustre):
+    update = findLastUpdate(lustre)
+    if update:
+        version = int(update.getAttribute("version")) + 1
+    else:
+        version = 1
+
+    new = gen.update(str(version))
+    lustre.appendChild(new)
+    
+
 ############################################################
 # Command line processing
 #
@@ -1165,7 +1360,7 @@ def get_option_int(options, tag):
     try:
         n = int(val)
     except ValueError:
-        raise OptionError("--%s <num> (value must be integer)" % (tag))        
+        raise OptionError("--%s <num> (value must be integer)" % (tag))
     return n
 
 # simple class for profiling
@@ -1191,13 +1386,13 @@ class chrono:
 # function cmdlinesplit used to split cmd line from batch file
 #
 def cmdlinesplit(cmdline):
-                                                                                                                                               
+
     double_quote  = re.compile(r'"(([^"\\]|\\.)*)"')
     single_quote  = re.compile(r"'(.*?)'")
     escaped = re.compile(r'\\(.)')
     esc_quote = re.compile(r'\\([\\"])')
-    outside = re.compile(r"""([^\s\\'"]+)""")
-                                                                                                                                               
+    outside = re.compile(r"""([^\s\\'"]+)""") #" fucking emacs.
+
     arg_list = []
     i = 0; arg = None
     while i < len(cmdline):
@@ -1210,7 +1405,7 @@ def cmdlinesplit(cmdline):
             i = match.end()
             if arg is None: arg = esc_quote.sub(r'\1', match.group(1))
             else:           arg = arg + esc_quote.sub(r'\1', match.group(1))
-                                                                                                                                               
+
         elif c == "'":
             match = single_quote.match(cmdline, i)
             if not match:
@@ -1219,7 +1414,7 @@ def cmdlinesplit(cmdline):
             i = match.end()
             if arg is None: arg = match.group(1)
             else:           arg = arg + match.group(1)
-                                                                                                                                               
+
         elif c == "\\":
             match = escaped.match(cmdline, i)
             if not match:
@@ -1228,7 +1423,7 @@ def cmdlinesplit(cmdline):
             i = match.end()
             if arg is None: arg = match.group(1)
             else:           arg = arg + match.group(1)
-                                                                                                                                               
+
         elif c in string.whitespace:
             if arg != None:
                 arg_list.append(str(arg))
@@ -1241,9 +1436,9 @@ def cmdlinesplit(cmdline):
             i = match.end()
             if arg is None: arg = match.group()
             else:           arg = arg + match.group()
-                                                                                                                                               
+
     if arg != None: arg_list.append(str(arg))
-                                                                                                                                               
+
     return arg_list
 
 ############################################################
@@ -1275,10 +1470,29 @@ def add(devtype, gen, lustre, options):
         add_lmv(gen, lustre, options)
     else:
         error("unknown device type:", devtype)
-    
+
+def delete(devtype, gen, lustre, options):
+    if devtype == 'ost':
+        del_ost(gen, lustre, options)
+    elif options.delete:
+        error("delete not supported for device type:", devtype)
+    elif options.deactivate:
+        error("deactivate not supported for device type:", devtype)
+    else:
+        error("in delete(), but neither .delete nor .deactivate are set.  Tell CFS.")
+
+def commit(gen, lustre):
+    commit_version(gen, lustre)
+
 def do_command(gen, lustre, options, args):
     if options.add:
         add(options.add, gen, lustre, options)
+    elif options.delete:
+        delete(options.delete, gen, lustre, options)
+    elif options.deactivate:
+        delete(options.deactivate, gen, lustre, options)
+    elif options.commit:
+        commit(gen, lustre)
     else:
         error("Missing command")
 
index 3d5c86f..e8d989d 100644 (file)
@@ -108,7 +108,7 @@ int jt_lcfg_attach(int argc, char **argv)
                 lcfg.lcfg_dev_name = argv[2];
         } else {
                 fprintf(stderr, "error: %s: LCFG_ATTACH requires a name\n",
-                        jt_cmdname(argv[0])); 
+                        jt_cmdname(argv[0]));
                return -EINVAL;
        }
 
@@ -148,8 +148,8 @@ int jt_lcfg_setup(int argc, char **argv)
 
         if (lcfg_devname == NULL) {
                 fprintf(stderr, "%s: please use 'cfg_device name' to set the "
-                        "device name for config commands.\n", 
-                        jt_cmdname(argv[0])); 
+                        "device name for config commands.\n",
+                        jt_cmdname(argv[0]));
                return -EINVAL;
         }
 
@@ -197,8 +197,8 @@ int jt_obd_detach(int argc, char **argv)
 
         if (lcfg_devname == NULL) {
                 fprintf(stderr, "%s: please use 'cfg_device name' to set the "
-                        "device name for config commands.\n", 
-                        jt_cmdname(argv[0])); 
+                        "device name for config commands.\n",
+                        jt_cmdname(argv[0]));
                return -EINVAL;
         }
 
@@ -226,8 +226,8 @@ int jt_obd_cleanup(int argc, char **argv)
 
         if (lcfg_devname == NULL) {
                 fprintf(stderr, "%s: please use 'cfg_device name' to set the "
-                        "device name for config commands.\n", 
-                        jt_cmdname(argv[0])); 
+                        "device name for config commands.\n",
+                        jt_cmdname(argv[0]));
                return -EINVAL;
         }
 
@@ -236,7 +236,7 @@ int jt_obd_cleanup(int argc, char **argv)
         if (argc < 1 || argc > 3)
                 return CMD_HELP;
 
-        for (n = 1; n < argc; n++) 
+        for (n = 1; n < argc; n++)
                 if (strcmp(argv[n], "force") == 0) {
                         flags[flag_cnt++] = force;
                 } else if (strcmp(argv[n], "failover") == 0) {
@@ -258,8 +258,8 @@ int jt_obd_cleanup(int argc, char **argv)
         return rc;
 }
 
-static 
-int do_add_uuid(char * func, char *uuid, ptl_nid_t nid, int nal) 
+static
+int do_add_uuid(char * func, char *uuid, ptl_nid_t nid, int nal)
 {
         char tmp[64];
         int rc;
@@ -286,8 +286,8 @@ int jt_lcfg_add_uuid(int argc, char **argv)
 {
         ptl_nid_t nid = 0;
         int nal;
-        
-        if (argc != 4) {                
+
+        if (argc != 4) {
                 return CMD_HELP;
         }
 
@@ -328,7 +328,7 @@ int jt_lcfg_del_uuid(int argc, char **argv)
                 lcfg.lcfg_inllen1 = strlen(argv[1]) + 1;
                 lcfg.lcfg_inlbuf1 = argv[1];
         }
-        
+
         rc = lcfg_ioctl(argv[0], OBD_DEV_ID, &lcfg);
         if (rc) {
                 fprintf(stderr, "IOC_PORTAL_DEL_UUID failed: %s\n",
@@ -342,13 +342,13 @@ int jt_lcfg_lov_setup(int argc, char **argv)
 {
         struct lustre_cfg lcfg;
         struct lov_desc desc;
-        struct obd_uuid *uuidarray, *ptr;
-        int rc, i;
+        int rc;
         char *end;
 
-        LCFG_INIT(lcfg, LCFG_SETUP, lcfg_devname);
-
-        if (argc <= 6)
+        /* argv: lov_setup <LOV uuid> <stripe count> <stripe size>
+         *                 <stripe offset> <pattern> [ <max tgt index> ]
+         */
+        if (argc < 6 || argc > 7)
                 return CMD_HELP;
 
         if (strlen(argv[1]) > sizeof(desc.ld_uuid) - 1) {
@@ -360,20 +360,12 @@ int jt_lcfg_lov_setup(int argc, char **argv)
 
         memset(&desc, 0, sizeof(desc));
         obd_str2uuid(&desc.ld_uuid, argv[1]);
-        desc.ld_tgt_count = argc - 6;
         desc.ld_default_stripe_count = strtoul(argv[2], &end, 0);
         if (*end) {
                 fprintf(stderr, "error: %s: bad default stripe count '%s'\n",
                         jt_cmdname(argv[0]), argv[2]);
                 return CMD_HELP;
         }
-        if (desc.ld_default_stripe_count > desc.ld_tgt_count) {
-                fprintf(stderr,
-                        "error: %s: default stripe count %u > OST count %u\n",
-                        jt_cmdname(argv[0]), desc.ld_default_stripe_count,
-                        desc.ld_tgt_count);
-                return -EINVAL;
-        }
 
         desc.ld_default_stripe_size = strtoull(argv[3], &end, 0);
         if (*end) {
@@ -406,40 +398,32 @@ int jt_lcfg_lov_setup(int argc, char **argv)
                 return CMD_HELP;
         }
 
-        /* NOTE: it is possible to overwrite the default striping parameters,
-         *       but EXTREME care must be taken when saving the OST UUID list.
-         *       It must be EXACTLY the same, or have only additions at the
-         *       end of the list, or only overwrite individual OST entries
-         *       that are restored from backups of the previous OST.
-         */
-        uuidarray = calloc(desc.ld_tgt_count, sizeof(*uuidarray));
-        if (!uuidarray) {
-                fprintf(stderr, "error: %s: no memory for %d UUIDs\n",
-                        jt_cmdname(argv[0]), desc.ld_tgt_count);
-                rc = -ENOMEM;
-                goto out;
-        }
-        for (i = 6, ptr = uuidarray; i < argc; i++, ptr++) {
-                if (strlen(argv[i]) >= sizeof(*ptr)) {
-                        fprintf(stderr, "error: %s: arg %d (%s) too long\n",
-                                jt_cmdname(argv[0]), i, argv[i]);
-                        rc = -EINVAL;
-                        goto out;
+        if (argc == 7) {
+                desc.ld_tgt_count = strtoul(argv[6], &end, 0);
+                if (*end) {
+                        fprintf(stderr, "error: %s: bad target count '%s'\n",
+                                jt_cmdname(argv[0]), argv[6]);
+                        return CMD_HELP;
+                }
+                if (desc.ld_default_stripe_count > desc.ld_tgt_count) {
+                        fprintf(stderr,
+                                "error: %s: default stripe count %u > "
+                                "OST count %u\n", jt_cmdname(argv[0]),
+                                desc.ld_default_stripe_count,
+                                desc.ld_tgt_count);
+                        return -EINVAL;
                 }
-                strcpy((char *)ptr, argv[i]);
         }
 
+        LCFG_INIT(lcfg, LCFG_SETUP, lcfg_devname);
+
         lcfg.lcfg_inllen1 = sizeof(desc);
         lcfg.lcfg_inlbuf1 = (char *)&desc;
-        lcfg.lcfg_inllen2 = desc.ld_tgt_count * sizeof(*uuidarray);
-        lcfg.lcfg_inlbuf2 = (char *)uuidarray;
 
         rc = lcfg_ioctl(argv[0], OBD_DEV_ID, &lcfg);
         if (rc)
                 fprintf(stderr, "error: %s: ioctl error: %s\n",
                         jt_cmdname(argv[0]), strerror(rc = errno));
-out:
-        free(uuidarray);
         return rc;
 }
 
@@ -505,6 +489,64 @@ out:
         return rc;
 }
 
+int jt_lcfg_lov_modify_tgts(int argc, char **argv)
+{
+        struct lustre_cfg lcfg;
+        char *end;
+        int index;
+        int gen;
+        int rc;
+
+        /* argv: lov_modify_tgts <op> <LOV name> <OBD uuid> <index> <gen> */
+        if (argc != 6)
+                return CMD_HELP;
+
+        if (!strncmp(argv[1], "add", 4)) {
+                LCFG_INIT(lcfg, LCFG_LOV_ADD_OBD, argv[2]);
+        } else if (!strncmp(argv[1], "del", 4)) {
+                LCFG_INIT(lcfg, LCFG_LOV_DEL_OBD, argv[2]);
+        } else {
+                fprintf(stderr, "error: %s: bad operation '%s'\n",
+                        jt_cmdname(argv[0]), argv[1]);
+                return CMD_HELP;
+        }
+
+        lcfg.lcfg_inlbuf1 = argv[3];
+        lcfg.lcfg_inllen1 = strlen(lcfg.lcfg_inlbuf1) + 1;
+        if (lcfg.lcfg_inllen1 > sizeof(struct obd_uuid)) {
+                fprintf(stderr,
+                        "error: %s: OBD uuid '%s' longer than "LPSZ" chars\n",
+                        jt_cmdname(argv[0]), argv[3],
+                        sizeof(struct obd_uuid) - 1);
+                return -EINVAL;
+        }
+
+        index = strtoul(argv[4], &end, 0);
+        if (*end) {
+                fprintf(stderr, "error: %s: bad OBD index '%s'\n",
+                        jt_cmdname(argv[0]), argv[4]);
+                return CMD_HELP;
+        }
+        lcfg.lcfg_inlbuf2 = argv[4];
+        lcfg.lcfg_inllen2 = strlen(lcfg.lcfg_inlbuf2);
+
+        gen = strtoul(argv[5], &end, 0);
+        if (*end) {
+                fprintf(stderr, "error: %s: bad OBD generation '%s'\n",
+                        jt_cmdname(argv[0]), argv[5]);
+                return CMD_HELP;
+        }
+        lcfg.lcfg_inlbuf3 = argv[5];
+        lcfg.lcfg_inllen3 = strlen(lcfg.lcfg_inlbuf3);
+
+        rc = lcfg_ioctl(argv[0], OBD_DEV_ID, &lcfg);
+        if (rc)
+                fprintf(stderr, "error: %s: ioctl error: %s\n",
+                        jt_cmdname(argv[0]), strerror(rc = errno));
+
+        return rc;
+}
+
 int jt_lcfg_mount_option(int argc, char **argv)
 {
         int rc;
@@ -569,7 +611,7 @@ int jt_lcfg_set_timeout(int argc, char **argv)
                 return CMD_HELP;
 
         lcfg.lcfg_num = atoi(argv[1]);
-        
+
         rc = lcfg_ioctl(argv[0], OBD_DEV_ID, &lcfg);
         if (rc < 0) {
                 fprintf(stderr, "error: %s: %s\n", jt_cmdname(argv[0]),
index fb5db7b..c1dbacd 100644 (file)
@@ -781,7 +781,8 @@ int jt_obd_list(int argc, char **argv)
         FILE *fp = fopen(DEVICES_LIST, "r");
 
         if (fp == NULL) {
-                fprintf(stderr, "error: %s: %s opening "DEVICES_LIST"\n",
+                fprintf(stderr, "error: %s: %s could not open file "
+                        DEVICES_LIST " .\n",
                         jt_cmdname(argv[0]), strerror(rc =  errno));
                 return rc;
         }
@@ -1399,6 +1400,7 @@ int jt_obd_lov_getconfig(int argc, char **argv)
         struct obd_ioctl_data data;
         struct lov_desc desc;
         struct obd_uuid *uuidarray;
+        __u32 *obdgens;
         char *path;
         int rc, fd;
 
@@ -1412,13 +1414,13 @@ int jt_obd_lov_getconfig(int argc, char **argv)
         if (fd < 0) {
                 fprintf(stderr, "open \"%s\" failed: %s\n", path,
                         strerror(errno));
-                return -1;
+                return -errno;
         }
 
         memset(&desc, 0, sizeof(desc));
         obd_str2uuid(&desc.ld_uuid, argv[1]);
         desc.ld_tgt_count = ((OBD_MAX_IOCTL_BUFFER-sizeof(data)-sizeof(desc)) /
-                             sizeof(*uuidarray));
+                             (sizeof(*uuidarray) + sizeof(*obdgens)));
 
 repeat:
         uuidarray = calloc(desc.ld_tgt_count, sizeof(*uuidarray));
@@ -1428,27 +1430,38 @@ repeat:
                 rc = -ENOMEM;
                 goto out;
         }
+        obdgens = calloc(desc.ld_tgt_count, sizeof(*obdgens));
+        if (!obdgens) {
+                fprintf(stderr, "error: %s: no memory for %d generation #'s\n",
+                        jt_cmdname(argv[0]), desc.ld_tgt_count);
+                rc = -ENOMEM;
+                goto out_uuidarray;
+        }
 
         data.ioc_inllen1 = sizeof(desc);
         data.ioc_inlbuf1 = (char *)&desc;
         data.ioc_inllen2 = desc.ld_tgt_count * sizeof(*uuidarray);
         data.ioc_inlbuf2 = (char *)uuidarray;
+        data.ioc_inllen3 = desc.ld_tgt_count * sizeof(*obdgens);
+        data.ioc_inlbuf3 = (char *)obdgens;
 
         if (obd_ioctl_pack(&data, &buf, max)) {
                 fprintf(stderr, "error: %s: invalid ioctl\n",
                         jt_cmdname(argv[0]));
                 rc = -EINVAL;
-                goto out;
+                goto out_obdgens;
         }
         rc = ioctl(fd, OBD_IOC_LOV_GET_CONFIG, buf);
         if (rc == -ENOSPC) {
                 free(uuidarray);
+                free(obdgens);
                 goto repeat;
         } else if (rc) {
                 fprintf(stderr, "error: %s: ioctl error: %s\n",
                         jt_cmdname(argv[0]), strerror(rc = errno));
         } else {
-                struct obd_uuid *ptr;
+                struct obd_uuid *uuidp;
+                __u32 *genp;
                 int i;
 
                 if (obd_ioctl_unpack(&data, buf, max)) {
@@ -1465,11 +1478,17 @@ repeat:
                        desc.ld_default_stripe_offset);
                 printf("default_stripe_pattern: %u\n", desc.ld_pattern);
                 printf("obd_count: %u\n", desc.ld_tgt_count);
-                for (i = 0, ptr = uuidarray; i < desc.ld_tgt_count; i++, ptr++)
-                        printf("%u: %s\n", i, (char *)ptr);
-        }
-out:
+                printf("OBDS:\tobdidx\t\tobdgen\t\t obduuid\n");
+                uuidp = uuidarray;
+                genp = obdgens;
+                for (i = 0; i < desc.ld_tgt_count; i++, uuidp++, genp++)
+                        printf("\t%6u\t%14u\t\t %s\n", i, *genp, (char *)uuidp);
+        }
+out_obdgens:
+        free(obdgens);
+out_uuidarray:
         free(uuidarray);
+out:
         close(fd);
         return rc;
 }
@@ -1828,7 +1847,8 @@ int jt_cfg_endrecord(int argc, char **argv)
                 return CMD_HELP;
 
         if (!jt_recording) {
-                fprintf(stderr, "Not recording, so endrecord doesn't make sense.\n");
+                fprintf(stderr, "Not recording, so endrecord doesn't make "
+                        "sense.\n");
                 return 0;
         }
 
@@ -1855,14 +1875,14 @@ int jt_llog_catlist(int argc, char **argv)
         IOC_INIT(data);
         data.ioc_inllen1 = max - size_round(sizeof(data));
         IOC_PACK(argv[0], data);
-        
+
         rc = l_ioctl(OBD_DEV_ID, OBD_IOC_CATLOGLIST, buf);
-        if (rc == 0) 
+        if (rc == 0)
                 fprintf(stdout, "%s", ((struct obd_ioctl_data*)buf)->ioc_bulk);
         else
-                fprintf(stderr, "OBD_IOC_CATLOGLIST failed: %s\n", 
+                fprintf(stderr, "OBD_IOC_CATLOGLIST failed: %s\n",
                         strerror(errno));
-        
+
         return rc;
 }
 
@@ -1877,7 +1897,7 @@ int jt_llog_info(int argc, char **argv)
         IOC_INIT(data);
         data.ioc_inllen1 = strlen(argv[1]) + 1;
         data.ioc_inlbuf1 = argv[1];
-        data.ioc_inllen2 = max - size_round(sizeof(data)) - 
+        data.ioc_inllen2 = max - size_round(sizeof(data)) -
                 size_round(data.ioc_inllen1);
         IOC_PACK(argv[0], data);
 
index 845a50e..31ed6d8 100644 (file)
@@ -105,6 +105,7 @@ int jt_lcfg_add_uuid(int argc, char **argv);
 int jt_lcfg_del_uuid(int argc, char **argv);
 int jt_lcfg_lov_setup(int argc, char **argv);
 int jt_lcfg_lmv_setup(int argc, char **argv);
+int jt_lcfg_lov_modify_tgts(int argc, char **argv);
 int jt_lcfg_mount_option(int argc, char **argv);
 int jt_lcfg_del_mount_option(int argc, char **argv);
 int jt_lcfg_set_timeout(int argc, char **argv);
index f4c576e..254c96e 100644 (file)
@@ -648,7 +648,6 @@ check_llog_log_hdr(void)
         CHECK_MEMBER(llog_log_hdr, llh_flags);
         CHECK_MEMBER(llog_log_hdr, llh_cat_idx);
         CHECK_MEMBER(llog_log_hdr, llh_tgtuuid);
-        CHECK_MEMBER(llog_log_hdr, llh_reserved);
         CHECK_MEMBER(llog_log_hdr, llh_bitmap);
         CHECK_MEMBER(llog_log_hdr, llh_tail);
 }
@@ -676,7 +675,7 @@ check_llogd_body(void)
         CHECK_MEMBER(llogd_body, lgd_len);
         CHECK_MEMBER(llogd_body, lgd_cur_offset);
 
-        CHECK_VALUE(LLOG_ORIGIN_HANDLE_CREATE);
+        CHECK_VALUE(LLOG_ORIGIN_HANDLE_OPEN);
         CHECK_VALUE(LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
         CHECK_VALUE(LLOG_ORIGIN_HANDLE_READ_HEADER);
         CHECK_VALUE(LLOG_ORIGIN_HANDLE_WRITE_REC);
index 6968cec..a08fb09 100644 (file)
@@ -25,7 +25,7 @@ int main()
 void lustre_assert_wire_constants(void)
 {
         /* Wire protocol assertions generated by 'wirecheck'
-         * running on Linux schnapps.adilger.int 2.4.22-l32 #4 Thu Jan 8 14:32:57 MST 2004 i686 i686 
+         * running on Linux innova.tion.org 2.4.20-30.9-87k.40-HEAD.RC_1_3_0_14.200404051618 #1 Mon A
          * with gcc version 3.2.2 20030222 (Red Hat Linux 3.2.2-5) */
 
 
@@ -208,140 +208,140 @@ void lustre_assert_wire_constants(void)
         /* Checks for struct lustre_handle */
         LASSERTF((int)sizeof(struct lustre_handle) == 8, " found %lld\n",
                  (long long)(int)sizeof(struct lustre_handle));
-        LASSERTF(offsetof(struct lustre_handle, cookie) == 0, " found %lld\n",
-                 (long long)offsetof(struct lustre_handle, cookie));
+        LASSERTF((int)offsetof(struct lustre_handle, cookie) == 0, " found %lld\n",
+                 (long long)(int)offsetof(struct lustre_handle, cookie));
         LASSERTF((int)sizeof(((struct lustre_handle *)0)->cookie) == 8, " found %lld\n",
                  (long long)(int)sizeof(((struct lustre_handle *)0)->cookie));
 
         /* Checks for struct lustre_msg */
         LASSERTF((int)sizeof(struct lustre_msg) == 64, " found %lld\n",
                  (long long)(int)sizeof(struct lustre_msg));
-        LASSERTF(offsetof(struct lustre_msg, handle) == 0, " found %lld\n",
-                 (long long)offsetof(struct lustre_msg, handle));
+        LASSERTF((int)offsetof(struct lustre_msg, handle) == 0, " found %lld\n",
+                 (long long)(int)offsetof(struct lustre_msg, handle));
         LASSERTF((int)sizeof(((struct lustre_msg *)0)->handle) == 8, " found %lld\n",
                  (long long)(int)sizeof(((struct lustre_msg *)0)->handle));
-        LASSERTF(offsetof(struct lustre_msg, magic) == 8, " found %lld\n",
-                 (long long)offsetof(struct lustre_msg, magic));
+        LASSERTF((int)offsetof(struct lustre_msg, magic) == 8, " found %lld\n",
+                 (long long)(int)offsetof(struct lustre_msg, magic));
         LASSERTF((int)sizeof(((struct lustre_msg *)0)->magic) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct lustre_msg *)0)->magic));
-        LASSERTF(offsetof(struct lustre_msg, type) == 12, " found %lld\n",
-                 (long long)offsetof(struct lustre_msg, type));
+        LASSERTF((int)offsetof(struct lustre_msg, type) == 12, " found %lld\n",
+                 (long long)(int)offsetof(struct lustre_msg, type));
         LASSERTF((int)sizeof(((struct lustre_msg *)0)->type) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct lustre_msg *)0)->type));
-        LASSERTF(offsetof(struct lustre_msg, version) == 16, " found %lld\n",
-                 (long long)offsetof(struct lustre_msg, version));
+        LASSERTF((int)offsetof(struct lustre_msg, version) == 16, " found %lld\n",
+                 (long long)(int)offsetof(struct lustre_msg, version));
         LASSERTF((int)sizeof(((struct lustre_msg *)0)->version) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct lustre_msg *)0)->version));
-        LASSERTF(offsetof(struct lustre_msg, opc) == 20, " found %lld\n",
-                 (long long)offsetof(struct lustre_msg, opc));
+        LASSERTF((int)offsetof(struct lustre_msg, opc) == 20, " found %lld\n",
+                 (long long)(int)offsetof(struct lustre_msg, opc));
         LASSERTF((int)sizeof(((struct lustre_msg *)0)->opc) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct lustre_msg *)0)->opc));
-        LASSERTF(offsetof(struct lustre_msg, last_xid) == 24, " found %lld\n",
-                 (long long)offsetof(struct lustre_msg, last_xid));
+        LASSERTF((int)offsetof(struct lustre_msg, last_xid) == 24, " found %lld\n",
+                 (long long)(int)offsetof(struct lustre_msg, last_xid));
         LASSERTF((int)sizeof(((struct lustre_msg *)0)->last_xid) == 8, " found %lld\n",
                  (long long)(int)sizeof(((struct lustre_msg *)0)->last_xid));
-        LASSERTF(offsetof(struct lustre_msg, last_committed) == 32, " found %lld\n",
-                 (long long)offsetof(struct lustre_msg, last_committed));
+        LASSERTF((int)offsetof(struct lustre_msg, last_committed) == 32, " found %lld\n",
+                 (long long)(int)offsetof(struct lustre_msg, last_committed));
         LASSERTF((int)sizeof(((struct lustre_msg *)0)->last_committed) == 8, " found %lld\n",
                  (long long)(int)sizeof(((struct lustre_msg *)0)->last_committed));
-        LASSERTF(offsetof(struct lustre_msg, transno) == 40, " found %lld\n",
-                 (long long)offsetof(struct lustre_msg, transno));
+        LASSERTF((int)offsetof(struct lustre_msg, transno) == 40, " found %lld\n",
+                 (long long)(int)offsetof(struct lustre_msg, transno));
         LASSERTF((int)sizeof(((struct lustre_msg *)0)->transno) == 8, " found %lld\n",
                  (long long)(int)sizeof(((struct lustre_msg *)0)->transno));
-        LASSERTF(offsetof(struct lustre_msg, status) == 48, " found %lld\n",
-                 (long long)offsetof(struct lustre_msg, status));
+        LASSERTF((int)offsetof(struct lustre_msg, status) == 48, " found %lld\n",
+                 (long long)(int)offsetof(struct lustre_msg, status));
         LASSERTF((int)sizeof(((struct lustre_msg *)0)->status) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct lustre_msg *)0)->status));
-        LASSERTF(offsetof(struct lustre_msg, flags) == 52, " found %lld\n",
-                 (long long)offsetof(struct lustre_msg, flags));
+        LASSERTF((int)offsetof(struct lustre_msg, flags) == 52, " found %lld\n",
+                 (long long)(int)offsetof(struct lustre_msg, flags));
         LASSERTF((int)sizeof(((struct lustre_msg *)0)->flags) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct lustre_msg *)0)->flags));
-        LASSERTF(offsetof(struct lustre_msg, bufcount) == 60, " found %lld\n",
-                 (long long)offsetof(struct lustre_msg, bufcount));
+        LASSERTF((int)offsetof(struct lustre_msg, bufcount) == 60, " found %lld\n",
+                 (long long)(int)offsetof(struct lustre_msg, bufcount));
         LASSERTF((int)sizeof(((struct lustre_msg *)0)->bufcount) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct lustre_msg *)0)->bufcount));
-        LASSERTF(offsetof(struct lustre_msg, buflens[7]) == 92, " found %lld\n",
-                 (long long)offsetof(struct lustre_msg, buflens[7]));
+        LASSERTF((int)offsetof(struct lustre_msg, buflens[7]) == 92, " found %lld\n",
+                 (long long)(int)offsetof(struct lustre_msg, buflens[7]));
         LASSERTF((int)sizeof(((struct lustre_msg *)0)->buflens[7]) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct lustre_msg *)0)->buflens[7]));
 
         /* Checks for struct obdo */
         LASSERTF((int)sizeof(struct obdo) == 168, " found %lld\n",
                  (long long)(int)sizeof(struct obdo));
-        LASSERTF(offsetof(struct obdo, o_id) == 0, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_id));
+        LASSERTF((int)offsetof(struct obdo, o_id) == 0, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_id));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_id) == 8, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_id));
-        LASSERTF(offsetof(struct obdo, o_gr) == 8, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_gr));
+        LASSERTF((int)offsetof(struct obdo, o_gr) == 8, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_gr));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_gr) == 8, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_gr));
-        LASSERTF(offsetof(struct obdo, o_atime) == 16, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_atime));
+        LASSERTF((int)offsetof(struct obdo, o_atime) == 16, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_atime));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_atime) == 8, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_atime));
-        LASSERTF(offsetof(struct obdo, o_mtime) == 24, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_mtime));
+        LASSERTF((int)offsetof(struct obdo, o_mtime) == 24, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_mtime));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_mtime) == 8, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_mtime));
-        LASSERTF(offsetof(struct obdo, o_ctime) == 32, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_ctime));
+        LASSERTF((int)offsetof(struct obdo, o_ctime) == 32, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_ctime));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_ctime) == 8, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_ctime));
-        LASSERTF(offsetof(struct obdo, o_size) == 40, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_size));
+        LASSERTF((int)offsetof(struct obdo, o_size) == 40, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_size));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_size) == 8, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_size));
-        LASSERTF(offsetof(struct obdo, o_blocks) == 48, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_blocks));
+        LASSERTF((int)offsetof(struct obdo, o_blocks) == 48, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_blocks));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_blocks) == 8, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_blocks));
-        LASSERTF(offsetof(struct obdo, o_grant) == 56, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_grant));
+        LASSERTF((int)offsetof(struct obdo, o_grant) == 56, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_grant));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_grant) == 8, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_grant));
-        LASSERTF(offsetof(struct obdo, o_blksize) == 64, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_blksize));
+        LASSERTF((int)offsetof(struct obdo, o_blksize) == 64, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_blksize));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_blksize) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_blksize));
-        LASSERTF(offsetof(struct obdo, o_mode) == 68, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_mode));
+        LASSERTF((int)offsetof(struct obdo, o_mode) == 68, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_mode));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_mode) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_mode));
-        LASSERTF(offsetof(struct obdo, o_uid) == 72, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_uid));
+        LASSERTF((int)offsetof(struct obdo, o_uid) == 72, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_uid));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_uid) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_uid));
-        LASSERTF(offsetof(struct obdo, o_gid) == 76, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_gid));
+        LASSERTF((int)offsetof(struct obdo, o_gid) == 76, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_gid));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_gid) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_gid));
-        LASSERTF(offsetof(struct obdo, o_flags) == 80, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_flags));
+        LASSERTF((int)offsetof(struct obdo, o_flags) == 80, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_flags));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_flags) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_flags));
-        LASSERTF(offsetof(struct obdo, o_nlink) == 84, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_nlink));
+        LASSERTF((int)offsetof(struct obdo, o_nlink) == 84, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_nlink));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_nlink) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_nlink));
-        LASSERTF(offsetof(struct obdo, o_generation) == 88, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_generation));
+        LASSERTF((int)offsetof(struct obdo, o_generation) == 88, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_generation));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_generation) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_generation));
-        LASSERTF(offsetof(struct obdo, o_valid) == 92, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_valid));
+        LASSERTF((int)offsetof(struct obdo, o_valid) == 92, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_valid));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_valid) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_valid));
-        LASSERTF(offsetof(struct obdo, o_misc) == 96, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_misc));
+        LASSERTF((int)offsetof(struct obdo, o_misc) == 96, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_misc));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_misc) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_misc));
-        LASSERTF(offsetof(struct obdo, o_easize) == 100, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_easize));
+        LASSERTF((int)offsetof(struct obdo, o_easize) == 100, " found %lld\n",
+                 (long long)(int)offsetof(struct obdo, o_easize));
         LASSERTF((int)sizeof(((struct obdo *)0)->o_easize) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct obdo *)0)->o_easize));
-        LASSERTF(offsetof(struct obdo, o_inline) == 104, " found %lld\n",
-                 (long long)offsetof(struct obdo, o_inline));
+        LASSERTF((int)offsetof(struct obdo, o_inline) == 104, " found %lld\n",
+ &nb