Severity : enhancement
Bugzilla : 14748
Description: Optimize ldlm waiting list processing for PR extent locks
-Details : When processing waiting list for read extent lock and meeting
-read
+Details : When processing waiting list for read extent lock and meeting read
lock that is same or wider to it that is not contended, skip
processing rest of the list and immediatelly return current
status of conflictness, since we are guaranteed there are no
to have two identical connections in imp_conn_list. We must
compare not conn's pointers but NIDs, otherwise we can defeat
connection throttling.
+
+Severity : normal
+Bugzilla : 13821
+Description: port llog fixes from b1_6 into HEAD
+Details : Port llog reference couting and some llog cleanups from b1_6
+ (bug 10800) into HEAD, for protect from panic and access to already
+ free llog structures.
+
--------------------------------------------------------------------------------
cap_t cap_get_proc(void);
int cap_get_flag(cap_t, cap_value_t, cap_flag_t, cap_flag_value_t *);
-/* log related */
-static inline int llog_init_commit_master(void) { return 0; }
-static inline int llog_cleanup_commit_master(int force) { return 0; }
static inline void libcfs_run_lbug_upcall(char *file, const char *fn,
const int l){}
};
/* ptlrpc/recov_thread.c */
-int llog_start_commit_thread(void);
-struct llog_canceld_ctxt *llcd_grab(void);
-void llcd_send(struct llog_canceld_ctxt *llcd);
+int llog_start_commit_thread(struct llog_commit_master *);
+int llog_init_commit_master(struct llog_commit_master *);
+int llog_cleanup_commit_master(struct llog_commit_master *lcm, int force);
#endif /* _LUSTRE_COMMIT_CONFD_H */
int llog_cat_set_first_idx(struct llog_handle *cathandle, int index);
/* llog_obd.c */
-int llog_setup(struct obd_device *obd, struct obd_llogs *llogs, int index,
+int llog_setup(struct obd_device *obd, struct obd_llog_group *olg, int index,
struct obd_device *disk_obd, int count, struct llog_logid *logid,
struct llog_operations *op);
+int __llog_ctxt_put(struct llog_ctxt *ctxt);
int llog_cleanup(struct llog_ctxt *);
int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
int llog_cancel(struct llog_ctxt *, struct lov_stripe_md *lsm,
int count, struct llog_cookie *cookies, int flags);
-int llog_obd_origin_setup(struct obd_device *obd, struct obd_llogs *llogs,
+int llog_obd_origin_setup(struct obd_device *obd, struct obd_llog_group *olg,
int index, struct obd_device *disk_obd, int count,
struct llog_logid *logid);
int llog_obd_origin_cleanup(struct llog_ctxt *ctxt);
struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
struct llog_cookie *logcookies, int numcookies);
-int llog_cat_initialize(struct obd_device *obd, struct obd_llogs *llogs,
+int llog_cat_initialize(struct obd_device *obd, struct obd_llog_group *olg,
int count, struct obd_uuid *uuid);
-int obd_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+int obd_llog_init(struct obd_device *obd, int group,
struct obd_device *disk_obd, int count,
struct llog_catid *logid, struct obd_uuid *uuid);
int (*lop_close)(struct llog_handle *handle);
int (*lop_read_header)(struct llog_handle *handle);
- int (*lop_setup)(struct obd_device *obd, struct obd_llogs *llogs,
+ int (*lop_setup)(struct obd_device *obd, struct obd_llog_group *olg,
int ctxt_idx, struct obd_device *disk_obd, int count,
struct llog_logid *logid);
int (*lop_sync)(struct llog_ctxt *ctxt, struct obd_export *exp);
int loc_idx; /* my index the obd array of ctxt's */
struct llog_gen loc_gen;
struct obd_device *loc_obd; /* points back to the containing obd*/
+ struct obd_llog_group *loc_olg; /* group containing that ctxt */
struct obd_export *loc_exp; /* parent "disk" export (e.g. MDS) */
struct obd_import *loc_imp; /* to use in RPC's: can be backward
pointing import */
struct llog_handle *loc_handle;
struct llog_canceld_ctxt *loc_llcd;
struct semaphore loc_sem; /* protects loc_llcd and loc_imp */
+ atomic_t loc_refcount;
+ struct llog_commit_master *loc_lcm;
void *llog_proc_cb;
};
return size_round(len);
}
+static inline struct llog_ctxt *llog_ctxt_get(struct llog_ctxt *ctxt)
+{
+ LASSERT(atomic_read(&ctxt->loc_refcount) > 0);
+ atomic_inc(&ctxt->loc_refcount);
+ CDEBUG(D_INFO, "GETting ctxt %p : new refcount %d\n", ctxt,
+ atomic_read(&ctxt->loc_refcount));
+ return ctxt;
+}
+
+static inline void llog_ctxt_put(struct llog_ctxt *ctxt)
+{
+ if (ctxt == NULL)
+ return;
+ CDEBUG(D_INFO, "PUTting ctxt %p : new refcount %d\n", ctxt,
+ atomic_read(&ctxt->loc_refcount) - 1);
+ LASSERT(atomic_read(&ctxt->loc_refcount) > 0);
+ LASSERT(atomic_read(&ctxt->loc_refcount) < 0x5a5a5a);
+ __llog_ctxt_put(ctxt);
+}
+
+static inline void llog_group_init(struct obd_llog_group *olg, int group)
+{
+ cfs_waitq_init(&olg->olg_waitq);
+ spin_lock_init(&olg->olg_lock);
+ olg->olg_group = group;
+}
+
+static inline void llog_group_set_export(struct obd_llog_group *olg,
+ struct obd_export *exp)
+{
+ LASSERT(exp != NULL);
+
+ spin_lock(&olg->olg_lock);
+ if (olg->olg_exp != NULL && olg->olg_exp != exp)
+ CWARN("%s: export for group %d is changed: 0x%p -> 0x%p\n",
+ exp->exp_obd->obd_name, olg->olg_group,
+ olg->olg_exp, exp);
+ olg->olg_exp = exp;
+ spin_unlock(&olg->olg_lock);
+}
+
+static inline int llog_group_set_ctxt(struct obd_llog_group *olg,
+ struct llog_ctxt *ctxt, int index)
+{
+ LASSERT(index >= 0 && index < LLOG_MAX_CTXTS);
+
+ spin_lock(&olg->olg_lock);
+ if (olg->olg_ctxts[index] != NULL) {
+ spin_unlock(&olg->olg_lock);
+ return -EEXIST;
+ }
+ olg->olg_ctxts[index] = ctxt;
+ spin_unlock(&olg->olg_lock);
+ return 0;
+}
+
+static inline struct llog_ctxt *llog_group_get_ctxt(struct obd_llog_group *olg,
+ int index)
+{
+ struct llog_ctxt *ctxt;
+
+ LASSERT(index >= 0 && index < LLOG_MAX_CTXTS);
+
+ spin_lock(&olg->olg_lock);
+ if (olg->olg_ctxts[index] == NULL) {
+ ctxt = NULL;
+ } else {
+ ctxt = llog_ctxt_get(olg->olg_ctxts[index]);
+ }
+ spin_unlock(&olg->olg_lock);
+ return ctxt;
+}
+
static inline struct llog_ctxt *llog_get_context(struct obd_device *obd,
int index)
{
- if (index < 0 || index >= LLOG_MAX_CTXTS)
- return NULL;
+ return llog_group_get_ctxt(&obd->obd_olg, index);
+}
- return obd->obd_llog_ctxt[index];
+static inline int llog_group_ctxt_null(struct obd_llog_group *olg, int index)
+{
+ return (olg->olg_ctxts[index] == NULL);
}
-static inline struct llog_ctxt *
-llog_get_context_from_llogs(struct obd_llogs *llogs, int index)
-{
- if (index < 0 || index >= LLOG_MAX_CTXTS)
- return NULL;
- return llogs->llog_ctxt[index];
+static inline int llog_ctxt_null(struct obd_device *obd, int index)
+{
+ return (llog_group_ctxt_null(&obd->obd_olg, index));
}
static inline int llog_write_rec(struct llog_handle *handle,
/* llog contexts */
enum llog_ctxt_id {
LLOG_CONFIG_ORIG_CTXT = 0,
- LLOG_CONFIG_REPL_CTXT = 1,
- LLOG_MDS_OST_ORIG_CTXT = 2,
- LLOG_MDS_OST_REPL_CTXT = 3,
- LLOG_SIZE_ORIG_CTXT = 4,
- LLOG_SIZE_REPL_CTXT = 5,
- LLOG_MD_ORIG_CTXT = 6,
- LLOG_MD_REPL_CTXT = 7,
- LLOG_RD1_ORIG_CTXT = 8,
- LLOG_RD1_REPL_CTXT = 9,
- LLOG_TEST_ORIG_CTXT = 10,
- LLOG_TEST_REPL_CTXT = 11,
- LLOG_LOVEA_ORIG_CTXT = 12,
- LLOG_LOVEA_REPL_CTXT = 13,
+ LLOG_CONFIG_REPL_CTXT,
+ LLOG_MDS_OST_ORIG_CTXT,
+ LLOG_MDS_OST_REPL_CTXT,
+ LLOG_SIZE_ORIG_CTXT,
+ LLOG_SIZE_REPL_CTXT,
+ LLOG_RD1_ORIG_CTXT,
+ LLOG_RD1_REPL_CTXT,
+ LLOG_TEST_ORIG_CTXT,
+ LLOG_TEST_REPL_CTXT,
+ LLOG_LOVEA_ORIG_CTXT,
+ LLOG_LOVEA_REPL_CTXT,
LLOG_MAX_CTXTS
};
#define FILTER_SUBDIR_COUNT 32 /* set to zero for no subdirs */
-#define FILTER_GROUP_LLOG 1
-#define FILTER_GROUP_ECHO 2
-#define FILTER_GROUP_MDS0 3
-
struct filter_subdirs {
cfs_dentry_t *dentry[FILTER_SUBDIR_COUNT];
};
__u64 fe_end;
};
-struct obd_llogs {
- struct llog_ctxt *llog_ctxt[LLOG_MAX_CTXTS];
-};
-
-struct filter_group_llog {
- struct list_head list;
- int group;
- struct obd_llogs *llogs;
- struct obd_export *exp;
-};
-
struct filter_obd {
/* NB this field MUST be first */
struct obd_device_target fo_obt;
unsigned int fo_fl_oss_capa;
struct list_head fo_capa_keys;
struct hlist_head *fo_capa_hash;
+
+ void *fo_lcm;
};
#define OSC_MAX_RIF_DEFAULT 8
cfs_dentry_t *mds_logs_dir;
cfs_dentry_t *mds_objects_dir;
struct llog_handle *mds_cfg_llh;
-// struct llog_handle *mds_catalog;
struct obd_device *mds_osc_obd; /* XXX lov_obd */
struct obd_uuid mds_lov_uuid;
char *mds_profile;
struct completion trd_finishing;
};
+#define OBD_LLOG_GROUP 0
+
+enum filter_groups {
+ FILTER_GROUP_LLOG = 1,
+ FILTER_GROUP_ECHO,
+ FILTER_GROUP_MDS0
+};
+
+struct obd_llog_group {
+ struct list_head olg_list;
+ int olg_group;
+ struct llog_ctxt *olg_ctxts[LLOG_MAX_CTXTS];
+ cfs_waitq_t olg_waitq;
+ spinlock_t olg_lock;
+ struct obd_export *olg_exp;
+};
+
/* corresponds to one of the obd's */
#define MAX_OBD_NAME 128
#define OBD_DEVICE_MAGIC 0XAB5CD6EF
struct obd_statfs obd_osfs; /* locked by obd_osfs_lock */
__u64 obd_osfs_age;
struct lvfs_run_ctxt obd_lvfs_ctxt;
- struct llog_ctxt *obd_llog_ctxt[LLOG_MAX_CTXTS];
+ struct obd_llog_group obd_olg; /* default llog group */
struct obd_device *obd_observer;
struct obd_notify_upcall obd_upcall;
struct obd_export *obd_self_export;
int cmd, obd_off *);
/* llog related obd_methods */
- int (*o_llog_init)(struct obd_device *obd, struct obd_llogs *llog,
+ int (*o_llog_init)(struct obd_device *obd, int group,
struct obd_device *disk_obd, int count,
struct llog_catid *logid, struct obd_uuid *uuid);
int (*o_llog_finish)(struct obd_device *obd, int count);
#define OBD_FAIL_MDS_OSC_PRECREATE 0x13d
#define OBD_FAIL_MDS_LOV_SYNC_RACE 0x13e
#define OBD_FAIL_MDS_CLOSE_NET_REP 0x13f
+#define OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT 0x140
#define OBD_FAIL_OST 0x200
#define OBD_FAIL_OST_CONNECT_NET 0x201
#define OBD_FAIL_OST_SETATTR_CREDITS 0x21e
#define OBD_FAIL_OST_HOLD_WRITE_RPC 0x21f
#define OBD_FAIL_OST_BRW_WRITE_BULK2 0x220
+#define OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT 0x221
+#define OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT 0x222
#define OBD_FAIL_LDLM 0x300
#define OBD_FAIL_LDLM_NAMESPACE_NEW 0x301
exp = class_conn2export(&mgc_conn);
- ctxt = exp->exp_obd->obd_llog_ctxt[LLOG_CONFIG_REPL_CTXT];
+ ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
cfg->cfg_flags |= CFG_F_COMPAT146;
rc = class_config_parse_llog(ctxt, profile, cfg);
+ llog_ctxt_put(ctxt);
if (rc) {
CERROR("class_config_parse_llog failed: rc = %d\n", rc);
}
RETURN(-EINVAL);
}
- rc = obd_llog_init(obd, NULL, mdc_obd, 0, NULL, tgt_uuid);
+ rc = obd_llog_init(obd, OBD_LLOG_GROUP, mdc_obd, 0, NULL, tgt_uuid);
if (rc) {
lmv_init_unlock(lmv);
CERROR("lmv failed to setup llogging subsystems\n");
RETURN(rc);
}
-static int lmv_llog_init(struct obd_device *obd, struct obd_llogs* llogs,
+static int lmv_llog_init(struct obd_device *obd, int group,
struct obd_device *tgt, int count,
struct llog_catid *logid, struct obd_uuid *uuid)
{
+#if 0
struct llog_ctxt *ctxt;
int rc;
ENTRY;
- rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+ LASSERT(group == OBD_LLOG_GROUP);
+ rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
&llog_client_ops);
if (rc == 0) {
- ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
- ctxt->loc_imp = tgt->u.cli.cl_import;
+ ctxt = llog_group_get_ctxt(&obd->obd_olg, LLOG_CONFIG_REPL_CTXT);
+ llog_initiator_connect(ctxt, tgt);
+ llog_ctxt_put(ctxt);
}
-
RETURN(rc);
+#else
+ return 0;
+#endif
}
static int lmv_llog_finish(struct obd_device *obd, int count)
{
- int rc;
+ struct llog_ctxt *ctxt;
+ int rc = 0;
ENTRY;
- rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
+ ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
+ if (ctxt)
+ rc = llog_cleanup(ctxt);
+
RETURN(rc);
}
LASSERT(ctxt);
if (lsm->lsm_array && lsm->lsm_array->lai_ext_array)
- RETURN(0);
+ GOTO(release_ctxt, rc = 0);
CDEBUG(D_INFO, "get lsm logid: "LPU64":"LPU64"\n",
lsm->lsm_array->lai_array_id.lgl_oid,
OBD_ALLOC(lsm->lsm_array->lai_ext_array,lsm->lsm_array->lai_ext_count *
sizeof (struct lov_extent));
if (!lsm->lsm_array->lai_ext_array)
- RETURN(-ENOMEM);
+ GOTO(release_ctxt, rc = -ENOMEM);
CDEBUG(D_INFO, "get lsm logid: "LPU64":"LPU64"\n",
lsm->lsm_array->lai_array_id.lgl_oid,
out:
if (rc)
lovea_free_array_info(lsm);
+release_ctxt:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
ENTRY;
LASSERT(md_exp != NULL);
+ /*for those orphan inode, we should keep array id*/
+ if (!(oa->o_valid & OBD_MD_FLCOOKIE))
+ RETURN(rc);
+
ctxt = llog_get_context(md_exp->exp_obd, LLOG_LOVEA_REPL_CTXT);
if (!ctxt)
- GOTO(out, rc = -EINVAL);
+ RETURN(-EINVAL);
LASSERT(lsm->lsm_array != NULL);
- /*for those orphan inode, we should keep array id*/
- if (!(oa->o_valid & OBD_MD_FLCOOKIE))
- RETURN(0);
-
- LASSERT(ctxt != NULL);
rc = llog_create(ctxt, &llh, &lsm->lsm_array->lai_array_id,
NULL);
if (rc)
}
llog_free_handle(llh);
out:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
void lov_putref(struct obd_device *obd);
/* lov_log.c */
-int lov_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+int lov_llog_init(struct obd_device *obd, int group,
struct obd_device *tgt, int count, struct llog_catid *logid,
struct obd_uuid *uuid);
int lov_llog_finish(struct obd_device *obd, int count);
LASSERT(lsm->lsm_object_gr == loi->loi_gr);
rc1 = llog_add(cctxt, rec, NULL, logcookies + rc,
numcookies - rc);
+ llog_ctxt_put(cctxt);
if (rc1 < 0)
RETURN(rc1);
rc += rc1;
child = lov->lov_tgts[i]->ltd_exp->exp_obd;
cctxt = llog_get_context(child, ctxt->loc_idx);
rc = llog_connect(cctxt, 1, logid, gen, uuid);
+ llog_ctxt_put(cctxt);
+
if (rc) {
CERROR("error osc_llog_connect tgt %d (%d)\n", i, rc);
if (!err)
int err;
err = llog_cancel(cctxt, NULL, 1, cookies, flags);
+ llog_ctxt_put(cctxt);
if (err && lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
CERROR("error: objid "LPX64" subobj "LPX64
" on OST idx %d: rc = %d\n", lsm->lsm_object_id,
lop_cancel: lov_llog_repl_cancel
};
-int lov_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+int lov_llog_init(struct obd_device *obd, int group,
struct obd_device *tgt, int count, struct llog_catid *logid,
struct obd_uuid *uuid)
{
int i, rc = 0, err = 0;
ENTRY;
- rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
+ LASSERT(group == OBD_LLOG_GROUP);
+ rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
&lov_mds_ost_orig_logops);
if (rc)
RETURN(rc);
- rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
&lov_size_repl_logops);
if (rc)
RETURN(rc);
CDEBUG(D_CONFIG, "init %d/%d\n", i, count);
LASSERT(lov->lov_tgts[i]->ltd_exp);
child = lov->lov_tgts[i]->ltd_exp->exp_obd;
- rc = obd_llog_init(child, llogs, tgt, 1, logid + i, uuid);
+ rc = obd_llog_init(child, group, tgt, 1, logid + i, uuid);
if (rc) {
CERROR("error osc_llog_init idx %d osc '%s' tgt '%s' "
"(rc=%d)\n", i, child->obd_name, tgt->obd_name,
case OBD_IOC_PARSE: {
ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
+ llog_ctxt_put(ctxt);
GOTO(out, rc);
}
#ifdef __KERNEL__
case OBD_IOC_LLOG_PRINT: {
ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
rc = llog_ioctl(ctxt, cmd, data);
-
+ llog_ctxt_put(ctxt);
GOTO(out, rc);
}
#endif
sptlrpc_lprocfs_cliobd_attach(obd);
ptlrpc_lprocfs_register_obd(obd);
- rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL);
+ rc = obd_llog_init(obd, OBD_LLOG_GROUP, obd, 0, NULL, NULL);
if (rc) {
mdc_cleanup(obd);
CERROR("failed to setup llogging subsystems\n");
}
-static int mdc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+static int mdc_llog_init(struct obd_device *obd, int group,
struct obd_device *tgt, int count,
struct llog_catid *logid, struct obd_uuid *uuid)
{
struct llog_ctxt *ctxt;
+ struct obd_llog_group *olg = &obd->obd_olg;
int rc;
ENTRY;
- rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
- &llog_client_ops);
- if (rc == 0) {
- ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
- ctxt->loc_imp = obd->u.cli.cl_import;
- }
+ LASSERT(group == OBD_LLOG_GROUP);
+ LASSERT(olg->olg_group == group);
- rc = llog_setup(obd, llogs, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL,
- &llog_client_ops);
+ rc = llog_setup(obd, olg, LLOG_LOVEA_REPL_CTXT, tgt, 0,
+ NULL, &llog_client_ops);
if (rc == 0) {
ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
- ctxt->loc_imp = obd->u.cli.cl_import;
+ llog_initiator_connect(ctxt);
+ llog_ctxt_put(ctxt);
}
RETURN(rc);
static int mdc_llog_finish(struct obd_device *obd, int count)
{
- int rc;
+ struct llog_ctxt *ctxt;
+ int rc = 0;
ENTRY;
- rc = llog_cleanup(llog_get_context(obd, LLOG_LOVEA_REPL_CTXT));
- if (rc) {
- CERROR("can not cleanup LLOG_CONFIG_REPL_CTXT rc %d\n", rc);
- }
- rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
+ ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
+ if (ctxt)
+ rc = llog_cleanup(ctxt);
+
RETURN(rc);
}
}
static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg)
{
- int rc;
+ int rc = 0;
ENTRY;
- rc = llog_start_commit_thread();
- if (rc < 0)
- RETURN(rc);
-
if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
class_uuid_t uuid;
int rc = 0;
ENTRY;
- rc = llog_setup(obd, NULL, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
+ rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
&llog_lvfs_ops);
if (rc)
RETURN(rc);
- rc = llog_setup(obd, NULL, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
+ rc = llog_setup(obd, &obd->obd_olg, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
&llog_lvfs_ops);
if (rc)
RETURN(rc);
RETURN(0);
LASSERT(!obd->obd_recovering);
- LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
+ LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
+
/* clean PENDING dir */
if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
rc = mds_cleanup_pending(obd);
/* mds/mds_log.c */
-int mds_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+int mds_llog_init(struct obd_device *obd, int group,
struct obd_device *tgt, int count,
struct llog_catid *logid, struct obd_uuid *uuid);
int mds_llog_finish(struct obd_device *obd, int count);
struct lov_mds_md_join *head_lmmj = NULL, *tail_lmmj = NULL;
int lmm_size, rc = 0, cleanup_phase = 0, size;
struct llog_handle *llh_head = NULL, *llh_tail = NULL;
- struct llog_ctxt *ctxt;
+ struct llog_ctxt *ctxt = NULL;
struct mds_rec_join *join_rec;
ENTRY;
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
ctxt = llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT);
+ LASSERT(ctxt != NULL);
cleanup_phase = 2;
if (le32_to_cpu(head_lmm->lmm_magic) == LOV_MAGIC) { /*simple file */
struct llog_logid *llog_array;
case 3:
llog_close(llh_head);
case 2:
+ llog_ctxt_put(ctxt);
if (head_lmmj && ((void*)head_lmmj != (void*)head_lmm))
OBD_FREE_PTR(head_lmmj);
lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
rc = llog_add(lctxt, rec, lsm, logcookies, numcookies);
+ llog_ctxt_put(lctxt);
+
RETURN(rc);
}
lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
rc = llog_connect(lctxt, count, logid, gen, uuid);
+ llog_ctxt_put(lctxt);
RETURN(rc);
}
lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
rc = llog_cancel(lctxt, lsm, count, cookies, flags);
+ llog_ctxt_put(lctxt);
RETURN(rc);
}
ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
rc = llog_add(ctxt, &lur->lur_hdr, lsm, logcookies,
cookies_size / sizeof(struct llog_cookie));
+ llog_ctxt_put(ctxt);
OBD_FREE(lur, sizeof(*lur));
out:
rc = llog_add(ctxt, &lsr->lsr_hdr, lsm, logcookies,
cookies_size / sizeof(struct llog_cookie));
+ llog_ctxt_put(ctxt);
+
OBD_FREE(lsr, sizeof(*lsr));
out:
obd_free_memmd(mds->mds_osc_exp, &lsm);
lop_cancel: mds_llog_repl_cancel,
};
-int mds_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+int mds_llog_init(struct obd_device *obd, int group,
struct obd_device *tgt, int count, struct llog_catid *logid,
struct obd_uuid *uuid)
{
int rc;
ENTRY;
- rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
+ LASSERT(group == OBD_LLOG_GROUP);
+ rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
&mds_ost_orig_logops);
if (rc)
RETURN(rc);
- rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
&mds_size_repl_logops);
if (rc)
RETURN(rc);
- rc = obd_llog_init(lov_obd, llogs, tgt, count, logid, uuid);
+ rc = obd_llog_init(lov_obd, group, tgt, count, logid, uuid);
if (rc)
CERROR("lov_llog_init err %d\n", rc);
/* If we added a target we have to reconnect the llogs */
/* We only _need_ to do this at first add (idx), or the first time
after recovery. However, it should now be safe to call anytime. */
- rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
+ rc = llog_cat_initialize(obd, &obd->obd_olg,
+ mds->mds_lov_desc.ld_tgt_count, NULL);
/*XXX this notifies the MDD until lov handling use old mds code */
if (obd->obd_upcall.onu_owner) {
if (rc)
GOTO(err_reg, rc);
+ /* tgt_count may be 0! */
+ rc = llog_cat_initialize(obd, &obd->obd_olg,
+ mds->mds_lov_desc.ld_tgt_count, NULL);
+ if (rc) {
+ CERROR("failed to initialize catalog %d\n", rc);
+ GOTO(err_reg, rc);
+ }
+
/* If we're mounting this code for the first time on an existing FS,
* we need to populate the objids array from the real OST values */
if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
switch (cmd) {
case OBD_IOC_RECORD: {
char *name = data->ioc_inlbuf1;
+ struct llog_ctxt *ctxt;
+
if (mds->mds_cfg_llh)
RETURN(-EBUSY);
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
- &mds->mds_cfg_llh, NULL, name);
+ rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
+ llog_ctxt_put(ctxt);
if (rc == 0)
llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
&cfg_uuid);
case OBD_IOC_CLEAR_LOG: {
char *name = data->ioc_inlbuf1;
+ struct llog_ctxt *ctxt;
if (mds->mds_cfg_llh)
RETURN(-EBUSY);
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
- &mds->mds_cfg_llh, NULL, name);
+ rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
+ llog_ctxt_put(ctxt);
if (rc == 0) {
llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
NULL);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
if (rc)
RETURN(rc);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
if (rc)
RETURN(rc);
push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
rc = llog_ioctl(ctxt, cmd, data);
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
- llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
+ llog_cat_initialize(obd, &obd->obd_olg,
+ mds->mds_lov_desc.ld_tgt_count, NULL);
group = FILTER_GROUP_MDS0 + mds->mds_id;
+ llog_ctxt_put(ctxt);
rc2 = obd_set_info_async(mds->mds_osc_exp,
strlen(KEY_MDS_CONN), KEY_MDS_CONN,
sizeof(group), &group, NULL);
push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
rc = llog_ioctl(ctxt, cmd, data);
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
struct obd_uuid *uuid;
__u32 idx = mlsi->mlsi_index;
struct mds_group_info mgi;
+ struct llog_ctxt *ctxt;
int rc = 0;
ENTRY;
if (rc)
GOTO(out, rc);
- rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
- mds->mds_lov_desc.ld_tgt_count,
- NULL, NULL, uuid);
+ ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
+ if (!ctxt)
+ RETURN(-ENODEV);
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
+ rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
+ NULL, NULL, uuid);
+ llog_ctxt_put(ctxt);
if (rc != 0) {
CERROR("%s failed at llog_origin_connect: %d\n",
obd_uuid2str(uuid), rc);
}
/* We should update init llog here too for replay unlink and
* possiable llog init race when recovery complete */
- llog_cat_initialize(obd, NULL,
+ llog_cat_initialize(obd, &obd->obd_olg,
obd->u.mds.mds_lov_desc.ld_tgt_count,
&watched->u.cli.cl_target_uuid);
mutex_up(&obd->obd_dev_sem);
RETURN(rc);
}
- LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
+ LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
rc = mds_lov_start_synchronize(obd, watched, data,
!(ev == OBD_NOTIFY_SYNC));
rc = llog_cancel(ctxt, lsm, mlcd->mlcd_cookielen /
sizeof(*mlcd->mlcd_cookies),
mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW);
+ llog_ctxt_put(ctxt);
+
if (rc)
CERROR("error cancelling %d log cookies: rc %d\n",
(int)(mlcd->mlcd_cookielen /
if (rc)
GOTO(err_decref, rc);
- rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL);
+ rc = obd_llog_init(obd, OBD_LLOG_GROUP, obd, 0, NULL, NULL);
if (rc) {
CERROR("failed to setup llogging subsystems\n");
GOTO(err_cleanup, rc);
RETURN(rc);
}
-static int mgc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+static int mgc_llog_init(struct obd_device *obd, int group,
struct obd_device *tgt, int count,
struct llog_catid *logid, struct obd_uuid *uuid)
{
struct llog_ctxt *ctxt;
+ struct obd_llog_group *olg = &obd->obd_olg;
int rc;
ENTRY;
- rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+ LASSERT(group == olg->olg_group);
+ LASSERT(group == OBD_LLOG_GROUP);
+ rc = llog_setup(obd, olg, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
&llog_client_ops);
if (rc == 0) {
- ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
- ctxt->loc_imp = obd->u.cli.cl_import;
+ ctxt = llog_group_get_ctxt(olg, LLOG_CONFIG_REPL_CTXT);
+ llog_initiator_connect(ctxt);
+ llog_ctxt_put(ctxt);
}
RETURN(rc);
if (rc)
GOTO(err_decref, rc);
- rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL);
+ rc = obd_llog_init(obd, OBD_LLOG_GROUP, obd, 0, NULL, NULL);
if (rc) {
CERROR("failed to setup llogging subsystems\n");
GOTO(err_cleanup, rc);
RETURN(rc);
}
-static int mgc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+static int mgc_llog_init(struct obd_device *obd, int group,
struct obd_device *tgt, int count,
struct llog_catid *logid, struct obd_uuid *uuid)
{
struct llog_ctxt *ctxt;
+ struct obd_llog_group *olg = &obd->obd_olg;
int rc;
ENTRY;
- rc = llog_setup(obd, llogs, LLOG_CONFIG_ORIG_CTXT, tgt, 0, NULL,
+ LASSERT(group == OBD_LLOG_GROUP);
+ LASSERT(olg->olg_group == group);
+
+ rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, tgt, 0, NULL,
&llog_lvfs_ops);
if (rc)
RETURN(rc);
- rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, olg, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
&llog_client_ops);
if (rc == 0) {
ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
- ctxt->loc_imp = obd->u.cli.cl_import;
+ llog_initiator_connect(ctxt);
+ llog_ctxt_put(ctxt);
}
RETURN(rc);
static int mgc_llog_finish(struct obd_device *obd, int count)
{
- int rc;
+ struct llog_ctxt *ctxt;
+ int rc = 0, rc2 = 0;
ENTRY;
- rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
- rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
+ ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
+ if (ctxt)
+ rc = llog_cleanup(ctxt);
+
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+ if (ctxt)
+ rc2 = llog_cleanup(ctxt);
+
+ if (!rc)
+ rc = rc2;
RETURN(rc);
}
/* Now, whether we copied or not, start using the local llog.
If we failed to copy, we'll start using whatever the old
log has. */
+ llog_ctxt_put(ctxt);
ctxt = lctxt;
}
copy of the instance for the update. The cfg_last_idx will
be updated here. */
rc = class_config_parse_llog(ctxt, cld->cld_logname, &cld->cld_cfg);
-
- out_pop:
+out_pop:
+ llog_ctxt_put(ctxt);
+ if (ctxt != lctxt)
+ llog_ctxt_put(lctxt);
if (must_pop)
pop_ctxt(&saved, &mgc->obd_lvfs_ctxt, NULL);
static int mgs_cleanup(struct obd_device *obd);
static int mgs_handle(struct ptlrpc_request *req);
+static int mgs_llog_init(struct obd_device *obd, int group,
+ struct obd_device *tgt, int count,
+ struct llog_catid *logid, struct obd_uuid *uuid)
+{
+ struct obd_llog_group *olg = &obd->obd_olg;
+ int rc;
+ ENTRY;
+
+ LASSERT(group == OBD_LLOG_GROUP);
+ LASSERT(olg->olg_group == group);
+
+ rc = llog_setup(obd, olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
+ &llog_lvfs_ops);
+ RETURN(rc);
+}
+
+static int mgs_llog_finish(struct obd_device *obd, int count)
+{
+ struct llog_ctxt *ctxt;
+ int rc = 0;
+ ENTRY;
+
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+ if (ctxt)
+ rc = llog_cleanup(ctxt);
+
+ RETURN(rc);
+}
+
/* Start the MGS obd */
static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
{
GOTO(err_ns, rc);
}
- rc = llog_start_commit_thread();
- if (rc < 0)
- GOTO(err_fs, rc);
-
- rc = llog_setup(obd, NULL, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
- &llog_lvfs_ops);
+ rc = obd_llog_init(obd, OBD_LLOG_GROUP, obd, 0, NULL, NULL);
if (rc)
GOTO(err_fs, rc);
if (!mgs->mgs_service) {
CERROR("failed to start service\n");
- GOTO(err_fs, rc = -ENOMEM);
+ GOTO(err_llog, rc = -ENOMEM);
}
rc = ptlrpc_start_threads(obd, mgs->mgs_service);
err_thread:
ptlrpc_unregister_service(mgs->mgs_service);
+err_llog:
+ obd_llog_finish(obd, 0);
err_fs:
/* No extra cleanup needed for llog_init_commit_thread() */
mgs_fs_cleanup(obd);
case OBD_CLEANUP_EXPORTS:
break;
case OBD_CLEANUP_SELF_EXP:
- llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
rc = obd_llog_finish(obd, 0);
break;
case OBD_CLEANUP_OBD:
}
case OBD_IOC_DUMP_LOG: {
- struct llog_ctxt *ctxt =
- llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+ struct llog_ctxt *ctxt;
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- if (rc)
- RETURN(rc);
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
case OBD_IOC_LLOG_CHECK:
case OBD_IOC_LLOG_INFO:
case OBD_IOC_LLOG_PRINT: {
- struct llog_ctxt *ctxt =
- llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+ struct llog_ctxt *ctxt;
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
rc = llog_ioctl(ctxt, cmd, data);
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
.o_cleanup = mgs_cleanup,
.o_destroy_export = mgs_destroy_export,
.o_iocontrol = mgs_iocontrol,
+ .o_llog_init = mgs_llog_init,
+ .o_llog_finish = mgs_llog_finish
};
static int __init mgs_init(void)
char *logname;
struct llog_handle *loghandle;
struct lvfs_run_ctxt saved;
+ struct llog_ctxt *ctxt;
int rc, rc2;
ENTRY;
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+ LASSERT(ctxt != NULL);
name_create(&logname, fsdb->fsdb_name, "-client");
down(&fsdb->fsdb_sem);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-
- rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
- &loghandle, NULL, logname);
+ rc = llog_create(ctxt, &loghandle, NULL, logname);
if (rc)
GOTO(out_pop, rc);
rc2 = llog_close(loghandle);
if (!rc)
rc = rc2;
-
out_pop:
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
up(&fsdb->fsdb_sem);
name_destroy(&logname);
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
{
struct llog_handle *loghandle;
struct lvfs_run_ctxt saved;
+ struct llog_ctxt *ctxt;
struct mgs_modify_lookup *mml;
int rc, rc2;
ENTRY;
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
- &loghandle, NULL, logname);
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+ LASSERT(ctxt != NULL);
+ rc = llog_create(ctxt, &loghandle, NULL, logname);
if (rc)
GOTO(out_pop, rc);
rc2 = llog_close(loghandle);
if (!rc)
rc = rc2;
-
out_pop:
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
if (rc && rc != -ENODEV)
CERROR("modify %s/%s failed %d\n",
mti->mti_svname, comment, rc);
-
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
{
static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
struct lvfs_run_ctxt saved;
+ struct llog_ctxt *ctxt;
int rc = 0;
- if (*llh) {
+ if (*llh)
GOTO(out, rc = -EBUSY);
- }
- push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+ if (!ctxt)
+ GOTO(out, rc = -ENODEV);
- rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
- llh, NULL, name);
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ rc = llog_create(ctxt, llh, NULL, name);
if (rc == 0)
llog_init_handle(*llh, LLOG_F_IS_PLAIN, &cfg_uuid);
else
*llh = NULL;
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
out:
if (rc) {
{
struct lvfs_run_ctxt saved;
struct llog_handle *llh;
+ struct llog_ctxt *ctxt;
int rc = 0;
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+ LASSERT(ctxt != NULL);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
- &llh, NULL, name);
+ rc = llog_create(ctxt, &llh, NULL, name);
if (rc == 0) {
llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
rc = llog_get_size(llh);
llog_close(llh);
}
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
/* header is record 1 */
return(rc <= 1);
}
struct llog_handle *loghandle;
struct lvfs_run_ctxt saved;
struct mgs_target_info *tmti;
+ struct llog_ctxt *ctxt;
int rc, rc2;
ENTRY;
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+ LASSERT(ctxt != NULL);
+
OBD_ALLOC_PTR(tmti);
if (tmti == NULL)
RETURN(-ENOMEM);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
- &loghandle, NULL, client_name);
+ rc = llog_create(ctxt, &loghandle, NULL, client_name);
if (rc)
GOTO(out_pop, rc);
out_pop:
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
OBD_FREE_PTR(tmti);
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
int mgs_erase_log(struct obd_device *obd, char *name)
{
struct lvfs_run_ctxt saved;
+ struct llog_ctxt *ctxt;
struct llog_handle *llh;
int rc = 0;
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+ LASSERT(ctxt != NULL);
+
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
- &llh, NULL, name);
+ rc = llog_create(ctxt, &llh, NULL, name);
if (rc == 0) {
llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
rc = llog_destroy(llh);
llog_free_handle(llh);
}
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
if (rc)
CERROR("failed to clear log %s: %d\n", name, rc);
struct obd_export *doomed_exp = NULL;
int exports_evicted = 0;
- lnet_nid_t nid_key = libcfs_str2nid(nid);
+ lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
do {
doomed_exp = lustre_hash_get_object_by_key(obd->obd_nid_hash_body,
#include "llog_internal.h"
/* helper functions for calling the llog obd methods */
+static struct llog_ctxt* llog_new_ctxt(struct obd_device *obd)
+{
+ struct llog_ctxt *ctxt;
-int llog_cleanup(struct llog_ctxt *ctxt)
+ OBD_ALLOC_PTR(ctxt);
+ if (!ctxt)
+ return NULL;
+
+ ctxt->loc_obd = obd;
+ atomic_set(&ctxt->loc_refcount, 1);
+
+ return ctxt;
+}
+
+static void llog_ctxt_destroy(struct llog_ctxt *ctxt)
{
+ if (ctxt->loc_exp)
+ class_export_put(ctxt->loc_exp);
+ OBD_FREE_PTR(ctxt);
+ return;
+}
+
+int __llog_ctxt_put(struct llog_ctxt *ctxt)
+{
+ struct obd_llog_group *olg = ctxt->loc_olg;
+ struct obd_device *obd;
int rc = 0;
+
+ spin_lock(&olg->olg_lock);
+ if (!atomic_dec_and_test(&ctxt->loc_refcount)) {
+ spin_unlock(&olg->olg_lock);
+ return rc;
+ }
+ olg->olg_ctxts[ctxt->loc_idx] = NULL;
+ spin_unlock(&olg->olg_lock);
+
+ obd = ctxt->loc_obd;
+ spin_lock(&obd->obd_dev_lock);
+ spin_unlock(&obd->obd_dev_lock); /* sync with llog ctxt user thread */
+ LASSERT(obd->obd_stopping == 1 || obd->obd_set_up == 0);
+ /* cleanup the llog ctxt here */
+ if (CTXTP(ctxt, cleanup))
+ rc = CTXTP(ctxt, cleanup)(ctxt);
+
+ llog_ctxt_destroy(ctxt);
+ wake_up(&olg->olg_waitq);
+ return rc;
+}
+EXPORT_SYMBOL(__llog_ctxt_put);
+
+int llog_cleanup(struct llog_ctxt *ctxt)
+{
+ struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+ struct obd_llog_group *olg;
+ int rc, idx;
ENTRY;
if (!ctxt) {
CERROR("No ctxt\n");
RETURN(-ENODEV);
}
-
- if (CTXTP(ctxt, cleanup))
- rc = CTXTP(ctxt, cleanup)(ctxt);
- ctxt->loc_obd->obd_llog_ctxt[ctxt->loc_idx] = NULL;
- if (ctxt->loc_exp)
- class_export_put(ctxt->loc_exp);
- OBD_FREE(ctxt, sizeof(*ctxt));
+ olg = ctxt->loc_olg;
+ idx = ctxt->loc_idx;
+
+ /* banlance the ctxt get when calling llog_cleanup */
+ LASSERT(atomic_read(&ctxt->loc_refcount) > 1);
+ llog_ctxt_put(ctxt);
+
+ /* try to free the ctxt */
+ rc = __llog_ctxt_put(ctxt);
+
+ l_wait_event(olg->olg_waitq,
+ llog_group_ctxt_null(olg, idx), &lwi);
RETURN(rc);
}
EXPORT_SYMBOL(llog_cleanup);
-int llog_setup(struct obd_device *obd, struct obd_llogs *llogs, int index,
+int llog_setup(struct obd_device *obd, struct obd_llog_group *olg, int index,
struct obd_device *disk_obd, int count, struct llog_logid *logid,
struct llog_operations *op)
{
if (index < 0 || index >= LLOG_MAX_CTXTS)
RETURN(-EFAULT);
- /* in some recovery cases, obd_llog_ctxt might already be set,
+ LASSERT(olg != NULL);
+ ctxt = llog_group_get_ctxt(olg, index);
+
+ /* in some recovery cases, obd_llog_ctxt might already be set,
* but llogs might still be zero, for example in obd_filter recovery */
- if (obd->obd_llog_ctxt[index] &&
- (!llogs || (llogs && llogs->llog_ctxt[index]))) {
+ if (ctxt) {
/* mds_lov_update_mds might call here multiple times. So if the
llog is already set up then don't to do it again. */
- CDEBUG(D_CONFIG, "obd %s ctxt %d already set up\n",
+ CDEBUG(D_CONFIG, "obd %s ctxt %d already set up\n",
obd->obd_name, index);
- ctxt = obd->obd_llog_ctxt[index];
+ LASSERT(ctxt->loc_olg == olg);
LASSERT(ctxt->loc_obd == obd);
LASSERT(ctxt->loc_exp == disk_obd->obd_self_export);
LASSERT(ctxt->loc_logops == op);
+ llog_ctxt_put(ctxt);
GOTO(out, rc = 0);
}
-
- OBD_ALLOC(ctxt, sizeof(*ctxt));
+ ctxt = llog_new_ctxt(obd);
if (!ctxt)
GOTO(out, rc = -ENOMEM);
- if (llogs)
- llogs->llog_ctxt[index] = ctxt;
-
- if (!obd->obd_llog_ctxt[index])
- obd->obd_llog_ctxt[index] = ctxt;
-
+ rc = llog_group_set_ctxt(olg, ctxt, index);
+ if (rc) {
+ llog_ctxt_destroy(ctxt);
+ if (rc == -EEXIST)
+ rc = 0;
+ GOTO(out, rc);
+ }
ctxt->loc_obd = obd;
ctxt->loc_exp = class_export_get(disk_obd->obd_self_export);
+ ctxt->loc_olg = olg;
ctxt->loc_idx = index;
ctxt->loc_logops = op;
sema_init(&ctxt->loc_sem, 1);
if (op->lop_setup)
- rc = op->lop_setup(obd, llogs, index, disk_obd, count, logid);
+ rc = op->lop_setup(obd, olg, index, disk_obd, count, logid);
if (rc) {
- obd->obd_llog_ctxt[index] = NULL;
- class_export_put(ctxt->loc_exp);
- OBD_FREE(ctxt, sizeof(*ctxt));
+ llog_ctxt_destroy(ctxt);
}
-
out:
RETURN(rc);
}
/* lop_setup method for filter/osc */
// XXX how to set exports
-int llog_obd_origin_setup(struct obd_device *obd, struct obd_llogs *llogs,
+int llog_obd_origin_setup(struct obd_device *obd, struct obd_llog_group *olg,
int index, struct obd_device *disk_obd, int count,
struct llog_logid *logid)
{
LASSERT(count == 1);
- if (!llogs)
- ctxt = llog_get_context(obd, index);
- else
- ctxt = llog_get_context_from_llogs(llogs, index);
+ LASSERT(olg != NULL);
+ ctxt = llog_group_get_ctxt(olg, index);
LASSERT(ctxt);
llog_gen_init(ctxt);
rc = llog_process(handle, (llog_cb_t)cat_cancel_cb, NULL, NULL);
if (rc)
CERROR("llog_process with cat_cancel_cb failed: %d\n", rc);
- out:
+out:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
EXPORT_SYMBOL(llog_obd_origin_setup);
llog_cat_set_first_idx(cathandle, index);
rc = llog_cancel_rec(cathandle, index);
if (rc == 0)
- CDEBUG(D_HA, "cancel plain log at index"
- " %u of catalog "LPX64"\n",
+ CDEBUG(D_RPCTRACE, "cancel plain log at"
+ "index %u of catalog "LPX64"\n",
index,cathandle->lgh_id.lgl_oid);
}
}
}
EXPORT_SYMBOL(llog_obd_origin_add);
-int llog_cat_initialize(struct obd_device *obd, struct obd_llogs *llogs,
+int llog_cat_initialize(struct obd_device *obd, struct obd_llog_group *olg,
int count, struct obd_uuid *uuid)
{
char name[32] = CATLIST;
GOTO(out, rc);
}
- rc = obd_llog_init(obd, llogs, obd, count, idarray, uuid);
+ rc = obd_llog_init(obd, olg->olg_group, obd, count, idarray, uuid);
if (rc) {
CERROR("rc: %d\n", rc);
GOTO(out, rc);
}
EXPORT_SYMBOL(llog_cat_initialize);
-int obd_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+int obd_llog_init(struct obd_device *obd, int group,
struct obd_device *disk_obd, int count,
struct llog_catid *logid, struct obd_uuid *uuid)
{
OBD_CHECK_DT_OP(obd, llog_init, 0);
OBD_COUNTER_INCREMENT(obd, llog_init);
- rc = OBP(obd, llog_init)(obd, llogs, disk_obd, count, logid,
- uuid);
+ rc = OBP(obd, llog_init)(obd, group, disk_obd, count, logid, uuid);
RETURN(rc);
}
EXPORT_SYMBOL(obd_llog_init);
rc = llog_create(ctxt, &llh, NULL, name);
if (rc) {
CERROR("1a: llog_create with name %s failed: %d\n", name, rc);
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
llog_init_handle(llh, LLOG_F_IS_PLAIN, &uuid);
out:
CWARN("1b: close newly-created log\n");
rc2 = llog_close(llh);
+ llog_ctxt_put(ctxt);
if (rc2) {
CERROR("1b: close log %s failed: %d\n", name, rc2);
if (rc == 0)
rc = llog_create(ctxt, llh, NULL, name);
if (rc) {
CERROR("2a: re-open log with name %s failed: %d\n", name, rc);
- RETURN(rc);
+ GOTO(out, rc);
}
llog_init_handle(*llh, LLOG_F_IS_PLAIN, &uuid);
if ((rc = verify_handle("2", *llh, 1)))
- RETURN(rc);
+ GOTO(out, rc);
CWARN("2b: create a log without specified NAME & LOGID\n");
rc = llog_create(ctxt, &loghandle, NULL, NULL);
if (rc) {
CERROR("2b: create log failed\n");
- RETURN(rc);
+ GOTO(out, rc);
}
llog_init_handle(loghandle, LLOG_F_IS_PLAIN, &uuid);
logid = loghandle->lgh_id;
rc = llog_create(ctxt, &loghandle, &logid, NULL);
if (rc) {
CERROR("2b: re-open log by LOGID failed\n");
- RETURN(rc);
+ GOTO(out, rc);
}
llog_init_handle(loghandle, LLOG_F_IS_PLAIN, &uuid);
rc = llog_destroy(loghandle);
if (rc) {
CERROR("2b: destroy log failed\n");
- RETURN(rc);
+ GOTO(out, rc);
}
llog_free_handle(loghandle);
+out:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
}
num_recs++;
if ((rc = verify_handle("4b", cath, 2)))
- RETURN(rc);
+ GOTO(ctxt_release, rc);
if ((rc = verify_handle("4b", cath->u.chd.chd_current_log, num_recs)))
- RETURN(rc);
+ GOTO(ctxt_release, rc);
CWARN("4c: cancel 1 log record\n");
rc = llog_cat_cancel_records(cath, 1, &cookie);
num_recs--;
if ((rc = verify_handle("4c", cath->u.chd.chd_current_log, num_recs)))
- RETURN(rc);
+ GOTO(ctxt_release, rc);
CWARN("4d: write 40,000 more log records\n");
for (i = 0; i < 40000; i++) {
out:
CWARN("4f: put newly-created catalog\n");
rc = llog_cat_put(cath);
+ctxt_release:
+ llog_ctxt_put(ctxt);
if (rc)
CERROR("1b: close log %s failed: %d\n", name, rc);
RETURN(rc);
rc = llog_cat_put(llh);
if (rc)
CERROR("1b: close log %s failed: %d\n", name, rc);
+ llog_ctxt_put(ctxt);
+
RETURN(rc);
}
if (mdc_obd == NULL) {
CERROR("6: no MDC devices connected to %s found.\n",
mds_uuid->uuid);
- RETURN(-ENOENT);
+ GOTO(ctxt_release, rc = -ENOENT);
}
rc = obd_connect(NULL,
&exph, mdc_obd, &uuid, NULL /* obd_connect_data */);
if (rc) {
CERROR("6: failed to connect to MDC: %s\n", mdc_obd->obd_name);
- RETURN(rc);
+ GOTO(ctxt_release, rc);
}
exp = class_conn2export(&exph);
rc = llog_create(nctxt, &llh, NULL, name);
if (rc) {
CERROR("6: llog_create failed %d\n", rc);
- RETURN(rc);
+ llog_ctxt_put(nctxt);
+ GOTO(ctxt_release, rc);
}
rc = llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
parse_out:
rc = llog_close(llh);
+ llog_ctxt_put(nctxt);
if (rc) {
CERROR("6: llog_close failed: rc = %d\n", rc);
}
-
rc = obd_disconnect(exp);
-
+ctxt_release:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
rc = llog_create(ctxt, &llh, NULL, name);
if (rc) {
CERROR("7: llog_create with name %s failed: %d\n", name, rc);
- RETURN(rc);
+ GOTO(ctxt_release, rc);
}
llog_init_handle(llh, LLOG_F_IS_PLAIN, &uuid);
rc = llog_write_rec(llh, &lcr.lcr_hdr, NULL, 0, NULL, -1);
if (rc) {
CERROR("7: write one log record failed: %d\n", rc);
- RETURN(rc);
+ GOTO(ctxt_release, rc);
}
rc = llog_destroy(llh);
CERROR("7: llog_destroy failed: %d\n", rc);
else
llog_free_handle(llh);
+ctxt_release:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
case 0:
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
}
-
+ llog_ctxt_put(ctxt);
return rc;
}
-static int llog_test_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+static int llog_test_llog_init(struct obd_device *obd, int group,
struct obd_device *tgt, int count,
struct llog_catid *logid, struct obd_uuid *uuid)
{
int rc;
ENTRY;
- rc = llog_setup(obd, llogs, LLOG_TEST_ORIG_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, &obd->obd_olg, LLOG_TEST_ORIG_CTXT, tgt, 0, NULL,
&llog_lvfs_ops);
RETURN(rc);
}
RETURN(-EINVAL);
}
- rc = obd_llog_init(obd, NULL, tgt, 0, NULL, NULL);
+ rc = obd_llog_init(obd, OBD_LLOG_GROUP, tgt, 0, NULL, NULL);
if (rc)
RETURN(rc);
CFS_INIT_LIST_HEAD(&obd->obd_lock_replay_queue);
CFS_INIT_LIST_HEAD(&obd->obd_final_req_queue);
+ llog_group_init(&obd->obd_olg, OBD_LLOG_GROUP);
+
spin_lock_init(&obd->obd_uncommitted_replies_lock);
CFS_INIT_LIST_HEAD(&obd->obd_uncommitted_replies);
#include "filter_internal.h"
/* Group 0 is no longer a legal group, to catch uninitialized IDs */
-#define FILTER_MIN_GROUPS 3
+#define FILTER_MIN_GROUPS FILTER_GROUP_MDS0
static struct lvfs_callback_ops filter_lvfs_ops;
cfs_mem_cache_t *ll_fmd_cachep;
struct filter_client_data *fcd = fed->fed_fcd;
__u64 last_rcvd;
loff_t off;
- int err, log_pri = D_HA;
+ int err, log_pri = D_RPCTRACE;
/* Propagate error code. */
if (rc)
spin_lock_init(&filter->fo_translock);
spin_lock_init(&filter->fo_objidlock);
- INIT_LIST_HEAD(&filter->fo_export_list);
+ CFS_INIT_LIST_HEAD(&filter->fo_export_list);
sema_init(&filter->fo_alloc_lock, 1);
init_brw_stats(&filter->fo_filter_stats);
filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
"filter_ldlm_cb_client", &obd->obd_ldlm_client);
- rc = llog_cat_initialize(obd, NULL, 1, NULL);
+ rc = llog_cat_initialize(obd, &obd->obd_olg, 1, NULL);
if (rc) {
CERROR("failed to setup llogging subsystems\n");
GOTO(err_post, rc);
lop_add: llog_obd_origin_add
};
-static int filter_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+static int filter_llog_init(struct obd_device *obd, int group,
struct obd_device *tgt, int count,
struct llog_catid *catid,
struct obd_uuid *uuid)
{
+ struct filter_obd *filter = &obd->u.filter;
+ struct obd_llog_group *olg;
struct llog_ctxt *ctxt;
int rc;
ENTRY;
+ olg = filter_find_olg(obd, group);
+ if (IS_ERR(olg))
+ RETURN(PTR_ERR(olg));
+
+ if (group == OBD_LLOG_GROUP) {
+ LASSERT(filter->fo_lcm == NULL);
+ OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master));
+ if (!filter->fo_lcm)
+ RETURN(-ENOMEM);
+
+ rc = llog_init_commit_master((struct llog_commit_master *)
+ filter->fo_lcm);
+ if (rc)
+ GOTO(cleanup, rc);
+
filter_mds_ost_repl_logops = llog_client_ops;
filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel;
filter_mds_ost_repl_logops.lop_connect = llog_repl_connect;
filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
-
- rc = llog_setup(obd, llogs, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
+ } else {
+ LASSERT(filter->fo_lcm != NULL);
+ }
+ rc = llog_setup(obd, olg, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
&filter_mds_ost_repl_logops);
if (rc)
- RETURN(rc);
+ GOTO(cleanup, rc);
/* FIXME - assign unlink_cb for filter's recovery */
- if (!llogs)
- ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
- else
- ctxt = llog_get_context_from_llogs(llogs, LLOG_MDS_OST_REPL_CTXT);
+ LASSERT(olg);
+ ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
LASSERT(ctxt != NULL);
ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
+ ctxt->loc_lcm = obd->u.filter.fo_lcm;
+ rc = llog_start_commit_thread(ctxt->loc_lcm);
+ llog_ctxt_put(ctxt);
+ if (rc)
+ GOTO(cleanup, rc);
- rc = llog_setup(obd, llogs, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, olg, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
&filter_size_orig_logops);
- RETURN(rc);
-}
-
-static int filter_group_llog_cleanup(struct llog_ctxt *ctxt)
-{
- int rc = 0;
- ENTRY;
-
- if (CTXTP(ctxt, cleanup))
- rc = CTXTP(ctxt, cleanup)(ctxt);
-
- if (ctxt->loc_exp)
- class_export_put(ctxt->loc_exp);
- OBD_FREE(ctxt, sizeof(*ctxt));
+cleanup:
+ if (rc) {
+ llog_cleanup_commit_master(filter->fo_lcm, 0);
+ OBD_FREE(filter->fo_lcm, sizeof(struct llog_commit_master));
+ filter->fo_lcm = NULL;
+ }
RETURN(rc);
}
-static int filter_group_llog_finish(struct obd_llogs *llogs)
+static int filter_group_llog_finish(struct obd_llog_group *olg)
{
struct llog_ctxt *ctxt;
int rc = 0, rc2 = 0;
ENTRY;
- ctxt = llog_get_context_from_llogs(llogs, LLOG_MDS_OST_REPL_CTXT);
+ ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
if (ctxt)
- rc = filter_group_llog_cleanup(ctxt);
+ rc = llog_cleanup(ctxt);
- ctxt = llog_get_context_from_llogs(llogs, LLOG_SIZE_ORIG_CTXT);
+ ctxt = llog_group_get_ctxt(olg, LLOG_SIZE_ORIG_CTXT);
if (ctxt)
- rc2 = filter_group_llog_cleanup(ctxt);
+ rc2 = llog_cleanup(ctxt);
if (!rc)
rc = rc2;
static int filter_llog_finish(struct obd_device *obd, int count)
{
- struct llog_ctxt *ctxt;
- int rc = 0, rc2 = 0;
+ int rc;
ENTRY;
- ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
- if (ctxt)
- rc = llog_cleanup(ctxt);
-
- ctxt = llog_get_context(obd, LLOG_SIZE_ORIG_CTXT);
- if (ctxt)
- rc2 = llog_cleanup(ctxt);
- if (!rc)
- rc = rc2;
+ if (obd->u.filter.fo_lcm) {
+ llog_cleanup_commit_master((struct llog_commit_master *)
+ obd->u.filter.fo_lcm, 0);
+ OBD_FREE(obd->u.filter.fo_lcm,
+ sizeof(struct llog_commit_master));
+ obd->u.filter.fo_lcm = NULL;
+ }
+ /* finish obd llog group */
+ rc = filter_group_llog_finish(&obd->obd_olg);
RETURN(rc);
}
-struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group,
- struct obd_export *export)
+struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group)
{
- struct filter_group_llog *fglog, *nlog;
+ struct obd_llog_group *olg, *nolg;
struct filter_obd *filter;
- struct llog_ctxt *ctxt;
- struct list_head *cur;
int rc;
filter = &obd->u.filter;
+ if (group == OBD_LLOG_GROUP)
+ RETURN(&obd->obd_olg);
+
spin_lock(&filter->fo_llog_list_lock);
- list_for_each(cur, &filter->fo_llog_list) {
- fglog = list_entry(cur, struct filter_group_llog, list);
- if (fglog->group == group) {
- if (!(fglog->exp == NULL || fglog->exp == export || export == NULL))
- CWARN("%s: export for group %d changes: 0x%p -> 0x%p\n",
- obd->obd_name, group, fglog->exp, export);
+ list_for_each_entry(olg, &filter->fo_llog_list, olg_list) {
+ if (olg->olg_group == group) {
spin_unlock(&filter->fo_llog_list_lock);
- goto init;
+ RETURN(olg);
}
}
spin_unlock(&filter->fo_llog_list_lock);
- if (export == NULL)
- RETURN(NULL);
-
- OBD_ALLOC_PTR(fglog);
- if (fglog == NULL)
- RETURN(NULL);
- fglog->group = group;
-
- OBD_ALLOC_PTR(fglog->llogs);
- if (fglog->llogs == NULL) {
- OBD_FREE_PTR(fglog);
- RETURN(NULL);
- }
+ OBD_ALLOC_PTR(olg);
+ if (olg == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+ llog_group_init(olg, group);
spin_lock(&filter->fo_llog_list_lock);
- list_for_each(cur, &filter->fo_llog_list) {
- nlog = list_entry(cur, struct filter_group_llog, list);
- LASSERT(nlog->group != group);
+ list_for_each_entry(nolg, &filter->fo_llog_list, olg_list) {
+ LASSERT(nolg->olg_group != group);
}
- list_add(&fglog->list, &filter->fo_llog_list);
+ list_add(&olg->olg_list, &filter->fo_llog_list);
spin_unlock(&filter->fo_llog_list_lock);
- rc = llog_cat_initialize(obd, fglog->llogs, 1, NULL);
+ rc = llog_cat_initialize(obd, olg, 1, NULL);
if (rc) {
- OBD_FREE_PTR(fglog->llogs);
- OBD_FREE_PTR(fglog);
- RETURN(NULL);
- }
-
-init:
- if (export) {
- fglog->exp = export;
- ctxt = llog_get_context_from_llogs(fglog->llogs,
- LLOG_MDS_OST_REPL_CTXT);
- LASSERT(ctxt != NULL);
-
- llog_receptor_accept(ctxt, export->exp_imp_reverse);
+ spin_lock(&filter->fo_llog_list_lock);
+ list_del(&olg->olg_list);
+ spin_unlock(&filter->fo_llog_list_lock);
+ OBD_FREE_PTR(olg);
+ RETURN(ERR_PTR(rc));
}
- CDEBUG(D_OTHER, "%s: new llog 0x%p for group %u\n",
- obd->obd_name, fglog->llogs, group);
+ CDEBUG(D_OTHER, "%s: new llog group %u (0x%p)\n",
+ obd->obd_name, group, olg);
- RETURN(fglog->llogs);
+ RETURN(olg);
}
static int filter_llog_connect(struct obd_export *exp,
{
struct obd_device *obd = exp->exp_obd;
struct llog_ctxt *ctxt;
- struct obd_llogs *llog;
+ struct obd_llog_group *olg;
int rc;
ENTRY;
(unsigned) body->lgdc_logid.lgl_oid,
(unsigned) body->lgdc_logid.lgl_ogen);
- llog = filter_grab_llog_for_group(obd, body->lgdc_logid.lgl_ogr, exp);
- LASSERT(llog != NULL);
- ctxt = llog_get_context_from_llogs(llog, body->lgdc_ctxt_idx);
+ olg = filter_find_olg(obd, body->lgdc_logid.lgl_ogr);
+ if (IS_ERR(olg))
+ RETURN(PTR_ERR(olg));
+ llog_group_set_export(olg, exp);
+
+ ctxt = llog_group_get_ctxt(olg, body->lgdc_ctxt_idx);
LASSERTF(ctxt != NULL, "ctxt is not null, ctxt idx %d \n",
body->lgdc_ctxt_idx);
rc = llog_connect(ctxt, 1, &body->lgdc_logid,
&body->lgdc_gen, NULL);
+ llog_ctxt_put(ctxt);
if (rc != 0)
CERROR("failed to connect rc %d idx %d\n", rc,
body->lgdc_ctxt_idx);
static int filter_llog_preclean (struct obd_device *obd)
{
- struct filter_group_llog *log;
+ struct obd_llog_group *olg;
struct filter_obd *filter;
int rc = 0;
ENTRY;
+ rc = obd_llog_finish(obd, 0);
+ if (rc)
+ CERROR("failed to cleanup llogging subsystem\n");
+
filter = &obd->u.filter;
spin_lock(&filter->fo_llog_list_lock);
while (!list_empty(&filter->fo_llog_list)) {
- log = list_entry(filter->fo_llog_list.next,
- struct filter_group_llog, list);
- list_del(&log->list);
+ olg = list_entry(filter->fo_llog_list.next,
+ struct obd_llog_group, olg_list);
+ list_del(&olg->olg_list);
spin_unlock(&filter->fo_llog_list_lock);
- rc = filter_group_llog_finish(log->llogs);
+ rc = filter_group_llog_finish(olg);
if (rc)
CERROR("failed to cleanup llogging subsystem for %u\n",
- log->group);
- OBD_FREE_PTR(log->llogs);
- OBD_FREE_PTR(log);
+ olg->olg_group);
+ OBD_FREE_PTR(olg);
spin_lock(&filter->fo_llog_list_lock);
}
spin_unlock(&filter->fo_llog_list_lock);
- rc = obd_llog_finish(obd, 0);
- if (rc)
- CERROR("failed to cleanup llogging subsystem\n");
-
RETURN(rc);
}
exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
exp, exp->exp_filter_data.fed_pending);
+ /* Not ported yet the b1_6 quota functionality
+ * lquota_clearinfo(filter_quota_interface_ref, exp, exp->exp_obd);
+ */
+
target_destroy_export(exp);
if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp)
{
- struct filter_group_llog *fglog, *nlog;
+ struct obd_llog_group *olg_min, *olg;
struct filter_obd *filter;
int worked = 0, group;
struct llog_ctxt *ctxt;
* group order and skip already synced llogs -bzzz */
do {
/* look for group with min. number, but > worked */
- fglog = NULL;
+ olg_min = NULL;
group = 1 << 30;
spin_lock(&filter->fo_llog_list_lock);
- list_for_each_entry(nlog, &filter->fo_llog_list, list) {
- if (nlog->group <= worked) {
+ list_for_each_entry(olg, &filter->fo_llog_list, olg_list) {
+ if (olg->olg_group <= worked) {
/* this group is already synced */
continue;
}
- if (group < nlog->group) {
+ if (group < olg->olg_group) {
/* we have group with smaller number to sync */
continue;
}
/* store current minimal group */
- fglog = nlog;
- group = nlog->group;
+ olg_min = olg;
+ group = olg->olg_group;
}
spin_unlock(&filter->fo_llog_list_lock);
- if (fglog == NULL)
+ if (olg_min == NULL)
break;
- worked = fglog->group;
- if (fglog->exp && (dexp == fglog->exp || dexp == NULL)) {
- ctxt = llog_get_context_from_llogs(fglog->llogs,
+ worked = olg_min->olg_group;
+ if (olg_min->olg_exp &&
+ (dexp == olg_min->olg_exp || dexp == NULL)) {
+ int err;
+ ctxt = llog_group_get_ctxt(olg_min,
LLOG_MDS_OST_REPL_CTXT);
LASSERT(ctxt != NULL);
- llog_sync(ctxt, fglog->exp);
+ err = llog_sync(ctxt, olg_min->olg_exp);
+ llog_ctxt_put(ctxt);
+ if (err)
+ CERROR("error flushing logs to MDS: rc %d\n",
+ err);
}
- } while (fglog != NULL);
+ } while (olg_min != NULL);
}
/* also incredibly similar to mds_disconnect */
oa->o_id);
/* If object already gone, cancel cookie right now */
if (oa->o_valid & OBD_MD_FLCOOKIE) {
+ struct llog_ctxt *ctxt;
+ struct obd_llog_group *olg;
fcc = obdo_logcookie(oa);
- llog_cancel(llog_get_context(obd, fcc->lgc_subsys + 1),
- NULL, 1, fcc, 0);
+ olg = filter_find_olg(obd, oa->o_gr);
+ if (IS_ERR(olg))
+ GOTO(cleanup, rc = PTR_ERR(olg));
+ llog_group_set_export(olg, exp);
+
+ ctxt = llog_group_get_ctxt(olg, fcc->lgc_subsys + 1);
+ llog_cancel(ctxt, NULL, 1, fcc, 0);
+ llog_ctxt_put(ctxt);
fcc = NULL; /* we didn't allocate fcc, don't free it */
}
GOTO(cleanup, rc = -ENOENT);
struct lvfs_run_ctxt saved;
struct filter_obd *filter;
struct dentry *dentry;
- struct llog_ctxt *ctxt;
int rc, rc2;
ENTRY;
if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
rc = fsfilt_sync(exp->exp_obd, filter->fo_obt.obt_sb);
/* flush any remaining cancel messages out to the target */
- ctxt = llog_get_context(exp->exp_obd, LLOG_MDS_OST_REPL_CTXT);
- llog_sync(ctxt, exp);
+ filter_sync_llogs(exp->exp_obd, exp);
RETURN(rc);
}
struct ptlrpc_request_set *set)
{
struct obd_device *obd;
- struct obd_llogs *llog;
+ struct obd_llog_group *olg;
struct llog_ctxt *ctxt;
int rc = 0, group;
ENTRY;
group = (int)(*(__u32 *)val);
LASSERT(group >= FILTER_GROUP_MDS0);
- llog = filter_grab_llog_for_group(obd, group, exp);
- LASSERT(llog != NULL);
- ctxt = llog_get_context_from_llogs(llog, LLOG_MDS_OST_REPL_CTXT);
- LASSERTF(ctxt != NULL, "ctxt is not null\n"),
+ olg = filter_find_olg(obd, group);
+ if (IS_ERR(olg))
+ RETURN(PTR_ERR(olg));
+ llog_group_set_export(olg, exp);
+
+ ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
+ LASSERTF(ctxt != NULL, "ctxt is null\n"),
rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
+ llog_ctxt_put(ctxt);
lquota_setinfo(filter_quota_interface_ref, exp, obd);
struct dentry *filter_create_object(struct obd_device *obd, struct obdo *oa);
-struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group,
- struct obd_export *export);
+struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group);
/* filter_lvb.c */
extern struct ldlm_valblock_ops filter_lvbo;
void *cb_data, int error)
{
struct llog_cookie *cookie = cb_data;
- struct obd_llogs *llogs;
+ struct obd_llog_group *olg;
struct llog_ctxt *ctxt;
+ int rc;
/* we have to find context for right group */
- if (error != 0) {
- CDEBUG(D_INODE, "not cancelling llog cookie on error %d\n",
- error);
- goto out;
+ if (error != 0 || obd->obd_stopping) {
+ CDEBUG(D_INODE, "not cancel logcookie err %d stopping %d \n",
+ error, obd->obd_stopping);
+ GOTO (out, rc = 0);
}
- llogs = filter_grab_llog_for_group(obd, cookie->lgc_lgl.lgl_ogr, NULL);
- if (llogs) {
- ctxt = llog_get_context_from_llogs(llogs, cookie->lgc_subsys + 1);
- if (ctxt) {
- llog_cancel(ctxt, NULL, 1, cookie, 0);
- } else
+ olg = filter_find_olg(obd, cookie->lgc_lgl.lgl_ogr);
+ if (!IS_ERR(olg)) {
+ ctxt = llog_group_get_ctxt(olg, cookie->lgc_subsys + 1);
+ if (!ctxt) {
CERROR("no valid context for group "LPU64"\n",
cookie->lgc_lgl.lgl_ogr);
+ GOTO(out, rc = 0);
+ }
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT, 30);
+
+ rc = llog_cancel(ctxt, NULL, 1, cookie, 0);
+ if (rc)
+ CERROR("error cancelling log cookies: rc = %d\n", rc);
+ llog_ctxt_put(ctxt);
} else {
CDEBUG(D_HA, "unknown group "LPU64"!\n", cookie->lgc_lgl.lgl_ogr);
}
out:
- OBD_FREE(cookie, sizeof(struct llog_cookie));
+ OBD_FREE(cookie, sizeof(*cookie));
}
/* Callback for processing the unlink log record received from MDS by
int rc = 0;
ENTRY;
+ if (ctxt->loc_obd->obd_stopping)
+ RETURN(LLOG_PROC_BREAK);
+
if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
CERROR("log is not plain\n");
RETURN(-EINVAL);
}
+ OBD_FAIL_TIMEOUT(OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT, 30);
cookie.lgc_lgl = llh->lgh_id;
cookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
cookie.lgc_index = rec->lrh_index;
"ctxt %p: %d\n", ctxt, rc);
}
+ llog_ctxt_put(ctxt);
spin_lock(&imp->imp_lock);
imp->imp_server_timeout = 1;
imp->imp_pingable = 1;
};
static struct llog_operations osc_mds_ost_orig_logops;
-static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+static int osc_llog_init(struct obd_device *obd, int group,
struct obd_device *tgt, int count,
struct llog_catid *catid, struct obd_uuid *uuid)
{
int rc;
ENTRY;
-
+ LASSERT(group == OBD_LLOG_GROUP);
spin_lock(&obd->obd_dev_lock);
if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
osc_mds_ost_orig_logops = llog_lvfs_ops;
}
spin_unlock(&obd->obd_dev_lock);
- rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
+ rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
&catid->lci_logid, &osc_mds_ost_orig_logops);
if (rc) {
CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
GOTO (out, rc);
}
- rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
- &osc_size_repl_logops);
+ rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
+ NULL, &osc_size_repl_logops);
if (rc)
CERROR("failed LLOG_SIZE_REPL_CTXT\n");
out:
/* flush any remaining cancel messages out to the target */
llog_sync(ctxt, exp);
+ llog_ctxt_put(ctxt);
+
rc = client_disconnect_export(exp);
return rc;
}
if (rc)
RETURN(rc);
- rc = llog_start_commit_thread();
- if (rc < 0)
- RETURN(rc);
-
lprocfs_ost_init_vars(&lvars);
lprocfs_obd_setup(obd, lvars.obd_vars);
ctxt = llog_get_context(obd, req_body->lgdc_ctxt_idx);
rc = llog_connect(ctxt, 1, &req_body->lgdc_logid,
&req_body->lgdc_gen, NULL);
+
+ llog_ctxt_put(ctxt);
if (rc != 0)
CERROR("failed at llog_relp_connect\n");
{
ENTRY;
LASSERT(ctxt);
+ LASSERTF(ctxt->loc_imp == NULL || ctxt->loc_imp == imp,
+ "%p - %p\n", ctxt->loc_imp, imp);
ctxt->loc_imp = imp;
RETURN(0);
}
int llog_initiator_connect(struct llog_ctxt *ctxt)
{
+ struct obd_import *new_imp;
ENTRY;
+
LASSERT(ctxt);
- ctxt->loc_imp = ctxt->loc_obd->u.cli.cl_import;
+ new_imp = ctxt->loc_obd->u.cli.cl_import;
+ LASSERTF(ctxt->loc_imp == NULL || ctxt->loc_imp == new_imp,
+ "%p - %p\n", ctxt->loc_imp, new_imp);
+ ctxt->loc_imp = new_imp;
RETURN(0);
}
EXPORT_SYMBOL(llog_initiator_connect);
body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
if (body == NULL)
- GOTO(out, rc =-EFAULT);
+ RETURN(-EFAULT);
if (body->lgd_logid.lgl_oid > 0)
logid = &body->lgd_logid;
if (req_capsule_field_present(&req->rq_pill, &RMF_NAME, RCL_CLIENT)) {
name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
if (name == NULL)
- GOTO(out, rc = -EFAULT);
+ RETURN(-EFAULT);
CDEBUG(D_INFO, "opening log %s\n", name);
}
ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
if (ctxt == NULL)
- GOTO(out, rc = -EINVAL);
+ RETURN(-EINVAL);
disk_obd = ctxt->loc_exp->exp_obd;
push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
rc = rc2;
out_pop:
pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-out:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
if (body == NULL)
- GOTO(out, rc =-EFAULT);
+ RETURN(-EFAULT);
if (body->lgd_logid.lgl_oid > 0)
logid = &body->lgd_logid;
ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
if (ctxt == NULL)
- GOTO(out, rc = -EINVAL);
+ RETURN(-EINVAL);
+
disk_obd = ctxt->loc_exp->exp_obd;
push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
llog_close(loghandle);
out_pop:
pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-out:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
if (body == NULL)
- GOTO(out, rc =-EFAULT);
+ RETURN(-EFAULT);
OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
if (!buf)
- GOTO(out, rc = -ENOMEM);
+ RETURN(-ENOMEM);
ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
if (ctxt == NULL)
out_pop:
pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
out_free:
OBD_FREE(buf, LLOG_CHUNK_SIZE);
-out:
RETURN(rc);
}
body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
if (body == NULL)
- GOTO(out, rc =-EFAULT);
+ RETURN(-EFAULT);
OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
if (!buf)
- GOTO(out, rc = -ENOMEM);
+ RETURN(-ENOMEM);
ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
LASSERT(ctxt != NULL);
out_pop:
pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
OBD_FREE(buf, LLOG_CHUNK_SIZE);
-out:
RETURN(rc);
}
body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
if (body == NULL)
- GOTO(out, rc =-EFAULT);
+ RETURN(-EFAULT);
ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
if (ctxt == NULL)
- GOTO(out, rc = -EINVAL);
+ RETURN(-EINVAL);
disk_obd = ctxt->loc_exp->exp_obd;
push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
rc2 = llog_close(loghandle);
if (!rc)
rc = rc2;
-
out_pop:
pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-
-out:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
struct obd_device *obd = req->rq_export->exp_obd;
struct obd_device *disk_obd;
struct llog_cookie *logcookies;
- struct llog_ctxt *ctxt;
+ struct llog_ctxt *ctxt = NULL;
int num_cookies, rc = 0, err, i;
struct lvfs_run_ctxt saved;
struct llog_handle *cathandle;
if (rc)
CERROR("cancel %d llog-records failed: %d\n", num_cookies, rc);
else
- CDEBUG(D_HA, "cancel %d llog-records\n", num_cookies);
+ CDEBUG(D_RPCTRACE, "cancel %d llog-records\n", num_cookies);
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
EXPORT_SYMBOL(llog_origin_handle_cancel);
char *out = buf;
if (ctxt == NULL || mds == NULL)
- RETURN(-EOPNOTSUPP);
+ GOTO(release_ctxt, rc = -EOPNOTSUPP);
push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
}
out_pop:
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
+release_ctxt:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
{
static char *out = NULL;
static int remains = 0;
- struct llog_ctxt *ctxt;
+ struct llog_ctxt *ctxt = NULL;
struct llog_handle *handle;
struct llog_logid *logid;
struct llog_logid_rec *lir;
remains = cbd->remains;
cbd->init = 0;
}
- ctxt = cbd->ctxt;
if (!(cat->lgh_hdr->llh_flags & LLOG_F_IS_CAT))
RETURN(-EINVAL);
+ if (!cbd->ctxt)
+ RETURN(-EINVAL);
+
lir = (struct llog_logid_rec *)rec;
logid = &lir->lid_id;
rc = llog_create(ctxt, &handle, logid, NULL);
int rc;
if (ctxt == NULL || mds == NULL)
- RETURN(-EOPNOTSUPP);
+ GOTO(release_ctxt, rc = -EOPNOTSUPP);
count = mds->mds_lov_desc.ld_tgt_count;
size = sizeof(*idarray) * count;
OBD_ALLOC(idarray, size);
if (!idarray)
- RETURN(-ENOMEM);
+ GOTO(release_ctxt, rc = -ENOMEM);
rc = llog_get_cat_list(obd, obd, name, count, idarray);
if (rc)
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
out_free:
OBD_FREE(idarray, size);
+release_ctxt:
+ llog_ctxt_put(ctxt);
+
RETURN(rc);
}
#endif /* LPROCFS */
/* recovd_thread.c */
-int llog_init_commit_master(void);
-int llog_cleanup_commit_master(int force);
int ptlrpc_expire_one_request(struct ptlrpc_request *req);
RETURN(rc);
cleanup_phase = 2;
- ptlrpc_init_connection();
+ rc = ptlrpc_init_connection();
if (rc)
GOTO(cleanup, rc);
cleanup_phase = 3;
- rc = llog_init_commit_master();
- if (rc)
- GOTO(cleanup, rc);
- cleanup_phase = 4;
-
ptlrpc_put_connection_superhack = ptlrpc_put_connection;
rc = ptlrpc_start_pinger();
if (rc)
GOTO(cleanup, rc);
- cleanup_phase = 5;
+ cleanup_phase = 4;
rc = ldlm_init();
if (rc)
GOTO(cleanup, rc);
- cleanup_phase = 6;
+ cleanup_phase = 5;
rc = sptlrpc_init();
if (rc)
cleanup:
switch(cleanup_phase) {
- case 6:
- ldlm_exit();
case 5:
- ptlrpc_stop_pinger();
+ ldlm_exit();
case 4:
- llog_cleanup_commit_master(1);
+ ptlrpc_stop_pinger();
case 3:
ptlrpc_cleanup_connection();
case 2:
ptlrpc_stop_pinger();
ptlrpc_exit_portals();
ptlrpc_cleanup_connection();
- llog_cleanup_commit_master(0);
}
/* connection.c */
#ifdef __KERNEL__
-static struct llog_commit_master lustre_lcm;
-static struct llog_commit_master *lcm = &lustre_lcm;
-
/* Allocate new commit structs in case we do not have enough.
* Make the llcd size small enough that it fits into a single page when we
* are sending/receiving it. */
-static int llcd_alloc(void)
+static int llcd_alloc(struct llog_commit_master *lcm)
{
struct llog_canceld_ctxt *llcd;
int llcd_size;
}
/* Get a free cookie struct from the list */
-struct llog_canceld_ctxt *llcd_grab(void)
+static struct llog_canceld_ctxt *llcd_grab(struct llog_commit_master *lcm)
{
struct llog_canceld_ctxt *llcd;
spin_lock(&lcm->lcm_llcd_lock);
if (list_empty(&lcm->lcm_llcd_free)) {
spin_unlock(&lcm->lcm_llcd_lock);
- if (llcd_alloc() < 0) {
+ if (llcd_alloc(lcm) < 0) {
CERROR("unable to allocate log commit data!\n");
return NULL;
}
return llcd;
}
-EXPORT_SYMBOL(llcd_grab);
static void llcd_put(struct llog_canceld_ctxt *llcd)
{
+ struct llog_commit_master *lcm = llcd->llcd_lcm;
+
+ llog_ctxt_put(llcd->llcd_ctxt);
if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
int llcd_size = llcd->llcd_size +
offsetof(struct llog_canceld_ctxt, llcd_cookies);
}
/* Send some cookies to the appropriate target */
-void llcd_send(struct llog_canceld_ctxt *llcd)
+static void llcd_send(struct llog_canceld_ctxt *llcd)
{
+ if (!(llcd->llcd_lcm->lcm_flags & LLOG_LCM_FL_EXIT)) {
spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
- list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending);
+ list_add_tail(&llcd->llcd_list,
+ &llcd->llcd_lcm->lcm_llcd_pending);
spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
-
+ }
cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1);
}
-EXPORT_SYMBOL(llcd_send);
/* deleted objects have a commit callback that cancels the MDS
* log record for the deletion. The commit callback calls this
if (count > 0 && cookies != NULL) {
if (llcd == NULL) {
- llcd = llcd_grab();
+ llcd = llcd_grab(ctxt->loc_lcm);
if (llcd == NULL) {
CERROR("couldn't get an llcd - dropped "LPX64
":%x+%u\n",
cookies->lgc_index);
GOTO(out, rc = -ENOMEM);
}
- llcd->llcd_ctxt = ctxt;
+ llcd->llcd_ctxt = llog_ctxt_get(ctxt);
ctxt->loc_llcd = llcd;
}
/* If we do not have enough pages available, allocate some */
while (atomic_read(&lcm->lcm_llcd_numfree) <
lcm->lcm_llcd_minfree) {
- if (llcd_alloc() < 0)
+ if (llcd_alloc(lcm) < 0)
break;
}
if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
- rc = llog_start_commit_thread();
+ rc = llog_start_commit_thread(lcm);
if (rc < 0)
CERROR("error starting thread: rc %d\n", rc);
}
return 0;
}
-int llog_start_commit_thread(void)
+int llog_start_commit_thread(struct llog_commit_master *lcm)
{
int rc;
ENTRY;
void *llpa_arg;
} llpa;
-int llog_init_commit_master(void)
+int llog_init_commit_master(struct llog_commit_master *lcm)
{
CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy);
CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle);
sema_init(&llpa.llpa_sem, 1);
return 0;
}
+EXPORT_SYMBOL(llog_init_commit_master);
-int llog_cleanup_commit_master(int force)
+int llog_cleanup_commit_master(struct llog_commit_master *lcm,
+ int force)
{
lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
if (force)
atomic_read(&lcm->lcm_thread_total) == 0);
return 0;
}
+EXPORT_SYMBOL(llog_cleanup_commit_master);
static int log_process_thread(void *args)
{
rc = llog_create(ctxt, &llh, &logid, NULL);
if (rc) {
CERROR("llog_create failed %d\n", rc);
- RETURN(rc);
+ GOTO(out, rc);
}
rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
if (rc) {
CERROR("llog_init_handle failed %d\n", rc);
- GOTO(out, rc);
+ GOTO(release_llh, rc);
}
if (cb) {
CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",
ctxt->loc_llcd, ctxt);
llog_sync(ctxt, NULL);
-out:
+
+release_llh:
rc = llog_cat_put(llh);
if (rc)
CERROR("llog_cat_put failed %d\n", rc);
-
+out:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
{
+ struct obd_device *obd = ctxt->loc_obd;
int rc;
ENTRY;
+ if (obd->obd_stopping)
+ RETURN(-ENODEV);
+
mutex_down(&llpa.llpa_sem);
- llpa.llpa_ctxt = ctxt;
llpa.llpa_cb = handle;
llpa.llpa_arg = arg;
-
+ llpa.llpa_ctxt = llog_ctxt_get(ctxt); //llog_group_get_ctxt(ctxt->loc_olg, ctxt->loc_idx);
+ if (!llpa.llpa_ctxt) {
+ up(&llpa.llpa_sem);
+ RETURN(-ENODEV);
+ }
rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
if (rc < 0)
CERROR("error starting log_process_thread: %d\n", rc);
mutex_down(&ctxt->loc_sem);
ctxt->loc_gen = *gen;
- llcd = llcd_grab();
+ llcd = llcd_grab(ctxt->loc_lcm);
if (llcd == NULL) {
CERROR("couldn't get an llcd\n");
mutex_up(&ctxt->loc_sem);
RETURN(-ENOMEM);
}
- llcd->llcd_ctxt = ctxt;
+ llcd->llcd_ctxt = llog_ctxt_get(ctxt);
ctxt->loc_llcd = llcd;
mutex_up(&ctxt->loc_sem);
}
run_test 60 "test llog post recovery init vs llog unlink"
+#test race llog recovery thread vs llog cleanup
+test_61() {
+ mkdir $DIR/$tdir
+ createmany -o $DIR/$tdir/$tfile-%d 800
+ replay_barrier ost1
+# OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT 0x221
+ unlinkmany $DIR/$tdir/$tfile-%d 800
+ do_facet ost "sysctl -w lustre.fail_loc=0x80000221"
+ facet_failover ost1
+ sleep 10
+ fail ost1
+ sleep 30
+ do_facet ost "sysctl -w lustre.fail_loc=0x0"
+ $CHECKSTAT -t file $DIR/$tdir/$tfile-* && return 1
+ rmdir $DIR/$tdir
+}
+run_test 61 "test race llog recovery vs llog cleanup"
+
+#test race mds llog sync vs llog cleanup
+test_61b() {
+# OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT 0x140
+ do_facet $SINGLEMDS "sysctl -w lustre.fail_loc=0x80000140"
+ facet_failover $SINGLEMDS
+ sleep 10
+ fail $SINGLEMDS
+ do_facet client dd if=/dev/zero of=$DIR/$tfile bs=4k count=1 || return 1
+}
+run_test 61b "test race mds llog sync vs llog cleanup"
+
+#test race cancel cookie cb vs llog cleanup
+test_61c() {
+# OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT 0x222
+ touch $DIR/$tfile
+ do_facet ost "sysctl -w lustre.fail_loc=0x80000222"
+ rm $DIR/$tfile
+ sleep 10
+ fail ost1
+}
+run_test 61c "test race mds llog sync vs llog cleanup"
+
equals_msg `basename $0`: test complete, cleaning up
check_and_cleanup_lustre
[ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG || true
#!/bin/bash
+
+load_llog_test() {
+ grep -q llog_test /proc/modules && return
+ # Module should have been placed with other lustre modules...
+ modprobe llog_test
+ grep -q llog_test /proc/modules && return
+ # But maybe we're running from a developer tree...
+ insmod ../obdclass/llog_test.ko
+ grep -q llog_test /proc/modules && return
+ # This is for 2.4 kernels (deprecated!)
+ insmod ../obdclass/llog_test.o
+ grep -q llog_test /proc/modules && return
+ echo "Unable to load llog_test module!"
+ false
+ return
+}
+
PATH=`dirname $0`:`dirname $0`/../utils:$PATH
TMP=${TMP:-/tmp}
MDS=`ls /proc/fs/lustre/mdt | grep -v num_refs | head -n 1`
[ -z "$MDS" ] && echo "no MDS available, skipping llog test" && exit 0
-case `uname -r` in
-2.4.*) modprobe llog_test || exit 1 ;;
-2.6.*) modprobe llog_test || exit 1 ;;
-*) echo "unknown kernel version `uname -r`" && exit 99 ;;
-esac
+load_llog_test || exit 0
lctl modules > $TMP/ogdb-`hostname`
echo "NOW reload debugging syms.."