Whamcloud - gitweb
port llog fixes from b1_6 into HEAD
authorshadow <shadow>
Thu, 6 Mar 2008 03:10:22 +0000 (03:10 +0000)
committershadow <shadow>
Thu, 6 Mar 2008 03:10:22 +0000 (03:10 +0000)
b=13821
i=wangdi
i=umka

38 files changed:
lustre/ChangeLog
lustre/include/liblustre.h
lustre/include/lustre_commit_confd.h
lustre/include/lustre_log.h
lustre/include/obd.h
lustre/include/obd_support.h
lustre/liblustre/llite_lib.c
lustre/lmv/lmv_obd.c
lustre/lov/lov_ea.c
lustre/lov/lov_internal.h
lustre/lov/lov_log.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_join.c
lustre/mds/mds_log.c
lustre/mds/mds_lov.c
lustre/mds/mds_reint.c
lustre/mgc/libmgc.c
lustre/mgc/mgc_request.c
lustre/mgs/mgs_handler.c
lustre/mgs/mgs_llog.c
lustre/obdclass/genops.c
lustre/obdclass/llog_obd.c
lustre/obdclass/llog_test.c
lustre/obdclass/obd_config.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_log.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/llog_net.c
lustre/ptlrpc/llog_server.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/recov_thread.c
lustre/tests/replay-single.sh
lustre/tests/run-llog.sh

index d51c83d..0a639f3 100644 (file)
@@ -764,8 +764,7 @@ Details    : Adding total number of sampled request for an MDS node in snmp
 Severity   : enhancement
 Bugzilla   : 14748
 Description: Optimize ldlm waiting list processing for PR extent locks
 Severity   : enhancement
 Bugzilla   : 14748
 Description: Optimize ldlm waiting list processing for PR extent locks
-Details    : When processing waiting list for read extent lock and meeting
-read
+Details    : When processing waiting list for read extent lock and meeting read
              lock that is same or wider to it that is not contended, skip
             processing rest of the list and immediatelly return current
             status of conflictness, since we are guaranteed there are no
              lock that is same or wider to it that is not contended, skip
             processing rest of the list and immediatelly return current
             status of conflictness, since we are guaranteed there are no
@@ -778,6 +777,14 @@ Details    : When the failover node is the primary node, it is possible
             to have two identical connections in imp_conn_list. We must
             compare not conn's pointers but NIDs, otherwise we can defeat
             connection throttling.
             to have two identical connections in imp_conn_list. We must
             compare not conn's pointers but NIDs, otherwise we can defeat
             connection throttling.
+            
+Severity   : normal
+Bugzilla   : 13821
+Description: port llog fixes from b1_6 into HEAD
+Details    : Port llog reference couting and some llog cleanups from b1_6
+             (bug 10800) into HEAD, for protect from panic and access to already
+            free llog structures.
+
 
 --------------------------------------------------------------------------------
 
 
 --------------------------------------------------------------------------------
 
index 2b3533a..a96bc33 100644 (file)
@@ -780,9 +780,6 @@ typedef enum {
 cap_t   cap_get_proc(void);
 int     cap_get_flag(cap_t, cap_value_t, cap_flag_t, cap_flag_value_t *);
 
 cap_t   cap_get_proc(void);
 int     cap_get_flag(cap_t, cap_value_t, cap_flag_t, cap_flag_value_t *);
 
-/* log related */
-static inline int llog_init_commit_master(void) { return 0; }
-static inline int llog_cleanup_commit_master(int force) { return 0; }
 static inline void libcfs_run_lbug_upcall(char *file, const char *fn,
                                            const int l){}
 
 static inline void libcfs_run_lbug_upcall(char *file, const char *fn,
                                            const int l){}
 
index 40b1978..1804615 100644 (file)
@@ -51,8 +51,8 @@ struct llog_commit_daemon {
 };
 
 /* ptlrpc/recov_thread.c */
 };
 
 /* ptlrpc/recov_thread.c */
-int llog_start_commit_thread(void);
-struct llog_canceld_ctxt *llcd_grab(void);
-void llcd_send(struct llog_canceld_ctxt *llcd);
+int llog_start_commit_thread(struct llog_commit_master *);
 
 
+int llog_init_commit_master(struct llog_commit_master *);
+int llog_cleanup_commit_master(struct llog_commit_master *lcm, int force);
 #endif /* _LUSTRE_COMMIT_CONFD_H */
 #endif /* _LUSTRE_COMMIT_CONFD_H */
index e32a840..31bd543 100644 (file)
@@ -118,9 +118,10 @@ int llog_cat_reverse_process(struct llog_handle *cat_llh, llog_cb_t cb, void *da
 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index);
 
 /* llog_obd.c */
 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index);
 
 /* llog_obd.c */
-int llog_setup(struct obd_device *obd, struct obd_llogs *llogs, int index,
+int llog_setup(struct obd_device *obd, struct obd_llog_group *olg, int index,
                struct obd_device *disk_obd, int count,  struct llog_logid *logid,
                struct llog_operations *op);
                struct obd_device *disk_obd, int count,  struct llog_logid *logid,
                struct llog_operations *op);
+int __llog_ctxt_put(struct llog_ctxt *ctxt);
 int llog_cleanup(struct llog_ctxt *);
 int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
 int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
 int llog_cleanup(struct llog_ctxt *);
 int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
 int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
@@ -129,7 +130,7 @@ int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
 int llog_cancel(struct llog_ctxt *, struct lov_stripe_md *lsm,
                 int count, struct llog_cookie *cookies, int flags);
 
 int llog_cancel(struct llog_ctxt *, struct lov_stripe_md *lsm,
                 int count, struct llog_cookie *cookies, int flags);
 
-int llog_obd_origin_setup(struct obd_device *obd, struct obd_llogs *llogs
+int llog_obd_origin_setup(struct obd_device *obd, struct obd_llog_group *olg
                           int index, struct obd_device *disk_obd, int count,
                           struct llog_logid *logid);
 int llog_obd_origin_cleanup(struct llog_ctxt *ctxt);
                           int index, struct obd_device *disk_obd, int count,
                           struct llog_logid *logid);
 int llog_obd_origin_cleanup(struct llog_ctxt *ctxt);
@@ -137,9 +138,9 @@ int llog_obd_origin_add(struct llog_ctxt *ctxt,
                         struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
                         struct llog_cookie *logcookies, int numcookies);
 
                         struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
                         struct llog_cookie *logcookies, int numcookies);
 
-int llog_cat_initialize(struct obd_device *obd, struct obd_llogs *llogs,
+int llog_cat_initialize(struct obd_device *obd, struct obd_llog_group *olg,
                         int count, struct obd_uuid *uuid);
                         int count, struct obd_uuid *uuid);
-int obd_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+int obd_llog_init(struct obd_device *obd, int group,
                   struct obd_device *disk_obd, int count, 
                   struct llog_catid *logid, struct obd_uuid *uuid);
 
                   struct obd_device *disk_obd, int count, 
                   struct llog_catid *logid, struct obd_uuid *uuid);
 
@@ -182,7 +183,7 @@ struct llog_operations {
         int (*lop_close)(struct llog_handle *handle);
         int (*lop_read_header)(struct llog_handle *handle);
 
         int (*lop_close)(struct llog_handle *handle);
         int (*lop_read_header)(struct llog_handle *handle);
 
-        int (*lop_setup)(struct obd_device *obd, struct obd_llogs *llogs
+        int (*lop_setup)(struct obd_device *obd, struct obd_llog_group *olg
                          int ctxt_idx, struct obd_device *disk_obd, int count,
                          struct llog_logid *logid);
         int (*lop_sync)(struct llog_ctxt *ctxt, struct obd_export *exp);
                          int ctxt_idx, struct obd_device *disk_obd, int count,
                          struct llog_logid *logid);
         int (*lop_sync)(struct llog_ctxt *ctxt, struct obd_export *exp);
@@ -207,6 +208,7 @@ struct llog_ctxt {
         int                      loc_idx; /* my index the obd array of ctxt's */
         struct llog_gen          loc_gen;
         struct obd_device       *loc_obd; /* points back to the containing obd*/
         int                      loc_idx; /* my index the obd array of ctxt's */
         struct llog_gen          loc_gen;
         struct obd_device       *loc_obd; /* points back to the containing obd*/
+        struct obd_llog_group     *loc_olg; /* group containing that ctxt */
         struct obd_export       *loc_exp; /* parent "disk" export (e.g. MDS) */
         struct obd_import       *loc_imp; /* to use in RPC's: can be backward
                                              pointing import */
         struct obd_export       *loc_exp; /* parent "disk" export (e.g. MDS) */
         struct obd_import       *loc_imp; /* to use in RPC's: can be backward
                                              pointing import */
@@ -214,6 +216,8 @@ struct llog_ctxt {
         struct llog_handle      *loc_handle;
         struct llog_canceld_ctxt *loc_llcd;
         struct semaphore         loc_sem; /* protects loc_llcd and loc_imp */
         struct llog_handle      *loc_handle;
         struct llog_canceld_ctxt *loc_llcd;
         struct semaphore         loc_sem; /* protects loc_llcd and loc_imp */
+        atomic_t                   loc_refcount;
+        struct llog_commit_master *loc_lcm;
         void                    *llog_proc_cb;
 };
 
         void                    *llog_proc_cb;
 };
 
@@ -277,21 +281,93 @@ static inline int llog_data_len(int len)
         return size_round(len);
 }
 
         return size_round(len);
 }
 
+static inline struct llog_ctxt *llog_ctxt_get(struct llog_ctxt *ctxt)
+{
+        LASSERT(atomic_read(&ctxt->loc_refcount) > 0);
+        atomic_inc(&ctxt->loc_refcount);
+        CDEBUG(D_INFO, "GETting ctxt %p : new refcount %d\n", ctxt,
+               atomic_read(&ctxt->loc_refcount));
+        return ctxt;
+}
+
+static inline void llog_ctxt_put(struct llog_ctxt *ctxt)
+{
+        if (ctxt == NULL)
+                return;
+        CDEBUG(D_INFO, "PUTting ctxt %p : new refcount %d\n", ctxt,
+               atomic_read(&ctxt->loc_refcount) - 1);
+        LASSERT(atomic_read(&ctxt->loc_refcount) > 0);
+        LASSERT(atomic_read(&ctxt->loc_refcount) < 0x5a5a5a);
+        __llog_ctxt_put(ctxt);
+}
+
+static inline void llog_group_init(struct obd_llog_group *olg, int group)
+{
+        cfs_waitq_init(&olg->olg_waitq);
+        spin_lock_init(&olg->olg_lock);
+        olg->olg_group = group;
+}
+
+static inline void llog_group_set_export(struct obd_llog_group *olg,
+                                         struct obd_export *exp)
+{
+        LASSERT(exp != NULL);
+        
+        spin_lock(&olg->olg_lock);
+        if (olg->olg_exp != NULL && olg->olg_exp != exp)
+                CWARN("%s: export for group %d is changed: 0x%p -> 0x%p\n",
+                      exp->exp_obd->obd_name, olg->olg_group,
+                      olg->olg_exp, exp);
+        olg->olg_exp = exp;
+        spin_unlock(&olg->olg_lock);
+}
+
+static inline int llog_group_set_ctxt(struct obd_llog_group *olg,
+                                      struct llog_ctxt *ctxt, int index)
+{
+        LASSERT(index >= 0 && index < LLOG_MAX_CTXTS);
+
+        spin_lock(&olg->olg_lock);  
+        if (olg->olg_ctxts[index] != NULL) {
+                spin_unlock(&olg->olg_lock);
+                return -EEXIST;
+        }
+        olg->olg_ctxts[index] = ctxt;
+        spin_unlock(&olg->olg_lock);
+        return 0;
+}
+
+static inline struct llog_ctxt *llog_group_get_ctxt(struct obd_llog_group *olg,
+                                                    int index)
+{
+        struct llog_ctxt *ctxt;
+
+        LASSERT(index >= 0 && index < LLOG_MAX_CTXTS);
+
+        spin_lock(&olg->olg_lock);  
+        if (olg->olg_ctxts[index] == NULL) {
+                ctxt = NULL;
+        } else {
+                ctxt = llog_ctxt_get(olg->olg_ctxts[index]);
+        }
+        spin_unlock(&olg->olg_lock);
+        return ctxt;
+}
+
 static inline struct llog_ctxt *llog_get_context(struct obd_device *obd,
                                                  int index)
 {
 static inline struct llog_ctxt *llog_get_context(struct obd_device *obd,
                                                  int index)
 {
-        if (index < 0 || index >= LLOG_MAX_CTXTS)
-                return NULL;
+        return llog_group_get_ctxt(&obd->obd_olg, index);
+}
 
 
-        return obd->obd_llog_ctxt[index];
+static inline int llog_group_ctxt_null(struct obd_llog_group *olg, int index)
+{
+        return (olg->olg_ctxts[index] == NULL);
 }
 
 }
 
