Whamcloud - gitweb
Branch: b_new_cmd
authorwangdi <wangdi>
Tue, 19 Sep 2006 16:07:09 +0000 (16:07 +0000)
committerwangdi <wangdi>
Tue, 19 Sep 2006 16:07:09 +0000 (16:07 +0000)
merge llog group support from HEAD and some fixes.

24 files changed:
lustre/include/lustre_log.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/lmv/lmv_obd.c
lustre/lov/lov_internal.h
lustre/lov/lov_log.c
lustre/mdc/mdc_request.c
lustre/mdd/mdd_lov.c
lustre/mds/handler.c
lustre/mds/mds_fs.c
lustre/mds/mds_internal.h
lustre/mds/mds_log.c
lustre/mds/mds_lov.c
lustre/mgc/libmgc.c
lustre/mgc/mgc_request.c
lustre/mgs/mgs_handler.c
lustre/obdclass/llog_obd.c
lustre/obdclass/llog_test.c
lustre/obdclass/lprocfs_status.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_log.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c

index 412bb02..72c3287 100644 (file)
@@ -116,8 +116,9 @@ int llog_cat_reverse_process(struct llog_handle *cat_llh, llog_cb_t cb, void *da
 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index);
 
 /* llog_obd.c */
-int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
-               int count,  struct llog_logid *logid,struct llog_operations *op);
+int llog_setup(struct obd_device *obd, struct obd_llogs *llogs, int index,
+               struct obd_device *disk_obd, int count,  struct llog_logid *logid,
+               struct llog_operations *op);
 int llog_cleanup(struct llog_ctxt *);
 int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
 int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
@@ -134,10 +135,11 @@ int llog_obd_origin_add(struct llog_ctxt *ctxt,
                         struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
                         struct llog_cookie *logcookies, int numcookies);
 
-int llog_cat_initialize(struct obd_device *obd, int count, 
-                        struct obd_uuid *uuid);
-int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
-                  int count, struct llog_catid *logid, struct obd_uuid *uuid);
+int llog_cat_initialize(struct obd_device *obd, struct obd_llogs *llogs,
+                        int count, struct obd_uuid *uuid);
+int obd_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+                  struct obd_device *disk_obd, int count, 
+                  struct llog_catid *logid, struct obd_uuid *uuid);
 
 int obd_llog_finish(struct obd_device *obd, int count);
 
index 5c1ac2c..140c51f 100644 (file)
@@ -266,6 +266,25 @@ struct obd_device_target {
         struct lustre_quota_ctxt  obt_qctxt;
 };
 
+/* llog contexts */
+enum llog_ctxt_id {
+        LLOG_CONFIG_ORIG_CTXT  =  0,
+        LLOG_CONFIG_REPL_CTXT  =  1,
+        LLOG_MDS_OST_ORIG_CTXT =  2,
+        LLOG_MDS_OST_REPL_CTXT =  3,
+        LLOG_SIZE_ORIG_CTXT    =  4,
+        LLOG_SIZE_REPL_CTXT    =  5,
+        LLOG_MD_ORIG_CTXT      =  6,
+        LLOG_MD_REPL_CTXT      =  7,
+        LLOG_RD1_ORIG_CTXT     =  8,
+        LLOG_RD1_REPL_CTXT     =  9,
+        LLOG_TEST_ORIG_CTXT    = 10,
+        LLOG_TEST_REPL_CTXT    = 11,
+        LLOG_LOVEA_ORIG_CTXT   = 12,
+        LLOG_LOVEA_REPL_CTXT   = 13,
+        LLOG_MAX_CTXTS
+};
+
 #define FILTER_SUBDIR_COUNT      32            /* set to zero for no subdirs */
 
 #define FILTER_GROUP_LLOG 1
@@ -282,6 +301,17 @@ struct filter_ext {
         __u64                fe_end;
 };
 
