merge llog group support from HEAD and some fixes.
int llog_cat_set_first_idx(struct llog_handle *cathandle, int index);
/* llog_obd.c */
-int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
- int count, struct llog_logid *logid,struct llog_operations *op);
+int llog_setup(struct obd_device *obd, struct obd_llogs *llogs, int index,
+ struct obd_device *disk_obd, int count, struct llog_logid *logid,
+ struct llog_operations *op);
int llog_cleanup(struct llog_ctxt *);
int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
struct llog_cookie *logcookies, int numcookies);
-int llog_cat_initialize(struct obd_device *obd, int count,
- struct obd_uuid *uuid);
-int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
- int count, struct llog_catid *logid, struct obd_uuid *uuid);
+int llog_cat_initialize(struct obd_device *obd, struct obd_llogs *llogs,
+ int count, struct obd_uuid *uuid);
+int obd_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+ struct obd_device *disk_obd, int count,
+ struct llog_catid *logid, struct obd_uuid *uuid);
int obd_llog_finish(struct obd_device *obd, int count);
struct lustre_quota_ctxt obt_qctxt;
};
+/* llog contexts */
+enum llog_ctxt_id {
+ LLOG_CONFIG_ORIG_CTXT = 0,
+ LLOG_CONFIG_REPL_CTXT = 1,
+ LLOG_MDS_OST_ORIG_CTXT = 2,
+ LLOG_MDS_OST_REPL_CTXT = 3,
+ LLOG_SIZE_ORIG_CTXT = 4,
+ LLOG_SIZE_REPL_CTXT = 5,
+ LLOG_MD_ORIG_CTXT = 6,
+ LLOG_MD_REPL_CTXT = 7,
+ LLOG_RD1_ORIG_CTXT = 8,
+ LLOG_RD1_REPL_CTXT = 9,
+ LLOG_TEST_ORIG_CTXT = 10,
+ LLOG_TEST_REPL_CTXT = 11,
+ LLOG_LOVEA_ORIG_CTXT = 12,
+ LLOG_LOVEA_REPL_CTXT = 13,
+ LLOG_MAX_CTXTS
+};
+
#define FILTER_SUBDIR_COUNT 32 /* set to zero for no subdirs */
#define FILTER_GROUP_LLOG 1
__u64 fe_end;
};
+struct obd_llogs {
+ struct llog_ctxt *llog_ctxt[LLOG_MAX_CTXTS];
+};
+
+struct filter_group_llog {
+ struct list_head list;
+ int group;
+ struct obd_llogs *llogs;
+ struct obd_export *exp;
+};
+
struct filter_obd {
/* NB this field MUST be first */
struct obd_device_target fo_obt;
struct obd_histogram fo_r_dio_frags;
struct obd_histogram fo_w_dio_frags;
+ struct list_head fo_llog_list;
+ spinlock_t fo_llog_list_lock;
+
struct lustre_quota_ctxt fo_quota_ctxt;
spinlock_t fo_quotacheck_lock;
atomic_t fo_quotachecking;
unsigned long mds_lov_objids_valid:1,
mds_fl_user_xattr:1,
mds_fl_acl:1;
+
+ /* For CMD add mds_num */
+ int mds_num;
+
};
struct echo_obd {
oti->oti_numcookies = 0;
}
-/* llog contexts */
-enum llog_ctxt_id {
- LLOG_CONFIG_ORIG_CTXT = 0,
- LLOG_CONFIG_REPL_CTXT = 1,
- LLOG_MDS_OST_ORIG_CTXT = 2,
- LLOG_MDS_OST_REPL_CTXT = 3,
- LLOG_SIZE_ORIG_CTXT = 4,
- LLOG_SIZE_REPL_CTXT = 5,
- LLOG_MD_ORIG_CTXT = 6,
- LLOG_MD_REPL_CTXT = 7,
- LLOG_RD1_ORIG_CTXT = 8,
- LLOG_RD1_REPL_CTXT = 9,
- LLOG_TEST_ORIG_CTXT = 10,
- LLOG_TEST_REPL_CTXT = 11,
- LLOG_LOVEA_ORIG_CTXT = 12,
- LLOG_LOVEA_REPL_CTXT = 13,
- LLOG_MAX_CTXTS
-};
-
/*
* Events signalled through obd_notify() upcall-chain.
*/
};
#include <lu_object.h>
+
/*
* Data structure used to pass obd_notify()-event to non-obd listeners (llite
* and liblustre being main examples).
int cmd, obd_off *);
/* llog related obd_methods */
- int (*o_llog_init)(struct obd_device *obd, struct obd_device *disk_obd,
- int count, struct llog_catid *logid,
- struct obd_uuid *uuid);
+ int (*o_llog_init)(struct obd_device *obd, struct obd_llogs *llog,
+ struct obd_device *disk_obd, int count,
+ struct llog_catid *logid, struct obd_uuid *uuid);
int (*o_llog_finish)(struct obd_device *obd, int count);
-
+ int (*o_llog_connect)(struct obd_export *, struct llogd_conn_body *);
+
/* metadata-only methods */
int (*o_pin)(struct obd_export *, const struct lu_fid *fid,
struct obd_client_handle *, int flag);
}
}
+static inline int obd_llog_connect(struct obd_export *exp,
+ struct llogd_conn_body *body)
+{
+ ENTRY;
+
+ OBD_CHECK_DT_OP(exp->exp_obd, llog_connect, 0);
+ OBD_COUNTER_INCREMENT(exp->exp_obd, llog_connect);
+
+ return OBP(exp->exp_obd, llog_connect)(exp, body);
+}
+
+
static inline int obd_notify(struct obd_device *obd,
struct obd_device *watched,
enum obd_notify_event ev,
RETURN(-EINVAL);
}
- rc = obd_llog_init(obd, mdc_obd, 0, NULL, tgt_uuid);
+ rc = obd_llog_init(obd, NULL, mdc_obd, 0, NULL, tgt_uuid);
if (rc) {
lmv_init_unlock(lmv);
CERROR("lmv failed to setup llogging subsystems\n");
RETURN(rc);
}
-static int lmv_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *logid, struct obd_uuid *uuid)
+static int lmv_llog_init(struct obd_device *obd, struct obd_llogs* llogs,
+ struct obd_device *tgt, int count,
+ struct llog_catid *logid, struct obd_uuid *uuid)
{
struct llog_ctxt *ctxt;
int rc;
ENTRY;
- rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
&llog_client_ops);
if (rc == 0) {
ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
void lov_putref(struct obd_device *obd);
/* lov_log.c */
-int lov_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *logid, struct obd_uuid *uuid);
+int lov_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+ struct obd_device *tgt, int count, struct llog_catid *logid,
+ struct obd_uuid *uuid);
int lov_llog_finish(struct obd_device *obd, int count);
/* lov_pack.c */
lop_cancel: lov_llog_repl_cancel
};
-int lov_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *logid, struct obd_uuid *uuid)
+int lov_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+ struct obd_device *tgt, int count, struct llog_catid *logid,
+ struct obd_uuid *uuid)
{
struct lov_obd *lov = &obd->u.lov;
struct obd_device *child;
int i, rc = 0, err = 0;
ENTRY;
- rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
&lov_mds_ost_orig_logops);
if (rc)
RETURN(rc);
- rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
&lov_size_repl_logops);
if (rc)
RETURN(rc);
CDEBUG(D_CONFIG, "init %d/%d\n", i, count);
LASSERT(lov->lov_tgts[i]->ltd_exp);
child = lov->lov_tgts[i]->ltd_exp->exp_obd;
- rc = obd_llog_init(child, tgt, 1, logid + i, uuid);
+ rc = obd_llog_init(child, llogs, tgt, 1, logid + i, uuid);
if (rc) {
CERROR("error osc_llog_init idx %d osc '%s' tgt '%s' "
"(rc=%d)\n", i, child->obd_name, tgt->obd_name,
lprocfs_init_vars(mdc, &lvars);
lprocfs_obd_setup(obd, lvars.obd_vars);
- rc = obd_llog_init(obd, obd, 0, NULL, NULL);
+ rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL);
if (rc) {
mdc_cleanup(obd);
CERROR("failed to setup llogging subsystems\n");
}
-static int mdc_llog_init(struct obd_device *obd, struct obd_device *tgt,
+static int mdc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+ struct obd_device *tgt,
int count, struct llog_catid *logid,
struct obd_uuid *uuid)
{
int rc;
ENTRY;
- rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
&llog_client_ops);
if (rc == 0) {
ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
ctxt->loc_imp = obd->u.cli.cl_import;
}
- rc = llog_setup(obd, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, llogs, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL,
&llog_client_ops);
if (rc == 0) {
ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
obd->obd_upcall.onu_owner = mdd;
obd->obd_upcall.onu_upcall = mdd_lov_update;
mdd->mdd_obd_dev = obd;
+ obd->u.mds.mds_num = mdd2lu_dev(mdd)->ld_site->ls_node_id;
class_detach:
if (rc)
class_detach(obd, lcfg);
int rc = 0;
ENTRY;
- rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
+ rc = llog_setup(obd, NULL, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
&llog_lvfs_ops);
if (rc)
RETURN(rc);
- rc = llog_setup(obd, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
+ rc = llog_setup(obd, NULL, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
&llog_lvfs_ops);
if (rc)
RETURN(rc);
err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
handle, 0);
- if (!err)
- oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
- else if (!rc)
+ if (!err) {
+ oa->o_gr = FILTER_GROUP_MDS0 + mds->mds_num;
+ oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLGROUP;
+ } else if (!rc)
rc = err;
out_dput:
dput(new_child);
/* mds/mds_log.c */
-int mds_llog_init(struct obd_device *obd, struct obd_device *tgt, int count,
+int mds_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+ struct obd_device *tgt, int count,
struct llog_catid *logid, struct obd_uuid *uuid);
int mds_llog_finish(struct obd_device *obd, int count);
lop_cancel: mds_llog_repl_cancel,
};
-int mds_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *logid, struct obd_uuid *uuid)
+int mds_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+ struct obd_device *tgt, int count, struct llog_catid *logid,
+ struct obd_uuid *uuid)
{
struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
int rc;
ENTRY;
- rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
&mds_ost_orig_logops);
if (rc)
RETURN(rc);
- rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
&mds_size_repl_logops);
if (rc)
RETURN(rc);
- rc = obd_llog_init(lov_obd, tgt, count, logid, uuid);
+ rc = obd_llog_init(lov_obd, llogs, tgt, count, logid, uuid);
if (rc)
CERROR("lov_llog_init err %d\n", rc);
after recovery. However, it should now be safe to call anytime. */
CDEBUG(D_CONFIG, "reset llogs idx=%d\n", idx);
mutex_down(&obd->obd_dev_sem);
- llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, uuid);
+ llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, uuid);
mutex_up(&obd->obd_dev_sem);
RETURN(rc);
GOTO(err_reg, rc);
/* tgt_count may be 0! */
- rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
+ rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
if (rc) {
CERROR("failed to initialize catalog %d\n", rc);
GOTO(err_reg, rc);
push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
rc = llog_ioctl(ctxt, cmd, data);
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
- llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
+ llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
group = FILTER_GROUP_MDS0 + mds->mds_id;
rc2 = obd_set_info_async(mds->mds_osc_exp,
strlen(KEY_MDS_CONN), KEY_MDS_CONN,
if (rc)
GOTO(err_decref, rc);
- rc = obd_llog_init(obd, obd, 0, NULL, NULL);
+ rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL);
if (rc) {
CERROR("failed to setup llogging subsystems\n");
GOTO(err_cleanup, rc);
RETURN(rc);
}
-static int mgc_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *logid,
- struct obd_uuid *uuid)
+static int mgc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+ struct obd_device *tgt, int count,
+ struct llog_catid *logid, struct obd_uuid *uuid)
{
struct llog_ctxt *ctxt;
int rc;
ENTRY;
- rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
&llog_client_ops);
if (rc == 0) {
ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
if (rc)
GOTO(err_decref, rc);
- rc = obd_llog_init(obd, obd, 0, NULL, NULL);
+ rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL);
if (rc) {
CERROR("failed to setup llogging subsystems\n");
GOTO(err_cleanup, rc);
RETURN(rc);
}
-static int mgc_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *logid,
- struct obd_uuid *uuid)
+static int mgc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+ struct obd_device *tgt, int count,
+ struct llog_catid *logid, struct obd_uuid *uuid)
{
struct llog_ctxt *ctxt;
int rc;
ENTRY;
- rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, llogs, LLOG_CONFIG_ORIG_CTXT, tgt, 0, NULL,
&llog_lvfs_ops);
if (rc)
RETURN(rc);
- rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
&llog_client_ops);
if (rc == 0) {
ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
if (rc < 0)
GOTO(err_fs, rc);
- rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
+ rc = llog_setup(obd, NULL, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
&llog_lvfs_ops);
if (rc)
GOTO(err_fs, rc);
if (CTXTP(ctxt, cleanup))
rc = CTXTP(ctxt, cleanup)(ctxt);
-
ctxt->loc_obd->obd_llog_ctxt[ctxt->loc_idx] = NULL;
+
if (ctxt->loc_exp)
class_export_put(ctxt->loc_exp);
OBD_FREE(ctxt, sizeof(*ctxt));
}
EXPORT_SYMBOL(llog_cleanup);
-int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
- int count, struct llog_logid *logid, struct llog_operations *op)
+int llog_setup(struct obd_device *obd, struct obd_llogs *llogs, int index,
+ struct obd_device *disk_obd, int count, struct llog_logid *logid,
+ struct llog_operations *op)
{
int rc = 0;
struct llog_ctxt *ctxt;
LASSERT(ctxt->loc_logops == op);
GOTO(out, rc = 0);
}
-
+
OBD_ALLOC(ctxt, sizeof(*ctxt));
if (!ctxt)
GOTO(out, rc = -ENOMEM);
+ if (llogs)
+ llogs->llog_ctxt[index] = ctxt;
+
obd->obd_llog_ctxt[index] = ctxt;
ctxt->loc_obd = obd;
ctxt->loc_exp = class_export_get(disk_obd->obd_self_export);
if (op->lop_setup)
rc = op->lop_setup(obd, index, disk_obd, count, logid);
-
+
if (rc) {
obd->obd_llog_ctxt[index] = NULL;
class_export_put(ctxt->loc_exp);
OBD_FREE(ctxt, sizeof(*ctxt));
}
-
+
out:
RETURN(rc);
}
}
EXPORT_SYMBOL(llog_obd_origin_add);
-int llog_cat_initialize(struct obd_device *obd, int count,
- struct obd_uuid *uuid)
+int llog_cat_initialize(struct obd_device *obd, struct obd_llogs *llogs,
+ int count, struct obd_uuid *uuid)
{
char name[32] = CATLIST;
struct llog_catid *idarray;
GOTO(out, rc);
}
- rc = obd_llog_init(obd, obd, count, idarray, uuid);
+ rc = obd_llog_init(obd, llogs, obd, count, idarray, uuid);
if (rc) {
CERROR("rc: %d\n", rc);
GOTO(out, rc);
}
EXPORT_SYMBOL(llog_cat_initialize);
-int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
- int count, struct llog_catid *logid, struct obd_uuid *uuid)
+int obd_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+ struct obd_device *disk_obd, int count,
+ struct llog_catid *logid, struct obd_uuid *uuid)
{
int rc;
ENTRY;
OBD_CHECK_DT_OP(obd, llog_init, 0);
OBD_COUNTER_INCREMENT(obd, llog_init);
- rc = OBP(obd, llog_init)(obd, disk_obd, count, logid, uuid);
+ rc = OBP(obd, llog_init)(obd, llogs, disk_obd, count, logid,
+ uuid);
RETURN(rc);
}
EXPORT_SYMBOL(obd_llog_init);
RETURN(rc);
}
EXPORT_SYMBOL(obd_llog_finish);
+
}
-static int llog_test_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *logid,
- struct obd_uuid *uuid)
+static int llog_test_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+ struct obd_device *tgt, int count,
+ struct llog_catid *logid, struct obd_uuid *uuid)
{
int rc;
ENTRY;
- rc = llog_setup(obd, LLOG_TEST_ORIG_CTXT, tgt, 0, NULL, &llog_lvfs_ops);
+ rc = llog_setup(obd, llogs, LLOG_TEST_ORIG_CTXT, tgt, 0, NULL,
+ &llog_lvfs_ops);
RETURN(rc);
}
RETURN(-EINVAL);
}
- rc = obd_llog_init(obd, tgt, 0, NULL, NULL);
+ rc = obd_llog_init(obd, NULL, tgt, 0, NULL, NULL);
if (rc)
RETURN(rc);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, destroy_export);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, extent_calc);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, llog_init);
+ LPROCFS_OBD_OP_INIT(num_private_stats, stats, llog_connect);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, llog_finish);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, pin);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, unpin);
filter->fo_fmd_max_num = FILTER_FMD_MAX_NUM_DEFAULT;
filter->fo_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT;
+ INIT_LIST_HEAD(&filter->fo_llog_list);
+ spin_lock_init(&filter->fo_llog_list_lock);
+
sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid);
obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
if (obd->obd_namespace == NULL)
ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
"filter_ldlm_cb_client", &obd->obd_ldlm_client);
- rc = llog_cat_initialize(obd, 1, NULL);
+ rc = llog_cat_initialize(obd, NULL, 1, NULL);
if (rc) {
CERROR("failed to setup llogging subsystems\n");
GOTO(err_post, rc);
lop_add: llog_obd_origin_add
};
-static int filter_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *catid,
+
+static int filter_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+ struct obd_device *tgt, int count,
+ struct llog_catid *catid,
struct obd_uuid *uuid)
{
struct llog_ctxt *ctxt;
filter_mds_ost_repl_logops.lop_connect = llog_repl_connect;
filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
- rc = llog_setup(obd, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, llogs, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
&filter_mds_ost_repl_logops);
if (rc)
RETURN(rc);
/* FIXME - assign unlink_cb for filter's recovery */
- ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
+ if (!llogs)
+ ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
+ else
+ ctxt = filter_llog_get_context(llogs, LLOG_MDS_OST_REPL_CTXT);
ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
- rc = llog_setup(obd, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, llogs, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
&filter_size_orig_logops);
RETURN(rc);
}
-static int filter_llog_finish(struct obd_device *obd, int count)
+
+static int filter_group_llog_finish(struct obd_llogs *llogs)
{
struct llog_ctxt *ctxt;
int rc = 0, rc2 = 0;
ENTRY;
+
+ ctxt = filter_llog_get_context(llogs, LLOG_MDS_OST_REPL_CTXT);
+ if (ctxt)
+ rc = llog_cleanup(ctxt);
+
+ ctxt = filter_llog_get_context(llogs, LLOG_SIZE_ORIG_CTXT);
+ if (ctxt)
+ rc2 = llog_cleanup(ctxt);
+ if (!rc)
+ rc = rc2;
+
+ RETURN(rc);
+}
+static int filter_llog_finish(struct obd_device *obd, int count)
+{
+ struct llog_ctxt *ctxt;
+ int rc = 0, rc2 = 0;
+ ENTRY;
+
ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
if (ctxt)
rc = llog_cleanup(ctxt);
RETURN(rc);
}
+struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group,
+ struct obd_export *export)
+{
+ struct filter_group_llog *fglog, *nlog;
+ struct filter_obd *filter;
+ struct llog_ctxt *ctxt;
+ struct list_head *cur;
+ int rc;
+
+ filter = &obd->u.filter;
+
+ spin_lock(&filter->fo_llog_list_lock);
+ list_for_each(cur, &filter->fo_llog_list) {
+ fglog = list_entry(cur, struct filter_group_llog, list);
+ if (fglog->group == group) {
+ if (!(fglog->exp == NULL || fglog->exp == export || export == NULL))
+ CWARN("%s: export for group %d changes: 0x%p -> 0x%p\n",
+ obd->obd_name, group, fglog->exp, export);
+ spin_unlock(&filter->fo_llog_list_lock);
+ goto init;
+ }
+ }
+ spin_unlock(&filter->fo_llog_list_lock);
+
+ if (export == NULL)
+ RETURN(NULL);
+
+ OBD_ALLOC(fglog, sizeof(*fglog));
+ if (fglog == NULL)
+ RETURN(NULL);
+ fglog->group = group;
+
+ OBD_ALLOC(fglog->llogs, sizeof(struct obd_llogs));
+ if (fglog->llogs == NULL) {
+ OBD_FREE(fglog, sizeof(*fglog));
+ RETURN(NULL);
+ }
+
+ spin_lock(&filter->fo_llog_list_lock);
+ list_for_each(cur, &filter->fo_llog_list) {
+ nlog = list_entry(cur, struct filter_group_llog, list);
+ LASSERT(nlog->group != group);
+ }
+ list_add(&fglog->list, &filter->fo_llog_list);
+ spin_unlock(&filter->fo_llog_list_lock);
+
+ rc = llog_cat_initialize(obd, fglog->llogs, 1, NULL);
+ if (rc) {
+ OBD_FREE(fglog->llogs, sizeof(*(fglog->llogs)));
+ OBD_FREE(fglog, sizeof(*fglog));
+ RETURN(NULL);
+ }
+
+init:
+ if (export) {
+ fglog->exp = export;
+ ctxt = filter_llog_get_context(fglog->llogs,
+ LLOG_MDS_OST_REPL_CTXT);
+ LASSERT(ctxt != NULL);
+
+ llog_receptor_accept(ctxt, export->exp_imp_reverse);
+ }
+ CDEBUG(D_OTHER, "%s: new llog 0x%p for group %u\n",
+ obd->obd_name, fglog->llogs, group);
+
+ RETURN(fglog->llogs);
+}
+
+static int filter_llog_connect(struct obd_export *exp,
+ struct llogd_conn_body *body)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct llog_ctxt *ctxt;
+ struct obd_llogs *llog;
+ int rc;
+ ENTRY;
+
+ CDEBUG(D_OTHER, "handle connect for %s: %u/%u/%u\n", obd->obd_name,
+ (unsigned) body->lgdc_logid.lgl_ogr,
+ (unsigned) body->lgdc_logid.lgl_oid,
+ (unsigned) body->lgdc_logid.lgl_ogen);
+
+ llog = filter_grab_llog_for_group(obd, body->lgdc_logid.lgl_ogr, exp);
+ LASSERT(llog != NULL);
+ ctxt = filter_llog_get_context(llog, body->lgdc_ctxt_idx);
+ rc = llog_connect(ctxt, 1, &body->lgdc_logid,
+ &body->lgdc_gen, NULL);
+ if (rc != 0)
+ CERROR("failed to connect\n");
+
+ RETURN(rc);
+}
+
+static int filter_llog_preclean (struct obd_device *obd)
+{
+ struct filter_group_llog *log;
+ struct filter_obd *filter;
+ int rc = 0;
+ ENTRY;
+
+ filter = &obd->u.filter;
+ spin_lock(&filter->fo_llog_list_lock);
+ while (!list_empty(&filter->fo_llog_list)) {
+ log = list_entry(filter->fo_llog_list.next,
+ struct filter_group_llog, list);
+ list_del(&log->list);
+ spin_unlock(&filter->fo_llog_list_lock);
+
+ rc = filter_group_llog_finish(log->llogs);
+ if (rc)
+ CERROR("failed to cleanup llogging subsystem for %u\n",
+ log->group);
+ OBD_FREE(log->llogs, sizeof(*(log->llogs)));
+ OBD_FREE(log, sizeof(*log));
+ spin_lock(&filter->fo_llog_list_lock);
+ }
+ spin_unlock(&filter->fo_llog_list_lock);
+
+ rc = obd_llog_finish(obd, 0);
+ if (rc)
+ CERROR("failed to cleanup llogging subsystem\n");
+
+ RETURN(rc);
+}
+
static int filter_precleanup(struct obd_device *obd,
enum obd_cleanup_stage stage)
{
case OBD_CLEANUP_EXPORTS:
target_cleanup_recovery(obd);
break;
- case OBD_CLEANUP_SELF_EXP:
- rc = filter_llog_finish(obd, 0);
+ case OBD_CLEANUP_SELF_EXP:
+ rc = filter_llog_preclean(obd);
break;
case OBD_CLEANUP_OBD:
break;
RETURN(0);
}
+static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp)
+{
+ struct filter_group_llog *fglog, *nlog;
+ struct filter_obd *filter;
+ int worked = 0, group;
+ struct llog_ctxt *ctxt;
+ ENTRY;
+
+ filter = &obd->u.filter;
+
+ /* we can't sync log holding spinlock. also, we do not want to get
+ * into livelock. so we do following: loop over MDS's exports in
+ * group order and skip already synced llogs -bzzz */
+ do {
+ /* look for group with min. number, but > worked */
+ fglog = NULL;
+ group = 1 << 30;
+ spin_lock(&filter->fo_llog_list_lock);
+ list_for_each_entry(nlog, &filter->fo_llog_list, list) {
+
+ if (nlog->group <= worked) {
+ /* this group is already synced */
+ continue;
+ }
+
+ if (group < nlog->group) {
+ /* we have group with smaller number to sync */
+ continue;
+ }
+
+ /* store current minimal group */
+ fglog = nlog;
+ group = nlog->group;
+ }
+ spin_unlock(&filter->fo_llog_list_lock);
+
+ if (fglog == NULL)
+ break;
+
+ worked = fglog->group;
+ if (fglog->exp && (dexp == fglog->exp || dexp == NULL)) {
+ ctxt = filter_llog_get_context(fglog->llogs,
+ LLOG_MDS_OST_REPL_CTXT);
+ LASSERT(ctxt != NULL);
+ llog_sync(ctxt, fglog->exp);
+ }
+ } while (fglog != NULL);
+}
+
/* also incredibly similar to mds_disconnect */
static int filter_disconnect(struct obd_export *exp)
{
struct obd_device *obd = exp->exp_obd;
- struct llog_ctxt *ctxt;
- int rc, err;
+ int rc;
ENTRY;
LASSERT(exp);
fsfilt_sync(obd, obd->u.obt.obt_sb);
/* flush any remaining cancel messages out to the target */
- ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
- err = llog_sync(ctxt, exp);
- if (err)
- CERROR("error flushing logs to MDS: rc %d\n", err);
-
+ filter_sync_llogs(obd, exp);
class_export_put(exp);
RETURN(rc);
}
.o_preprw = filter_preprw,
.o_commitrw = filter_commitrw,
.o_llog_init = filter_llog_init,
+ .o_llog_connect = filter_llog_connect,
.o_llog_finish = filter_llog_finish,
.o_iocontrol = filter_iocontrol,
.o_health_check = filter_health_check,
struct dentry *filter_create_object(struct obd_device *obd, struct obdo *oa);
+struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group,
+ struct obd_export *export);
+
+static inline struct llog_ctxt *filter_llog_get_context(struct obd_llogs *llogs,
+ int index)
+{
+ if (index < 0 || index >= LLOG_MAX_CTXTS)
+ return NULL;
+ return llogs->llog_ctxt[index];
+}
+
/* filter_lvb.c */
extern struct ldlm_valblock_ops filter_lvbo;
void *cb_data, int error)
{
struct llog_cookie *cookie = cb_data;
- int rc;
-
+ struct obd_llogs *llogs;
+ struct llog_ctxt *ctxt;
+
+ /* we have to find context for right group */
if (error != 0) {
CDEBUG(D_INODE, "not cancelling llog cookie on error %d\n",
error);
return;
}
-
- rc = llog_cancel(llog_get_context(obd, cookie->lgc_subsys + 1),
- NULL, 1, cookie, 0);
- if (rc)
- CERROR("error cancelling log cookies: rc = %d\n", rc);
- OBD_FREE(cookie, sizeof(*cookie));
+ llogs = filter_grab_llog_for_group(obd, cookie->lgc_lgl.lgl_ogr, NULL);
+
+ if (llogs) {
+ ctxt = filter_llog_get_context(llogs, cookie->lgc_subsys + 1);
+ if (ctxt) {
+ llog_cancel(ctxt, NULL, 1, cookie, 0);
+ } else
+ CERROR("no valid context for group "LPU64"\n",
+ cookie->lgc_lgl.lgl_ogr);
+ } else {
+ CDEBUG(D_HA, "unknown group "LPU64"!\n", cookie->lgc_lgl.lgl_ogr);
+ }
+ OBD_FREE(cb_data, sizeof(struct llog_cookie));
}
/* Callback for processing the unlink log record received from MDS by
};
static struct llog_operations osc_mds_ost_orig_logops;
-static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *catid,
- struct obd_uuid *uuid)
+static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+ struct obd_device *tgt, int count,
+ struct llog_catid *catid, struct obd_uuid *uuid)
{
int rc;
ENTRY;
}
spin_unlock(&obd->obd_dev_lock);
- rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
+ rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
&catid->lci_logid, &osc_mds_ost_orig_logops);
if (rc) {
CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
GOTO (out, rc);
}
- rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
+ rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
&osc_size_repl_logops);
if (rc)
CERROR("failed LLOG_SIZE_REPL_CTXT\n");
RETURN(0);
}
+static int ost_llog_handle_connect(struct obd_export *exp,
+ struct ptlrpc_request *req)
+{
+ struct llogd_conn_body *body;
+ int rc;
+ ENTRY;
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
+ rc = obd_llog_connect(exp, body);
+ RETURN(rc);
+}
+
+
static int ost_filter_recovery_request(struct ptlrpc_request *req,
struct obd_device *obd, int *process)
{
/* FIXME - just reply status */
case LLOG_ORIGIN_CONNECT:
DEBUG_REQ(D_INODE, req, "log connect\n");
- rc = llog_handle_connect(req);
+ rc = ost_llog_handle_connect(req->rq_export, req);
req->rq_status = rc;
- rc = lustre_pack_reply(req, 1, NULL, NULL);
+ rc = lustre_pack_reply(req, 0, NULL, NULL);
if (rc)
RETURN(rc);
RETURN(ptlrpc_reply(req));