-static inline struct llog_ctxt *
-llog_get_context_from_llogs(struct obd_llogs *llogs, int index)
-{                
-        if (index < 0 || index >= LLOG_MAX_CTXTS)
-                return NULL;
-        return llogs->llog_ctxt[index];
+static inline int llog_ctxt_null(struct obd_device *obd, int index)
+{
+        return (llog_group_ctxt_null(&obd->obd_olg, index));
 }
 
 static inline int llog_write_rec(struct llog_handle *handle,
 }
 
 static inline int llog_write_rec(struct llog_handle *handle,
index fef125f..c9041ee 100644 (file)
@@ -255,28 +255,22 @@ struct obd_device_target {
 /* llog contexts */
 enum llog_ctxt_id {
         LLOG_CONFIG_ORIG_CTXT  =  0,
 /* llog contexts */
 enum llog_ctxt_id {
         LLOG_CONFIG_ORIG_CTXT  =  0,
-        LLOG_CONFIG_REPL_CTXT  =  1,
-        LLOG_MDS_OST_ORIG_CTXT =  2,
-        LLOG_MDS_OST_REPL_CTXT =  3,
-        LLOG_SIZE_ORIG_CTXT    =  4,
-        LLOG_SIZE_REPL_CTXT    =  5,
-        LLOG_MD_ORIG_CTXT      =  6,
-        LLOG_MD_REPL_CTXT      =  7,
-        LLOG_RD1_ORIG_CTXT     =  8,
-        LLOG_RD1_REPL_CTXT     =  9,
-        LLOG_TEST_ORIG_CTXT    = 10,
-        LLOG_TEST_REPL_CTXT    = 11,
-        LLOG_LOVEA_ORIG_CTXT   = 12,
-        LLOG_LOVEA_REPL_CTXT   = 13,
+        LLOG_CONFIG_REPL_CTXT,
+        LLOG_MDS_OST_ORIG_CTXT,
+        LLOG_MDS_OST_REPL_CTXT,
+        LLOG_SIZE_ORIG_CTXT,
+        LLOG_SIZE_REPL_CTXT,
+        LLOG_RD1_ORIG_CTXT,
+        LLOG_RD1_REPL_CTXT,
+        LLOG_TEST_ORIG_CTXT,
+        LLOG_TEST_REPL_CTXT,
+        LLOG_LOVEA_ORIG_CTXT,
+        LLOG_LOVEA_REPL_CTXT,
         LLOG_MAX_CTXTS
 };
 
 #define FILTER_SUBDIR_COUNT      32            /* set to zero for no subdirs */
 
         LLOG_MAX_CTXTS
 };
 
 #define FILTER_SUBDIR_COUNT      32            /* set to zero for no subdirs */
 
-#define FILTER_GROUP_LLOG 1
-#define FILTER_GROUP_ECHO 2
-#define FILTER_GROUP_MDS0 3
-
 struct filter_subdirs {
        cfs_dentry_t *dentry[FILTER_SUBDIR_COUNT];
 };
 struct filter_subdirs {
        cfs_dentry_t *dentry[FILTER_SUBDIR_COUNT];
 };
@@ -287,17 +281,6 @@ struct filter_ext {
         __u64                fe_end;
 };
 
         __u64                fe_end;
 };
 
-struct obd_llogs {
-        struct llog_ctxt        *llog_ctxt[LLOG_MAX_CTXTS];
-};
-
-struct filter_group_llog {
-        struct list_head list;
-        int group;
-        struct obd_llogs *llogs;
-        struct obd_export *exp;
-};
-
 struct filter_obd {
         /* NB this field MUST be first */
         struct obd_device_target fo_obt;
 struct filter_obd {
         /* NB this field MUST be first */
         struct obd_device_target fo_obt;
@@ -378,6 +361,8 @@ struct filter_obd {
         unsigned int             fo_fl_oss_capa;
         struct list_head         fo_capa_keys;
         struct hlist_head       *fo_capa_hash;
         unsigned int             fo_fl_oss_capa;
         struct list_head         fo_capa_keys;
         struct hlist_head       *fo_capa_hash;
+
+        void                    *fo_lcm;
 };
 
 #define OSC_MAX_RIF_DEFAULT       8
 };
 
 #define OSC_MAX_RIF_DEFAULT       8
@@ -520,7 +505,6 @@ struct mds_obd {
         cfs_dentry_t                    *mds_logs_dir;
         cfs_dentry_t                    *mds_objects_dir;
         struct llog_handle              *mds_cfg_llh;
         cfs_dentry_t                    *mds_logs_dir;
         cfs_dentry_t                    *mds_objects_dir;
         struct llog_handle              *mds_cfg_llh;
-//        struct llog_handle              *mds_catalog;
         struct obd_device               *mds_osc_obd; /* XXX lov_obd */
         struct obd_uuid                  mds_lov_uuid;
         char                            *mds_profile;
         struct obd_device               *mds_osc_obd; /* XXX lov_obd */
         struct obd_uuid                  mds_lov_uuid;
         char                            *mds_profile;
@@ -841,6 +825,23 @@ struct target_recovery_data {
         struct completion trd_finishing;
 };
 
         struct completion trd_finishing;
 };
 
+#define OBD_LLOG_GROUP  0
+
+enum filter_groups {
+        FILTER_GROUP_LLOG = 1,
+        FILTER_GROUP_ECHO,
+        FILTER_GROUP_MDS0
+};
+
+struct obd_llog_group {
+        struct list_head   olg_list;
+        int                olg_group;
+        struct llog_ctxt  *olg_ctxts[LLOG_MAX_CTXTS];
+        cfs_waitq_t        olg_waitq;
+        spinlock_t         olg_lock;
+        struct obd_export *olg_exp;
+};
+
 /* corresponds to one of the obd's */
 #define MAX_OBD_NAME 128
 #define OBD_DEVICE_MAGIC        0XAB5CD6EF
 /* corresponds to one of the obd's */
 #define MAX_OBD_NAME 128
 #define OBD_DEVICE_MAGIC        0XAB5CD6EF
@@ -890,7 +891,7 @@ struct obd_device {
         struct obd_statfs       obd_osfs;       /* locked by obd_osfs_lock */
         __u64                   obd_osfs_age;
         struct lvfs_run_ctxt    obd_lvfs_ctxt;
         struct obd_statfs       obd_osfs;       /* locked by obd_osfs_lock */
         __u64                   obd_osfs_age;
         struct lvfs_run_ctxt    obd_lvfs_ctxt;
-        struct llog_ctxt        *obd_llog_ctxt[LLOG_MAX_CTXTS];
+        struct obd_llog_group   obd_olg; /* default llog group */
         struct obd_device       *obd_observer;
         struct obd_notify_upcall obd_upcall;
         struct obd_export       *obd_self_export;
         struct obd_device       *obd_observer;
         struct obd_notify_upcall obd_upcall;
         struct obd_export       *obd_self_export;
@@ -1195,7 +1196,7 @@ struct obd_ops {
                              int cmd, obd_off *);
 
         /* llog related obd_methods */
                              int cmd, obd_off *);
 
         /* llog related obd_methods */
-        int (*o_llog_init)(struct obd_device *obd, struct obd_llogs *llog,
+        int (*o_llog_init)(struct obd_device *obd, int group,
                            struct obd_device *disk_obd, int count,
                            struct llog_catid *logid, struct obd_uuid *uuid);
         int (*o_llog_finish)(struct obd_device *obd, int count);
                            struct obd_device *disk_obd, int count,
                            struct llog_catid *logid, struct obd_uuid *uuid);
         int (*o_llog_finish)(struct obd_device *obd, int count);
index ce5ecf0..d029d99 100644 (file)
@@ -141,6 +141,7 @@ int __obd_fail_check_set(__u32 id, __u32 value, int set);
 #define OBD_FAIL_MDS_OSC_PRECREATE       0x13d
 #define OBD_FAIL_MDS_LOV_SYNC_RACE       0x13e
 #define OBD_FAIL_MDS_CLOSE_NET_REP       0x13f
 #define OBD_FAIL_MDS_OSC_PRECREATE       0x13d
 #define OBD_FAIL_MDS_LOV_SYNC_RACE       0x13e
 #define OBD_FAIL_MDS_CLOSE_NET_REP       0x13f
+#define OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT   0x140
 
 #define OBD_FAIL_OST                     0x200
 #define OBD_FAIL_OST_CONNECT_NET         0x201
 
 #define OBD_FAIL_OST                     0x200
 #define OBD_FAIL_OST_CONNECT_NET         0x201
@@ -175,6 +176,8 @@ int __obd_fail_check_set(__u32 id, __u32 value, int set);
 #define OBD_FAIL_OST_SETATTR_CREDITS     0x21e
 #define OBD_FAIL_OST_HOLD_WRITE_RPC      0x21f
 #define OBD_FAIL_OST_BRW_WRITE_BULK2     0x220
 #define OBD_FAIL_OST_SETATTR_CREDITS     0x21e
 #define OBD_FAIL_OST_HOLD_WRITE_RPC      0x21f
 #define OBD_FAIL_OST_BRW_WRITE_BULK2     0x220
+#define OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT 0x221
+#define OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT 0x222
 
 #define OBD_FAIL_LDLM                    0x300
 #define OBD_FAIL_LDLM_NAMESPACE_NEW      0x301
 
 #define OBD_FAIL_LDLM                    0x300
 #define OBD_FAIL_LDLM_NAMESPACE_NEW      0x301
index 9e5de90..3c7c3b0 100644 (file)
@@ -172,9 +172,10 @@ int liblustre_process_log(struct config_llog_instance *cfg,
 
         exp = class_conn2export(&mgc_conn);
 
 
         exp = class_conn2export(&mgc_conn);
 
-        ctxt = exp->exp_obd->obd_llog_ctxt[LLOG_CONFIG_REPL_CTXT];
+        ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
         cfg->cfg_flags |= CFG_F_COMPAT146;
         rc = class_config_parse_llog(ctxt, profile, cfg);
         cfg->cfg_flags |= CFG_F_COMPAT146;
         rc = class_config_parse_llog(ctxt, profile, cfg);
+        llog_ctxt_put(ctxt);
         if (rc) {
                 CERROR("class_config_parse_llog failed: rc = %d\n", rc);
         }
         if (rc) {
                 CERROR("class_config_parse_llog failed: rc = %d\n", rc);
         }
index 0d6a088..cc42113 100644 (file)
@@ -498,7 +498,7 @@ int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid)
                         RETURN(-EINVAL);
                 }
 
                         RETURN(-EINVAL);
                 }
 
-                rc = obd_llog_init(obd, NULL, mdc_obd, 0, NULL, tgt_uuid);
+                rc = obd_llog_init(obd, OBD_LLOG_GROUP, mdc_obd, 0, NULL, tgt_uuid);
                 if (rc) {
                         lmv_init_unlock(lmv);
                         CERROR("lmv failed to setup llogging subsystems\n");
                 if (rc) {
                         lmv_init_unlock(lmv);
                         CERROR("lmv failed to setup llogging subsystems\n");
@@ -2467,30 +2467,39 @@ repeat:
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-static int lmv_llog_init(struct obd_device *obd, struct obd_llogs* llogs,
+static int lmv_llog_init(struct obd_device *obd, int group,
                          struct obd_device *tgt, int count,
                          struct llog_catid *logid, struct obd_uuid *uuid)
 {
                          struct obd_device *tgt, int count,
                          struct llog_catid *logid, struct obd_uuid *uuid)
 {
+#if 0
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
 
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
 
-        rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+        LASSERT(group == OBD_LLOG_GROUP);
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
                         &llog_client_ops);
         if (rc == 0) {
                         &llog_client_ops);
         if (rc == 0) {
-                ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
-                ctxt->loc_imp = tgt->u.cli.cl_import;
+                ctxt = llog_group_get_ctxt(&obd->obd_olg, LLOG_CONFIG_REPL_CTXT);
+                llog_initiator_connect(ctxt, tgt);
+                llog_ctxt_put(ctxt);
         }
         }
-
         RETURN(rc);
         RETURN(rc);
+#else
+        return 0;
+#endif
 }
 
 static int lmv_llog_finish(struct obd_device *obd, int count)
 {
 }
 
 static int lmv_llog_finish(struct obd_device *obd, int count)
 {
-        int rc;
+        struct llog_ctxt *ctxt;
+        int rc = 0;
         ENTRY;
 
         ENTRY;
 
-        rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
+        ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
+        if (ctxt)
+                rc = llog_cleanup(ctxt);
+
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
index 3c4ac5b..b417ae0 100755 (executable)
@@ -470,7 +470,7 @@ static int lsm_revalidate_join(struct lov_stripe_md *lsm,
         LASSERT(ctxt);
 
         if (lsm->lsm_array && lsm->lsm_array->lai_ext_array)
         LASSERT(ctxt);
 
         if (lsm->lsm_array && lsm->lsm_array->lai_ext_array)
-                RETURN(0);
+                GOTO(release_ctxt, rc = 0);
 
         CDEBUG(D_INFO, "get lsm logid: "LPU64":"LPU64"\n",
                lsm->lsm_array->lai_array_id.lgl_oid,
 
         CDEBUG(D_INFO, "get lsm logid: "LPU64":"LPU64"\n",
                lsm->lsm_array->lai_array_id.lgl_oid,
@@ -478,7 +478,7 @@ static int lsm_revalidate_join(struct lov_stripe_md *lsm,
         OBD_ALLOC(lsm->lsm_array->lai_ext_array,lsm->lsm_array->lai_ext_count *
                                                 sizeof (struct lov_extent));
         if (!lsm->lsm_array->lai_ext_array)
         OBD_ALLOC(lsm->lsm_array->lai_ext_array,lsm->lsm_array->lai_ext_count *
                                                 sizeof (struct lov_extent));
         if (!lsm->lsm_array->lai_ext_array)
-                RETURN(-ENOMEM);
+                GOTO(release_ctxt, rc = -ENOMEM);        
 
         CDEBUG(D_INFO, "get lsm logid: "LPU64":"LPU64"\n",
                lsm->lsm_array->lai_array_id.lgl_oid,
 
         CDEBUG(D_INFO, "get lsm logid: "LPU64":"LPU64"\n",
                lsm->lsm_array->lai_array_id.lgl_oid,
@@ -499,6 +499,8 @@ static int lsm_revalidate_join(struct lov_stripe_md *lsm,
 out:
         if (rc)
                 lovea_free_array_info(lsm);
 out:
         if (rc)
                 lovea_free_array_info(lsm);
+release_ctxt:
+        llog_ctxt_put(ctxt);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -511,16 +513,15 @@ int lsm_destroy_join(struct lov_stripe_md *lsm, struct obdo *oa,
         ENTRY;
 
         LASSERT(md_exp != NULL);
         ENTRY;
 
         LASSERT(md_exp != NULL);
+        /*for those orphan inode, we should keep array id*/
+        if (!(oa->o_valid & OBD_MD_FLCOOKIE))
+                RETURN(rc);
+
         ctxt = llog_get_context(md_exp->exp_obd, LLOG_LOVEA_REPL_CTXT);
         if (!ctxt)
         ctxt = llog_get_context(md_exp->exp_obd, LLOG_LOVEA_REPL_CTXT);
         if (!ctxt)
-                GOTO(out, rc = -EINVAL);
+                RETURN(-EINVAL);
 
         LASSERT(lsm->lsm_array != NULL);
 
         LASSERT(lsm->lsm_array != NULL);
-        /*for those orphan inode, we should keep array id*/
-        if (!(oa->o_valid & OBD_MD_FLCOOKIE))
-                RETURN(0);
-
-        LASSERT(ctxt != NULL);
         rc = llog_create(ctxt, &llh, &lsm->lsm_array->lai_array_id,
                          NULL);
         if (rc)
         rc = llog_create(ctxt, &llh, &lsm->lsm_array->lai_array_id,
                          NULL);
         if (rc)
@@ -532,6 +533,7 @@ int lsm_destroy_join(struct lov_stripe_md *lsm, struct obdo *oa,
         }
         llog_free_handle(llh);
 out:
         }
         llog_free_handle(llh);
 out:
+        llog_ctxt_put(ctxt);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
index cfe3370..c8a9713 100644 (file)
@@ -232,7 +232,7 @@ void lov_getref(struct obd_device *obd);
 void lov_putref(struct obd_device *obd);
 
 /* lov_log.c */
 void lov_putref(struct obd_device *obd);
 
 /* lov_log.c */
-int lov_llog_init(struct obd_device *obd, struct obd_llogs *llogs
+int lov_llog_init(struct obd_device *obd, int group
                   struct obd_device *tgt, int count, struct llog_catid *logid, 
                   struct obd_uuid *uuid);
 int lov_llog_finish(struct obd_device *obd, int count);
                   struct obd_device *tgt, int count, struct llog_catid *logid, 
                   struct obd_uuid *uuid);
 int lov_llog_finish(struct obd_device *obd, int count);
index 90603db..8c2186b 100644 (file)
@@ -92,6 +92,7 @@ static int lov_llog_origin_add(struct llog_ctxt *ctxt,
                 LASSERT(lsm->lsm_object_gr == loi->loi_gr);
                 rc1 = llog_add(cctxt, rec, NULL, logcookies + rc,
                                numcookies - rc);
                 LASSERT(lsm->lsm_object_gr == loi->loi_gr);
                 rc1 = llog_add(cctxt, rec, NULL, logcookies + rc,
                                numcookies - rc);
+                llog_ctxt_put(cctxt);
                 if (rc1 < 0)
                         RETURN(rc1);
                 rc += rc1;
                 if (rc1 < 0)
                         RETURN(rc1);
                 rc += rc1;
@@ -123,6 +124,8 @@ static int lov_llog_origin_connect(struct llog_ctxt *ctxt, int count,
                 child = lov->lov_tgts[i]->ltd_exp->exp_obd;
                 cctxt = llog_get_context(child, ctxt->loc_idx);
                 rc = llog_connect(cctxt, 1, logid, gen, uuid);
                 child = lov->lov_tgts[i]->ltd_exp->exp_obd;
                 cctxt = llog_get_context(child, ctxt->loc_idx);
                 rc = llog_connect(cctxt, 1, logid, gen, uuid);
+                llog_ctxt_put(cctxt);
                 if (rc) {
                         CERROR("error osc_llog_connect tgt %d (%d)\n", i, rc);
                         if (!err) 
                 if (rc) {
                         CERROR("error osc_llog_connect tgt %d (%d)\n", i, rc);
                         if (!err) 
@@ -157,6 +160,7 @@ static int lov_llog_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *ls
                 int err;
 
                 err = llog_cancel(cctxt, NULL, 1, cookies, flags);
                 int err;
 
                 err = llog_cancel(cctxt, NULL, 1, cookies, flags);
+                llog_ctxt_put(cctxt);
                 if (err && lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CERROR("error: objid "LPX64" subobj "LPX64
                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
                 if (err && lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CERROR("error: objid "LPX64" subobj "LPX64
                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
@@ -178,7 +182,7 @@ static struct llog_operations lov_size_repl_logops = {
         lop_cancel: lov_llog_repl_cancel
 };
 
         lop_cancel: lov_llog_repl_cancel
 };
 
-int lov_llog_init(struct obd_device *obd, struct obd_llogs *llogs
+int lov_llog_init(struct obd_device *obd, int group
                   struct obd_device *tgt, int count, struct llog_catid *logid, 
                   struct obd_uuid *uuid)
 {
                   struct obd_device *tgt, int count, struct llog_catid *logid, 
                   struct obd_uuid *uuid)
 {
@@ -187,12 +191,13 @@ int lov_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
         int i, rc = 0, err = 0;
         ENTRY;
 
         int i, rc = 0, err = 0;
         ENTRY;
 
-        rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
+        LASSERT(group == OBD_LLOG_GROUP);
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
                         &lov_mds_ost_orig_logops);
         if (rc)
                 RETURN(rc);
 
                         &lov_mds_ost_orig_logops);
         if (rc)
                 RETURN(rc);
 
-        rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
                         &lov_size_repl_logops);
         if (rc)
                 RETURN(rc);
                         &lov_size_repl_logops);
         if (rc)
                 RETURN(rc);
@@ -207,7 +212,7 @@ int lov_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
                 CDEBUG(D_CONFIG, "init %d/%d\n", i, count);
                 LASSERT(lov->lov_tgts[i]->ltd_exp);
                 child = lov->lov_tgts[i]->ltd_exp->exp_obd;
                 CDEBUG(D_CONFIG, "init %d/%d\n", i, count);
                 LASSERT(lov->lov_tgts[i]->ltd_exp);
                 child = lov->lov_tgts[i]->ltd_exp->exp_obd;
-                rc = obd_llog_init(child, llogs, tgt, 1, logid + i, uuid);
+                rc = obd_llog_init(child, group, tgt, 1, logid + i, uuid);
                 if (rc) {
                         CERROR("error osc_llog_init idx %d osc '%s' tgt '%s' "
                                "(rc=%d)\n", i, child->obd_name, tgt->obd_name,
                 if (rc) {
                         CERROR("error osc_llog_init idx %d osc '%s' tgt '%s' "
                                "(rc=%d)\n", i, child->obd_name, tgt->obd_name,
index 9228231..b0f3159 100644 (file)
@@ -1037,6 +1037,7 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         case OBD_IOC_PARSE: {
                 ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
         case OBD_IOC_PARSE: {
                 ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
+                llog_ctxt_put(ctxt);
                 GOTO(out, rc);
         }
 #ifdef __KERNEL__
                 GOTO(out, rc);
         }
 #ifdef __KERNEL__
@@ -1044,7 +1045,7 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         case OBD_IOC_LLOG_PRINT: {
                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
                 rc = llog_ioctl(ctxt, cmd, data);
         case OBD_IOC_LLOG_PRINT: {
                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
                 rc = llog_ioctl(ctxt, cmd, data);
-
+                llog_ctxt_put(ctxt);
                 GOTO(out, rc);
         }
 #endif
                 GOTO(out, rc);
         }
 #endif
@@ -1515,7 +1516,7 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
         sptlrpc_lprocfs_cliobd_attach(obd);
         ptlrpc_lprocfs_register_obd(obd);
 
         sptlrpc_lprocfs_cliobd_attach(obd);
         ptlrpc_lprocfs_register_obd(obd);
 
-        rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL);
+        rc = obd_llog_init(obd, OBD_LLOG_GROUP, obd, 0, NULL, NULL);
         if (rc) {
                 mdc_cleanup(obd);
                 CERROR("failed to setup llogging subsystems\n");
         if (rc) {
                 mdc_cleanup(obd);
                 CERROR("failed to setup llogging subsystems\n");
@@ -1602,26 +1603,24 @@ static int mdc_cleanup(struct obd_device *obd)
 }
 
 
 }
 
 
-static int mdc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+static int mdc_llog_init(struct obd_device *obd, int group,
                          struct obd_device *tgt, int count,
                          struct llog_catid *logid, struct obd_uuid *uuid)
 {
         struct llog_ctxt *ctxt;
                          struct obd_device *tgt, int count,
                          struct llog_catid *logid, struct obd_uuid *uuid)
 {
         struct llog_ctxt *ctxt;
+        struct obd_llog_group *olg = &obd->obd_olg;
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
-                        &llog_client_ops);
-        if (rc == 0) {
-                ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
-                ctxt->loc_imp = obd->u.cli.cl_import;
-        }
+        LASSERT(group == OBD_LLOG_GROUP);
+        LASSERT(olg->olg_group == group);
 
 
-        rc = llog_setup(obd, llogs, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL,
-                       &llog_client_ops);
+        rc = llog_setup(obd, olg, LLOG_LOVEA_REPL_CTXT, tgt, 0,
+                        NULL, &llog_client_ops);
         if (rc == 0) {
                 ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
         if (rc == 0) {
                 ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
-                ctxt->loc_imp = obd->u.cli.cl_import;
+                llog_initiator_connect(ctxt);
+                llog_ctxt_put(ctxt);
         }
 
         RETURN(rc);
         }
 
         RETURN(rc);
@@ -1629,14 +1628,14 @@ static int mdc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
 
 static int mdc_llog_finish(struct obd_device *obd, int count)
 {
 
 static int mdc_llog_finish(struct obd_device *obd, int count)
 {
-        int rc;
+        struct llog_ctxt *ctxt;
+        int rc = 0;
         ENTRY;
 
         ENTRY;
 
-        rc = llog_cleanup(llog_get_context(obd, LLOG_LOVEA_REPL_CTXT));
-        if (rc) {
-                CERROR("can not cleanup LLOG_CONFIG_REPL_CTXT rc %d\n", rc);
-        }
-        rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
+        ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
+        if (ctxt)
+                rc = llog_cleanup(ctxt);
+
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
index 1f52b90..0f3abd5 100644 (file)
@@ -1927,13 +1927,9 @@ static void fsoptions_to_mds_flags(struct mds_obd *mds, char *options)
 }
 static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg)
 {
 }
 static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg)
 {
-        int rc;
+        int rc = 0;
         ENTRY;
 
         ENTRY;
 
-        rc = llog_start_commit_thread();
-        if (rc < 0)
-                RETURN(rc);
-
         if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
                 class_uuid_t uuid;
 
         if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
                 class_uuid_t uuid;
 
@@ -2161,12 +2157,12 @@ static int mds_postsetup(struct obd_device *obd)
         int rc = 0;
         ENTRY;
 
         int rc = 0;
         ENTRY;
 
-        rc = llog_setup(obd, NULL, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
                         &llog_lvfs_ops);
         if (rc)
                 RETURN(rc);
 
                         &llog_lvfs_ops);
         if (rc)
                 RETURN(rc);
 
-        rc = llog_setup(obd, NULL, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
                         &llog_lvfs_ops);
         if (rc)
                 RETURN(rc);
                         &llog_lvfs_ops);
         if (rc)
                 RETURN(rc);
@@ -2206,7 +2202,8 @@ int mds_postrecov(struct obd_device *obd)
                 RETURN(0);
 
         LASSERT(!obd->obd_recovering);
                 RETURN(0);
 
         LASSERT(!obd->obd_recovering);
-        LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
+        LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
+
         /* clean PENDING dir */
         if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
                 rc = mds_cleanup_pending(obd);
         /* clean PENDING dir */
         if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
                 rc = mds_cleanup_pending(obd);
index 37601a4..95efae2 100644 (file)
@@ -177,7 +177,7 @@ int mds_cleanup_pending(struct obd_device *obd);
 
 
 /* mds/mds_log.c */
 
 
 /* mds/mds_log.c */
-int mds_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+int mds_llog_init(struct obd_device *obd, int group,
                   struct obd_device *tgt, int count,
                   struct llog_catid *logid, struct obd_uuid *uuid);
 int mds_llog_finish(struct obd_device *obd, int count);
                   struct obd_device *tgt, int count,
                   struct llog_catid *logid, struct obd_uuid *uuid);
 int mds_llog_finish(struct obd_device *obd, int count);
index 0eae80f..9de0bb5 100644 (file)
@@ -345,7 +345,7 @@ int mds_join_file(struct mds_update_record *rec, struct ptlrpc_request *req,
         struct lov_mds_md_join *head_lmmj = NULL, *tail_lmmj = NULL;
         int lmm_size, rc = 0, cleanup_phase = 0, size;
         struct llog_handle *llh_head = NULL, *llh_tail = NULL;
         struct lov_mds_md_join *head_lmmj = NULL, *tail_lmmj = NULL;
         int lmm_size, rc = 0, cleanup_phase = 0, size;
         struct llog_handle *llh_head = NULL, *llh_tail = NULL;
-        struct llog_ctxt *ctxt;
+        struct llog_ctxt *ctxt = NULL;
         struct mds_rec_join *join_rec;
         ENTRY;
 
         struct mds_rec_join *join_rec;
         ENTRY;
 
@@ -394,6 +394,7 @@ int mds_join_file(struct mds_update_record *rec, struct ptlrpc_request *req,
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         ctxt = llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT);
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         ctxt = llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT);
+        LASSERT(ctxt != NULL);
         cleanup_phase = 2;
         if (le32_to_cpu(head_lmm->lmm_magic) == LOV_MAGIC) { /*simple file */
                 struct llog_logid *llog_array;
         cleanup_phase = 2;
         if (le32_to_cpu(head_lmm->lmm_magic) == LOV_MAGIC) { /*simple file */
                 struct llog_logid *llog_array;
@@ -484,6 +485,7 @@ cleanup:
         case 3:
                 llog_close(llh_head);
         case 2:
         case 3:
                 llog_close(llh_head);
         case 2:
+                llog_ctxt_put(ctxt);
                 if (head_lmmj && ((void*)head_lmmj != (void*)head_lmm))
                         OBD_FREE_PTR(head_lmmj);
 
                 if (head_lmmj && ((void*)head_lmmj != (void*)head_lmm))
                         OBD_FREE_PTR(head_lmmj);
 
index be4959d..4f3940b 100644 (file)
@@ -56,6 +56,8 @@ static int mds_llog_origin_add(struct llog_ctxt *ctxt,
 
         lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
         rc = llog_add(lctxt, rec, lsm, logcookies, numcookies);
 
         lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
         rc = llog_add(lctxt, rec, lsm, logcookies, numcookies);
+        llog_ctxt_put(lctxt);
+
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -72,6 +74,7 @@ static int mds_llog_origin_connect(struct llog_ctxt *ctxt, int count,
 
         lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
         rc = llog_connect(lctxt, count, logid, gen, uuid);
 
         lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
         rc = llog_connect(lctxt, count, logid, gen, uuid);
+        llog_ctxt_put(lctxt);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -86,6 +89,7 @@ static int mds_llog_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *ls
 
         lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
         rc = llog_cancel(lctxt, lsm, count, cookies, flags);
 
         lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
         rc = llog_cancel(lctxt, lsm, count, cookies, flags);
+        llog_ctxt_put(lctxt);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -119,6 +123,7 @@ int mds_log_op_unlink(struct obd_device *obd,
         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
         rc = llog_add(ctxt, &lur->lur_hdr, lsm, logcookies,
                       cookies_size / sizeof(struct llog_cookie));
         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
         rc = llog_add(ctxt, &lur->lur_hdr, lsm, logcookies,
                       cookies_size / sizeof(struct llog_cookie));
+        llog_ctxt_put(ctxt);
 
         OBD_FREE(lur, sizeof(*lur));
 out:
 
         OBD_FREE(lur, sizeof(*lur));
 out:
@@ -163,6 +168,8 @@ int mds_log_op_setattr(struct obd_device *obd, __u32 uid, __u32 gid,
         rc = llog_add(ctxt, &lsr->lsr_hdr, lsm, logcookies,
                       cookies_size / sizeof(struct llog_cookie));
 
         rc = llog_add(ctxt, &lsr->lsr_hdr, lsm, logcookies,
                       cookies_size / sizeof(struct llog_cookie));
 
+        llog_ctxt_put(ctxt);
+
         OBD_FREE(lsr, sizeof(*lsr));
  out:
         obd_free_memmd(mds->mds_osc_exp, &lsm);
         OBD_FREE(lsr, sizeof(*lsr));
  out:
         obd_free_memmd(mds->mds_osc_exp, &lsm);
@@ -179,7 +186,7 @@ static struct llog_operations mds_size_repl_logops = {
         lop_cancel:     mds_llog_repl_cancel,
 };
 
         lop_cancel:     mds_llog_repl_cancel,
 };
 
-int mds_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+int mds_llog_init(struct obd_device *obd, int group,
                   struct obd_device *tgt, int count, struct llog_catid *logid, 
                   struct obd_uuid *uuid)
 {
                   struct obd_device *tgt, int count, struct llog_catid *logid, 
                   struct obd_uuid *uuid)
 {
@@ -187,17 +194,18 @@ int mds_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
+        LASSERT(group == OBD_LLOG_GROUP);
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
                         &mds_ost_orig_logops);
         if (rc)
                 RETURN(rc);
 
                         &mds_ost_orig_logops);
         if (rc)
                 RETURN(rc);
 
-        rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
                         &mds_size_repl_logops);
         if (rc)
                 RETURN(rc);
 
                         &mds_size_repl_logops);
         if (rc)
                 RETURN(rc);
 
-        rc = obd_llog_init(lov_obd, llogs, tgt, count, logid, uuid);
+        rc = obd_llog_init(lov_obd, group, tgt, count, logid, uuid);
         if (rc)
                 CERROR("lov_llog_init err %d\n", rc);
 
         if (rc)
                 CERROR("lov_llog_init err %d\n", rc);
 
index 79bc5fc..9fe4a0d 100644 (file)
@@ -409,7 +409,8 @@ static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
         /* If we added a target we have to reconnect the llogs */
         /* We only _need_ to do this at first add (idx), or the first time
            after recovery.  However, it should now be safe to call anytime. */
         /* If we added a target we have to reconnect the llogs */
         /* We only _need_ to do this at first add (idx), or the first time
            after recovery.  However, it should now be safe to call anytime. */
-        rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
+        rc = llog_cat_initialize(obd, &obd->obd_olg,
+                                 mds->mds_lov_desc.ld_tgt_count, NULL);
 
         /*XXX this notifies the MDD until lov handling use old mds code */
         if (obd->obd_upcall.onu_owner) {
 
         /*XXX this notifies the MDD until lov handling use old mds code */
         if (obd->obd_upcall.onu_owner) {
@@ -547,6 +548,14 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         if (rc)
                 GOTO(err_reg, rc);
 
         if (rc)
                 GOTO(err_reg, rc);
 
+        /* tgt_count may be 0! */
+        rc = llog_cat_initialize(obd, &obd->obd_olg, 
+                                 mds->mds_lov_desc.ld_tgt_count, NULL);
+        if (rc) {
+                CERROR("failed to initialize catalog %d\n", rc);
+                GOTO(err_reg, rc);
+        }
+
         /* If we're mounting this code for the first time on an existing FS,
          * we need to populate the objids array from the real OST values */
         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
         /* If we're mounting this code for the first time on an existing FS,
          * we need to populate the objids array from the real OST values */
         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
@@ -622,12 +631,15 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         switch (cmd) {
         case OBD_IOC_RECORD: {
                 char *name = data->ioc_inlbuf1;
         switch (cmd) {
         case OBD_IOC_RECORD: {
                 char *name = data->ioc_inlbuf1;
+                struct llog_ctxt *ctxt;
+
                 if (mds->mds_cfg_llh)
                         RETURN(-EBUSY);
 
                 if (mds->mds_cfg_llh)
                         RETURN(-EBUSY);
 
+                ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-                rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
-                                 &mds->mds_cfg_llh, NULL, name);
+                rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
+                llog_ctxt_put(ctxt);
                 if (rc == 0)
                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
                                          &cfg_uuid);
                 if (rc == 0)
                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
                                          &cfg_uuid);
@@ -652,12 +664,14 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 
         case OBD_IOC_CLEAR_LOG: {
                 char *name = data->ioc_inlbuf1;
 
         case OBD_IOC_CLEAR_LOG: {
                 char *name = data->ioc_inlbuf1;
+                struct llog_ctxt *ctxt;
                 if (mds->mds_cfg_llh)
                         RETURN(-EBUSY);
 
                 if (mds->mds_cfg_llh)
                         RETURN(-EBUSY);
 
+                ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-                rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
-                                 &mds->mds_cfg_llh, NULL, name);
+                rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
+                llog_ctxt_put(ctxt);
                 if (rc == 0) {
                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
                                          NULL);
                 if (rc == 0) {
                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
                                          NULL);
@@ -710,6 +724,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+                llog_ctxt_put(ctxt);
                 if (rc)
                         RETURN(rc);
 
                 if (rc)
                         RETURN(rc);
 
@@ -722,6 +737,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+                llog_ctxt_put(ctxt);
                 if (rc)
                         RETURN(rc);
 
                 if (rc)
                         RETURN(rc);
 
@@ -770,8 +786,10 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
                 rc = llog_ioctl(ctxt, cmd, data);
                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
                 rc = llog_ioctl(ctxt, cmd, data);
                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
-                llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
+                llog_cat_initialize(obd, &obd->obd_olg,
+                                    mds->mds_lov_desc.ld_tgt_count, NULL);
                 group = FILTER_GROUP_MDS0 + mds->mds_id;
                 group = FILTER_GROUP_MDS0 + mds->mds_id;
+                llog_ctxt_put(ctxt);
                 rc2 = obd_set_info_async(mds->mds_osc_exp,
                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
                                          sizeof(group), &group, NULL);
                 rc2 = obd_set_info_async(mds->mds_osc_exp,
                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
                                          sizeof(group), &group, NULL);
@@ -787,6 +805,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
                 rc = llog_ioctl(ctxt, cmd, data);
                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
                 rc = llog_ioctl(ctxt, cmd, data);
                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
+                llog_ctxt_put(ctxt);
 
                 RETURN(rc);
         }
 
                 RETURN(rc);
         }
@@ -862,6 +881,7 @@ static int __mds_lov_synchronize(void *data)
         struct obd_uuid *uuid;
         __u32  idx = mlsi->mlsi_index;
         struct mds_group_info mgi;
         struct obd_uuid *uuid;
         __u32  idx = mlsi->mlsi_index;
         struct mds_group_info mgi;
+        struct llog_ctxt *ctxt;
         int rc = 0;
         ENTRY;
 
         int rc = 0;
         ENTRY;
 
@@ -891,10 +911,15 @@ static int __mds_lov_synchronize(void *data)
         if (rc)
                 GOTO(out, rc);
 
         if (rc)
                 GOTO(out, rc);
 
-        rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
-                          mds->mds_lov_desc.ld_tgt_count,
-                          NULL, NULL, uuid);
+        ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
+        if (!ctxt) 
+                RETURN(-ENODEV);
+
+        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
 
 
+        rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count, 
+                          NULL, NULL, uuid); 
+        llog_ctxt_put(ctxt);
         if (rc != 0) {
                 CERROR("%s failed at llog_origin_connect: %d\n",
                        obd_uuid2str(uuid), rc);
         if (rc != 0) {
                 CERROR("%s failed at llog_origin_connect: %d\n",
                        obd_uuid2str(uuid), rc);
@@ -1048,7 +1073,7 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched,
                 }
                 /* We should update init llog here too for replay unlink and 
                  * possiable llog init race when recovery complete */
                 }
                 /* We should update init llog here too for replay unlink and 
                  * possiable llog init race when recovery complete */
-                llog_cat_initialize(obd, NULL
+                llog_cat_initialize(obd, &obd->obd_olg
                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
                                     &watched->u.cli.cl_target_uuid);
                 mutex_up(&obd->obd_dev_sem);
                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
                                     &watched->u.cli.cl_target_uuid);
                 mutex_up(&obd->obd_dev_sem);
@@ -1056,7 +1081,7 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched,
                 RETURN(rc);
         }
 
                 RETURN(rc);
         }
 
-        LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
+        LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
         rc = mds_lov_start_synchronize(obd, watched, data,
                                        !(ev == OBD_NOTIFY_SYNC));
 
         rc = mds_lov_start_synchronize(obd, watched, data,
                                        !(ev == OBD_NOTIFY_SYNC));
 
index 4dadd10..9ff31cb 100644 (file)
@@ -91,6 +91,8 @@ static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
                 rc = llog_cancel(ctxt, lsm, mlcd->mlcd_cookielen /
                                                 sizeof(*mlcd->mlcd_cookies),
                                  mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW);
                 rc = llog_cancel(ctxt, lsm, mlcd->mlcd_cookielen /
                                                 sizeof(*mlcd->mlcd_cookies),
                                  mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW);
+                llog_ctxt_put(ctxt);
+
                 if (rc)
                         CERROR("error cancelling %d log cookies: rc %d\n",
                                (int)(mlcd->mlcd_cookielen /
                 if (rc)
                         CERROR("error cancelling %d log cookies: rc %d\n",
                                (int)(mlcd->mlcd_cookielen /
index 88fe853..87b51d2 100644 (file)
@@ -51,7 +51,7 @@ static int mgc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         if (rc)
                 GOTO(err_decref, rc);
 
         if (rc)
                 GOTO(err_decref, rc);
 
-        rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL);
+        rc = obd_llog_init(obd, OBD_LLOG_GROUP, obd, 0, NULL, NULL);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_cleanup, rc);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_cleanup, rc);
@@ -100,19 +100,23 @@ static int mgc_cleanup(struct obd_device *obd)
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-static int mgc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+static int mgc_llog_init(struct obd_device *obd, int group,
                          struct obd_device *tgt, int count, 
                          struct llog_catid *logid, struct obd_uuid *uuid)
 {
         struct llog_ctxt *ctxt;
                          struct obd_device *tgt, int count, 
                          struct llog_catid *logid, struct obd_uuid *uuid)
 {
         struct llog_ctxt *ctxt;
+        struct obd_llog_group *olg = &obd->obd_olg;
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+        LASSERT(group == olg->olg_group);
+        LASSERT(group == OBD_LLOG_GROUP);
+        rc = llog_setup(obd, olg, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
                         &llog_client_ops);
         if (rc == 0) {
                         &llog_client_ops);
         if (rc == 0) {
-                ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
-                ctxt->loc_imp = obd->u.cli.cl_import;
+                ctxt = llog_group_get_ctxt(olg, LLOG_CONFIG_REPL_CTXT);
+                llog_initiator_connect(ctxt);
+                llog_ctxt_put(ctxt);
         }
 
         RETURN(rc);
         }
 
         RETURN(rc);
index e6a8056..511fb84 100644 (file)
@@ -507,7 +507,7 @@ static int mgc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         if (rc)
                 GOTO(err_decref, rc);
 
         if (rc)
                 GOTO(err_decref, rc);
 
-        rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL);
+        rc = obd_llog_init(obd, OBD_LLOG_GROUP, obd, 0, NULL, NULL);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_cleanup, rc);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_cleanup, rc);
@@ -908,24 +908,29 @@ static int mgc_import_event(struct obd_device *obd,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-static int mgc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+static int mgc_llog_init(struct obd_device *obd, int group,
                          struct obd_device *tgt, int count,
                          struct llog_catid *logid, struct obd_uuid *uuid)
 {
         struct llog_ctxt *ctxt;
                          struct obd_device *tgt, int count,
                          struct llog_catid *logid, struct obd_uuid *uuid)
 {
         struct llog_ctxt *ctxt;
+        struct obd_llog_group *olg = &obd->obd_olg;
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        rc = llog_setup(obd, llogs, LLOG_CONFIG_ORIG_CTXT, tgt, 0, NULL,
+        LASSERT(group == OBD_LLOG_GROUP);
+        LASSERT(olg->olg_group == group);
+
+        rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, tgt, 0, NULL,
                         &llog_lvfs_ops);
         if (rc)
                 RETURN(rc);
 
                         &llog_lvfs_ops);
         if (rc)
                 RETURN(rc);
 
-        rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, olg, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
                         &llog_client_ops);
         if (rc == 0) {
                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
                         &llog_client_ops);
         if (rc == 0) {
                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
-                ctxt->loc_imp = obd->u.cli.cl_import;
+                llog_initiator_connect(ctxt);
+                llog_ctxt_put(ctxt);
         }
 
         RETURN(rc);
         }
 
         RETURN(rc);
@@ -933,11 +938,20 @@ static int mgc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
 
 static int mgc_llog_finish(struct obd_device *obd, int count)
 {
 
 static int mgc_llog_finish(struct obd_device *obd, int count)
 {
-        int rc;
+        struct llog_ctxt *ctxt;
+        int rc = 0, rc2 = 0;
         ENTRY;
 
         ENTRY;
 
-        rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
-        rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
+        ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
+        if (ctxt)
+                rc = llog_cleanup(ctxt);
+
+        ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+        if (ctxt)
+                rc2 = llog_cleanup(ctxt);
+
+        if (!rc)
+                rc = rc2;
 
         RETURN(rc);
 }
 
         RETURN(rc);
 }
@@ -1138,6 +1152,7 @@ static int mgc_process_log(struct obd_device *mgc,
                 /* Now, whether we copied or not, start using the local llog.
                    If we failed to copy, we'll start using whatever the old
                    log has. */
                 /* Now, whether we copied or not, start using the local llog.
                    If we failed to copy, we'll start using whatever the old
                    log has. */
+                llog_ctxt_put(ctxt);
                 ctxt = lctxt;
         }
 
                 ctxt = lctxt;
         }
 
@@ -1145,8 +1160,10 @@ static int mgc_process_log(struct obd_device *mgc,
            copy of the instance for the update.  The cfg_last_idx will
            be updated here. */
         rc = class_config_parse_llog(ctxt, cld->cld_logname, &cld->cld_cfg);
            copy of the instance for the update.  The cfg_last_idx will
            be updated here. */
         rc = class_config_parse_llog(ctxt, cld->cld_logname, &cld->cld_cfg);
-
- out_pop:
+out_pop:
+        llog_ctxt_put(ctxt);
+        if (ctxt != lctxt)
+                llog_ctxt_put(lctxt);
         if (must_pop)
                 pop_ctxt(&saved, &mgc->obd_lvfs_ctxt, NULL);
 
         if (must_pop)
                 pop_ctxt(&saved, &mgc->obd_lvfs_ctxt, NULL);
 
index c05800b..3a1a6e6 100644 (file)
@@ -121,6 +121,35 @@ static int mgs_disconnect(struct obd_export *exp)
 static int mgs_cleanup(struct obd_device *obd);
 static int mgs_handle(struct ptlrpc_request *req);
 
 static int mgs_cleanup(struct obd_device *obd);
 static int mgs_handle(struct ptlrpc_request *req);
 
+static int mgs_llog_init(struct obd_device *obd, int group,
+                         struct obd_device *tgt, int count,
+                         struct llog_catid *logid, struct obd_uuid *uuid)
+{
+        struct obd_llog_group *olg = &obd->obd_olg;
+        int rc;
+        ENTRY;
+
+        LASSERT(group == OBD_LLOG_GROUP);
+        LASSERT(olg->olg_group == group);
+
+        rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
+                        &llog_lvfs_ops);
+        RETURN(rc);
+}
+
+static int mgs_llog_finish(struct obd_device *obd, int count)
+{
+        struct llog_ctxt *ctxt;
+        int rc = 0;
+        ENTRY;
+
+        ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+        if (ctxt)
+                rc = llog_cleanup(ctxt);
+
+        RETURN(rc);
+}
+
 /* Start the MGS obd */
 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
 /* Start the MGS obd */
 static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
@@ -164,12 +193,7 @@ static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 GOTO(err_ns, rc);
         }
 
                 GOTO(err_ns, rc);
         }
 
-        rc = llog_start_commit_thread();
-        if (rc < 0)
-                GOTO(err_fs, rc);
-
-        rc = llog_setup(obd, NULL, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
-                        &llog_lvfs_ops);
+        rc = obd_llog_init(obd, OBD_LLOG_GROUP, obd, 0, NULL, NULL);
         if (rc)
                 GOTO(err_fs, rc);
 
         if (rc)
                 GOTO(err_fs, rc);
 
@@ -192,7 +216,7 @@ static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 
         if (!mgs->mgs_service) {
                 CERROR("failed to start service\n");
 
         if (!mgs->mgs_service) {
                 CERROR("failed to start service\n");
-                GOTO(err_fs, rc = -ENOMEM);
+                GOTO(err_llog, rc = -ENOMEM);
         }
 
         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
         }
 
         rc = ptlrpc_start_threads(obd, mgs->mgs_service);
@@ -213,6 +237,8 @@ static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 
 err_thread:
         ptlrpc_unregister_service(mgs->mgs_service);
 
 err_thread:
         ptlrpc_unregister_service(mgs->mgs_service);
+err_llog:
+        obd_llog_finish(obd, 0);
 err_fs:
         /* No extra cleanup needed for llog_init_commit_thread() */
         mgs_fs_cleanup(obd);
 err_fs:
         /* No extra cleanup needed for llog_init_commit_thread() */
         mgs_fs_cleanup(obd);
@@ -237,7 +263,6 @@ static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
         case OBD_CLEANUP_EXPORTS:
                 break;
         case OBD_CLEANUP_SELF_EXP:
         case OBD_CLEANUP_EXPORTS:
                 break;
         case OBD_CLEANUP_SELF_EXP:
-                llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
                 rc = obd_llog_finish(obd, 0);
                 break;
         case OBD_CLEANUP_OBD:
                 rc = obd_llog_finish(obd, 0);
                 break;
         case OBD_CLEANUP_OBD:
@@ -706,13 +731,12 @@ out_free:
         }
 
         case OBD_IOC_DUMP_LOG: {
         }
 
         case OBD_IOC_DUMP_LOG: {
-                struct llog_ctxt *ctxt =
-                        llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+                struct llog_ctxt *ctxt;
+                ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-                if (rc)
-                        RETURN(rc);
+                llog_ctxt_put(ctxt);
 
                 RETURN(rc);
         }
 
                 RETURN(rc);
         }
@@ -720,12 +744,13 @@ out_free:
         case OBD_IOC_LLOG_CHECK:
         case OBD_IOC_LLOG_INFO:
         case OBD_IOC_LLOG_PRINT: {
         case OBD_IOC_LLOG_CHECK:
         case OBD_IOC_LLOG_INFO:
         case OBD_IOC_LLOG_PRINT: {
-                struct llog_ctxt *ctxt =
-                        llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+                struct llog_ctxt *ctxt;
+                ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
 
                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
                 rc = llog_ioctl(ctxt, cmd, data);
                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
 
                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
                 rc = llog_ioctl(ctxt, cmd, data);
                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
+                llog_ctxt_put(ctxt);
 
                 RETURN(rc);
         }
 
                 RETURN(rc);
         }
@@ -747,6 +772,8 @@ static struct obd_ops mgs_obd_ops = {
         .o_cleanup         = mgs_cleanup,
         .o_destroy_export  = mgs_destroy_export,
         .o_iocontrol       = mgs_iocontrol,
         .o_cleanup         = mgs_cleanup,
         .o_destroy_export  = mgs_destroy_export,
         .o_iocontrol       = mgs_iocontrol,
+        .o_llog_init       = mgs_llog_init,
+        .o_llog_finish     = mgs_llog_finish
 };
 
 static int __init mgs_init(void)
 };
 
 static int __init mgs_init(void)
index e942879..9df8c29 100644 (file)
@@ -233,15 +233,16 @@ static int mgs_get_fsdb_from_llog(struct obd_device *obd, struct fs_db *fsdb)
         char *logname;
         struct llog_handle *loghandle;
         struct lvfs_run_ctxt saved;
         char *logname;
         struct llog_handle *loghandle;
         struct lvfs_run_ctxt saved;
+        struct llog_ctxt *ctxt;
         int rc, rc2;
         ENTRY;
 
         int rc, rc2;
         ENTRY;
 
+        ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+        LASSERT(ctxt != NULL);
         name_create(&logname, fsdb->fsdb_name, "-client");
         down(&fsdb->fsdb_sem);
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         name_create(&logname, fsdb->fsdb_name, "-client");
         down(&fsdb->fsdb_sem);
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-
-        rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
-                         &loghandle, NULL, logname);
+        rc = llog_create(ctxt, &loghandle, NULL, logname);
         if (rc)
                 GOTO(out_pop, rc);
 
         if (rc)
                 GOTO(out_pop, rc);
 
@@ -258,11 +259,11 @@ out_close:
         rc2 = llog_close(loghandle);
         if (!rc)
                 rc = rc2;
         rc2 = llog_close(loghandle);
         if (!rc)
                 rc = rc2;
-
 out_pop:
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         up(&fsdb->fsdb_sem);
         name_destroy(&logname);
 out_pop:
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         up(&fsdb->fsdb_sem);
         name_destroy(&logname);
+        llog_ctxt_put(ctxt);
 
         RETURN(rc);
 }
 
         RETURN(rc);
 }
@@ -606,6 +607,7 @@ static int mgs_modify(struct obd_device *obd, struct fs_db *fsdb,
 {
         struct llog_handle *loghandle;
         struct lvfs_run_ctxt saved;
 {
         struct llog_handle *loghandle;
         struct lvfs_run_ctxt saved;
+        struct llog_ctxt *ctxt;
         struct mgs_modify_lookup *mml;
         int rc, rc2;
         ENTRY;
         struct mgs_modify_lookup *mml;
         int rc, rc2;
         ENTRY;
@@ -614,8 +616,9 @@ static int mgs_modify(struct obd_device *obd, struct fs_db *fsdb,
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         
-        rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
-                         &loghandle, NULL, logname);
+        ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+        LASSERT(ctxt != NULL);
+        rc = llog_create(ctxt, &loghandle, NULL, logname);
         if (rc)
                 GOTO(out_pop, rc);
 
         if (rc)
                 GOTO(out_pop, rc);
 
@@ -644,13 +647,12 @@ out_close:
         rc2 = llog_close(loghandle);
         if (!rc)
                 rc = rc2;
         rc2 = llog_close(loghandle);
         if (!rc)
                 rc = rc2;
-
 out_pop:
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         if (rc && rc != -ENODEV) 
                 CERROR("modify %s/%s failed %d\n",
                        mti->mti_svname, comment, rc);
 out_pop:
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         if (rc && rc != -ENODEV) 
                 CERROR("modify %s/%s failed %d\n",
                        mti->mti_svname, comment, rc);
-
+        llog_ctxt_put(ctxt);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -865,22 +867,25 @@ static int record_start_log(struct obd_device *obd,
 {
         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
         struct lvfs_run_ctxt saved;
 {
         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
         struct lvfs_run_ctxt saved;
+        struct llog_ctxt *ctxt;
         int rc = 0;
 
         int rc = 0;
 
-        if (*llh) {
+        if (*llh) 
                 GOTO(out, rc = -EBUSY);
                 GOTO(out, rc = -EBUSY);
-        }
 
 
-        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+        if (!ctxt)
+                GOTO(out, rc = -ENODEV);
 
 
-        rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
-                         llh, NULL, name);
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        rc = llog_create(ctxt, llh, NULL, name);
         if (rc == 0)
                 llog_init_handle(*llh, LLOG_F_IS_PLAIN, &cfg_uuid);
         else
                 *llh = NULL;
 
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         if (rc == 0)
                 llog_init_handle(*llh, LLOG_F_IS_PLAIN, &cfg_uuid);
         else
                 *llh = NULL;
 
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        llog_ctxt_put(ctxt);
 
 out:
         if (rc) {
 
 out:
         if (rc) {
@@ -907,17 +912,20 @@ static int mgs_log_is_empty(struct obd_device *obd, char *name)
 {
         struct lvfs_run_ctxt saved;
         struct llog_handle *llh;
 {
         struct lvfs_run_ctxt saved;
         struct llog_handle *llh;
+        struct llog_ctxt *ctxt;
         int rc = 0;
 
         int rc = 0;
 
+        ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+        LASSERT(ctxt != NULL);
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
-                         &llh, NULL, name);
+        rc = llog_create(ctxt, &llh, NULL, name);
         if (rc == 0) {
                 llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
                 rc = llog_get_size(llh);
                 llog_close(llh);
         }
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         if (rc == 0) {
                 llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
                 rc = llog_get_size(llh);
                 llog_close(llh);
         }
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        llog_ctxt_put(ctxt);
         /* header is record 1 */
         return(rc <= 1);
 }
         /* header is record 1 */
         return(rc <= 1);
 }
@@ -1153,9 +1161,13 @@ static int mgs_steal_llog_for_mdt_from_client(struct obd_device *obd,
         struct llog_handle *loghandle;
         struct lvfs_run_ctxt saved;
         struct mgs_target_info *tmti;
         struct llog_handle *loghandle;
         struct lvfs_run_ctxt saved;
         struct mgs_target_info *tmti;
+        struct llog_ctxt *ctxt;
         int rc, rc2;
         ENTRY;
 
         int rc, rc2;
         ENTRY;
 
+        ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+        LASSERT(ctxt != NULL);
+
         OBD_ALLOC_PTR(tmti);
         if (tmti == NULL)
                 RETURN(-ENOMEM);
         OBD_ALLOC_PTR(tmti);
         if (tmti == NULL)
                 RETURN(-ENOMEM);
@@ -1165,8 +1177,7 @@ static int mgs_steal_llog_for_mdt_from_client(struct obd_device *obd,
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
-        rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
-                         &loghandle, NULL, client_name);
+        rc = llog_create(ctxt, &loghandle, NULL, client_name);
         if (rc)
                 GOTO(out_pop, rc);
 
         if (rc)
                 GOTO(out_pop, rc);
 
@@ -1183,6 +1194,7 @@ out_close:
 out_pop:
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         OBD_FREE_PTR(tmti);
 out_pop:
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         OBD_FREE_PTR(tmti);
+        llog_ctxt_put(ctxt);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -3030,18 +3042,22 @@ int mgs_upgrade_sv_14(struct obd_device *obd, struct mgs_target_info *mti)
 int mgs_erase_log(struct obd_device *obd, char *name)
 {
         struct lvfs_run_ctxt saved;
 int mgs_erase_log(struct obd_device *obd, char *name)
 {
         struct lvfs_run_ctxt saved;
+        struct llog_ctxt *ctxt;
         struct llog_handle *llh;
         int rc = 0;
 
         struct llog_handle *llh;
         int rc = 0;
 
+        ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+        LASSERT(ctxt != NULL);
+
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
-                         &llh, NULL, name);
+        rc = llog_create(ctxt, &llh, NULL, name);
         if (rc == 0) {
                 llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
                 rc = llog_destroy(llh);
                 llog_free_handle(llh);
         }
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         if (rc == 0) {
                 llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
                 rc = llog_destroy(llh);
                 llog_free_handle(llh);
         }
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        llog_ctxt_put(ctxt);
 
         if (rc)
                 CERROR("failed to clear log %s: %d\n", name, rc);
 
         if (rc)
                 CERROR("failed to clear log %s: %d\n", name, rc);
index 25d12e4..f71a33f 100644 (file)
@@ -1285,7 +1285,7 @@ int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
         struct obd_export *doomed_exp = NULL;
         int exports_evicted = 0;
 
         struct obd_export *doomed_exp = NULL;
         int exports_evicted = 0;
 
-        lnet_nid_t nid_key = libcfs_str2nid(nid);
+        lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
 
         do {
                 doomed_exp = lustre_hash_get_object_by_key(obd->obd_nid_hash_body,
 
         do {
                 doomed_exp = lustre_hash_get_object_by_key(obd->obd_nid_hash_body,
index f2cd312..9e2ac5f 100644 (file)
 #include "llog_internal.h"
 
 /* helper functions for calling the llog obd methods */
 #include "llog_internal.h"
 
 /* helper functions for calling the llog obd methods */
+static struct llog_ctxt* llog_new_ctxt(struct obd_device *obd)
+{
+        struct llog_ctxt *ctxt;
 
 
-int llog_cleanup(struct llog_ctxt *ctxt)
+        OBD_ALLOC_PTR(ctxt);
+        if (!ctxt)
+                return NULL;
+
+        ctxt->loc_obd = obd;
+        atomic_set(&ctxt->loc_refcount, 1);
+
+        return ctxt;
+}
+
+static void llog_ctxt_destroy(struct llog_ctxt *ctxt)
 {
 {
+        if (ctxt->loc_exp)
+                class_export_put(ctxt->loc_exp);
+        OBD_FREE_PTR(ctxt);
+        return;
+}
+
+int __llog_ctxt_put(struct llog_ctxt *ctxt)
+{
+        struct obd_llog_group *olg = ctxt->loc_olg;
+        struct obd_device *obd;
         int rc = 0;
         int rc = 0;
+
+        spin_lock(&olg->olg_lock);
+        if (!atomic_dec_and_test(&ctxt->loc_refcount)) {
+                spin_unlock(&olg->olg_lock);
+                return rc;
+        }
+        olg->olg_ctxts[ctxt->loc_idx] = NULL;
+        spin_unlock(&olg->olg_lock);
+
+        obd = ctxt->loc_obd;
+        spin_lock(&obd->obd_dev_lock);
+        spin_unlock(&obd->obd_dev_lock); /* sync with llog ctxt user thread */
+        LASSERT(obd->obd_stopping == 1 || obd->obd_set_up == 0);
+        /* cleanup the llog ctxt here */
+        if (CTXTP(ctxt, cleanup))
+                rc = CTXTP(ctxt, cleanup)(ctxt);
+
+        llog_ctxt_destroy(ctxt);
+        wake_up(&olg->olg_waitq);
+        return rc;
+}
+EXPORT_SYMBOL(__llog_ctxt_put);
+
+int llog_cleanup(struct llog_ctxt *ctxt)
+{
+        struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+        struct obd_llog_group *olg;
+        int rc, idx;
         ENTRY;
 
         if (!ctxt) {
                 CERROR("No ctxt\n");
                 RETURN(-ENODEV);
         }
         ENTRY;
 
         if (!ctxt) {
                 CERROR("No ctxt\n");
                 RETURN(-ENODEV);
         }
-        
-        if (CTXTP(ctxt, cleanup))
-                rc = CTXTP(ctxt, cleanup)(ctxt);
-        ctxt->loc_obd->obd_llog_ctxt[ctxt->loc_idx] = NULL;
 
 
-        if (ctxt->loc_exp)
-                class_export_put(ctxt->loc_exp);
-        OBD_FREE(ctxt, sizeof(*ctxt));
+        olg = ctxt->loc_olg;
+        idx = ctxt->loc_idx;
+
+        /* banlance the ctxt get when calling llog_cleanup */
+        LASSERT(atomic_read(&ctxt->loc_refcount) > 1);
+        llog_ctxt_put(ctxt);
+
+        /* try to free the ctxt */
+        rc = __llog_ctxt_put(ctxt);
+
+        l_wait_event(olg->olg_waitq,
+                     llog_group_ctxt_null(olg, idx), &lwi);
 
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_cleanup);
 
 
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_cleanup);
 
-int llog_setup(struct obd_device *obd,  struct obd_llogs *llogs, int index, 
+int llog_setup(struct obd_device *obd,  struct obd_llog_group *olg, int index,
                struct obd_device *disk_obd, int count, struct llog_logid *logid,
                struct llog_operations *op)
 {
                struct obd_device *disk_obd, int count, struct llog_logid *logid,
                struct llog_operations *op)
 {
@@ -72,46 +128,47 @@ int llog_setup(struct obd_device *obd,  struct obd_llogs *llogs, int index,
         if (index < 0 || index >= LLOG_MAX_CTXTS)
                 RETURN(-EFAULT);
 
         if (index < 0 || index >= LLOG_MAX_CTXTS)
                 RETURN(-EFAULT);
 
-        /* in some recovery cases, obd_llog_ctxt might already be set, 
+        LASSERT(olg != NULL);
+        ctxt = llog_group_get_ctxt(olg, index);
+
+        /* in some recovery cases, obd_llog_ctxt might already be set,
          * but llogs might still be zero, for example in obd_filter recovery */
          * but llogs might still be zero, for example in obd_filter recovery */
-        if (obd->obd_llog_ctxt[index] &&
-            (!llogs || (llogs && llogs->llog_ctxt[index]))) {
+        if (ctxt) {
                 /* mds_lov_update_mds might call here multiple times. So if the
                    llog is already set up then don't to do it again. */
                 /* mds_lov_update_mds might call here multiple times. So if the
                    llog is already set up then don't to do it again. */
-                CDEBUG(D_CONFIG, "obd %s ctxt %d already set up\n", 
+                CDEBUG(D_CONFIG, "obd %s ctxt %d already set up\n",
                        obd->obd_name, index);
                        obd->obd_name, index);
-                ctxt = obd->obd_llog_ctxt[index];
+                LASSERT(ctxt->loc_olg == olg);
                 LASSERT(ctxt->loc_obd == obd);
                 LASSERT(ctxt->loc_exp == disk_obd->obd_self_export);
                 LASSERT(ctxt->loc_logops == op);
                 LASSERT(ctxt->loc_obd == obd);
                 LASSERT(ctxt->loc_exp == disk_obd->obd_self_export);
                 LASSERT(ctxt->loc_logops == op);
+                llog_ctxt_put(ctxt);
                 GOTO(out, rc = 0);
         }
                 GOTO(out, rc = 0);
         }
-
-        OBD_ALLOC(ctxt, sizeof(*ctxt));
+        ctxt = llog_new_ctxt(obd);
         if (!ctxt)
                 GOTO(out, rc = -ENOMEM);
 
         if (!ctxt)
                 GOTO(out, rc = -ENOMEM);
 
-        if (llogs)
-                llogs->llog_ctxt[index] = ctxt;
-
-        if (!obd->obd_llog_ctxt[index])
-                obd->obd_llog_ctxt[index] = ctxt;
-
+        rc = llog_group_set_ctxt(olg, ctxt, index);
+        if (rc) {
+                llog_ctxt_destroy(ctxt);
+                if (rc == -EEXIST)
+                        rc = 0;
+                GOTO(out, rc);
+        }
         ctxt->loc_obd = obd;
         ctxt->loc_exp = class_export_get(disk_obd->obd_self_export);
         ctxt->loc_obd = obd;
         ctxt->loc_exp = class_export_get(disk_obd->obd_self_export);
+        ctxt->loc_olg = olg;
         ctxt->loc_idx = index;
         ctxt->loc_logops = op;
         sema_init(&ctxt->loc_sem, 1);
 
         if (op->lop_setup)
         ctxt->loc_idx = index;
         ctxt->loc_logops = op;
         sema_init(&ctxt->loc_sem, 1);
 
         if (op->lop_setup)
-                rc = op->lop_setup(obd, llogs, index, disk_obd, count, logid);
+                rc = op->lop_setup(obd, olg, index, disk_obd, count, logid);
 
         if (rc) {
 
         if (rc) {
-                obd->obd_llog_ctxt[index] = NULL;
-                class_export_put(ctxt->loc_exp);
-                OBD_FREE(ctxt, sizeof(*ctxt));
+                llog_ctxt_destroy(ctxt);
         }
         }
-
 out:
         RETURN(rc);
 }
 out:
         RETURN(rc);
 }
@@ -221,7 +278,7 @@ static int cat_cancel_cb(struct llog_handle *cathandle,
 
 /* lop_setup method for filter/osc */
 // XXX how to set exports
 
 /* lop_setup method for filter/osc */
 // XXX how to set exports
-int llog_obd_origin_setup(struct obd_device *obd, struct obd_llogs *llogs,
+int llog_obd_origin_setup(struct obd_device *obd, struct obd_llog_group *olg,
                           int index, struct obd_device *disk_obd, int count,
                           struct llog_logid *logid)
 {
                           int index, struct obd_device *disk_obd, int count,
                           struct llog_logid *logid)
 {
@@ -236,10 +293,8 @@ int llog_obd_origin_setup(struct obd_device *obd, struct obd_llogs *llogs,
 
         LASSERT(count == 1);
 
 
         LASSERT(count == 1);
 
-        if (!llogs)
-                ctxt = llog_get_context(obd, index);
-        else
-                ctxt = llog_get_context_from_llogs(llogs, index);
+        LASSERT(olg != NULL);
+        ctxt = llog_group_get_ctxt(olg, index);
         
         LASSERT(ctxt);
         llog_gen_init(ctxt);
         
         LASSERT(ctxt);
         llog_gen_init(ctxt);
@@ -264,7 +319,8 @@ int llog_obd_origin_setup(struct obd_device *obd, struct obd_llogs *llogs,
         rc = llog_process(handle, (llog_cb_t)cat_cancel_cb, NULL, NULL);
         if (rc)
                 CERROR("llog_process with cat_cancel_cb failed: %d\n", rc);
         rc = llog_process(handle, (llog_cb_t)cat_cancel_cb, NULL, NULL);
         if (rc)
                 CERROR("llog_process with cat_cancel_cb failed: %d\n", rc);
- out:
+out:
+        llog_ctxt_put(ctxt);
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_obd_origin_setup);
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_obd_origin_setup);
@@ -300,8 +356,8 @@ int llog_obd_origin_cleanup(struct llog_ctxt *ctxt)
                                 llog_cat_set_first_idx(cathandle, index);
                                 rc = llog_cancel_rec(cathandle, index);
                                 if (rc == 0)
                                 llog_cat_set_first_idx(cathandle, index);
                                 rc = llog_cancel_rec(cathandle, index);
                                 if (rc == 0)
-                                        CDEBUG(D_HA, "cancel plain log at index"
-                                               " %u of catalog "LPX64"\n",
+                                        CDEBUG(D_RPCTRACE, "cancel plain log at"
+                                               "index %u of catalog "LPX64"\n",
                                                index,cathandle->lgh_id.lgl_oid);
                         }
                 }
                                                index,cathandle->lgh_id.lgl_oid);
                         }
                 }
@@ -329,7 +385,7 @@ int llog_obd_origin_add(struct llog_ctxt *ctxt,
 }
 EXPORT_SYMBOL(llog_obd_origin_add);
 
 }
 EXPORT_SYMBOL(llog_obd_origin_add);
 
-int llog_cat_initialize(struct obd_device *obd, struct obd_llogs *llogs,
+int llog_cat_initialize(struct obd_device *obd, struct obd_llog_group *olg,
                         int count, struct obd_uuid *uuid)
 {
         char name[32] = CATLIST;
                         int count, struct obd_uuid *uuid)
 {
         char name[32] = CATLIST;
@@ -350,7 +406,7 @@ int llog_cat_initialize(struct obd_device *obd, struct obd_llogs *llogs,
                 GOTO(out, rc);
         }
 
                 GOTO(out, rc);
         }
 
-        rc = obd_llog_init(obd, llogs, obd, count, idarray, uuid);
+        rc = obd_llog_init(obd, olg->olg_group, obd, count, idarray, uuid);
         if (rc) {
                 CERROR("rc: %d\n", rc);
                 GOTO(out, rc);
         if (rc) {
                 CERROR("rc: %d\n", rc);
                 GOTO(out, rc);
@@ -369,7 +425,7 @@ int llog_cat_initialize(struct obd_device *obd, struct obd_llogs *llogs,
 }
 EXPORT_SYMBOL(llog_cat_initialize);
 
 }
 EXPORT_SYMBOL(llog_cat_initialize);
 
-int obd_llog_init(struct obd_device *obd, struct obd_llogs *llogs
+int obd_llog_init(struct obd_device *obd, int group
                   struct obd_device *disk_obd, int count, 
                   struct llog_catid *logid, struct obd_uuid *uuid)
 {
                   struct obd_device *disk_obd, int count, 
                   struct llog_catid *logid, struct obd_uuid *uuid)
 {
@@ -378,8 +434,7 @@ int obd_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
         OBD_CHECK_DT_OP(obd, llog_init, 0);
         OBD_COUNTER_INCREMENT(obd, llog_init);
 
         OBD_CHECK_DT_OP(obd, llog_init, 0);
         OBD_COUNTER_INCREMENT(obd, llog_init);
 
-        rc = OBP(obd, llog_init)(obd, llogs, disk_obd, count, logid, 
-                                 uuid);
+        rc = OBP(obd, llog_init)(obd, group, disk_obd, count, logid, uuid);
         RETURN(rc);
 }
 EXPORT_SYMBOL(obd_llog_init);
         RETURN(rc);
 }
 EXPORT_SYMBOL(obd_llog_init);
index 5dfcd0c..3a839f1 100644 (file)
@@ -94,6 +94,7 @@ static int llog_test_1(struct obd_device *obd, char *name)
         rc = llog_create(ctxt, &llh, NULL, name);
         if (rc) {
                 CERROR("1a: llog_create with name %s failed: %d\n", name, rc);
         rc = llog_create(ctxt, &llh, NULL, name);
         if (rc) {
                 CERROR("1a: llog_create with name %s failed: %d\n", name, rc);
+                llog_ctxt_put(ctxt);
                 RETURN(rc);
         }
         llog_init_handle(llh, LLOG_F_IS_PLAIN, &uuid);
                 RETURN(rc);
         }
         llog_init_handle(llh, LLOG_F_IS_PLAIN, &uuid);
@@ -104,6 +105,7 @@ static int llog_test_1(struct obd_device *obd, char *name)
  out:
         CWARN("1b: close newly-created log\n");
         rc2 = llog_close(llh);
  out:
         CWARN("1b: close newly-created log\n");
         rc2 = llog_close(llh);
+        llog_ctxt_put(ctxt);
         if (rc2) {
                 CERROR("1b: close log %s failed: %d\n", name, rc2);
                 if (rc == 0)
         if (rc2) {
                 CERROR("1b: close log %s failed: %d\n", name, rc2);
                 if (rc == 0)
@@ -126,18 +128,18 @@ static int llog_test_2(struct obd_device *obd, char *name,
         rc = llog_create(ctxt, llh, NULL, name);
         if (rc) {
                 CERROR("2a: re-open log with name %s failed: %d\n", name, rc);
         rc = llog_create(ctxt, llh, NULL, name);
         if (rc) {
                 CERROR("2a: re-open log with name %s failed: %d\n", name, rc);
-                RETURN(rc);
+                GOTO(out, rc);
         }
         llog_init_handle(*llh, LLOG_F_IS_PLAIN, &uuid);
 
         if ((rc = verify_handle("2", *llh, 1)))
         }
         llog_init_handle(*llh, LLOG_F_IS_PLAIN, &uuid);
 
         if ((rc = verify_handle("2", *llh, 1)))
-                RETURN(rc);
+                GOTO(out, rc);
 
         CWARN("2b: create a log without specified NAME & LOGID\n");
         rc = llog_create(ctxt, &loghandle, NULL, NULL);
         if (rc) {
                 CERROR("2b: create log failed\n");
 
         CWARN("2b: create a log without specified NAME & LOGID\n");
         rc = llog_create(ctxt, &loghandle, NULL, NULL);
         if (rc) {
                 CERROR("2b: create log failed\n");
-                RETURN(rc);
+                GOTO(out, rc);
         }
         llog_init_handle(loghandle, LLOG_F_IS_PLAIN, &uuid);
         logid = loghandle->lgh_id;
         }
         llog_init_handle(loghandle, LLOG_F_IS_PLAIN, &uuid);
         logid = loghandle->lgh_id;
@@ -147,7 +149,7 @@ static int llog_test_2(struct obd_device *obd, char *name,
         rc = llog_create(ctxt, &loghandle, &logid, NULL);
         if (rc) {
                 CERROR("2b: re-open log by LOGID failed\n");
         rc = llog_create(ctxt, &loghandle, &logid, NULL);
         if (rc) {
                 CERROR("2b: re-open log by LOGID failed\n");
-                RETURN(rc);
+                GOTO(out, rc);
         }
         llog_init_handle(loghandle, LLOG_F_IS_PLAIN, &uuid);
 
         }
         llog_init_handle(loghandle, LLOG_F_IS_PLAIN, &uuid);
 
@@ -155,9 +157,11 @@ static int llog_test_2(struct obd_device *obd, char *name,
         rc = llog_destroy(loghandle);
         if (rc) {
                 CERROR("2b: destroy log failed\n");
         rc = llog_destroy(loghandle);
         if (rc) {
                 CERROR("2b: destroy log failed\n");
-                RETURN(rc);
+                GOTO(out, rc);
         }
         llog_free_handle(loghandle);
         }
         llog_free_handle(loghandle);
+out:
+        llog_ctxt_put(ctxt);
 
         RETURN(rc);
 }
 
         RETURN(rc);
 }
@@ -261,10 +265,10 @@ static int llog_test_4(struct obd_device *obd)
         }
         num_recs++;
         if ((rc = verify_handle("4b", cath, 2)))
         }
         num_recs++;
         if ((rc = verify_handle("4b", cath, 2)))
-                RETURN(rc);
+                GOTO(ctxt_release, rc);
 
         if ((rc = verify_handle("4b", cath->u.chd.chd_current_log, num_recs)))
 
         if ((rc = verify_handle("4b", cath->u.chd.chd_current_log, num_recs)))
-                RETURN(rc);
+                GOTO(ctxt_release, rc);
 
         CWARN("4c: cancel 1 log record\n");
         rc = llog_cat_cancel_records(cath, 1, &cookie);
 
         CWARN("4c: cancel 1 log record\n");
         rc = llog_cat_cancel_records(cath, 1, &cookie);
@@ -275,7 +279,7 @@ static int llog_test_4(struct obd_device *obd)
         num_recs--;
 
         if ((rc = verify_handle("4c", cath->u.chd.chd_current_log, num_recs)))
         num_recs--;
 
         if ((rc = verify_handle("4c", cath->u.chd.chd_current_log, num_recs)))
-                RETURN(rc);
+                GOTO(ctxt_release, rc);
 
         CWARN("4d: write 40,000 more log records\n");
         for (i = 0; i < 40000; i++) {
 
         CWARN("4d: write 40,000 more log records\n");
         for (i = 0; i < 40000; i++) {
@@ -311,6 +315,8 @@ static int llog_test_4(struct obd_device *obd)
  out:
         CWARN("4f: put newly-created catalog\n");
         rc = llog_cat_put(cath);
  out:
         CWARN("4f: put newly-created catalog\n");
         rc = llog_cat_put(cath);
+ctxt_release:
+        llog_ctxt_put(ctxt);
         if (rc)
                 CERROR("1b: close log %s failed: %d\n", name, rc);
         RETURN(rc);
         if (rc)
                 CERROR("1b: close log %s failed: %d\n", name, rc);
         RETURN(rc);
@@ -437,6 +443,8 @@ static int llog_test_5(struct obd_device *obd)
                 rc = llog_cat_put(llh);
         if (rc)
                 CERROR("1b: close log %s failed: %d\n", name, rc);
                 rc = llog_cat_put(llh);
         if (rc)
                 CERROR("1b: close log %s failed: %d\n", name, rc);
+        llog_ctxt_put(ctxt);
+
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -458,14 +466,14 @@ static int llog_test_6(struct obd_device *obd, char *name)
         if (mdc_obd == NULL) {
                 CERROR("6: no MDC devices connected to %s found.\n",
                        mds_uuid->uuid);
         if (mdc_obd == NULL) {
                 CERROR("6: no MDC devices connected to %s found.\n",
                        mds_uuid->uuid);
-                RETURN(-ENOENT);
+                GOTO(ctxt_release, rc = -ENOENT);
         }
 
         rc = obd_connect(NULL,
                          &exph, mdc_obd, &uuid, NULL /* obd_connect_data */);
         if (rc) {
                 CERROR("6: failed to connect to MDC: %s\n", mdc_obd->obd_name);
         }
 
         rc = obd_connect(NULL,
                          &exph, mdc_obd, &uuid, NULL /* obd_connect_data */);
         if (rc) {
                 CERROR("6: failed to connect to MDC: %s\n", mdc_obd->obd_name);
-                RETURN(rc);
+                GOTO(ctxt_release, rc);
         }
         exp = class_conn2export(&exph);
 
         }
         exp = class_conn2export(&exph);
 
@@ -473,7 +481,8 @@ static int llog_test_6(struct obd_device *obd, char *name)
         rc = llog_create(nctxt, &llh, NULL, name);
         if (rc) {
                 CERROR("6: llog_create failed %d\n", rc);
         rc = llog_create(nctxt, &llh, NULL, name);
         if (rc) {
                 CERROR("6: llog_create failed %d\n", rc);
-                RETURN(rc);
+                llog_ctxt_put(nctxt);
+                GOTO(ctxt_release, rc);
         }
 
         rc = llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
         }
 
         rc = llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
@@ -492,12 +501,13 @@ static int llog_test_6(struct obd_device *obd, char *name)
 
 parse_out:
         rc = llog_close(llh);
 
 parse_out:
         rc = llog_close(llh);
+        llog_ctxt_put(nctxt);
         if (rc) {
                 CERROR("6: llog_close failed: rc = %d\n", rc);
         }
         if (rc) {
                 CERROR("6: llog_close failed: rc = %d\n", rc);
         }
-
         rc = obd_disconnect(exp);
         rc = obd_disconnect(exp);
-
+ctxt_release:
+        llog_ctxt_put(ctxt);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -517,7 +527,7 @@ static int llog_test_7(struct obd_device *obd)
         rc = llog_create(ctxt, &llh, NULL, name);
         if (rc) {
                 CERROR("7: llog_create with name %s failed: %d\n", name, rc);
         rc = llog_create(ctxt, &llh, NULL, name);
         if (rc) {
                 CERROR("7: llog_create with name %s failed: %d\n", name, rc);
-                RETURN(rc);
+                GOTO(ctxt_release, rc);
         }
         llog_init_handle(llh, LLOG_F_IS_PLAIN, &uuid);
 
         }
         llog_init_handle(llh, LLOG_F_IS_PLAIN, &uuid);
 
@@ -526,7 +536,7 @@ static int llog_test_7(struct obd_device *obd)
         rc = llog_write_rec(llh,  &lcr.lcr_hdr, NULL, 0, NULL, -1);
         if (rc) {
                 CERROR("7: write one log record failed: %d\n", rc);
         rc = llog_write_rec(llh,  &lcr.lcr_hdr, NULL, 0, NULL, -1);
         if (rc) {
                 CERROR("7: write one log record failed: %d\n", rc);
-                RETURN(rc);
+                GOTO(ctxt_release, rc);
         }
 
         rc = llog_destroy(llh);
         }
 
         rc = llog_destroy(llh);
@@ -534,6 +544,8 @@ static int llog_test_7(struct obd_device *obd)
                 CERROR("7: llog_destroy failed: %d\n", rc);
         else
                 llog_free_handle(llh);
                 CERROR("7: llog_destroy failed: %d\n", rc);
         else
                 llog_free_handle(llh);
+ctxt_release:
+        llog_ctxt_put(ctxt);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -592,19 +604,19 @@ static int llog_run_tests(struct obd_device *obd)
         case 0:
                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
         }
         case 0:
                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
         }
-
+        llog_ctxt_put(ctxt);
         return rc;
 }
 
 
         return rc;
 }
 
 
-static int llog_test_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+static int llog_test_llog_init(struct obd_device *obd, int group,
                                struct obd_device *tgt, int count, 
                                struct llog_catid *logid, struct obd_uuid *uuid)
 {
         int rc;
         ENTRY;
 
                                struct obd_device *tgt, int count, 
                                struct llog_catid *logid, struct obd_uuid *uuid)
 {
         int rc;
         ENTRY;
 
-        rc = llog_setup(obd, llogs, LLOG_TEST_ORIG_CTXT, tgt, 0, NULL, 
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_TEST_ORIG_CTXT, tgt, 0, NULL, 
                         &llog_lvfs_ops);
         RETURN(rc);
 }
                         &llog_lvfs_ops);
         RETURN(rc);
 }
@@ -662,7 +674,7 @@ static int llog_test_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 RETURN(-EINVAL);
         }
 
                 RETURN(-EINVAL);
         }
 
-        rc = obd_llog_init(obd, NULL, tgt, 0, NULL, NULL);
+        rc = obd_llog_init(obd, OBD_LLOG_GROUP, tgt, 0, NULL, NULL);
         if (rc)
                 RETURN(rc);
 
         if (rc)
                 RETURN(rc);
 
index 646e7fe..def7410 100644 (file)
@@ -188,6 +188,8 @@ int class_attach(struct lustre_cfg *lcfg)
         CFS_INIT_LIST_HEAD(&obd->obd_lock_replay_queue);
         CFS_INIT_LIST_HEAD(&obd->obd_final_req_queue);
 
         CFS_INIT_LIST_HEAD(&obd->obd_lock_replay_queue);
         CFS_INIT_LIST_HEAD(&obd->obd_final_req_queue);
 
+        llog_group_init(&obd->obd_olg, OBD_LLOG_GROUP);
+
         spin_lock_init(&obd->obd_uncommitted_replies_lock);
         CFS_INIT_LIST_HEAD(&obd->obd_uncommitted_replies);
 
         spin_lock_init(&obd->obd_uncommitted_replies_lock);
         CFS_INIT_LIST_HEAD(&obd->obd_uncommitted_replies);
 
index 053c10f..399698f 100644 (file)
@@ -63,7 +63,7 @@
 #include "filter_internal.h"
 
 /* Group 0 is no longer a legal group, to catch uninitialized IDs */
 #include "filter_internal.h"
 
 /* Group 0 is no longer a legal group, to catch uninitialized IDs */
-#define FILTER_MIN_GROUPS 3
+#define FILTER_MIN_GROUPS FILTER_GROUP_MDS0
 static struct lvfs_callback_ops filter_lvfs_ops;
 cfs_mem_cache_t *ll_fmd_cachep;
 
 static struct lvfs_callback_ops filter_lvfs_ops;
 cfs_mem_cache_t *ll_fmd_cachep;
 
@@ -82,7 +82,7 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
         struct filter_client_data *fcd = fed->fed_fcd;
         __u64 last_rcvd;
         loff_t off;
         struct filter_client_data *fcd = fed->fed_fcd;
         __u64 last_rcvd;
         loff_t off;
-        int err, log_pri = D_HA;
+        int err, log_pri = D_RPCTRACE;
 
         /* Propagate error code. */
         if (rc)
 
         /* Propagate error code. */
         if (rc)
@@ -1931,7 +1931,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
 
         spin_lock_init(&filter->fo_translock);
         spin_lock_init(&filter->fo_objidlock);
 
         spin_lock_init(&filter->fo_translock);
         spin_lock_init(&filter->fo_objidlock);
-        INIT_LIST_HEAD(&filter->fo_export_list);
+        CFS_INIT_LIST_HEAD(&filter->fo_export_list);
         sema_init(&filter->fo_alloc_lock, 1);
         init_brw_stats(&filter->fo_filter_stats);
         filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
         sema_init(&filter->fo_alloc_lock, 1);
         init_brw_stats(&filter->fo_filter_stats);
         filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
@@ -1962,7 +1962,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
 
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
 
-        rc = llog_cat_initialize(obd, NULL, 1, NULL);
+        rc = llog_cat_initialize(obd, &obd->obd_olg, 1, NULL);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_post, rc);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_post, rc);
@@ -2075,67 +2075,81 @@ static struct llog_operations filter_size_orig_logops = {
         lop_add: llog_obd_origin_add
 };
 
         lop_add: llog_obd_origin_add
 };
 
-static int filter_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+static int filter_llog_init(struct obd_device *obd, int group,
                             struct obd_device *tgt, int count,
                             struct llog_catid *catid,
                             struct obd_uuid *uuid)
 {
                             struct obd_device *tgt, int count,
                             struct llog_catid *catid,
                             struct obd_uuid *uuid)
 {
+        struct filter_obd *filter = &obd->u.filter;
+        struct obd_llog_group *olg;
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
 
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
 
+        olg = filter_find_olg(obd, group);
+        if (IS_ERR(olg))
+                RETURN(PTR_ERR(olg));
+
+        if (group == OBD_LLOG_GROUP) {
+                LASSERT(filter->fo_lcm == NULL);
+                OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master));
+                if (!filter->fo_lcm)
+                        RETURN(-ENOMEM);
+
+                rc = llog_init_commit_master((struct llog_commit_master *)
+                                             filter->fo_lcm);
+                if (rc)
+                        GOTO(cleanup, rc);
+
         filter_mds_ost_repl_logops = llog_client_ops;
         filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel;
         filter_mds_ost_repl_logops.lop_connect = llog_repl_connect;
         filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
         filter_mds_ost_repl_logops = llog_client_ops;
         filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel;
         filter_mds_ost_repl_logops.lop_connect = llog_repl_connect;
         filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
-
-        rc = llog_setup(obd, llogs, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
+        } else {
+                LASSERT(filter->fo_lcm != NULL);
+        }
+        rc = llog_setup(obd, olg, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
                         &filter_mds_ost_repl_logops);
         if (rc)
                         &filter_mds_ost_repl_logops);
         if (rc)
-                RETURN(rc);
+                GOTO(cleanup, rc);
 
         /* FIXME - assign unlink_cb for filter's recovery */
 
         /* FIXME - assign unlink_cb for filter's recovery */
-        if (!llogs)
-                ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
-        else
-                ctxt = llog_get_context_from_llogs(llogs, LLOG_MDS_OST_REPL_CTXT);
+        LASSERT(olg);
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
 
         LASSERT(ctxt != NULL);
         ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
 
         LASSERT(ctxt != NULL);
         ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
+        ctxt->loc_lcm = obd->u.filter.fo_lcm;
+        rc = llog_start_commit_thread(ctxt->loc_lcm);
+        llog_ctxt_put(ctxt);
+        if (rc)
+                GOTO(cleanup, rc);
 
 
-        rc = llog_setup(obd, llogs, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, olg, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
                         &filter_size_orig_logops);
                         &filter_size_orig_logops);
-        RETURN(rc);
-}
-
-static int filter_group_llog_cleanup(struct llog_ctxt *ctxt)
-{
-        int rc = 0;
-        ENTRY;
-
-        if (CTXTP(ctxt, cleanup))
-                rc = CTXTP(ctxt, cleanup)(ctxt);
-
-        if (ctxt->loc_exp)
-                class_export_put(ctxt->loc_exp);
-        OBD_FREE(ctxt, sizeof(*ctxt));
 
 
+cleanup:
+        if (rc) {
+                llog_cleanup_commit_master(filter->fo_lcm, 0);
+                OBD_FREE(filter->fo_lcm, sizeof(struct llog_commit_master));
+                filter->fo_lcm = NULL;
+        }
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-static int filter_group_llog_finish(struct obd_llogs *llogs)
+static int filter_group_llog_finish(struct obd_llog_group *olg)
 {
         struct llog_ctxt *ctxt;
         int rc = 0, rc2 = 0;
         ENTRY;
 
 {
         struct llog_ctxt *ctxt;
         int rc = 0, rc2 = 0;
         ENTRY;
 
-        ctxt = llog_get_context_from_llogs(llogs, LLOG_MDS_OST_REPL_CTXT);
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
         if (ctxt)
         if (ctxt)
-                rc = filter_group_llog_cleanup(ctxt);
+                rc = llog_cleanup(ctxt);
 
 
-        ctxt = llog_get_context_from_llogs(llogs, LLOG_SIZE_ORIG_CTXT);
+        ctxt = llog_group_get_ctxt(olg, LLOG_SIZE_ORIG_CTXT);
         if (ctxt)
         if (ctxt)
-                rc2 = filter_group_llog_cleanup(ctxt);
+                rc2 = llog_cleanup(ctxt);
         if (!rc)
                 rc = rc2;
 
         if (!rc)
                 rc = rc2;
 
@@ -2144,89 +2158,66 @@ static int filter_group_llog_finish(struct obd_llogs *llogs)
 
 static int filter_llog_finish(struct obd_device *obd, int count)
 {
 
 static int filter_llog_finish(struct obd_device *obd, int count)
 {
-        struct llog_ctxt *ctxt;
-        int rc = 0, rc2 = 0;
+        int rc;
         ENTRY;
 
         ENTRY;
 
-        ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
-        if (ctxt)
-                rc = llog_cleanup(ctxt);
-
-        ctxt = llog_get_context(obd, LLOG_SIZE_ORIG_CTXT);
-        if (ctxt)
-                rc2 = llog_cleanup(ctxt);
-        if (!rc)
-                rc = rc2;
+        if (obd->u.filter.fo_lcm) { 
+                llog_cleanup_commit_master((struct llog_commit_master *)
+                                           obd->u.filter.fo_lcm, 0);
+                OBD_FREE(obd->u.filter.fo_lcm, 
+                         sizeof(struct llog_commit_master));
+                obd->u.filter.fo_lcm = NULL;
+        }
+        /* finish obd llog group */
+        rc = filter_group_llog_finish(&obd->obd_olg);
 
         RETURN(rc);
 }
 
 
         RETURN(rc);
 }
 
-struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group,
-                                             struct obd_export *export)
+struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group)
 {
 {
-        struct filter_group_llog *fglog, *nlog;
+        struct obd_llog_group *olg, *nolg;
         struct filter_obd *filter;
         struct filter_obd *filter;
-        struct llog_ctxt *ctxt;
-        struct list_head *cur;
         int rc;
 
         filter = &obd->u.filter;
 
         int rc;
 
         filter = &obd->u.filter;
 
+        if (group == OBD_LLOG_GROUP)
+                RETURN(&obd->obd_olg);
+
         spin_lock(&filter->fo_llog_list_lock);
         spin_lock(&filter->fo_llog_list_lock);
-        list_for_each(cur, &filter->fo_llog_list) {
-                fglog = list_entry(cur, struct filter_group_llog, list);
-                if (fglog->group == group) {
-                        if (!(fglog->exp == NULL || fglog->exp == export || export == NULL))
-                                CWARN("%s: export for group %d changes: 0x%p -> 0x%p\n",
-                                      obd->obd_name, group, fglog->exp, export);
+        list_for_each_entry(olg, &filter->fo_llog_list, olg_list) {
+                if (olg->olg_group == group) {
                         spin_unlock(&filter->fo_llog_list_lock);
                         spin_unlock(&filter->fo_llog_list_lock);
-                        goto init;
+                        RETURN(olg);
                 }
         }
         spin_unlock(&filter->fo_llog_list_lock);
 
                 }
         }
         spin_unlock(&filter->fo_llog_list_lock);
 
-        if (export == NULL)
-                RETURN(NULL);
-
-        OBD_ALLOC_PTR(fglog);
-        if (fglog == NULL)
-                RETURN(NULL);
-        fglog->group = group;
-
-        OBD_ALLOC_PTR(fglog->llogs);
-        if (fglog->llogs == NULL) {
-                OBD_FREE_PTR(fglog);
-                RETURN(NULL);
-        }
+        OBD_ALLOC_PTR(olg);
+        if (olg == NULL)
+                RETURN(ERR_PTR(-ENOMEM));
 
 
+        llog_group_init(olg, group);
         spin_lock(&filter->fo_llog_list_lock);
         spin_lock(&filter->fo_llog_list_lock);
-        list_for_each(cur, &filter->fo_llog_list) {
-                nlog = list_entry(cur, struct filter_group_llog, list);
-                LASSERT(nlog->group != group);
+        list_for_each_entry(nolg, &filter->fo_llog_list, olg_list) {
+                LASSERT(nolg->olg_group != group);
         }
         }
-        list_add(&fglog->list, &filter->fo_llog_list);
+        list_add(&olg->olg_list, &filter->fo_llog_list);
         spin_unlock(&filter->fo_llog_list_lock);
 
         spin_unlock(&filter->fo_llog_list_lock);
 
-        rc = llog_cat_initialize(obd, fglog->llogs, 1, NULL);
+        rc = llog_cat_initialize(obd, olg, 1, NULL);
         if (rc) {
         if (rc) {
-                OBD_FREE_PTR(fglog->llogs);
-                OBD_FREE_PTR(fglog);
-                RETURN(NULL);
-        }
-
-init:
-        if (export) {
-                fglog->exp = export;
-                ctxt = llog_get_context_from_llogs(fglog->llogs,
-                                               LLOG_MDS_OST_REPL_CTXT);
-                LASSERT(ctxt != NULL);
-
-                llog_receptor_accept(ctxt, export->exp_imp_reverse);
+                spin_lock(&filter->fo_llog_list_lock);
+                list_del(&olg->olg_list);
+                spin_unlock(&filter->fo_llog_list_lock);
+                OBD_FREE_PTR(olg);
+                RETURN(ERR_PTR(rc));
         }
         }
-        CDEBUG(D_OTHER, "%s: new llog 0x%p for group %u\n",
-               obd->obd_name, fglog->llogs, group);
+        CDEBUG(D_OTHER, "%s: new llog group %u (0x%p)\n",
+               obd->obd_name, group, olg);
 
 
-        RETURN(fglog->llogs);
+        RETURN(olg);
 }
 
 static int filter_llog_connect(struct obd_export *exp,
 }
 
 static int filter_llog_connect(struct obd_export *exp,
@@ -2234,7 +2225,7 @@ static int filter_llog_connect(struct obd_export *exp,
 {
         struct obd_device *obd = exp->exp_obd;
         struct llog_ctxt *ctxt;
 {
         struct obd_device *obd = exp->exp_obd;
         struct llog_ctxt *ctxt;
-        struct obd_llogs *llog;
+        struct obd_llog_group *olg;
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
@@ -2243,13 +2234,17 @@ static int filter_llog_connect(struct obd_export *exp,
                 (unsigned) body->lgdc_logid.lgl_oid,
                 (unsigned) body->lgdc_logid.lgl_ogen);
 
                 (unsigned) body->lgdc_logid.lgl_oid,
                 (unsigned) body->lgdc_logid.lgl_ogen);
 
-        llog = filter_grab_llog_for_group(obd, body->lgdc_logid.lgl_ogr, exp);
-        LASSERT(llog != NULL);
-        ctxt = llog_get_context_from_llogs(llog, body->lgdc_ctxt_idx);
+        olg = filter_find_olg(obd, body->lgdc_logid.lgl_ogr);
+        if (IS_ERR(olg))
+                RETURN(PTR_ERR(olg));
+        llog_group_set_export(olg, exp);
+
+        ctxt = llog_group_get_ctxt(olg, body->lgdc_ctxt_idx);
         LASSERTF(ctxt != NULL, "ctxt is not null, ctxt idx %d \n",
                  body->lgdc_ctxt_idx);
         rc = llog_connect(ctxt, 1, &body->lgdc_logid,
                           &body->lgdc_gen, NULL);
         LASSERTF(ctxt != NULL, "ctxt is not null, ctxt idx %d \n",
                  body->lgdc_ctxt_idx);
         rc = llog_connect(ctxt, 1, &body->lgdc_logid,
                           &body->lgdc_gen, NULL);
+        llog_ctxt_put(ctxt);
         if (rc != 0)
                 CERROR("failed to connect rc %d idx %d\n", rc,
                                 body->lgdc_ctxt_idx);
         if (rc != 0)
                 CERROR("failed to connect rc %d idx %d\n", rc,
                                 body->lgdc_ctxt_idx);
@@ -2259,33 +2254,32 @@ static int filter_llog_connect(struct obd_export *exp,
 
 static int filter_llog_preclean (struct obd_device *obd)
 {
 
 static int filter_llog_preclean (struct obd_device *obd)
 {
-        struct filter_group_llog *log;
+        struct obd_llog_group *olg;
         struct filter_obd *filter;
         int rc = 0;
         ENTRY;
 
         struct filter_obd *filter;
         int rc = 0;
         ENTRY;
 
+        rc = obd_llog_finish(obd, 0);
+        if (rc)
+                CERROR("failed to cleanup llogging subsystem\n");
+
         filter = &obd->u.filter;
         spin_lock(&filter->fo_llog_list_lock);
         while (!list_empty(&filter->fo_llog_list)) {
         filter = &obd->u.filter;
         spin_lock(&filter->fo_llog_list_lock);
         while (!list_empty(&filter->fo_llog_list)) {
-                log = list_entry(filter->fo_llog_list.next,
-                                 struct filter_group_llog, list);
-                list_del(&log->list);
+                olg = list_entry(filter->fo_llog_list.next,
+                                 struct obd_llog_group, olg_list);
+                list_del(&olg->olg_list);
                 spin_unlock(&filter->fo_llog_list_lock);
 
                 spin_unlock(&filter->fo_llog_list_lock);
 
-                rc = filter_group_llog_finish(log->llogs);
+                rc = filter_group_llog_finish(olg);
                 if (rc)
                         CERROR("failed to cleanup llogging subsystem for %u\n",
                 if (rc)
                         CERROR("failed to cleanup llogging subsystem for %u\n",
-                                log->group);
-                OBD_FREE_PTR(log->llogs);
-                OBD_FREE_PTR(log);
+                               olg->olg_group);
+                OBD_FREE_PTR(olg);
                 spin_lock(&filter->fo_llog_list_lock);
         }
         spin_unlock(&filter->fo_llog_list_lock);
 
                 spin_lock(&filter->fo_llog_list_lock);
         }
         spin_unlock(&filter->fo_llog_list_lock);
 
-        rc = obd_llog_finish(obd, 0);
-        if (rc)
-                CERROR("failed to cleanup llogging subsystem\n");
-
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -2661,6 +2655,10 @@ static int filter_destroy_export(struct obd_export *exp)
                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
                        exp, exp->exp_filter_data.fed_pending);
 
                        exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
                        exp, exp->exp_filter_data.fed_pending);
 
+        /* Not ported yet the b1_6 quota functionality
+         * lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd);
+         */
+
         target_destroy_export(exp);
 
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
         target_destroy_export(exp);
 
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
@@ -2684,7 +2682,7 @@ static int filter_destroy_export(struct obd_export *exp)
 
 static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp)
 {
 
 static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp)
 {
-        struct filter_group_llog *fglog, *nlog;
+        struct obd_llog_group *olg_min, *olg;
         struct filter_obd *filter;
         int worked = 0, group;
         struct llog_ctxt *ctxt;
         struct filter_obd *filter;
         int worked = 0, group;
         struct llog_ctxt *ctxt;
@@ -2697,35 +2695,41 @@ static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp)
          * group order and skip already synced llogs -bzzz */
         do {
                 /* look for group with min. number, but > worked */
          * group order and skip already synced llogs -bzzz */
         do {
                 /* look for group with min. number, but > worked */
-                fglog = NULL;
+                olg_min = NULL;
                 group = 1 << 30;
                 spin_lock(&filter->fo_llog_list_lock);
                 group = 1 << 30;
                 spin_lock(&filter->fo_llog_list_lock);
-                list_for_each_entry(nlog, &filter->fo_llog_list, list) {
-                        if (nlog->group <= worked) {
+                list_for_each_entry(olg, &filter->fo_llog_list, olg_list) {
+                        if (olg->olg_group <= worked) {
                                 /* this group is already synced */
                                 continue;
                         }
                                 /* this group is already synced */
                                 continue;
                         }
-                        if (group < nlog->group) {
+                        if (group < olg->olg_group) {
                                 /* we have group with smaller number to sync */
                                 continue;
                         }
                         /* store current minimal group */
                                 /* we have group with smaller number to sync */
                                 continue;
                         }
                         /* store current minimal group */
-                        fglog = nlog;
-                        group = nlog->group;
+                        olg_min = olg;
+                        group = olg->olg_group;
                 }
                 spin_unlock(&filter->fo_llog_list_lock);
 
                 }
                 spin_unlock(&filter->fo_llog_list_lock);
 
-                if (fglog == NULL)
+                if (olg_min == NULL)
                         break;
 
                         break;
 
-                worked = fglog->group;
-                if (fglog->exp && (dexp == fglog->exp || dexp == NULL)) {
-                        ctxt = llog_get_context_from_llogs(fglog->llogs,
+                worked = olg_min->olg_group;
+                if (olg_min->olg_exp &&
+                    (dexp == olg_min->olg_exp || dexp == NULL)) {
+                        int err;
+                        ctxt = llog_group_get_ctxt(olg_min,
                                                 LLOG_MDS_OST_REPL_CTXT);
                         LASSERT(ctxt != NULL);
                                                 LLOG_MDS_OST_REPL_CTXT);
                         LASSERT(ctxt != NULL);
-                        llog_sync(ctxt, fglog->exp);
+                        err = llog_sync(ctxt, olg_min->olg_exp);
+                        llog_ctxt_put(ctxt);
+                        if (err)
+                                CERROR("error flushing logs to MDS: rc %d\n",
+                                       err);                        
                 }
                 }
-        } while (fglog != NULL);
+        } while (olg_min != NULL);
 }
 
 /* also incredibly similar to mds_disconnect */
 }
 
 /* also incredibly similar to mds_disconnect */
@@ -3615,9 +3619,17 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa,
                        oa->o_id);
                 /* If object already gone, cancel cookie right now */
                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
                        oa->o_id);
                 /* If object already gone, cancel cookie right now */
                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
+                        struct llog_ctxt *ctxt;
+                        struct obd_llog_group *olg;
                         fcc = obdo_logcookie(oa);
                         fcc = obdo_logcookie(oa);
-                        llog_cancel(llog_get_context(obd, fcc->lgc_subsys + 1),
-                                    NULL, 1, fcc, 0);
+                        olg = filter_find_olg(obd, oa->o_gr);
+                        if (IS_ERR(olg))
+                                GOTO(cleanup, rc = PTR_ERR(olg));
+                        llog_group_set_export(olg, exp);
+
+                        ctxt = llog_group_get_ctxt(olg, fcc->lgc_subsys + 1);
+                        llog_cancel(ctxt, NULL, 1, fcc, 0);
+                        llog_ctxt_put(ctxt);
                         fcc = NULL; /* we didn't allocate fcc, don't free it */
                 }
                 GOTO(cleanup, rc = -ENOENT);
                         fcc = NULL; /* we didn't allocate fcc, don't free it */
                 }
                 GOTO(cleanup, rc = -ENOENT);
@@ -3770,7 +3782,6 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         struct lvfs_run_ctxt saved;
         struct filter_obd *filter;
         struct dentry *dentry;
         struct lvfs_run_ctxt saved;
         struct filter_obd *filter;
         struct dentry *dentry;
-        struct llog_ctxt *ctxt;
         int rc, rc2;
         ENTRY;
 
         int rc, rc2;
         ENTRY;
 
@@ -3785,8 +3796,7 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
                 rc = fsfilt_sync(exp->exp_obd, filter->fo_obt.obt_sb);
                 /* flush any remaining cancel messages out to the target */
         if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
                 rc = fsfilt_sync(exp->exp_obd, filter->fo_obt.obt_sb);
                 /* flush any remaining cancel messages out to the target */
-                ctxt = llog_get_context(exp->exp_obd, LLOG_MDS_OST_REPL_CTXT);
-                llog_sync(ctxt, exp);
+                filter_sync_llogs(exp->exp_obd, exp);
                 RETURN(rc);
         }
 
                 RETURN(rc);
         }
 
@@ -3877,7 +3887,7 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
                                  struct ptlrpc_request_set *set)
 {
         struct obd_device *obd;
                                  struct ptlrpc_request_set *set)
 {
         struct obd_device *obd;
-        struct obd_llogs *llog;
+        struct obd_llog_group *olg;
         struct llog_ctxt *ctxt;
         int rc = 0, group;
         ENTRY;
         struct llog_ctxt *ctxt;
         int rc = 0, group;
         ENTRY;
@@ -3912,12 +3922,16 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
         group = (int)(*(__u32 *)val);
         LASSERT(group >= FILTER_GROUP_MDS0);
 
         group = (int)(*(__u32 *)val);
         LASSERT(group >= FILTER_GROUP_MDS0);
 
-        llog = filter_grab_llog_for_group(obd, group, exp);
-        LASSERT(llog != NULL);
-        ctxt = llog_get_context_from_llogs(llog, LLOG_MDS_OST_REPL_CTXT);
-        LASSERTF(ctxt != NULL, "ctxt is not null\n"),
+        olg = filter_find_olg(obd, group);
+        if (IS_ERR(olg))
+                RETURN(PTR_ERR(olg));
+        llog_group_set_export(olg, exp);
+
+        ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
+        LASSERTF(ctxt != NULL, "ctxt is null\n"),
 
         rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
 
         rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
+        llog_ctxt_put(ctxt);
 
         lquota_setinfo(filter_quota_interface_ref, exp, obd);
 
 
         lquota_setinfo(filter_quota_interface_ref, exp, obd);
 
index a92d621..b0ef32e 100644 (file)
@@ -124,8 +124,7 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo,
 
 struct dentry *filter_create_object(struct obd_device *obd, struct obdo *oa);
 
 
 struct dentry *filter_create_object(struct obd_device *obd, struct obdo *oa);
 
-struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group,
-                                             struct obd_export *export);
+struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group);
 
 /* filter_lvb.c */
 extern struct ldlm_valblock_ops filter_lvbo;
 
 /* filter_lvb.c */
 extern struct ldlm_valblock_ops filter_lvbo;