+struct obd_llogs {
+        struct llog_ctxt        *llog_ctxt[LLOG_MAX_CTXTS];
+};
+
+struct filter_group_llog {
+        struct list_head list;
+        int group;
+        struct obd_llogs *llogs;
+        struct obd_export *exp;
+};
+
 struct filter_obd {
         /* NB this field MUST be first */
         struct obd_device_target fo_obt;
@@ -358,6 +388,9 @@ struct filter_obd {
         struct obd_histogram     fo_r_dio_frags;
         struct obd_histogram     fo_w_dio_frags;
 
+        struct list_head         fo_llog_list;
+        spinlock_t               fo_llog_list_lock;
+
         struct lustre_quota_ctxt fo_quota_ctxt;
         spinlock_t               fo_quotacheck_lock;
         atomic_t                 fo_quotachecking;
@@ -520,6 +553,10 @@ struct mds_obd {
         unsigned long                    mds_lov_objids_valid:1,
                                          mds_fl_user_xattr:1,
                                          mds_fl_acl:1;
+
+        /* For CMD add mds_num */
+        int                              mds_num;
+
 };
 
 struct echo_obd {
@@ -756,25 +793,6 @@ static inline void oti_free_cookies(struct obd_trans_info *oti)
         oti->oti_numcookies = 0;
 }
 
-/* llog contexts */
-enum llog_ctxt_id {
-        LLOG_CONFIG_ORIG_CTXT  =  0,
-        LLOG_CONFIG_REPL_CTXT  =  1,
-        LLOG_MDS_OST_ORIG_CTXT =  2,
-        LLOG_MDS_OST_REPL_CTXT =  3,
-        LLOG_SIZE_ORIG_CTXT    =  4,
-        LLOG_SIZE_REPL_CTXT    =  5,
-        LLOG_MD_ORIG_CTXT      =  6,
-        LLOG_MD_REPL_CTXT      =  7,
-        LLOG_RD1_ORIG_CTXT     =  8,
-        LLOG_RD1_REPL_CTXT     =  9,
-        LLOG_TEST_ORIG_CTXT    = 10,
-        LLOG_TEST_REPL_CTXT    = 11,
-        LLOG_LOVEA_ORIG_CTXT   = 12,
-        LLOG_LOVEA_REPL_CTXT   = 13,
-        LLOG_MAX_CTXTS
-};
-
 /*
  * Events signalled through obd_notify() upcall-chain.
  */
@@ -791,6 +809,7 @@ enum obd_notify_event {
 };
 
 #include <lu_object.h>
+
 /*
  * Data structure used to pass obd_notify()-event to non-obd listeners (llite
  * and liblustre being main examples).
@@ -1070,11 +1089,12 @@ struct obd_ops {
                              int cmd, obd_off *);
 
         /* llog related obd_methods */
-        int (*o_llog_init)(struct obd_device *obd, struct obd_device *disk_obd,
-                           int count, struct llog_catid *logid
-                           struct obd_uuid *uuid);
+        int (*o_llog_init)(struct obd_device *obd, struct obd_llogs *llog, 
+                           struct obd_device *disk_obd, int count
+                           struct llog_catid *logid, struct obd_uuid *uuid);
         int (*o_llog_finish)(struct obd_device *obd, int count);
-
+        int (*o_llog_connect)(struct obd_export *, struct llogd_conn_body *);
+        
         /* metadata-only methods */
         int (*o_pin)(struct obd_export *, const struct lu_fid *fid,
                      struct obd_client_handle *, int flag);
index a893765..e7602db 100644 (file)
@@ -1432,6 +1432,18 @@ static inline void obd_import_event(struct obd_device *obd,
         }
 }
 
+static inline int obd_llog_connect(struct obd_export *exp,
+                                   struct llogd_conn_body *body)
+{
+        ENTRY;
+        
+        OBD_CHECK_DT_OP(exp->exp_obd, llog_connect, 0);
+        OBD_COUNTER_INCREMENT(exp->exp_obd, llog_connect);
+
+        return OBP(exp->exp_obd, llog_connect)(exp, body);
+}
+
+
 static inline int obd_notify(struct obd_device *obd,
                              struct obd_device *watched,
                              enum obd_notify_event ev,
index 7b98c8c..36c8519 100644 (file)
@@ -487,7 +487,7 @@ int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid)
                         RETURN(-EINVAL);
                 }
 
-                rc = obd_llog_init(obd, mdc_obd, 0, NULL, tgt_uuid);
+                rc = obd_llog_init(obd, NULL, mdc_obd, 0, NULL, tgt_uuid);
                 if (rc) {
                         lmv_init_unlock(lmv);
                         CERROR("lmv failed to setup llogging subsystems\n");
@@ -2073,14 +2073,15 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
         RETURN(rc);
 }
 
-static int lmv_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                         int count, struct llog_catid *logid, struct obd_uuid *uuid)
+static int lmv_llog_init(struct obd_device *obd, struct obd_llogs* llogs, 
+                         struct obd_device *tgt, int count, 
+                         struct llog_catid *logid, struct obd_uuid *uuid)
 {
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
 
-        rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
                         &llog_client_ops);
         if (rc == 0) {
                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
index 5c0fa75..d5a0c12 100644 (file)
@@ -221,8 +221,9 @@ void lov_getref(struct obd_device *obd);
 void lov_putref(struct obd_device *obd);
 
 /* lov_log.c */
-int lov_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                  int count, struct llog_catid *logid, struct obd_uuid *uuid);
+int lov_llog_init(struct obd_device *obd, struct obd_llogs *llogs, 
+                  struct obd_device *tgt, int count, struct llog_catid *logid, 
+                  struct obd_uuid *uuid);
 int lov_llog_finish(struct obd_device *obd, int count);
 
 /* lov_pack.c */
index 4ce722c..cc7f8ef 100644 (file)
@@ -176,20 +176,21 @@ static struct llog_operations lov_size_repl_logops = {
         lop_cancel: lov_llog_repl_cancel
 };
 
