land 10800(adding llog ctxt refcount) to b1_6.
b=10800
i=green
i=shadow
Description: lfs setstripe enhancement
Details : Make lfs setstripe understand 'k', 'm' and 'g' for stripe size.
+Severity : normal
+Frequency : mds/oss recovery
+Bugzilla : 10800
+Description: llog ctxt is refrenced after it has been freed.
+Details : llog ctxt refcount was added to avoide the race between ctxt free
+ and llog recovery process. Each llog user must hold ctxt refcount
+ before it access the llog. And the llog ctxt can only be freed
+ when its refcount is zero.
+
--------------------------------------------------------------------------------
2007-07-30 Cluster File Systems, Inc. <info@clusterfs.com>
cap_t cap_get_proc(void);
int cap_get_flag(cap_t, cap_value_t, cap_flag_t, cap_flag_value_t *);
-/* log related */
-static inline int llog_init_commit_master(void) { return 0; }
-static inline int llog_cleanup_commit_master(int force) { return 0; }
static inline void libcfs_run_lbug_upcall(char *file, const char *fn,
const int l){}
};
/* ptlrpc/recov_thread.c */
-int llog_start_commit_thread(void);
-struct llog_canceld_ctxt *llcd_grab(void);
-void llcd_send(struct llog_canceld_ctxt *llcd);
+int llog_start_commit_thread(struct llog_commit_master *);
+int llog_init_commit_master(struct llog_commit_master *);
+int llog_cleanup_commit_master(struct llog_commit_master *lcm, int force);
#endif /* _LUSTRE_COMMIT_CONFD_H */
/* llog_obd.c */
int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
int count, struct llog_logid *logid,struct llog_operations *op);
+int __llog_ctxt_put(struct llog_ctxt *ctxt);
int llog_cleanup(struct llog_ctxt *);
int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
struct llog_handle *loc_handle;
struct llog_canceld_ctxt *loc_llcd;
struct semaphore loc_sem; /* protects loc_llcd and loc_imp */
+ atomic_t loc_refcount;
+ struct llog_commit_master *loc_lcm;
void *llog_proc_cb;
};
return size_round(len);
}
+#define llog_ctxt_get(ctxt) \
+({ \
+ struct llog_ctxt *ctxt_ = ctxt; \
+ LASSERT(atomic_read(&ctxt_->loc_refcount) > 0); \
+ atomic_inc(&ctxt_->loc_refcount); \
+ CDEBUG(D_INFO, "GETting ctxt %p : new refcount %d\n", ctxt_, \
+ atomic_read(&ctxt_->loc_refcount)); \
+ ctxt_; \
+})
+
+#define llog_ctxt_put(ctxt) \
+do { \
+ if ((ctxt) == NULL) \
+ break; \
+ CDEBUG(D_INFO, "PUTting ctxt %p : new refcount %d\n", (ctxt), \
+ atomic_read(&(ctxt)->loc_refcount) - 1); \
+ LASSERT(atomic_read(&(ctxt)->loc_refcount) > 0); \
+ LASSERT(atomic_read(&(ctxt)->loc_refcount) < 0x5a5a5a); \
+ __llog_ctxt_put(ctxt); \
+} while (0)
+
static inline struct llog_ctxt *llog_get_context(struct obd_device *obd,
- int index)
+ int index)
{
- if (index < 0 || index >= LLOG_MAX_CTXTS)
- return NULL;
+ struct llog_ctxt *ctxt;
+
+ if (index < 0 || index >= LLOG_MAX_CTXTS)
+ return NULL;
+
+ spin_lock(&obd->obd_dev_lock);
+ if (obd->obd_llog_ctxt[index] == NULL) {
+ spin_unlock(&obd->obd_dev_lock);
+ CWARN("obd %p and ctxt index %d is NULL \n", obd, index);
+ return NULL;
+ }
+ ctxt = llog_ctxt_get(obd->obd_llog_ctxt[index]);
+ spin_unlock(&obd->obd_dev_lock);
+ return ctxt;
+}
- return obd->obd_llog_ctxt[index];
+static inline int llog_ctxt_null(struct obd_device *obd, int index)
+{
+ return (obd->obd_llog_ctxt[index] == NULL);
}
static inline int llog_write_rec(struct llog_handle *handle,
int fo_fmd_max_num; /* per exp filter_mod_data */
int fo_fmd_max_age; /* jiffies to fmd expiry */
+ void *fo_lcm;
};
#define OSC_MAX_RIF_DEFAULT 8
struct lustre_class_hash_body *obd_nid_hash_body;
atomic_t obd_refcount;
cfs_waitq_t obd_refcount_waitq;
+ cfs_waitq_t obd_llog_waitq;
struct list_head obd_exports;
int obd_num_exports;
struct ldlm_namespace *obd_namespace;
#define OBD_FAIL_MDS_LLOG_CREATE_FAILED 0x137
#define OBD_FAIL_MDS_LOV_SYNC_RACE 0x138
#define OBD_FAIL_MDS_OSC_PRECREATE 0x139
+#define OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT 0x13a
#define OBD_FAIL_OST 0x200
#define OBD_FAIL_OST_CONNECT_NET 0x201
#define OBD_FAIL_OST_SETATTR_CREDITS 0x21e
#define OBD_FAIL_OST_HOLD_WRITE_RPC 0x21f
#define OBD_FAIL_OST_BRW_WRITE_BULK2 0x220
+#define OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT 0x221
+#define OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT 0x222
#define OBD_FAIL_LDLM 0x300
#define OBD_FAIL_LDLM_NAMESPACE_NEW 0x301
exp = class_conn2export(&mgc_conn);
- ctxt = exp->exp_obd->obd_llog_ctxt[LLOG_CONFIG_REPL_CTXT];
+ ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
cfg->cfg_flags |= CFG_F_COMPAT146;
rc = class_config_parse_llog(ctxt, profile, cfg);
+ llog_ctxt_put(ctxt);
if (rc) {
CERROR("class_config_parse_llog failed: rc = %d\n", rc);
}
*/
rc = class_config_dump_llog(ctxt, profile, cfg);
#endif
+ llog_ctxt_put(ctxt);
switch (rc) {
case 0: {
/* Set the caller's profile name to the old-style */
LASSERT(ctxt);
if (lsm->lsm_array && lsm->lsm_array->lai_ext_array)
- RETURN(0);
+ GOTO(release_ctxt, rc = 0);
CDEBUG(D_INFO, "get lsm logid: "LPU64":"LPU64"\n",
lsm->lsm_array->lai_array_id.lgl_oid,
OBD_ALLOC(lsm->lsm_array->lai_ext_array,lsm->lsm_array->lai_ext_count *
sizeof (struct lov_extent));
if (!lsm->lsm_array->lai_ext_array)
- RETURN(-ENOMEM);
+ GOTO(release_ctxt, rc = -ENOMEM);
CDEBUG(D_INFO, "get lsm logid: "LPU64":"LPU64"\n",
lsm->lsm_array->lai_array_id.lgl_oid,
out:
if (rc)
lovea_free_array_info(lsm);
+release_ctxt:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
ENTRY;
LASSERT(md_exp != NULL);
+ /*for those orphan inode, we should keep array id*/
+ if (!(oa->o_valid & OBD_MD_FLCOOKIE))
+ RETURN(rc);
+
ctxt = llog_get_context(md_exp->exp_obd, LLOG_LOVEA_REPL_CTXT);
if (!ctxt)
- GOTO(out, rc = -EINVAL);
+ RETURN(-EINVAL);
LASSERT(lsm->lsm_array != NULL);
- /*for those orphan inode, we should keep array id*/
- if (!(oa->o_valid & OBD_MD_FLCOOKIE))
- RETURN(0);
-
- LASSERT(ctxt != NULL);
rc = llog_create(ctxt, &llh, &lsm->lsm_array->lai_array_id,
NULL);
if (rc)
}
llog_free_handle(llh);
out:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
rc += llog_add(cctxt, rec, NULL, logcookies + rc,
numcookies - rc);
+ llog_ctxt_put(cctxt);
}
RETURN(rc);
child = lov->lov_tgts[i]->ltd_exp->exp_obd;
cctxt = llog_get_context(child, ctxt->loc_idx);
rc = llog_connect(cctxt, 1, logid, gen, uuid);
+ llog_ctxt_put(cctxt);
+
if (rc) {
CERROR("error osc_llog_connect tgt %d (%d)\n", i, rc);
if (!err)
int err;
err = llog_cancel(cctxt, NULL, 1, cookies, flags);
+ llog_ctxt_put(cctxt);
if (err && lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
CERROR("error: objid "LPX64" subobj "LPX64
" on OST idx %d: rc = %d\n", lsm->lsm_object_id,
case OBD_IOC_PARSE: {
ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
+ llog_ctxt_put(ctxt);
GOTO(out, rc);
}
#ifdef __KERNEL__
case OBD_IOC_LLOG_PRINT: {
ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
rc = llog_ioctl(ctxt, cmd, data);
-
+ llog_ctxt_put(ctxt);
GOTO(out, rc);
}
#endif
if (rc == 0) {
ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
ctxt->loc_imp = obd->u.cli.cl_import;
+ llog_ctxt_put(ctxt);
}
rc = llog_setup(obd, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL,
if (rc == 0) {
ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
ctxt->loc_imp = obd->u.cli.cl_import;
+ llog_ctxt_put(ctxt);
}
RETURN(rc);
GOTO(err_ns, rc);
}
- rc = llog_start_commit_thread();
- if (rc < 0)
- GOTO(err_fs, rc);
-
if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
class_uuid_t uuid;
int mds_postrecov(struct obd_device *obd)
{
+ struct llog_ctxt *ctxt;
int rc;
ENTRY;
RETURN(0);
LASSERT(!obd->obd_recovering);
- LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
+ ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
+ LASSERT(ctxt != NULL);
+ llog_ctxt_put(ctxt);
/* set nextid first, so we are sure it happens */
mutex_down(&obd->obd_dev_sem);
struct lov_mds_md_join *head_lmmj = NULL, *tail_lmmj = NULL;
int lmm_size, rc = 0, cleanup_phase = 0, size;
struct llog_handle *llh_head = NULL, *llh_tail = NULL;
- struct llog_ctxt *ctxt;
+ struct llog_ctxt *ctxt = NULL;
struct mds_rec_join *join_rec;
ENTRY;
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
ctxt = llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT);
+ LASSERT(ctxt != NULL);
cleanup_phase = 2;
if (le32_to_cpu(head_lmm->lmm_magic) == LOV_MAGIC) { /*simple file */
struct llog_logid *llog_array;
case 3:
llog_close(llh_head);
case 2:
+ llog_ctxt_put(ctxt);
if (head_lmmj && ((void*)head_lmmj != (void*)head_lmm))
OBD_FREE_PTR(head_lmmj);
lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
rc = llog_add(lctxt, rec, lsm, logcookies, numcookies);
+ llog_ctxt_put(lctxt);
+
RETURN(rc);
}
lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
rc = llog_connect(lctxt, count, logid, gen, uuid);
+ llog_ctxt_put(lctxt);
RETURN(rc);
}
lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
rc = llog_cancel(lctxt, lsm, count, cookies, flags);
+ llog_ctxt_put(lctxt);
RETURN(rc);
}
ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
rc = llog_add(ctxt, &lur->lur_hdr, lsm, logcookies,
cookies_size / sizeof(struct llog_cookie));
+ llog_ctxt_put(ctxt);
OBD_FREE(lur, sizeof(*lur));
out:
rc = llog_add(ctxt, &lsr->lsr_hdr, lsm, logcookies,
cookies_size / sizeof(struct llog_cookie));
+ llog_ctxt_put(ctxt);
+
OBD_FREE(lsr, sizeof(*lsr));
out:
obd_free_memmd(mds->mds_osc_exp, &lsm);
switch (cmd) {
case OBD_IOC_RECORD: {
char *name = data->ioc_inlbuf1;
+ struct llog_ctxt *ctxt;
+
if (mds->mds_cfg_llh)
RETURN(-EBUSY);
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
- &mds->mds_cfg_llh, NULL, name);
+ rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
+ llog_ctxt_put(ctxt);
if (rc == 0)
llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
&cfg_uuid);
case OBD_IOC_CLEAR_LOG: {
char *name = data->ioc_inlbuf1;
+ struct llog_ctxt *ctxt;
if (mds->mds_cfg_llh)
RETURN(-EBUSY);
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
- &mds->mds_cfg_llh, NULL, name);
+ rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
+ llog_ctxt_put(ctxt);
if (rc == 0) {
llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
NULL);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
if (rc)
RETURN(rc);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
if (rc)
RETURN(rc);
rc = llog_ioctl(ctxt, cmd, data);
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
+ llog_ctxt_put(ctxt);
rc2 = obd_set_info_async(mds->mds_osc_exp,
strlen(KEY_MDS_CONN), KEY_MDS_CONN,
0, NULL, NULL);
push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
rc = llog_ioctl(ctxt, cmd, data);
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
struct mds_obd *mds = &obd->u.mds;
struct obd_uuid *uuid;
__u32 idx = mlsi->mlsi_index;
+ struct llog_ctxt *ctxt;
int rc = 0;
ENTRY;
if (rc != 0)
GOTO(out, rc);
- rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
- mds->mds_lov_desc.ld_tgt_count,
- NULL, NULL, uuid);
+ ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
+ if (!ctxt)
+ RETURN(-ENODEV);
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
+
+ rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
+ NULL, NULL, uuid);
+ llog_ctxt_put(ctxt);
if (rc != 0) {
CERROR("%s failed at llog_origin_connect: %d\n",
RETURN(rc);
}
- LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
+ LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
rc = mds_lov_start_synchronize(obd, watched, data,
!(ev == OBD_NOTIFY_SYNC));
rc = llog_cancel(ctxt, lsm, mlcd->mlcd_cookielen /
sizeof(*mlcd->mlcd_cookies),
mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW);
+ llog_ctxt_put(ctxt);
+
if (rc)
CERROR("error cancelling %d log cookies: rc %d\n",
(int)(mlcd->mlcd_cookielen /
if (rc == 0) {
ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
ctxt->loc_imp = obd->u.cli.cl_import;
+ llog_ctxt_put(ctxt);
}
RETURN(rc);
if (rc == 0) {
ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
ctxt->loc_imp = obd->u.cli.cl_import;
+ llog_ctxt_put(ctxt);
}
RETURN(rc);
/* Now, whether we copied or not, start using the local llog.
If we failed to copy, we'll start using whatever the old
log has. */
+ llog_ctxt_put(ctxt);
ctxt = lctxt;
}
copy of the instance for the update. The cfg_last_idx will
be updated here. */
rc = class_config_parse_llog(ctxt, cld->cld_logname, &cld->cld_cfg);
-
- out_pop:
+
+out_pop:
+ llog_ctxt_put(ctxt);
+ if (ctxt != lctxt)
+ llog_ctxt_put(lctxt);
if (must_pop)
pop_ctxt(&saved, &mgc->obd_lvfs_ctxt, NULL);
GOTO(err_ns, rc);
}
- rc = llog_start_commit_thread();
- if (rc < 0)
- GOTO(err_fs, rc);
-
rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
&llog_lvfs_ops);
if (rc)
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
if (rc)
RETURN(rc);
push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
rc = llog_ioctl(ctxt, cmd, data);
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
char *logname;
struct llog_handle *loghandle;
struct lvfs_run_ctxt saved;
+ struct llog_ctxt *ctxt;
int rc, rc2;
ENTRY;
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+ LASSERT(ctxt != NULL);
name_create(&logname, fsdb->fsdb_name, "-client");
down(&fsdb->fsdb_sem);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-
- rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
- &loghandle, NULL, logname);
+ rc = llog_create(ctxt, &loghandle, NULL, logname);
if (rc)
GOTO(out_pop, rc);
rc2 = llog_close(loghandle);
if (!rc)
rc = rc2;
-
out_pop:
+ llog_ctxt_put(ctxt);
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
up(&fsdb->fsdb_sem);
name_destroy(&logname);
{
struct llog_handle *loghandle;
struct lvfs_run_ctxt saved;
+ struct llog_ctxt *ctxt;
struct mgs_modify_lookup *mml;
int rc, rc2;
ENTRY;
CDEBUG(D_MGS, "modify %s/%s/%s\n", logname, devname, comment);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-
- rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
- &loghandle, NULL, logname);
+
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+ LASSERT(ctxt != NULL);
+ rc = llog_create(ctxt, &loghandle, NULL, logname);
if (rc)
GOTO(out_pop, rc);
rc2 = llog_close(loghandle);
if (!rc)
rc = rc2;
-
out_pop:
+ llog_ctxt_put(ctxt);
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
if (rc && rc != -ENODEV)
CERROR("modify %s/%s failed %d\n",
{
static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
struct lvfs_run_ctxt saved;
+ struct llog_ctxt *ctxt;
int rc = 0;
- if (*llh) {
+ if (*llh)
GOTO(out, rc = -EBUSY);
- }
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+ if (!ctxt)
+ GOTO(out, rc = -ENODEV);
+
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-
- rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
- llh, NULL, name);
+ rc = llog_create(ctxt, llh, NULL, name);
if (rc == 0)
llog_init_handle(*llh, LLOG_F_IS_PLAIN, &cfg_uuid);
else
*llh = NULL;
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
out:
if (rc) {
{
struct lvfs_run_ctxt saved;
struct llog_handle *llh;
+ struct llog_ctxt *ctxt;
int rc = 0;
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+ LASSERT(ctxt != NULL);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
- &llh, NULL, name);
+ rc = llog_create(ctxt, &llh, NULL, name);
if (rc == 0) {
llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
rc = llog_get_size(llh);
llog_close(llh);
}
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
/* header is record 1 */
return(rc <= 1);
}
int mgs_erase_log(struct obd_device *obd, char *name)
{
struct lvfs_run_ctxt saved;
+ struct llog_ctxt *ctxt;
struct llog_handle *llh;
int rc = 0;
+ ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+ LASSERT(ctxt != NULL);
+
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
- &llh, NULL, name);
+ rc = llog_create(ctxt, &llh, NULL, name);
if (rc == 0) {
llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
rc = llog_destroy(llh);
llog_free_handle(llh);
}
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
if (rc)
CERROR("failed to clear log %s: %d\n", name, rc);
#include "llog_internal.h"
/* helper functions for calling the llog obd methods */
+static struct llog_ctxt* llog_new_ctxt(struct obd_device *obd)
+{
+ struct llog_ctxt *ctxt;
-int llog_cleanup(struct llog_ctxt *ctxt)
+ OBD_ALLOC(ctxt, sizeof(*ctxt));
+ if (!ctxt)
+ return NULL;
+
+ ctxt->loc_obd = obd;
+ atomic_set(&ctxt->loc_refcount, 1);
+
+ return ctxt;
+}
+
+static void llog_ctxt_destroy(struct llog_ctxt *ctxt)
{
+ if (ctxt->loc_exp)
+ class_export_put(ctxt->loc_exp);
+ OBD_FREE(ctxt, sizeof(*ctxt));
+ return;
+}
+
+int __llog_ctxt_put(struct llog_ctxt *ctxt)
+{
+ struct obd_device *obd;
int rc = 0;
+
+ obd = ctxt->loc_obd;
+ spin_lock(&obd->obd_dev_lock);
+ if (!atomic_dec_and_test(&ctxt->loc_refcount)) {
+ spin_unlock(&obd->obd_dev_lock);
+ return rc;
+ }
+ obd->obd_llog_ctxt[ctxt->loc_idx] = NULL;
+ spin_unlock(&obd->obd_dev_lock);
+
+ LASSERT(obd->obd_stopping == 1);
+ /* cleanup the llog ctxt here */
+ if (CTXTP(ctxt, cleanup))
+ rc = CTXTP(ctxt, cleanup)(ctxt);
+
+ llog_ctxt_destroy(ctxt);
+ wake_up(&obd->obd_llog_waitq);
+ return rc;
+}
+EXPORT_SYMBOL(__llog_ctxt_put);
+
+int llog_cleanup(struct llog_ctxt *ctxt)
+{
+ struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+ struct obd_device *obd = ctxt->loc_obd;
+ int rc, idx;
ENTRY;
if (!ctxt) {
CERROR("No ctxt\n");
RETURN(-ENODEV);
}
-
- if (CTXTP(ctxt, cleanup))
- rc = CTXTP(ctxt, cleanup)(ctxt);
- ctxt->loc_obd->obd_llog_ctxt[ctxt->loc_idx] = NULL;
- if (ctxt->loc_exp)
- class_export_put(ctxt->loc_exp);
- OBD_FREE(ctxt, sizeof(*ctxt));
+ /*banlance the ctxt get when calling llog_cleanup */
+ llog_ctxt_put(ctxt);
+
+ /* sync with other llog ctxt user thread */
+ spin_lock(&obd->obd_dev_lock);
+ LASSERT(obd->obd_stopping == 1);
+ spin_unlock(&obd->obd_dev_lock);
+
+ idx = ctxt->loc_idx;
+ /*try to free the ctxt */
+ rc = __llog_ctxt_put(ctxt);
+
+ l_wait_event(obd->obd_llog_waitq, llog_ctxt_null(obd, idx), &lwi);
RETURN(rc);
}
if (index < 0 || index >= LLOG_MAX_CTXTS)
RETURN(-EFAULT);
- if (obd->obd_llog_ctxt[index]) {
+ ctxt = llog_get_context(obd, index);
+ if (ctxt) {
/* mds_lov_update_mds might call here multiple times. So if the
llog is already set up then don't to do it again. */
CDEBUG(D_CONFIG, "obd %s ctxt %d already set up\n",
obd->obd_name, index);
- ctxt = obd->obd_llog_ctxt[index];
LASSERT(ctxt->loc_obd == obd);
LASSERT(ctxt->loc_exp == disk_obd->obd_self_export);
LASSERT(ctxt->loc_logops == op);
+ llog_ctxt_put(ctxt);
GOTO(out, rc = 0);
}
-
- OBD_ALLOC(ctxt, sizeof(*ctxt));
+ ctxt = llog_new_ctxt(obd);
if (!ctxt)
GOTO(out, rc = -ENOMEM);
obd->obd_llog_ctxt[index] = ctxt;
- ctxt->loc_obd = obd;
ctxt->loc_exp = class_export_get(disk_obd->obd_self_export);
ctxt->loc_idx = index;
ctxt->loc_logops = op;
if (op->lop_setup)
rc = op->lop_setup(obd, index, disk_obd, count, logid);
-
- if (rc) {
- obd->obd_llog_ctxt[index] = NULL;
- class_export_put(ctxt->loc_exp);
- OBD_FREE(ctxt, sizeof(*ctxt));
- }
-
+
+ if (rc)
+ llog_ctxt_destroy(ctxt);
out:
RETURN(rc);
}
rc = llog_process(handle, (llog_cb_t)cat_cancel_cb, NULL, NULL);
if (rc)
CERROR("llog_process with cat_cancel_cb failed: %d\n", rc);
- out:
+out:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
EXPORT_SYMBOL(llog_obd_origin_setup);
rc = llog_create(ctxt, &llh, NULL, name);
if (rc) {
CERROR("1a: llog_create with name %s failed: %d\n", name, rc);
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
llog_init_handle(llh, LLOG_F_IS_PLAIN, &uuid);
out:
CWARN("1b: close newly-created log\n");
rc2 = llog_close(llh);
+ llog_ctxt_put(ctxt);
if (rc2) {
CERROR("1b: close log %s failed: %d\n", name, rc2);
if (rc == 0)
rc = llog_create(ctxt, llh, NULL, name);
if (rc) {
CERROR("2a: re-open log with name %s failed: %d\n", name, rc);
- RETURN(rc);
+ GOTO(out, rc);
}
llog_init_handle(*llh, LLOG_F_IS_PLAIN, &uuid);
if ((rc = verify_handle("2", *llh, 1)))
- RETURN(rc);
+ GOTO(out, rc);
CWARN("2b: create a log without specified NAME & LOGID\n");
rc = llog_create(ctxt, &loghandle, NULL, NULL);
if (rc) {
CERROR("2b: create log failed\n");
- RETURN(rc);
+ GOTO(out, rc);
}
llog_init_handle(loghandle, LLOG_F_IS_PLAIN, &uuid);
logid = loghandle->lgh_id;
rc = llog_create(ctxt, &loghandle, &logid, NULL);
if (rc) {
CERROR("2b: re-open log by LOGID failed\n");
- RETURN(rc);
+ GOTO(out, rc);
}
llog_init_handle(loghandle, LLOG_F_IS_PLAIN, &uuid);
rc = llog_destroy(loghandle);
if (rc) {
CERROR("2b: destroy log failed\n");
- RETURN(rc);
+ GOTO(out, rc);
}
llog_free_handle(loghandle);
+out:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
}
num_recs++;
if ((rc = verify_handle("4b", cath, 2)))
- RETURN(rc);
+ GOTO(ctxt_release, rc);
if ((rc = verify_handle("4b", cath->u.chd.chd_current_log, num_recs)))
- RETURN(rc);
+ GOTO(ctxt_release, rc);
CWARN("4c: cancel 1 log record\n");
rc = llog_cat_cancel_records(cath, 1, &cookie);
num_recs--;
if ((rc = verify_handle("4c", cath->u.chd.chd_current_log, num_recs)))
- RETURN(rc);
+ GOTO(ctxt_release, rc);
CWARN("4d: write 40,000 more log records\n");
for (i = 0; i < 40000; i++) {
out:
CWARN("4f: put newly-created catalog\n");
rc = llog_cat_put(cath);
+ctxt_release:
+ llog_ctxt_put(ctxt);
if (rc)
CERROR("1b: close log %s failed: %d\n", name, rc);
RETURN(rc);
rc = llog_cat_put(llh);
if (rc)
CERROR("1b: close log %s failed: %d\n", name, rc);
+ llog_ctxt_put(ctxt);
+
RETURN(rc);
}
if (mdc_obd == NULL) {
CERROR("6: no MDC devices connected to %s found.\n",
mds_uuid->uuid);
- RETURN(-ENOENT);
+ GOTO(ctxt_release, rc = -ENOENT);
}
rc = obd_connect(&exph, mdc_obd, &uuid, NULL /* obd_connect_data */);
if (rc) {
CERROR("6: failed to connect to MDC: %s\n", mdc_obd->obd_name);
- RETURN(rc);
+ GOTO(ctxt_release, rc);
}
exp = class_conn2export(&exph);
rc = llog_create(nctxt, &llh, NULL, name);
if (rc) {
CERROR("6: llog_create failed %d\n", rc);
- RETURN(rc);
+ llog_ctxt_put(nctxt);
+ GOTO(ctxt_release, rc);
}
rc = llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
parse_out:
rc = llog_close(llh);
+ llog_ctxt_put(nctxt);
if (rc) {
CERROR("6: llog_close failed: rc = %d\n", rc);
}
-
rc = obd_disconnect(exp);
-
+ctxt_release:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
rc = llog_create(ctxt, &llh, NULL, name);
if (rc) {
CERROR("7: llog_create with name %s failed: %d\n", name, rc);
- RETURN(rc);
+ GOTO(ctxt_release, rc);
}
llog_init_handle(llh, LLOG_F_IS_PLAIN, &uuid);
rc = llog_write_rec(llh, &lcr.lcr_hdr, NULL, 0, NULL, -1);
if (rc) {
CERROR("7: write one log record failed: %d\n", rc);
- RETURN(rc);
+ GOTO(ctxt_release, rc);
}
rc = llog_destroy(llh);
CERROR("7: llog_destroy failed: %d\n", rc);
else
llog_free_handle(llh);
+ctxt_release:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
case 0:
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
}
-
+ llog_ctxt_put(ctxt);
return rc;
}
cfs_init_timer(&obd->obd_recovery_timer);
spin_lock_init(&obd->obd_processing_task_lock);
cfs_waitq_init(&obd->obd_next_transno_waitq);
+ cfs_waitq_init(&obd->obd_llog_waitq);
CFS_INIT_LIST_HEAD(&obd->obd_recovery_queue);
CFS_INIT_LIST_HEAD(&obd->obd_delayed_reply_queue);
int count, struct llog_catid *catid,
struct obd_uuid *uuid)
{
+ struct filter_obd *filter = &obd->u.filter;
struct llog_ctxt *ctxt;
int rc;
ENTRY;
+ OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master));
+ if (!filter->fo_lcm)
+ RETURN(-ENOMEM);
+
+ rc = llog_init_commit_master((struct llog_commit_master *)
+ filter->fo_lcm);
+ if (rc)
+ GOTO(cleanup, rc);
+
filter_mds_ost_repl_logops = llog_client_ops;
filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel;
filter_mds_ost_repl_logops.lop_connect = llog_repl_connect;
rc = llog_setup(obd, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
&filter_mds_ost_repl_logops);
if (rc)
- RETURN(rc);
+ GOTO(cleanup, rc);
/* FIXME - assign unlink_cb for filter's recovery */
ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
+ ctxt->loc_lcm = obd->u.filter.fo_lcm;
+ rc = llog_start_commit_thread(ctxt->loc_lcm);
+ llog_ctxt_put(ctxt);
+ if (rc)
+ GOTO(cleanup, rc);
rc = llog_setup(obd, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
&filter_size_orig_logops);
+
+cleanup:
+ if (rc) {
+ llog_cleanup_commit_master(filter->fo_lcm, 0);
+ OBD_FREE(filter->fo_lcm, sizeof(struct llog_commit_master));
+ filter->fo_lcm = NULL;
+ }
RETURN(rc);
}
int rc = 0, rc2 = 0;
ENTRY;
+ if (obd->u.filter.fo_lcm) {
+ llog_cleanup_commit_master((struct llog_commit_master *)
+ obd->u.filter.fo_lcm, 0);
+ OBD_FREE(obd->u.filter.fo_lcm,
+ sizeof(struct llog_commit_master));
+ obd->u.filter.fo_lcm = NULL;
+ }
+
ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
if (ctxt)
rc = llog_cleanup(ctxt);
/* flush any remaining cancel messages out to the target */
ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
err = llog_sync(ctxt, exp);
+ llog_ctxt_put(ctxt);
+
if (err)
CERROR("error flushing logs to MDS: rc %d\n", err);
oa->o_id);
/* If object already gone, cancel cookie right now */
if (oa->o_valid & OBD_MD_FLCOOKIE) {
+ struct llog_ctxt *ctxt;
fcc = obdo_logcookie(oa);
- llog_cancel(llog_get_context(obd, fcc->lgc_subsys + 1),
- NULL, 1, fcc, 0);
+ ctxt = llog_get_context(obd, fcc->lgc_subsys + 1);
+ llog_cancel(ctxt, NULL, 1, fcc, 0);
+ llog_ctxt_put(ctxt);
fcc = NULL; /* we didn't allocate fcc, don't free it */
}
GOTO(cleanup, rc = -ENOENT);
}
-
+
filter_prepare_destroy(obd, oa->o_id);
/* Our MDC connection is established by the MDS to us */
/* flush any remaining cancel messages out to the target */
ctxt = llog_get_context(exp->exp_obd, LLOG_MDS_OST_REPL_CTXT);
llog_sync(ctxt, exp);
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
/* setup llog imports */
ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
+ llog_ctxt_put(ctxt);
lquota_setinfo(filter_quota_interface_ref, exp, obd);
void *cb_data, int error)
{
struct llog_cookie *cookie = cb_data;
+ struct llog_ctxt *ctxt;
int rc;
- if (error != 0) {
- CDEBUG(D_INODE, "not cancelling llog cookie on error %d\n",
- error);
+ if (error != 0 || obd->obd_stopping) {
+ CDEBUG(D_INODE, "not cancel logcookie err %d stopping %d \n",
+ error, obd->obd_stopping);
OBD_FREE(cookie, sizeof(*cookie));
return;
}
- rc = llog_cancel(llog_get_context(obd, cookie->lgc_subsys + 1),
- NULL, 1, cookie, 0);
+ ctxt = llog_get_context(obd, cookie->lgc_subsys + 1);
+ if (!ctxt)
+ GOTO(out, rc = 0);
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT, 30);
+
+ rc = llog_cancel(ctxt, NULL, 1, cookie, 0);
if (rc)
CERROR("error cancelling log cookies: rc = %d\n", rc);
+out:
+ llog_ctxt_put(ctxt);
OBD_FREE(cookie, sizeof(*cookie));
}
lur = (struct llog_unlink_rec *)rec;
OBDO_ALLOC(oa);
- if (oa == NULL)
+ if (oa == NULL)
RETURN(-ENOMEM);
oa->o_valid |= OBD_MD_FLCOOKIE;
oa->o_id = lur->lur_oid;
int rc = 0;
ENTRY;
+ if (ctxt->loc_obd->obd_stopping)
+ RETURN(LLOG_PROC_BREAK);
+
if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
CERROR("log is not plain\n");
RETURN(-EINVAL);
}
+ OBD_FAIL_TIMEOUT(OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT, 30);
cookie.lgc_lgl = llh->lgh_id;
cookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
cookie.lgc_index = rec->lrh_index;
"ctxt %p: %d\n", ctxt, rc);
}
+ llog_ctxt_put(ctxt);
spin_lock(&imp->imp_lock);
imp->imp_server_timeout = 1;
imp->imp_pingable = 1;
if (obd->u.cli.cl_conn_count == 1)
/* flush any remaining cancel messages out to the target */
llog_sync(ctxt, exp);
+
+ llog_ctxt_put(ctxt);
rc = client_disconnect_export(exp);
return rc;
rc = cleanup_group_info();
if (rc)
RETURN(rc);
-
- rc = llog_start_commit_thread();
- if (rc < 0)
- RETURN(rc);
-
lprocfs_init_vars(ost, &lvars);
lprocfs_obd_setup(obd, lvars.obd_vars);
ctxt = llog_get_context(obd, req_body->lgdc_ctxt_idx);
rc = llog_connect(ctxt, 1, &req_body->lgdc_logid,
&req_body->lgdc_gen, NULL);
+
+ llog_ctxt_put(ctxt);
if (rc != 0)
CERROR("failed at llog_relp_connect\n");
lustre_swab_llogd_body);
if (body == NULL) {
CERROR ("Can't unpack llogd_body\n");
- GOTO(out, rc =-EFAULT);
+ RETURN(-EFAULT);
}
if (body->lgd_logid.lgl_oid > 0)
name = lustre_msg_string(req->rq_reqmsg, REQ_REC_OFF + 1, 0);
if (name == NULL) {
CERROR("Can't unpack name\n");
- GOTO(out, rc = -EFAULT);
+ RETURN(-EFAULT);
}
CDEBUG(D_INFO, "opening log %s\n", name);
}
ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
if (ctxt == NULL)
- GOTO(out, rc = -EINVAL);
+ RETURN(-EINVAL);
disk_obd = ctxt->loc_exp->exp_obd;
push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
rc = rc2;
out_pop:
pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-out:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
lustre_swab_llogd_body);
if (body == NULL) {
CERROR ("Can't unpack llogd_body\n");
- GOTO(out, rc =-EFAULT);
+ RETURN(-EFAULT);
}
if (body->lgd_logid.lgl_oid > 0)
ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
if (ctxt == NULL)
- GOTO(out, rc = -EINVAL);
+ RETURN(-EINVAL);
+
disk_obd = ctxt->loc_exp->exp_obd;
push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
llog_close(loghandle);
out_pop:
pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-out:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
lustre_swab_llogd_body);
if (body == NULL) {
CERROR ("Can't unpack llogd_body\n");
- GOTO(out, rc =-EFAULT);
+ RETURN(-EFAULT);
}
OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
if (!buf)
- GOTO(out, rc = -ENOMEM);
+ RETURN(-ENOMEM);
ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
if (ctxt == NULL)
out_pop:
pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
out_free:
OBD_FREE(buf, LLOG_CHUNK_SIZE);
-out:
RETURN(rc);
}
lustre_swab_llogd_body);
if (body == NULL) {
CERROR ("Can't unpack llogd_body\n");
- GOTO(out, rc =-EFAULT);
+ RETURN(-EFAULT);
}
OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
if (!buf)
- GOTO(out, rc = -ENOMEM);
+ RETURN(-ENOMEM);
ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
LASSERT(ctxt != NULL);
if (rc)
GOTO(out_close, rc);
-
rc = lustre_pack_reply(req, 3, size, NULL);
if (rc)
GOTO(out_close, rc = -ENOMEM);
out_pop:
pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+ llog_ctxt_put(ctxt);
OBD_FREE(buf, LLOG_CHUNK_SIZE);
-out:
RETURN(rc);
}
lustre_swab_llogd_body);
if (body == NULL) {
CERROR ("Can't unpack llogd_body\n");
- GOTO(out, rc =-EFAULT);
+ RETURN(-EFAULT);
}
ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
if (ctxt == NULL)
- GOTO(out, rc = -EINVAL);
+ RETURN(-EINVAL);
disk_obd = ctxt->loc_exp->exp_obd;
push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
rc2 = llog_close(loghandle);
if (!rc)
rc = rc2;
-
out_pop:
pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-
-out:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
struct obd_device *obd = req->rq_export->exp_obd;
struct obd_device *disk_obd;
struct llog_cookie *logcookies;
- struct llog_ctxt *ctxt;
+ struct llog_ctxt *ctxt = NULL;
int num_cookies, rc = 0, err, i;
struct lvfs_run_ctxt saved;
struct llog_handle *cathandle;
else
CDEBUG(D_RPCTRACE, "cancel %d llog-records\n", num_cookies);
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
EXPORT_SYMBOL(llog_origin_handle_cancel);
char *out = buf;
if (ctxt == NULL || mds == NULL)
- RETURN(-EOPNOTSUPP);
+ GOTO(release_ctxt, rc = -EOPNOTSUPP);
push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
}
out_pop:
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
+release_ctxt:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
{
static char *out = NULL;
static int remains = 0;
- struct llog_ctxt *ctxt;
+ struct llog_ctxt *ctxt = NULL;
struct llog_handle *handle;
struct llog_logid *logid;
struct llog_logid_rec *lir;
remains = cbd->remains;
cbd->init = 0;
}
- ctxt = cbd->ctxt;
- if (!(cat->lgh_hdr->llh_flags & LLOG_F_IS_CAT))
+ if (!(cat->lgh_hdr->llh_flags & LLOG_F_IS_CAT))
RETURN(-EINVAL);
+ if (!cbd->ctxt)
+ RETURN(-EINVAL);
+
lir = (struct llog_logid_rec *)rec;
logid = &lir->lid_id;
rc = llog_create(ctxt, &handle, logid, NULL);
struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
if (ctxt == NULL || mds == NULL)
- RETURN(-EOPNOTSUPP);
-
+ GOTO(release_ctxt, rc = -EOPNOTSUPP);
+
count = mds->mds_lov_desc.ld_tgt_count;
size = sizeof(*idarray) * count;
OBD_ALLOC(idarray, size);
if (!idarray)
- RETURN(-ENOMEM);
+ GOTO(release_ctxt, rc = -ENOMEM);
rc = llog_get_cat_list(obd, obd, name, count, idarray);
if (rc)
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
out_free:
OBD_FREE(idarray, size);
+release_ctxt:
+ llog_ctxt_put(ctxt);
+
RETURN(rc);
}
#endif /* LPROCFS */
/* recovd_thread.c */
-int llog_init_commit_master(void);
-int llog_cleanup_commit_master(int force);
static inline int opcode_offset(__u32 opc) {
if (opc < OST_LAST_OPC) {
GOTO(cleanup, rc);
cleanup_phase = 2;
- rc = llog_init_commit_master();
- if (rc)
- GOTO(cleanup, rc);
- cleanup_phase = 3;
-
ptlrpc_put_connection_superhack = ptlrpc_put_connection;
rc = ptlrpc_start_pinger();
if (rc)
GOTO(cleanup, rc);
- cleanup_phase = 4;
+ cleanup_phase = 3;
rc = ldlm_init();
if (rc)
cleanup:
switch(cleanup_phase) {
- case 4:
- ptlrpc_stop_pinger();
case 3:
- llog_cleanup_commit_master(1);
+ ptlrpc_stop_pinger();
case 2:
ptlrpc_cleanup_connection();
case 1:
ptlrpc_stop_pinger();
ptlrpc_exit_portals();
ptlrpc_cleanup_connection();
- llog_cleanup_commit_master(0);
}
/* connection.c */
#ifdef __KERNEL__
-static struct llog_commit_master lustre_lcm;
-static struct llog_commit_master *lcm = &lustre_lcm;
-
/* Allocate new commit structs in case we do not have enough.
* Make the llcd size small enough that it fits into a single page when we
* are sending/receiving it. */
-static int llcd_alloc(void)
+static int llcd_alloc(struct llog_commit_master *lcm)
{
struct llog_canceld_ctxt *llcd;
int llcd_size;
}
/* Get a free cookie struct from the list */
-struct llog_canceld_ctxt *llcd_grab(void)
+static struct llog_canceld_ctxt *llcd_grab(struct llog_commit_master *lcm)
{
struct llog_canceld_ctxt *llcd;
spin_lock(&lcm->lcm_llcd_lock);
if (list_empty(&lcm->lcm_llcd_free)) {
spin_unlock(&lcm->lcm_llcd_lock);
- if (llcd_alloc() < 0) {
+ if (llcd_alloc(lcm) < 0) {
CERROR("unable to allocate log commit data!\n");
return NULL;
}
return llcd;
}
-EXPORT_SYMBOL(llcd_grab);
static void llcd_put(struct llog_canceld_ctxt *llcd)
{
+ struct llog_commit_master *lcm = llcd->llcd_lcm;
+
+ llog_ctxt_put(llcd->llcd_ctxt);
if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
int llcd_size = llcd->llcd_size +
offsetof(struct llog_canceld_ctxt, llcd_cookies);
}
/* Send some cookies to the appropriate target */
-void llcd_send(struct llog_canceld_ctxt *llcd)
+static void llcd_send(struct llog_canceld_ctxt *llcd)
{
- spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
- list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending);
- spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
-
+ if (!(llcd->llcd_lcm->lcm_flags & LLOG_LCM_FL_EXIT)) {
+ spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
+ list_add_tail(&llcd->llcd_list,
+ &llcd->llcd_lcm->lcm_llcd_pending);
+ spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
+ }
cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1);
}
-EXPORT_SYMBOL(llcd_send);
/* deleted objects have a commit callback that cancels the MDS
* log record for the deletion. The commit callback calls this
if (count > 0 && cookies != NULL) {
if (llcd == NULL) {
- llcd = llcd_grab();
+ llcd = llcd_grab(ctxt->loc_lcm);
if (llcd == NULL) {
CERROR("couldn't get an llcd - dropped "LPX64
":%x+%u\n",
cookies->lgc_index);
GOTO(out, rc = -ENOMEM);
}
- llcd->llcd_ctxt = ctxt;
+ llcd->llcd_ctxt = llog_ctxt_get(ctxt);
ctxt->loc_llcd = llcd;
}
/* If we do not have enough pages available, allocate some */
while (atomic_read(&lcm->lcm_llcd_numfree) <
lcm->lcm_llcd_minfree) {
- if (llcd_alloc() < 0)
+ if (llcd_alloc(lcm) < 0)
break;
}
if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
- rc = llog_start_commit_thread();
+ rc = llog_start_commit_thread(lcm);
if (rc < 0)
CERROR("error starting thread: rc %d\n", rc);
}
return 0;
}
-int llog_start_commit_thread(void)
+int llog_start_commit_thread(struct llog_commit_master *lcm)
{
int rc;
ENTRY;
void *llpa_arg;
} llpa;
-int llog_init_commit_master(void)
+int llog_init_commit_master(struct llog_commit_master *lcm)
{
CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy);
CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle);
sema_init(&llpa.llpa_sem, 1);
return 0;
}
+EXPORT_SYMBOL(llog_init_commit_master);
-int llog_cleanup_commit_master(int force)
+int llog_cleanup_commit_master(struct llog_commit_master *lcm,
+ int force)
{
lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
if (force)
atomic_read(&lcm->lcm_thread_total) == 0);
return 0;
}
+EXPORT_SYMBOL(llog_cleanup_commit_master);
static int log_process_thread(void *args)
{
rc = llog_create(ctxt, &llh, &logid, NULL);
if (rc) {
CERROR("llog_create failed %d\n", rc);
- RETURN(rc);
+ GOTO(out, rc);
}
rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
if (rc) {
CERROR("llog_init_handle failed %d\n", rc);
- GOTO(out, rc);
+ GOTO(release_llh, rc);
}
if (cb) {
CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",
ctxt->loc_llcd, ctxt);
llog_sync(ctxt, NULL);
-out:
+
+release_llh:
rc = llog_cat_put(llh);
if (rc)
CERROR("llog_cat_put failed %d\n", rc);
-
+out:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
{
+ struct obd_device *obd = ctxt->loc_obd;
int rc;
ENTRY;
+ if (obd->obd_stopping)
+ RETURN(-ENODEV);
+
mutex_down(&llpa.llpa_sem);
- llpa.llpa_ctxt = ctxt;
llpa.llpa_cb = handle;
llpa.llpa_arg = arg;
-
+ llpa.llpa_ctxt = llog_get_context(ctxt->loc_obd, ctxt->loc_idx);
+ if (!llpa.llpa_ctxt) {
+ up(&llpa.llpa_sem);
+ RETURN(-ENODEV);
+ }
rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
if (rc < 0)
CERROR("error starting log_process_thread: %d\n", rc);
mutex_down(&ctxt->loc_sem);
ctxt->loc_gen = *gen;
- llcd = llcd_grab();
+ llcd = llcd_grab(ctxt->loc_lcm);
if (llcd == NULL) {
CERROR("couldn't get an llcd\n");
mutex_up(&ctxt->loc_sem);
RETURN(-ENOMEM);
}
- llcd->llcd_ctxt = ctxt;
+ llcd->llcd_ctxt = llog_ctxt_get(ctxt);
ctxt->loc_llcd = llcd;
mutex_up(&ctxt->loc_sem);
set -e
-# bug number: 6088 10124 10800
-ALWAYS_EXCEPT="8 15c 17 $REPLAY_DUAL_EXCEPT"
+# bug number: 6088 10124
+ALWAYS_EXCEPT="8 15c $REPLAY_DUAL_EXCEPT"
PTLDEBUG=${PTLDEBUG:--1}
LUSTRE=${LUSTRE:-`dirname $0`/..}
}
run_test 60 "test llog post recovery init vs llog unlink"
+#test race llog recovery thread vs llog cleanup
+test_61() {
+ mkdir $DIR/$tdir
+ createmany -o $DIR/$tdir/$tfile-%d 800
+ replay_barrier ost1
+# OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT 0x221
+ unlinkmany $DIR/$tdir/$tfile-%d 800
+ do_facet ost "sysctl -w lustre.fail_loc=0x80000221"
+ facet_failover ost1
+ sleep 10
+ fail ost1
+ sleep 30
+ do_facet ost "sysctl -w lustre.fail_loc=0x0"
+ $CHECKSTAT -t file $DIR/$tdir/$tfile-* && return 1
+ rmdir $DIR/$tdir
+}
+run_test 61 "test race llog recovery vs llog cleanup"
+
+#test race mds llog sync vs llog cleanup
+test_61b() {
+# OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT 0x13a
+ do_facet mds "sysctl -w lustre.fail_loc=0x8000013a"
+ facet_failover mds
+ sleep 10
+ fail mds
+ do_facet client dd if=/dev/zero of=$DIR/$tfile bs=4k count=1 || return 1
+}
+run_test 61b "test race mds llog sync vs llog cleanup"
+
+#test race cancel cookie cb vs llog cleanup
+test_61c() {
+# OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT 0x222
+ touch $DIR/$tfile
+ do_facet ost "sysctl -w lustre.fail_loc=0x80000222"
+ rm $DIR/$tfile
+ sleep 10
+ fail ost1
+}
+run_test 61c "test race mds llog sync vs llog cleanup"
+
equals_msg `basename $0`: test complete, cleaning up
$CLEANUP