index b5cd83e..20afcf3 100644 (file)
@@ -102,29 +102,37 @@ void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
                               void *cb_data, int error)
 {
         struct llog_cookie *cookie = cb_data;
                               void *cb_data, int error)
 {
         struct llog_cookie *cookie = cb_data;
-        struct obd_llogs *llogs;
+        struct obd_llog_group *olg;
         struct llog_ctxt *ctxt;
         struct llog_ctxt *ctxt;
+        int rc;        
         
         /* we have to find context for right group */
         
         /* we have to find context for right group */
-        if (error != 0) {
-                CDEBUG(D_INODE, "not cancelling llog cookie on error %d\n",
-                       error);
-                goto out;
+        if (error != 0 || obd->obd_stopping) {
+                CDEBUG(D_INODE, "not cancel logcookie err %d stopping %d \n",
+                       error, obd->obd_stopping);
+                GOTO (out, rc = 0);
         }
         }
-        llogs = filter_grab_llog_for_group(obd, cookie->lgc_lgl.lgl_ogr, NULL);
 
 
-        if (llogs) {
-                ctxt = llog_get_context_from_llogs(llogs, cookie->lgc_subsys + 1);
-                if (ctxt) {
-                        llog_cancel(ctxt, NULL, 1, cookie, 0);
-                } else
+        olg = filter_find_olg(obd, cookie->lgc_lgl.lgl_ogr);
+        if (!IS_ERR(olg)) {
+                ctxt = llog_group_get_ctxt(olg, cookie->lgc_subsys + 1);
+                if (!ctxt) {
                         CERROR("no valid context for group "LPU64"\n",
                                 cookie->lgc_lgl.lgl_ogr);
                         CERROR("no valid context for group "LPU64"\n",
                                 cookie->lgc_lgl.lgl_ogr);
+                        GOTO(out, rc = 0);
+                }
+
+                OBD_FAIL_TIMEOUT(OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT, 30);
+
+                rc = llog_cancel(ctxt, NULL, 1, cookie, 0);
+                if (rc)
+                        CERROR("error cancelling log cookies: rc = %d\n", rc);
+                llog_ctxt_put(ctxt);
         } else {
                 CDEBUG(D_HA, "unknown group "LPU64"!\n", cookie->lgc_lgl.lgl_ogr);
         }
 out:
         } else {
                 CDEBUG(D_HA, "unknown group "LPU64"!\n", cookie->lgc_lgl.lgl_ogr);
         }
 out:
-        OBD_FREE(cookie, sizeof(struct llog_cookie));
+        OBD_FREE(cookie, sizeof(*cookie));
 }
 
 /* Callback for processing the unlink log record received from MDS by 
 }
 
 /* Callback for processing the unlink log record received from MDS by 
@@ -218,11 +226,15 @@ int filter_recov_log_mds_ost_cb(struct llog_handle *llh,
         int rc = 0;
         ENTRY;
 
         int rc = 0;
         ENTRY;
 
+        if (ctxt->loc_obd->obd_stopping)
+                RETURN(LLOG_PROC_BREAK);
+
         if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
                 CERROR("log is not plain\n");
                 RETURN(-EINVAL);
         }
 
         if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
                 CERROR("log is not plain\n");
                 RETURN(-EINVAL);
         }
 
+        OBD_FAIL_TIMEOUT(OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT, 30);
         cookie.lgc_lgl = llh->lgh_id;
         cookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
         cookie.lgc_index = rec->lrh_index;
         cookie.lgc_lgl = llh->lgh_id;
         cookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
         cookie.lgc_index = rec->lrh_index;
index 853d6f3..d8d5be7 100644 (file)
@@ -3452,6 +3452,7 @@ static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
                                "ctxt %p: %d\n", ctxt, rc);
         }
 
                                "ctxt %p: %d\n", ctxt, rc);
         }
 
+        llog_ctxt_put(ctxt);
         spin_lock(&imp->imp_lock);
         imp->imp_server_timeout = 1;
         imp->imp_pingable = 1;
         spin_lock(&imp->imp_lock);
         imp->imp_server_timeout = 1;
         imp->imp_pingable = 1;
@@ -3571,13 +3572,13 @@ static struct llog_operations osc_size_repl_logops = {
 };
 
 static struct llog_operations osc_mds_ost_orig_logops;
 };
 
 static struct llog_operations osc_mds_ost_orig_logops;
-static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+static int osc_llog_init(struct obd_device *obd, int group,
                          struct obd_device *tgt, int count,
                          struct llog_catid *catid, struct obd_uuid *uuid)
 {
         int rc;
         ENTRY;
                          struct obd_device *tgt, int count,
                          struct llog_catid *catid, struct obd_uuid *uuid)
 {
         int rc;
         ENTRY;
-
+        LASSERT(group == OBD_LLOG_GROUP);
         spin_lock(&obd->obd_dev_lock);
         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
                 osc_mds_ost_orig_logops = llog_lvfs_ops;
         spin_lock(&obd->obd_dev_lock);
         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
                 osc_mds_ost_orig_logops = llog_lvfs_ops;
@@ -3588,15 +3589,15 @@ static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
         }
         spin_unlock(&obd->obd_dev_lock);
 
         }
         spin_unlock(&obd->obd_dev_lock);
 
-        rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
                         &catid->lci_logid, &osc_mds_ost_orig_logops);
         if (rc) {
                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
                 GOTO (out, rc);
         }
 
                         &catid->lci_logid, &osc_mds_ost_orig_logops);
         if (rc) {
                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
                 GOTO (out, rc);
         }
 
-        rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
-                        &osc_size_repl_logops);
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
+                        NULL, &osc_size_repl_logops);
         if (rc)
                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
 out:
         if (rc)
                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
 out:
@@ -3666,6 +3667,8 @@ static int osc_disconnect(struct obd_export *exp)
                 /* flush any remaining cancel messages out to the target */
                 llog_sync(ctxt, exp);
 
                 /* flush any remaining cancel messages out to the target */
                 llog_sync(ctxt, exp);
 