-int lov_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                  int count, struct llog_catid *logid, struct obd_uuid *uuid)
+int lov_llog_init(struct obd_device *obd, struct obd_llogs *llogs, 
+                  struct obd_device *tgt, int count, struct llog_catid *logid, 
+                  struct obd_uuid *uuid)
 {
         struct lov_obd *lov = &obd->u.lov;
         struct obd_device *child;
         int i, rc = 0, err = 0;
         ENTRY;
 
-        rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
                         &lov_mds_ost_orig_logops);
         if (rc)
                 RETURN(rc);
 
-        rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
                         &lov_size_repl_logops);
         if (rc)
                 RETURN(rc);
@@ -204,7 +205,7 @@ int lov_llog_init(struct obd_device *obd, struct obd_device *tgt,
                 CDEBUG(D_CONFIG, "init %d/%d\n", i, count);
                 LASSERT(lov->lov_tgts[i]->ltd_exp);
                 child = lov->lov_tgts[i]->ltd_exp->exp_obd;
-                rc = obd_llog_init(child, tgt, 1, logid + i, uuid);
+                rc = obd_llog_init(child, llogs, tgt, 1, logid + i, uuid);
                 if (rc) {
                         CERROR("error osc_llog_init idx %d osc '%s' tgt '%s' "
                                "(rc=%d)\n", i, child->obd_name, tgt->obd_name,
index 797d8c0..bcff1e4 100644 (file)
@@ -1316,7 +1316,7 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
         lprocfs_init_vars(mdc, &lvars);
         lprocfs_obd_setup(obd, lvars.obd_vars);
 
-        rc = obd_llog_init(obd, obd, 0, NULL, NULL);
+        rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL);
         if (rc) {
                 mdc_cleanup(obd);
                 CERROR("failed to setup llogging subsystems\n");
@@ -1391,7 +1391,8 @@ static int mdc_cleanup(struct obd_device *obd)
 }
 
 
-static int mdc_llog_init(struct obd_device *obd, struct obd_device *tgt,
+static int mdc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+                         struct obd_device *tgt,
                          int count, struct llog_catid *logid, 
                          struct obd_uuid *uuid)
 {
@@ -1399,14 +1400,14 @@ static int mdc_llog_init(struct obd_device *obd, struct obd_device *tgt,
         int rc;
         ENTRY;
 
-        rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
                         &llog_client_ops);
         if (rc == 0) {
                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
                 ctxt->loc_imp = obd->u.cli.cl_import;
         }
 
-        rc = llog_setup(obd, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, llogs, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL,
                        &llog_client_ops);
         if (rc == 0) {
                 ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
index b80fb7b..a0b4002 100644 (file)
@@ -135,6 +135,7 @@ int mdd_init_obd(const struct lu_context *ctxt, struct mdd_device *mdd,
         obd->obd_upcall.onu_owner = mdd;
         obd->obd_upcall.onu_upcall = mdd_lov_update;
         mdd->mdd_obd_dev = obd;
+        obd->u.mds.mds_num = mdd2lu_dev(mdd)->ld_site->ls_node_id;
 class_detach:
         if (rc)
                 class_detach(obd, lcfg);
index f9c9c7f..1bd28c3 100644 (file)
@@ -2109,12 +2109,12 @@ static int mds_postsetup(struct obd_device *obd)
         int rc = 0;
         ENTRY;
 
-        rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
+        rc = llog_setup(obd, NULL, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
                         &llog_lvfs_ops);
         if (rc)
                 RETURN(rc);
 
-        rc = llog_setup(obd, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
+        rc = llog_setup(obd, NULL, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
                         &llog_lvfs_ops);
         if (rc)
                 RETURN(rc);
index 3eda0f4..0571def 100644 (file)
@@ -713,9 +713,10 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa,
 
         err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
                             handle, 0);
-        if (!err)
-                oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
-        else if (!rc)
+        if (!err) {
+                oa->o_gr = FILTER_GROUP_MDS0 + mds->mds_num;
+                oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLGROUP;
+        } else if (!rc)
                 rc = err;
 out_dput:
         dput(new_child);
index ab74030..fd63280 100644 (file)
@@ -176,7 +176,8 @@ int mds_cleanup_pending(struct obd_device *obd);
 
 
 /* mds/mds_log.c */
-int mds_llog_init(struct obd_device *obd, struct obd_device *tgt, int count,
+int mds_llog_init(struct obd_device *obd, struct obd_llogs *llogs, 
+                  struct obd_device *tgt, int count,
                   struct llog_catid *logid, struct obd_uuid *uuid);
 int mds_llog_finish(struct obd_device *obd, int count);
 
index fa31b5f..3489799 100644 (file)
@@ -177,24 +177,25 @@ static struct llog_operations mds_size_repl_logops = {
         lop_cancel:     mds_llog_repl_cancel,
 };
 
-int mds_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                  int count, struct llog_catid *logid, struct obd_uuid *uuid)
+int mds_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+                  struct obd_device *tgt, int count, struct llog_catid *logid, 
+                  struct obd_uuid *uuid)
 {
         struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
         int rc;
         ENTRY;
 
-        rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
                         &mds_ost_orig_logops);
         if (rc)
                 RETURN(rc);
 
-        rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
                         &mds_size_repl_logops);
         if (rc)
                 RETURN(rc);
 
-        rc = obd_llog_init(lov_obd, tgt, count, logid, uuid);
+        rc = obd_llog_init(lov_obd, llogs, tgt, count, logid, uuid);
         if (rc)
                 CERROR("lov_llog_init err %d\n", rc);
 
index eb2845d..fac2360 100644 (file)
@@ -299,7 +299,7 @@ static int mds_lov_update_mds(struct obd_device *obd,
            after recovery.  However, it should now be safe to call anytime. */
         CDEBUG(D_CONFIG, "reset llogs idx=%d\n", idx);
         mutex_down(&obd->obd_dev_sem);
-        llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, uuid);
+        llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, uuid);
         mutex_up(&obd->obd_dev_sem);
 
         RETURN(rc);
@@ -362,7 +362,7 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
                 GOTO(err_reg, rc);
 
         /* tgt_count may be 0! */
-        rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
+        rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
         if (rc) {
                 CERROR("failed to initialize catalog %d\n", rc);
                 GOTO(err_reg, rc);
@@ -591,7 +591,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
                 rc = llog_ioctl(ctxt, cmd, data);
                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
-                llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
+                llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
                 group = FILTER_GROUP_MDS0 + mds->mds_id;
                 rc2 = obd_set_info_async(mds->mds_osc_exp,
                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
index 40959c1..ac27f94 100644 (file)
@@ -64,7 +64,7 @@ static int mgc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         if (rc)
                 GOTO(err_decref, rc);
 
-        rc = obd_llog_init(obd, obd, 0, NULL, NULL);
+        rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_cleanup, rc);
@@ -79,15 +79,15 @@ err_decref:
         RETURN(rc);
 }
 
-static int mgc_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                         int count, struct llog_catid *logid
-                         struct obd_uuid *uuid)
+static int mgc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+                         struct obd_device *tgt, int count
+                         struct llog_catid *logid, struct obd_uuid *uuid)
 {
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
 
-        rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
                         &llog_client_ops);
         if (rc == 0) {
                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
index 4af0211..1f8bcd0 100644 (file)
@@ -390,7 +390,7 @@ static int mgc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         if (rc)
                 GOTO(err_decref, rc);
 
-        rc = obd_llog_init(obd, obd, 0, NULL, NULL);
+        rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_cleanup, rc);
@@ -787,20 +787,20 @@ static int mgc_import_event(struct obd_device *obd,
         RETURN(rc);
 }
 