+        llog_ctxt_put(ctxt);
+
         rc = client_disconnect_export(exp);
         return rc;
 }
         rc = client_disconnect_export(exp);
         return rc;
 }
index 5a58e67..59b9e55 100644 (file)
@@ -1846,10 +1846,6 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
         if (rc)
                 RETURN(rc);
 
         if (rc)
                 RETURN(rc);
 
-        rc = llog_start_commit_thread();
-        if (rc < 0)
-                RETURN(rc);
-
         lprocfs_ost_init_vars(&lvars);
         lprocfs_obd_setup(obd, lvars.obd_vars);
 
         lprocfs_ost_init_vars(&lvars);
         lprocfs_obd_setup(obd, lvars.obd_vars);
 
index 492f390..a1c54fc 100644 (file)
@@ -110,6 +110,8 @@ int llog_handle_connect(struct ptlrpc_request *req)
         ctxt = llog_get_context(obd, req_body->lgdc_ctxt_idx);
         rc = llog_connect(ctxt, 1, &req_body->lgdc_logid,
                           &req_body->lgdc_gen, NULL);
         ctxt = llog_get_context(obd, req_body->lgdc_ctxt_idx);
         rc = llog_connect(ctxt, 1, &req_body->lgdc_logid,
                           &req_body->lgdc_gen, NULL);