-static int mgc_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                         int count, struct llog_catid *logid,
-                         struct obd_uuid *uuid)
+static int mgc_llog_init(struct obd_device *obd, struct obd_llogs *llogs, 
+                         struct obd_device *tgt, int count, 
+                         struct llog_catid *logid, struct obd_uuid *uuid)
 {
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
 
-        rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, llogs, LLOG_CONFIG_ORIG_CTXT, tgt, 0, NULL,
                         &llog_lvfs_ops);
         if (rc)
                 RETURN(rc);
 
-        rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
                         &llog_client_ops);
         if (rc == 0) {
                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
index 34c6459..53d4cb1 100644 (file)
@@ -163,7 +163,7 @@ static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         if (rc < 0)
                 GOTO(err_fs, rc);
 
-        rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
+        rc = llog_setup(obd, NULL, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
                         &llog_lvfs_ops);
         if (rc)
                 GOTO(err_fs, rc);
index 9cad6f4..381b8b4 100644 (file)
@@ -51,8 +51,8 @@ int llog_cleanup(struct llog_ctxt *ctxt)
         
         if (CTXTP(ctxt, cleanup))
                 rc = CTXTP(ctxt, cleanup)(ctxt);
-
         ctxt->loc_obd->obd_llog_ctxt[ctxt->loc_idx] = NULL;
+
         if (ctxt->loc_exp)
                 class_export_put(ctxt->loc_exp);
         OBD_FREE(ctxt, sizeof(*ctxt));
@@ -61,8 +61,9 @@ int llog_cleanup(struct llog_ctxt *ctxt)
 }
 EXPORT_SYMBOL(llog_cleanup);
 
-int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
-               int count, struct llog_logid *logid, struct llog_operations *op)
+int llog_setup(struct obd_device *obd,  struct obd_llogs *llogs, int index, 
+               struct obd_device *disk_obd, int count, struct llog_logid *logid,
+               struct llog_operations *op)
 {
         int rc = 0;
         struct llog_ctxt *ctxt;
@@ -82,11 +83,14 @@ int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
                 LASSERT(ctxt->loc_logops == op);
                 GOTO(out, rc = 0);
         }
-        
+
         OBD_ALLOC(ctxt, sizeof(*ctxt));
         if (!ctxt)
                 GOTO(out, rc = -ENOMEM);
 