+
+        llog_ctxt_put(ctxt);
         if (rc != 0)
                 CERROR("failed at llog_relp_connect\n");
 
         if (rc != 0)
                 CERROR("failed at llog_relp_connect\n");
 
@@ -121,6 +123,8 @@ int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp)
 {
         ENTRY;
         LASSERT(ctxt);
 {
         ENTRY;
         LASSERT(ctxt);
+        LASSERTF(ctxt->loc_imp == NULL || ctxt->loc_imp == imp,
+                 "%p - %p\n", ctxt->loc_imp, imp);
         ctxt->loc_imp = imp;
         RETURN(0);
 }
         ctxt->loc_imp = imp;
         RETURN(0);
 }
@@ -128,9 +132,14 @@ EXPORT_SYMBOL(llog_receptor_accept);
 
 int llog_initiator_connect(struct llog_ctxt *ctxt)
 {
 
 int llog_initiator_connect(struct llog_ctxt *ctxt)
 {
+        struct obd_import *new_imp;
         ENTRY;
         ENTRY;
+
         LASSERT(ctxt);
         LASSERT(ctxt);
-        ctxt->loc_imp = ctxt->loc_obd->u.cli.cl_import;
+        new_imp = ctxt->loc_obd->u.cli.cl_import;
+        LASSERTF(ctxt->loc_imp == NULL || ctxt->loc_imp == new_imp,
+                 "%p - %p\n", ctxt->loc_imp, new_imp);
+        ctxt->loc_imp = new_imp;
         RETURN(0);
 }
 EXPORT_SYMBOL(llog_initiator_connect);
         RETURN(0);
 }
 EXPORT_SYMBOL(llog_initiator_connect);