+        if (llogs)
+                llogs->llog_ctxt[index] = ctxt;
+
         obd->obd_llog_ctxt[index] = ctxt;
         ctxt->loc_obd = obd;
         ctxt->loc_exp = class_export_get(disk_obd->obd_self_export);
@@ -96,13 +100,13 @@ int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
 
         if (op->lop_setup)
                 rc = op->lop_setup(obd, index, disk_obd, count, logid);
-        
+
         if (rc) {
                 obd->obd_llog_ctxt[index] = NULL;
                 class_export_put(ctxt->loc_exp);
                 OBD_FREE(ctxt, sizeof(*ctxt));
         }
-        
+
 out:
         RETURN(rc);
 }
@@ -313,8 +317,8 @@ int llog_obd_origin_add(struct llog_ctxt *ctxt,
 }
 EXPORT_SYMBOL(llog_obd_origin_add);
 
-int llog_cat_initialize(struct obd_device *obd, int count,
-                        struct obd_uuid *uuid)
+int llog_cat_initialize(struct obd_device *obd, struct obd_llogs *llogs,
+                        int count, struct obd_uuid *uuid)
 {
         char name[32] = CATLIST;
         struct llog_catid *idarray;
@@ -332,7 +336,7 @@ int llog_cat_initialize(struct obd_device *obd, int count,
                 GOTO(out, rc);
         }
 
-        rc = obd_llog_init(obd, obd, count, idarray, uuid);
+        rc = obd_llog_init(obd, llogs, obd, count, idarray, uuid);
         if (rc) {
                 CERROR("rc: %d\n", rc);
                 GOTO(out, rc);
@@ -350,15 +354,17 @@ int llog_cat_initialize(struct obd_device *obd, int count,
 }
 EXPORT_SYMBOL(llog_cat_initialize);
 
-int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
-                  int count, struct llog_catid *logid, struct obd_uuid *uuid)
+int obd_llog_init(struct obd_device *obd, struct obd_llogs *llogs, 
+                  struct obd_device *disk_obd, int count, 
+                  struct llog_catid *logid, struct obd_uuid *uuid)
 {
         int rc;
         ENTRY;
         OBD_CHECK_DT_OP(obd, llog_init, 0);
         OBD_COUNTER_INCREMENT(obd, llog_init);
 
-        rc = OBP(obd, llog_init)(obd, disk_obd, count, logid, uuid);
+        rc = OBP(obd, llog_init)(obd, llogs, disk_obd, count, logid, 
+                                 uuid);
         RETURN(rc);
 }
 EXPORT_SYMBOL(obd_llog_init);
@@ -374,3 +380,4 @@ int obd_llog_finish(struct obd_device *obd, int count)
         RETURN(rc);
 }
 EXPORT_SYMBOL(obd_llog_finish);
+
index aedaa30..a70eed2 100644 (file)
@@ -597,14 +597,15 @@ static int llog_run_tests(struct obd_device *obd)
 }
 
 
-static int llog_test_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                               int count, struct llog_catid *logid,
-                               struct obd_uuid *uuid)
+static int llog_test_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+                               struct obd_device *tgt, int count, 
+                               struct llog_catid *logid, struct obd_uuid *uuid)
 {
         int rc;
         ENTRY;
 
-        rc = llog_setup(obd, LLOG_TEST_ORIG_CTXT, tgt, 0, NULL, &llog_lvfs_ops);
+        rc = llog_setup(obd, llogs, LLOG_TEST_ORIG_CTXT, tgt, 0, NULL, 
+                        &llog_lvfs_ops);
         RETURN(rc);
 }
 
@@ -652,7 +653,7 @@ static int llog_test_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 RETURN(-EINVAL);
         }
 
-        rc = obd_llog_init(obd, tgt, 0, NULL, NULL);
+        rc = obd_llog_init(obd, NULL, tgt, 0, NULL, NULL);
         if (rc)
                 RETURN(rc);
 
index 1e5d5bd..3077228 100644 (file)
@@ -755,6 +755,7 @@ int lprocfs_alloc_obd_stats(struct obd_device *obd, unsigned num_private_stats)
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, destroy_export);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, extent_calc);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, llog_init);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, llog_connect);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, llog_finish);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, pin);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, unpin);
index 12e8070..a9402c7 100644 (file)
@@ -1957,6 +1957,9 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         filter->fo_fmd_max_num = FILTER_FMD_MAX_NUM_DEFAULT;
         filter->fo_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT;
 
+        INIT_LIST_HEAD(&filter->fo_llog_list);
+        spin_lock_init(&filter->fo_llog_list_lock);
+
         sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid);
         obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
         if (obd->obd_namespace == NULL)
@@ -1968,7 +1971,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
 
-        rc = llog_cat_initialize(obd, 1, NULL);
+        rc = llog_cat_initialize(obd, NULL, 1, NULL);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_post, rc);
@@ -2076,8 +2079,10 @@ static struct llog_operations filter_size_orig_logops = {
         lop_add: llog_obd_origin_add
 };
 