index 4ae01cf..d0b618b 100644 (file)
@@ -60,7 +60,7 @@ int llog_origin_handle_create(struct ptlrpc_request *req)
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
-                GOTO(out, rc =-EFAULT);
+                RETURN(-EFAULT);
 
         if (body->lgd_logid.lgl_oid > 0)
                 logid = &body->lgd_logid;
 
         if (body->lgd_logid.lgl_oid > 0)
                 logid = &body->lgd_logid;
@@ -68,13 +68,13 @@ int llog_origin_handle_create(struct ptlrpc_request *req)
         if (req_capsule_field_present(&req->rq_pill, &RMF_NAME, RCL_CLIENT)) {
                 name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
                 if (name == NULL)
         if (req_capsule_field_present(&req->rq_pill, &RMF_NAME, RCL_CLIENT)) {
                 name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
                 if (name == NULL)
-                        GOTO(out, rc = -EFAULT);
+                        RETURN(-EFAULT);
                 CDEBUG(D_INFO, "opening log %s\n", name);
         }
 
         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
         if (ctxt == NULL)
                 CDEBUG(D_INFO, "opening log %s\n", name);
         }
 
         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
         if (ctxt == NULL)
-                GOTO(out, rc = -EINVAL);
+                RETURN(-EINVAL);
         disk_obd = ctxt->loc_exp->exp_obd;
         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
         disk_obd = ctxt->loc_exp->exp_obd;
         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
@@ -95,7 +95,7 @@ out_close:
                 rc = rc2;
 out_pop:
         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
                 rc = rc2;
 out_pop:
         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-out:
+        llog_ctxt_put(ctxt);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -115,14 +115,15 @@ int llog_origin_handle_destroy(struct ptlrpc_request *req)
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
-                GOTO(out, rc =-EFAULT);
+                RETURN(-EFAULT);
 
         if (body->lgd_logid.lgl_oid > 0)
                 logid = &body->lgd_logid;
 
         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
         if (ctxt == NULL)
 
         if (body->lgd_logid.lgl_oid > 0)
                 logid = &body->lgd_logid;
 
         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
         if (ctxt == NULL)
-                GOTO(out, rc = -EINVAL);
+                RETURN(-EINVAL);
+
         disk_obd = ctxt->loc_exp->exp_obd;
         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
         disk_obd = ctxt->loc_exp->exp_obd;
         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
@@ -150,7 +151,7 @@ out_close:
                 llog_close(loghandle);
 out_pop:
         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
                 llog_close(loghandle);
 out_pop:
         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-out:
+        llog_ctxt_put(ctxt);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -172,11 +173,11 @@ int llog_origin_handle_next_block(struct ptlrpc_request *req)
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
-                GOTO(out, rc =-EFAULT);
+                RETURN(-EFAULT);
 
         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
         if (!buf)
 
         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
         if (!buf)
-                GOTO(out, rc = -ENOMEM);
+                RETURN(-ENOMEM);
 
         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
         if (ctxt == NULL)
 
         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
         if (ctxt == NULL)
@@ -219,9 +220,9 @@ out_close:
 
 out_pop:
         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
 out_pop:
         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+        llog_ctxt_put(ctxt);
 out_free:
         OBD_FREE(buf, LLOG_CHUNK_SIZE);
 out_free:
         OBD_FREE(buf, LLOG_CHUNK_SIZE);
-out:
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -243,11 +244,11 @@ int llog_origin_handle_prev_block(struct ptlrpc_request *req)
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
-                GOTO(out, rc =-EFAULT);
+                RETURN(-EFAULT);
 
         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
         if (!buf)
 
         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
         if (!buf)
-                GOTO(out, rc = -ENOMEM);
+                RETURN(-ENOMEM);
 
         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
         LASSERT(ctxt != NULL);
 
         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
         LASSERT(ctxt != NULL);
@@ -288,8 +289,8 @@ out_close:
 
 out_pop:
         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
 out_pop:
         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+        llog_ctxt_put(ctxt);
         OBD_FREE(buf, LLOG_CHUNK_SIZE);
         OBD_FREE(buf, LLOG_CHUNK_SIZE);
-out:
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -309,11 +310,11 @@ int llog_origin_handle_read_header(struct ptlrpc_request *req)
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
         if (body == NULL)
-                GOTO(out, rc =-EFAULT);
+                RETURN(-EFAULT);
 
         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
         if (ctxt == NULL)
 
         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
         if (ctxt == NULL)
-                GOTO(out, rc = -EINVAL);
+                RETURN(-EINVAL);
         disk_obd = ctxt->loc_exp->exp_obd;
         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
         disk_obd = ctxt->loc_exp->exp_obd;
         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
@@ -338,11 +339,9 @@ out_close:
         rc2 = llog_close(loghandle);
         if (!rc)
                 rc = rc2;
         rc2 = llog_close(loghandle);
         if (!rc)
                 rc = rc2;
-
 out_pop:
         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 out_pop:
         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-
-out:
+        llog_ctxt_put(ctxt);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -360,7 +359,7 @@ int llog_origin_handle_cancel(struct ptlrpc_request *req)
         struct obd_device *obd = req->rq_export->exp_obd;
         struct obd_device *disk_obd;
         struct llog_cookie *logcookies;
         struct obd_device *obd = req->rq_export->exp_obd;
         struct obd_device *disk_obd;
         struct llog_cookie *logcookies;
-        struct llog_ctxt *ctxt;
+        struct llog_ctxt *ctxt = NULL;
         int num_cookies, rc = 0, err, i;
         struct lvfs_run_ctxt saved;
         struct llog_handle *cathandle;
         int num_cookies, rc = 0, err, i;
         struct lvfs_run_ctxt saved;
         struct llog_handle *cathandle;
@@ -411,8 +410,9 @@ pop_ctxt:
         if (rc)
                 CERROR("cancel %d llog-records failed: %d\n", num_cookies, rc);
         else
         if (rc)
                 CERROR("cancel %d llog-records failed: %d\n", num_cookies, rc);
         else
-                CDEBUG(D_HA, "cancel %d llog-records\n", num_cookies);
+                CDEBUG(D_RPCTRACE, "cancel %d llog-records\n", num_cookies);
 
 
+        llog_ctxt_put(ctxt);
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_origin_handle_cancel);
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_origin_handle_cancel);
@@ -429,7 +429,7 @@ static int llog_catinfo_config(struct obd_device *obd, char *buf, int buf_len,
         char                 *out = buf;
 
         if (ctxt == NULL || mds == NULL)
         char                 *out = buf;
 
         if (ctxt == NULL || mds == NULL)
-                RETURN(-EOPNOTSUPP);
+                GOTO(release_ctxt, rc = -EOPNOTSUPP);
 
         push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
 
 
         push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
 
@@ -468,6 +468,8 @@ static int llog_catinfo_config(struct obd_device *obd, char *buf, int buf_len,
         }
 out_pop:
         pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
         }
 out_pop:
         pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