-static int filter_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                            int count, struct llog_catid *catid,
+
+static int filter_llog_init(struct obd_device *obd, struct obd_llogs *llogs, 
+                            struct obd_device *tgt, int count, 
+                            struct llog_catid *catid,
                             struct obd_uuid *uuid)
 {
         struct llog_ctxt *ctxt;
@@ -2089,26 +2094,49 @@ static int filter_llog_init(struct obd_device *obd, struct obd_device *tgt,
         filter_mds_ost_repl_logops.lop_connect = llog_repl_connect;
         filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
 
-        rc = llog_setup(obd, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, llogs, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
                         &filter_mds_ost_repl_logops);
         if (rc)
                 RETURN(rc);
 
         /* FIXME - assign unlink_cb for filter's recovery */
-        ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
+        if (!llogs)
+                ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
+        else
+                ctxt = filter_llog_get_context(llogs, LLOG_MDS_OST_REPL_CTXT);
         ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
 
-        rc = llog_setup(obd, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
+        rc = llog_setup(obd, llogs, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
                         &filter_size_orig_logops);
         RETURN(rc);
 }
 
-static int filter_llog_finish(struct obd_device *obd, int count)
+
+static int filter_group_llog_finish(struct obd_llogs *llogs)
 {
         struct llog_ctxt *ctxt;
         int rc = 0, rc2 = 0;
         ENTRY;
+       
+        ctxt = filter_llog_get_context(llogs, LLOG_MDS_OST_REPL_CTXT);
+        if (ctxt)
+                rc = llog_cleanup(ctxt);
+
+        ctxt = filter_llog_get_context(llogs, LLOG_SIZE_ORIG_CTXT);
+        if (ctxt)
+                rc2 = llog_cleanup(ctxt);
+        if (!rc)
+                rc = rc2;
+
+        RETURN(rc);
+}
 
+static int filter_llog_finish(struct obd_device *obd, int count)
+{
+        struct llog_ctxt *ctxt;
+        int rc = 0, rc2 = 0;
+        ENTRY;
+       
         ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
         if (ctxt)
                 rc = llog_cleanup(ctxt);
@@ -2122,6 +2150,131 @@ static int filter_llog_finish(struct obd_device *obd, int count)
         RETURN(rc);
 }
 
+struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group,
+                                             struct obd_export *export)
+{
+        struct filter_group_llog *fglog, *nlog;
+        struct filter_obd *filter;
+        struct llog_ctxt *ctxt;
+        struct list_head *cur;
+        int rc;
+
+        filter = &obd->u.filter;
+
+        spin_lock(&filter->fo_llog_list_lock);
+        list_for_each(cur, &filter->fo_llog_list) {
+                fglog = list_entry(cur, struct filter_group_llog, list);
+                if (fglog->group == group) {
+                        if (!(fglog->exp == NULL || fglog->exp == export || export == NULL))
+                                CWARN("%s: export for group %d changes: 0x%p -> 0x%p\n",
+                                      obd->obd_name, group, fglog->exp, export);
+                        spin_unlock(&filter->fo_llog_list_lock);
+                        goto init;
+                }
+        }
+        spin_unlock(&filter->fo_llog_list_lock);
+
+        if (export == NULL)
+                RETURN(NULL);
+
+        OBD_ALLOC(fglog, sizeof(*fglog));
+        if (fglog == NULL)
+                RETURN(NULL);
+        fglog->group = group;
+
+        OBD_ALLOC(fglog->llogs, sizeof(struct obd_llogs));
+        if (fglog->llogs == NULL) {
+                OBD_FREE(fglog, sizeof(*fglog));
+                RETURN(NULL);
+        }
+
+        spin_lock(&filter->fo_llog_list_lock);
+        list_for_each(cur, &filter->fo_llog_list) {
+                nlog = list_entry(cur, struct filter_group_llog, list);
+                LASSERT(nlog->group != group);
+        }
+        list_add(&fglog->list, &filter->fo_llog_list);
+        spin_unlock(&filter->fo_llog_list_lock);
+
+        rc = llog_cat_initialize(obd, fglog->llogs, 1, NULL);
+        if (rc) {
+                OBD_FREE(fglog->llogs, sizeof(*(fglog->llogs)));
+                OBD_FREE(fglog, sizeof(*fglog));
+                RETURN(NULL);
+        }
+
+init:
+        if (export) {
+                fglog->exp = export;
+                ctxt = filter_llog_get_context(fglog->llogs, 
+                                               LLOG_MDS_OST_REPL_CTXT);
+                LASSERT(ctxt != NULL);
+
+                llog_receptor_accept(ctxt, export->exp_imp_reverse);
+        }
+        CDEBUG(D_OTHER, "%s: new llog 0x%p for group %u\n",
+               obd->obd_name, fglog->llogs, group);
+
+        RETURN(fglog->llogs);
+}
+
+static int filter_llog_connect(struct obd_export *exp,
+                               struct llogd_conn_body *body)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct llog_ctxt *ctxt;
+        struct obd_llogs *llog;
+        int rc;
+        ENTRY;
+
+        CDEBUG(D_OTHER, "handle connect for %s: %u/%u/%u\n", obd->obd_name,
+                (unsigned) body->lgdc_logid.lgl_ogr,
+                (unsigned) body->lgdc_logid.lgl_oid,
+                (unsigned) body->lgdc_logid.lgl_ogen);
+
+        llog = filter_grab_llog_for_group(obd, body->lgdc_logid.lgl_ogr, exp);
+        LASSERT(llog != NULL);
+        ctxt = filter_llog_get_context(llog, body->lgdc_ctxt_idx);
+        rc = llog_connect(ctxt, 1, &body->lgdc_logid,
+                          &body->lgdc_gen, NULL);
+        if (rc != 0)
+                CERROR("failed to connect\n");
+
+        RETURN(rc);
+}
+
+static int filter_llog_preclean (struct obd_device *obd)
+{
+        struct filter_group_llog *log;
+        struct filter_obd *filter;
+        int rc = 0;
+        ENTRY;
+
+        filter = &obd->u.filter;
+        spin_lock(&filter->fo_llog_list_lock);
+        while (!list_empty(&filter->fo_llog_list)) {
+                log = list_entry(filter->fo_llog_list.next, 
+                                 struct filter_group_llog, list);
+                list_del(&log->list);
+                spin_unlock(&filter->fo_llog_list_lock);
+
+                rc = filter_group_llog_finish(log->llogs);
+                if (rc)
+                        CERROR("failed to cleanup llogging subsystem for %u\n",
+                                log->group);
+                OBD_FREE(log->llogs, sizeof(*(log->llogs)));
+                OBD_FREE(log, sizeof(*log));
+                spin_lock(&filter->fo_llog_list_lock);
+        }
+        spin_unlock(&filter->fo_llog_list_lock);
+        
+        rc = obd_llog_finish(obd, 0);
+        if (rc)
+                CERROR("failed to cleanup llogging subsystem\n");
+
+        RETURN(rc);
+}
+
 static int filter_precleanup(struct obd_device *obd,
                              enum obd_cleanup_stage stage)
 {
@@ -2134,8 +2287,8 @@ static int filter_precleanup(struct obd_device *obd,
         case OBD_CLEANUP_EXPORTS:
                 target_cleanup_recovery(obd);
                 break;
-        case OBD_CLEANUP_SELF_EXP:
-                rc = filter_llog_finish(obd, 0);
+        case OBD_CLEANUP_SELF_EXP: 
+                rc = filter_llog_preclean(obd);
                 break;
         case OBD_CLEANUP_OBD:
                 break;
@@ -2513,12 +2666,60 @@ static int filter_destroy_export(struct obd_export *exp)
         RETURN(0);
 }
 
+static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp)
+{
+        struct filter_group_llog *fglog, *nlog;
+        struct filter_obd *filter;
+        int worked = 0, group;
+        struct llog_ctxt *ctxt;
+        ENTRY;
+
+        filter = &obd->u.filter;
+
+        /* we can't sync log holding spinlock. also, we do not want to get
+         * into livelock. so we do following: loop over MDS's exports in
+         * group order and skip already synced llogs -bzzz */
+        do {
+                /* look for group with min. number, but > worked */
+                fglog = NULL;
+                group = 1 << 30;
+                spin_lock(&filter->fo_llog_list_lock);
+                list_for_each_entry(nlog, &filter->fo_llog_list, list) {
+                       
+                        if (nlog->group <= worked) {
+                                /* this group is already synced */
+                                continue;
+                        }
+        
+                        if (group < nlog->group) {
+                                /* we have group with smaller number to sync */
+                                continue;
+                        }
+
+                        /* store current minimal group */
+                        fglog = nlog;
+                        group = nlog->group;
+                }
+                spin_unlock(&filter->fo_llog_list_lock);
+
+                if (fglog == NULL)
+                        break;
+
+                worked = fglog->group;
+                if (fglog->exp && (dexp == fglog->exp || dexp == NULL)) {
+                        ctxt = filter_llog_get_context(fglog->llogs,
+                                                LLOG_MDS_OST_REPL_CTXT);
+                        LASSERT(ctxt != NULL);
+                        llog_sync(ctxt, fglog->exp);
+                }
+        } while (fglog != NULL);
+}
+
 /* also incredibly similar to mds_disconnect */
 static int filter_disconnect(struct obd_export *exp)
 {
         struct obd_device *obd = exp->exp_obd;
-        struct llog_ctxt *ctxt;
-        int rc, err;
+        int rc;
         ENTRY;
 
         LASSERT(exp);
@@ -2536,11 +2737,7 @@ static int filter_disconnect(struct obd_export *exp)
         fsfilt_sync(obd, obd->u.obt.obt_sb);
 
         /* flush any remaining cancel messages out to the target */
-        ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
-        err = llog_sync(ctxt, exp);
-        if (err)
-                CERROR("error flushing logs to MDS: rc %d\n", err);
-
+        filter_sync_llogs(obd, exp);
         class_export_put(exp);
         RETURN(rc);
 }
@@ -3710,6 +3907,7 @@ static struct obd_ops filter_obd_ops = {
         .o_preprw         = filter_preprw,
         .o_commitrw       = filter_commitrw,
         .o_llog_init      = filter_llog_init,
+        .o_llog_connect   = filter_llog_connect,
         .o_llog_finish    = filter_llog_finish,
         .o_iocontrol      = filter_iocontrol,
         .o_health_check   = filter_health_check,
index 4021e6c..5f7f04d 100644 (file)
@@ -115,6 +115,17 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo,
 
 struct dentry *filter_create_object(struct obd_device *obd, struct obdo *oa);
 
+struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group,
+                                             struct obd_export *export);
+
+static inline struct llog_ctxt *filter_llog_get_context(struct obd_llogs *llogs,
+                                                        int index)
+{                
+        if (index < 0 || index >= LLOG_MAX_CTXTS)
+                return NULL;
+        return llogs->llog_ctxt[index];
+}
+
 /* filter_lvb.c */
 extern struct ldlm_valblock_ops filter_lvbo;
 
index 4ee0b79..fc0dc92 100644 (file)
@@ -100,19 +100,28 @@ void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
                               void *cb_data, int error)
 {
         struct llog_cookie *cookie = cb_data;
-        int rc;
-
+        struct obd_llogs *llogs;
+        struct llog_ctxt *ctxt;
+        
+        /* we have to find context for right group */
         if (error != 0) {
                 CDEBUG(D_INODE, "not cancelling llog cookie on error %d\n",
                        error);
                 return;
         }
-
-        rc = llog_cancel(llog_get_context(obd, cookie->lgc_subsys + 1),
-                         NULL, 1, cookie, 0);
-        if (rc)
-                CERROR("error cancelling log cookies: rc = %d\n", rc);
-        OBD_FREE(cookie, sizeof(*cookie));
+        llogs = filter_grab_llog_for_group(obd, cookie->lgc_lgl.lgl_ogr, NULL);
+
+        if (llogs) {
+                ctxt = filter_llog_get_context(llogs, cookie->lgc_subsys + 1);
+                if (ctxt) {
+                        llog_cancel(ctxt, NULL, 1, cookie, 0);
+                } else
+                        CERROR("no valid context for group "LPU64"\n",
+                                cookie->lgc_lgl.lgl_ogr);
+        } else {
+                CDEBUG(D_HA, "unknown group "LPU64"!\n", cookie->lgc_lgl.lgl_ogr);
+        }
+        OBD_FREE(cb_data, sizeof(struct llog_cookie));
 }
 
 /* Callback for processing the unlink log record received from MDS by 
index 13d7e03..973fbec 100644 (file)
@@ -3217,9 +3217,9 @@ static struct llog_operations osc_size_repl_logops = {
 };
 
 static struct llog_operations osc_mds_ost_orig_logops;
-static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                         int count, struct llog_catid *catid
-                         struct obd_uuid *uuid)
+static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs, 
+                         struct obd_device *tgt, int count
+                         struct llog_catid *catid, struct obd_uuid *uuid)
 {
         int rc;
         ENTRY;
@@ -3234,14 +3234,14 @@ static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
         }
         spin_unlock(&obd->obd_dev_lock);
 
-        rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
+        rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
                         &catid->lci_logid, &osc_mds_ost_orig_logops);
         if (rc) {
                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
                 GOTO (out, rc);
         }
 
-        rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
+        rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
                         &osc_size_repl_logops);
         if (rc) 
                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
index d883f47..d69e4b3 100644 (file)
@@ -1180,6 +1180,19 @@ static int ost_handle_quotacheck(struct ptlrpc_request *req)
         RETURN(0);
 }
 
+static int ost_llog_handle_connect(struct obd_export *exp,
+                                                   struct ptlrpc_request *req)
+{
+        struct llogd_conn_body *body;
+        int rc;
+        ENTRY;
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
+        rc = obd_llog_connect(exp, body);
+        RETURN(rc);
+}
+
+
 static int ost_filter_recovery_request(struct ptlrpc_request *req,
                                        struct obd_device *obd, int *process)
 {
@@ -1428,9 +1441,9 @@ static int ost_handle(struct ptlrpc_request *req)
         /* FIXME - just reply status */
         case LLOG_ORIGIN_CONNECT:
                 DEBUG_REQ(D_INODE, req, "log connect\n");
-                rc = llog_handle_connect(req);
+                rc = ost_llog_handle_connect(req->rq_export, req);
                 req->rq_status = rc;
-                rc = lustre_pack_reply(req, 1, NULL, NULL);
+                rc = lustre_pack_reply(req, 0, NULL, NULL);
                 if (rc)
                         RETURN(rc);
                 RETURN(ptlrpc_reply(req));