+release_ctxt:
+        llog_ctxt_put(ctxt);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -483,7 +485,7 @@ static int llog_catinfo_cb(struct llog_handle *cat,
 {
         static char *out = NULL;
         static int remains = 0;
 {
         static char *out = NULL;
         static int remains = 0;
-        struct llog_ctxt *ctxt;
+        struct llog_ctxt *ctxt = NULL;
         struct llog_handle *handle;
         struct llog_logid *logid;
         struct llog_logid_rec *lir;
         struct llog_handle *handle;
         struct llog_logid *logid;
         struct llog_logid_rec *lir;
@@ -495,11 +497,13 @@ static int llog_catinfo_cb(struct llog_handle *cat,
                 remains = cbd->remains;
                 cbd->init = 0;
         }
                 remains = cbd->remains;
                 cbd->init = 0;
         }
-        ctxt = cbd->ctxt;
 
         if (!(cat->lgh_hdr->llh_flags & LLOG_F_IS_CAT))
                 RETURN(-EINVAL);
 
 
         if (!(cat->lgh_hdr->llh_flags & LLOG_F_IS_CAT))
                 RETURN(-EINVAL);
 
+        if (!cbd->ctxt)
+                RETURN(-EINVAL);
+        
         lir = (struct llog_logid_rec *)rec;
         logid = &lir->lid_id;
         rc = llog_create(ctxt, &handle, logid, NULL);
         lir = (struct llog_logid_rec *)rec;
         logid = &lir->lid_id;
         rc = llog_create(ctxt, &handle, logid, NULL);
@@ -549,14 +553,14 @@ static int llog_catinfo_deletions(struct obd_device *obd, char *buf,
         int rc;
 
         if (ctxt == NULL || mds == NULL)
         int rc;
 
         if (ctxt == NULL || mds == NULL)
-                RETURN(-EOPNOTSUPP);
+                GOTO(release_ctxt, rc = -EOPNOTSUPP);
 
         count = mds->mds_lov_desc.ld_tgt_count;
         size = sizeof(*idarray) * count;
 
         OBD_ALLOC(idarray, size);
         if (!idarray)
 
         count = mds->mds_lov_desc.ld_tgt_count;
         size = sizeof(*idarray) * count;
 
         OBD_ALLOC(idarray, size);
         if (!idarray)
-                RETURN(-ENOMEM);
+                GOTO(release_ctxt, rc = -ENOMEM);
 
         rc = llog_get_cat_list(obd, obd, name, count, idarray);
         if (rc)
 
         rc = llog_get_cat_list(obd, obd, name, count, idarray);
         if (rc)
@@ -603,6 +607,9 @@ out_pop:
         pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
 out_free:
         OBD_FREE(idarray, size);
         pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
 out_free:
         OBD_FREE(idarray, size);
+release_ctxt:
+        llog_ctxt_put(ctxt);
+
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
index 1cce741..0e4adb8 100644 (file)
@@ -62,8 +62,6 @@ void ptlrpc_lprocfs_do_request_stat (struct ptlrpc_request *req,
 #endif /* LPROCFS */
 
 /* recovd_thread.c */
 #endif /* LPROCFS */
 
 /* recovd_thread.c */
-int llog_init_commit_master(void);
-int llog_cleanup_commit_master(int force);
 
 int ptlrpc_expire_one_request(struct ptlrpc_request *req);
 
 
 int ptlrpc_expire_one_request(struct ptlrpc_request *req);
 
index 42fb922..1bf4afd 100644 (file)
@@ -69,27 +69,22 @@ __init int ptlrpc_init(void)
                 RETURN(rc);
         cleanup_phase = 2;
 
                 RETURN(rc);
         cleanup_phase = 2;
 
-        ptlrpc_init_connection();
+        rc = ptlrpc_init_connection();
         if (rc)
                 GOTO(cleanup, rc);
         cleanup_phase = 3;
 
         if (rc)
                 GOTO(cleanup, rc);
         cleanup_phase = 3;
 
-        rc = llog_init_commit_master();
-        if (rc)
-                GOTO(cleanup, rc);
-        cleanup_phase = 4;
-
         ptlrpc_put_connection_superhack = ptlrpc_put_connection;
 
         rc = ptlrpc_start_pinger();
         if (rc)
                 GOTO(cleanup, rc);
         ptlrpc_put_connection_superhack = ptlrpc_put_connection;
 
         rc = ptlrpc_start_pinger();
         if (rc)
                 GOTO(cleanup, rc);
-        cleanup_phase = 5;
+        cleanup_phase = 4;
 
         rc = ldlm_init();
         if (rc)
                 GOTO(cleanup, rc);
 
         rc = ldlm_init();
         if (rc)
                 GOTO(cleanup, rc);
-        cleanup_phase = 6;
+        cleanup_phase = 5;
 
         rc = sptlrpc_init();
         if (rc)
 
         rc = sptlrpc_init();
         if (rc)
@@ -99,12 +94,10 @@ __init int ptlrpc_init(void)
 
 cleanup:
         switch(cleanup_phase) {
 
 cleanup:
         switch(cleanup_phase) {
-        case 6:
-                ldlm_exit();
         case 5:
         case 5:
-                ptlrpc_stop_pinger();
+                ldlm_exit();
         case 4:
         case 4:
-                llog_cleanup_commit_master(1);
+                ptlrpc_stop_pinger();
         case 3:
                 ptlrpc_cleanup_connection();
         case 2:
         case 3:
                 ptlrpc_cleanup_connection();
         case 2:
@@ -125,7 +118,6 @@ static void __exit ptlrpc_exit(void)
         ptlrpc_stop_pinger();
         ptlrpc_exit_portals();
         ptlrpc_cleanup_connection();
         ptlrpc_stop_pinger();
         ptlrpc_exit_portals();
         ptlrpc_cleanup_connection();
-        llog_cleanup_commit_master(0);
 }
 
 /* connection.c */
 }
 
 /* connection.c */
index 91bd48f..faffe65 100644 (file)
 
 #ifdef __KERNEL__
 
 
 #ifdef __KERNEL__
 
-static struct llog_commit_master lustre_lcm;
-static struct llog_commit_master *lcm = &lustre_lcm;
-
 /* Allocate new commit structs in case we do not have enough.
  * Make the llcd size small enough that it fits into a single page when we
  * are sending/receiving it. */
 /* Allocate new commit structs in case we do not have enough.
  * Make the llcd size small enough that it fits into a single page when we
  * are sending/receiving it. */
-static int llcd_alloc(void)
+static int llcd_alloc(struct llog_commit_master *lcm)
 {
         struct llog_canceld_ctxt *llcd;
         int llcd_size;
 {
         struct llog_canceld_ctxt *llcd;
         int llcd_size;
@@ -85,7 +82,7 @@ static int llcd_alloc(void)
 }
 
 /* Get a free cookie struct from the list */
 }
 
 /* Get a free cookie struct from the list */
-struct llog_canceld_ctxt *llcd_grab(void)
+static struct llog_canceld_ctxt *llcd_grab(struct llog_commit_master *lcm)
 {
         struct llog_canceld_ctxt *llcd;
 
 {
         struct llog_canceld_ctxt *llcd;
 
@@ -93,7 +90,7 @@ repeat:
         spin_lock(&lcm->lcm_llcd_lock);
         if (list_empty(&lcm->lcm_llcd_free)) {
                 spin_unlock(&lcm->lcm_llcd_lock);
         spin_lock(&lcm->lcm_llcd_lock);
         if (list_empty(&lcm->lcm_llcd_free)) {
                 spin_unlock(&lcm->lcm_llcd_lock);
-                if (llcd_alloc() < 0) {
+                if (llcd_alloc(lcm) < 0) {
                         CERROR("unable to allocate log commit data!\n");
                         return NULL;
                 }
                         CERROR("unable to allocate log commit data!\n");
                         return NULL;
                 }
@@ -110,10 +107,12 @@ repeat:
 
         return llcd;
 }
 
         return llcd;
 }
-EXPORT_SYMBOL(llcd_grab);
 
 static void llcd_put(struct llog_canceld_ctxt *llcd)
 {
 
 static void llcd_put(struct llog_canceld_ctxt *llcd)
 {
+        struct llog_commit_master *lcm = llcd->llcd_lcm;
+
+        llog_ctxt_put(llcd->llcd_ctxt);
         if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
                 int llcd_size = llcd->llcd_size +
                          offsetof(struct llog_canceld_ctxt, llcd_cookies);
         if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
                 int llcd_size = llcd->llcd_size +
                          offsetof(struct llog_canceld_ctxt, llcd_cookies);
@@ -127,15 +126,16 @@ static void llcd_put(struct llog_canceld_ctxt *llcd)
 }
 
 /* Send some cookies to the appropriate target */
 }
 
 /* Send some cookies to the appropriate target */
-void llcd_send(struct llog_canceld_ctxt *llcd)
+static void llcd_send(struct llog_canceld_ctxt *llcd)
 {
 {
+        if (!(llcd->llcd_lcm->lcm_flags & LLOG_LCM_FL_EXIT)) {
         spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
         spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
-        list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending);
+                list_add_tail(&llcd->llcd_list,
+                              &llcd->llcd_lcm->lcm_llcd_pending);
         spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
         spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
-
+        }
         cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1);
 }
         cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1);
 }
-EXPORT_SYMBOL(llcd_send);
 
 /* deleted objects have a commit callback that cancels the MDS
  * log record for the deletion.  The commit callback calls this
 
 /* deleted objects have a commit callback that cancels the MDS
  * log record for the deletion.  The commit callback calls this
@@ -161,7 +161,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
 
         if (count > 0 && cookies != NULL) {
                 if (llcd == NULL) {
 
         if (count > 0 && cookies != NULL) {
                 if (llcd == NULL) {
-                        llcd = llcd_grab();
+                        llcd = llcd_grab(ctxt->loc_lcm);
                         if (llcd == NULL) {
                                 CERROR("couldn't get an llcd - dropped "LPX64
                                        ":%x+%u\n",
                         if (llcd == NULL) {
                                 CERROR("couldn't get an llcd - dropped "LPX64
                                        ":%x+%u\n",
@@ -170,7 +170,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                                        cookies->lgc_index);
                                 GOTO(out, rc = -ENOMEM);
                         }
                                        cookies->lgc_index);
                                 GOTO(out, rc = -ENOMEM);
                         }
-                        llcd->llcd_ctxt = ctxt;
+                        llcd->llcd_ctxt = llog_ctxt_get(ctxt);
                         ctxt->loc_llcd = llcd;
                 }
 
                         ctxt->loc_llcd = llcd;
                 }
 
@@ -266,7 +266,7 @@ static int log_commit_thread(void *arg)
                 /* If we do not have enough pages available, allocate some */
                 while (atomic_read(&lcm->lcm_llcd_numfree) <
                        lcm->lcm_llcd_minfree) {
                 /* If we do not have enough pages available, allocate some */
                 while (atomic_read(&lcm->lcm_llcd_numfree) <
                        lcm->lcm_llcd_minfree) {
-                        if (llcd_alloc() < 0)
+                        if (llcd_alloc(lcm) < 0)
                                 break;
                 }
 
                                 break;
                 }
 
@@ -303,7 +303,7 @@ static int log_commit_thread(void *arg)
 
                 if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
                     atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
 
                 if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
                     atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
-                        rc = llog_start_commit_thread();
+                        rc = llog_start_commit_thread(lcm);
                         if (rc < 0)
                                 CERROR("error starting thread: rc %d\n", rc);
                 }
                         if (rc < 0)
                                 CERROR("error starting thread: rc %d\n", rc);
                 }
@@ -460,7 +460,7 @@ static int log_commit_thread(void *arg)
         return 0;
 }
 
         return 0;
 }
 
-int llog_start_commit_thread(void)
+int llog_start_commit_thread(struct llog_commit_master *lcm)
 {
         int rc;
         ENTRY;
 {
         int rc;
         ENTRY;
@@ -486,7 +486,7 @@ static struct llog_process_args {
         void                    *llpa_arg;
 } llpa;
 
         void                    *llpa_arg;
 } llpa;
 
-int llog_init_commit_master(void)
+int llog_init_commit_master(struct llog_commit_master *lcm)
 {
         CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy);
         CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle);
 {
         CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy);
         CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle);
@@ -504,8 +504,10 @@ int llog_init_commit_master(void)
         sema_init(&llpa.llpa_sem, 1);
         return 0;
 }
         sema_init(&llpa.llpa_sem, 1);
         return 0;
 }
+EXPORT_SYMBOL(llog_init_commit_master);
 
 
-int llog_cleanup_commit_master(int force)
+int llog_cleanup_commit_master(struct llog_commit_master *lcm,
+                               int force)
 {
         lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
         if (force)
 {
         lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
         if (force)
@@ -516,6 +518,7 @@ int llog_cleanup_commit_master(int force)
                                  atomic_read(&lcm->lcm_thread_total) == 0);
         return 0;
 }
                                  atomic_read(&lcm->lcm_thread_total) == 0);
         return 0;
 }
+EXPORT_SYMBOL(llog_cleanup_commit_master);
 
 static int log_process_thread(void *args)
 {
 
 static int log_process_thread(void *args)
 {
@@ -533,12 +536,12 @@ static int log_process_thread(void *args)
         rc = llog_create(ctxt, &llh, &logid, NULL);
         if (rc) {
                 CERROR("llog_create failed %d\n", rc);
         rc = llog_create(ctxt, &llh, &logid, NULL);
         if (rc) {
                 CERROR("llog_create failed %d\n", rc);
-                RETURN(rc);
+                GOTO(out, rc);
         }
         rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
         if (rc) {
                 CERROR("llog_init_handle failed %d\n", rc);
         }
         rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
         if (rc) {
                 CERROR("llog_init_handle failed %d\n", rc);
-                GOTO(out, rc);
+                GOTO(release_llh, rc);
         }
 
         if (cb) {
         }
 
         if (cb) {
@@ -552,24 +555,33 @@ static int log_process_thread(void *args)
         CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",
                ctxt->loc_llcd, ctxt);
         llog_sync(ctxt, NULL);
         CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",
                ctxt->loc_llcd, ctxt);
         llog_sync(ctxt, NULL);
-out:
+
+release_llh:
         rc = llog_cat_put(llh);
         if (rc)
                 CERROR("llog_cat_put failed %d\n", rc);
         rc = llog_cat_put(llh);
         if (rc)
                 CERROR("llog_cat_put failed %d\n", rc);
-
+out:
+        llog_ctxt_put(ctxt);
         RETURN(rc);
 }
 
 static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
 {
         RETURN(rc);
 }
 
 static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
 {
+        struct obd_device *obd = ctxt->loc_obd;
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
+        if (obd->obd_stopping)
+                RETURN(-ENODEV);
+
         mutex_down(&llpa.llpa_sem);
         mutex_down(&llpa.llpa_sem);
-        llpa.llpa_ctxt = ctxt;
         llpa.llpa_cb = handle;
         llpa.llpa_arg = arg;
         llpa.llpa_cb = handle;
         llpa.llpa_arg = arg;
-
+        llpa.llpa_ctxt = llog_ctxt_get(ctxt); //llog_group_get_ctxt(ctxt->loc_olg, ctxt->loc_idx);
+        if (!llpa.llpa_ctxt) {
+                up(&llpa.llpa_sem);
+                RETURN(-ENODEV);
+        }
         rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
         if (rc < 0)
                 CERROR("error starting log_process_thread: %d\n", rc);
         rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
         if (rc < 0)
                 CERROR("error starting log_process_thread: %d\n", rc);
@@ -597,13 +609,13 @@ int llog_repl_connect(struct llog_ctxt *ctxt, int count,
 
         mutex_down(&ctxt->loc_sem);
         ctxt->loc_gen = *gen;
 
         mutex_down(&ctxt->loc_sem);
         ctxt->loc_gen = *gen;
-        llcd = llcd_grab();
+        llcd = llcd_grab(ctxt->loc_lcm);
         if (llcd == NULL) {
                 CERROR("couldn't get an llcd\n");
                 mutex_up(&ctxt->loc_sem);
                 RETURN(-ENOMEM);
         }
         if (llcd == NULL) {
                 CERROR("couldn't get an llcd\n");
                 mutex_up(&ctxt->loc_sem);
                 RETURN(-ENOMEM);
         }
-        llcd->llcd_ctxt = ctxt;
+        llcd->llcd_ctxt = llog_ctxt_get(ctxt);
         ctxt->loc_llcd = llcd;
         mutex_up(&ctxt->loc_sem);
 
         ctxt->loc_llcd = llcd;
         mutex_up(&ctxt->loc_sem);
 
index 4266132..b7efe65 100755 (executable)
@@ -1414,6 +1414,46 @@ test_60() {
 }
 run_test 60 "test llog post recovery init vs llog unlink"
 
 }
 run_test 60 "test llog post recovery init vs llog unlink"
 
+#test race  llog recovery thread vs llog cleanup
+test_61() {
+    mkdir $DIR/$tdir
+    createmany -o $DIR/$tdir/$tfile-%d 800
+    replay_barrier ost1 
+#   OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT 0x221 
+    unlinkmany $DIR/$tdir/$tfile-%d 800 
+    do_facet ost "sysctl -w lustre.fail_loc=0x80000221"
+    facet_failover ost1
+    sleep 10 
+    fail ost1
+    sleep 30
+    do_facet ost "sysctl -w lustre.fail_loc=0x0"
+    $CHECKSTAT -t file $DIR/$tdir/$tfile-* && return 1
+    rmdir $DIR/$tdir
+}
+run_test 61 "test race llog recovery vs llog cleanup"
+
+#test race  mds llog sync vs llog cleanup
+test_61b() {
+#   OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT 0x140
+    do_facet $SINGLEMDS "sysctl -w lustre.fail_loc=0x80000140"
+    facet_failover $SINGLEMDS 
+    sleep 10
+    fail $SINGLEMDS
+    do_facet client dd if=/dev/zero of=$DIR/$tfile bs=4k count=1 || return 1
+}
+run_test 61b "test race mds llog sync vs llog cleanup"
+
+#test race  cancel cookie cb vs llog cleanup
+test_61c() {
+#   OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT 0x222 
+    touch $DIR/$tfile 
+    do_facet ost "sysctl -w lustre.fail_loc=0x80000222"
+    rm $DIR/$tfile    
+    sleep 10
+    fail ost1
+}
+run_test 61c "test race mds llog sync vs llog cleanup"
+
 equals_msg `basename $0`: test complete, cleaning up
 check_and_cleanup_lustre
 [ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG || true
 equals_msg `basename $0`: test complete, cleaning up
 check_and_cleanup_lustre
 [ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG || true
index 4d0041e..1c023ca 100644 (file)
@@ -1,15 +1,28 @@
 #!/bin/bash
 #!/bin/bash
+
+load_llog_test() {
+    grep -q llog_test /proc/modules && return
+    # Module should have been placed with other lustre modules...
+    modprobe llog_test
+    grep -q llog_test /proc/modules && return
+    # But maybe we're running from a developer tree...
+    insmod ../obdclass/llog_test.ko
+    grep -q llog_test /proc/modules && return
+    # This is for 2.4 kernels (deprecated!)
+    insmod ../obdclass/llog_test.o
+    grep -q llog_test /proc/modules && return
+    echo "Unable to load llog_test module!"
+    false
+    return
+}
+
 PATH=`dirname $0`:`dirname $0`/../utils:$PATH
 TMP=${TMP:-/tmp}
 
 MDS=`ls /proc/fs/lustre/mdt | grep -v num_refs | head -n 1`
 [ -z "$MDS" ] && echo "no MDS available, skipping llog test" && exit 0
 
 PATH=`dirname $0`:`dirname $0`/../utils:$PATH
 TMP=${TMP:-/tmp}
 
 MDS=`ls /proc/fs/lustre/mdt | grep -v num_refs | head -n 1`
 [ -z "$MDS" ] && echo "no MDS available, skipping llog test" && exit 0
 
-case `uname -r` in
-2.4.*) modprobe llog_test || exit 1 ;;
-2.6.*) modprobe llog_test || exit 1 ;;
-*) echo "unknown kernel version `uname -r`" && exit 99 ;;
-esac
+load_llog_test || exit 0
 lctl modules > $TMP/ogdb-`hostname`
 echo "NOW reload debugging syms.."
 
 lctl modules > $TMP/ogdb-`hostname`
 echo "NOW reload debugging syms.."