struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
struct llog_cookie *logcookies, int numcookies);
-int llog_cat_initialize(struct obd_device *obd, int idx,
- struct obd_uuid *uuid);
int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
- int count, struct llog_catid *logid, struct obd_uuid *uuid);
+ int *idx);
int obd_llog_finish(struct obd_device *obd, int count);
int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
char *name, int idx, int count, struct llog_catid *idarray);
+#define LLOG_CTXT_FLAG_UNINITIALIZED 0x00000001
+
struct llog_ctxt {
int loc_idx; /* my index the obd array of ctxt's */
struct llog_gen loc_gen;
atomic_t loc_refcount;
struct llog_commit_master *loc_lcm;
void *llog_proc_cb;
+ long loc_flags; /* flags, see above defines */
};
#define LCM_NAME_SIZE 64
struct lvfs_run_ctxt obd_lvfs_ctxt;
struct llog_ctxt *obd_llog_ctxt[LLOG_MAX_CTXTS];
- struct semaphore obd_llog_alloc;
struct semaphore obd_llog_cat_process;
cfs_waitq_t obd_llog_waitq;
/* llog related obd_methods */
int (*o_llog_init)(struct obd_device *obd, struct obd_device *disk_obd,
- int count, struct llog_catid *logid,
- struct obd_uuid *uuid);
+ int *idx);
int (*o_llog_finish)(struct obd_device *obd, int count);
/* metadata-only methods */
/* lov_log.c */
int lov_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *logid, struct obd_uuid *uuid);
+ int *index);
int lov_llog_finish(struct obd_device *obd, int count);
/* lov_pack.c */
lop_cancel: lov_llog_repl_cancel
};
-int lov_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *logid, struct obd_uuid *uuid)
+int lov_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
+ int *index)
{
struct lov_obd *lov = &obd->u.lov;
struct obd_device *child;
- int i, rc = 0, err = 0;
+ int i, rc = 0;
ENTRY;
- LASSERT(uuid);
-
- rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, disk_obd, 0, NULL,
&lov_mds_ost_orig_logops);
if (rc)
RETURN(rc);
- rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, disk_obd, 0, NULL,
&lov_size_repl_logops);
if (rc)
GOTO(err_cleanup, rc);
obd_getref(obd);
for (i = 0; i < lov->desc.ld_tgt_count ; i++) {
- if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active)
+ if (!lov->lov_tgts[i])
continue;
- if (!obd_uuid_equals(uuid, &lov->lov_tgts[i]->ltd_uuid))
+
+ if (index && i != *index)
continue;
- CDEBUG(D_CONFIG, "init %d/%d\n", i, count);
- LASSERT(lov->lov_tgts[i]->ltd_exp);
- child = lov->lov_tgts[i]->ltd_exp->exp_obd;
- rc = obd_llog_init(child, tgt, 1, logid, uuid);
+
+ CDEBUG(D_CONFIG, "init %s\n", lov->lov_tgts[i]->ltd_uuid.uuid);
+ child = class_find_client_obd(&lov->lov_tgts[i]->ltd_uuid,
+ LUSTRE_OSC_NAME, &obd->obd_uuid);
+ if (!child) {
+ CERROR("Can't find osc\n");
+ continue;
+ }
+
+ rc = obd_llog_init(child, disk_obd, &i);
if (rc) {
CERROR("error osc_llog_init idx %d osc '%s' tgt '%s' "
- "(rc=%d)\n", i, child->obd_name, tgt->obd_name,
+ "(rc=%d)\n", i, child->obd_name, disk_obd->obd_name,
rc);
- if (!err)
- err = rc;
+ rc = 0;
}
}
obd_putref(obd);
- GOTO(err_cleanup, err);
+ GOTO(err_cleanup, rc);
err_cleanup:
- if (err) {
- struct llog_ctxt *ctxt =
+ if (rc) {
+ struct llog_ctxt *ctxt =
llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
if (ctxt)
llog_cleanup(ctxt);
if (ctxt)
llog_cleanup(ctxt);
}
- return err;
+ return rc;
}
int lov_llog_finish(struct obd_device *obd, int count)
if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0)
ptlrpc_lprocfs_register_obd(obd);
- rc = obd_llog_init(obd, obd, 0, NULL, NULL);
+ rc = obd_llog_init(obd, obd, NULL);
if (rc) {
mdc_cleanup(obd);
CERROR("failed to setup llogging subsystems\n");
}
-static int mdc_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *logid,
- struct obd_uuid *uuid)
+static int mdc_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
+ int *index)
{
struct llog_ctxt *ctxt;
int rc;
ENTRY;
- rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, disk_obd, 0, NULL,
&llog_client_ops);
if (rc == 0) {
ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
llog_ctxt_put(ctxt);
}
- rc = llog_setup(obd, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, LLOG_LOVEA_REPL_CTXT, disk_obd, 0, NULL,
&llog_client_ops);
if (rc == 0) {
ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
int mds_log_op_setattr(struct obd_device *obd, struct inode *inode,
struct lov_mds_md *lmm, int lmm_size,
struct llog_cookie *logcookies, int cookies_size);
-int mds_llog_init(struct obd_device *obd, struct obd_device *tgt, int count,
- struct llog_catid *logid, struct obd_uuid *uuid);
+int mds_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
+ int *index);
int mds_llog_finish(struct obd_device *obd, int count);
/* mds/mds_lov.c */
lop_cancel: mds_llog_repl_cancel,
};
-int mds_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *logid, struct obd_uuid *uuid)
+int mds_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
+ int *index)
{
struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
struct llog_ctxt *ctxt;
int rc;
ENTRY;
- rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, disk_obd, 0, NULL,
&mds_ost_orig_logops);
if (rc)
RETURN(rc);
- rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, disk_obd, 0, NULL,
&mds_size_repl_logops);
if (rc)
GOTO(err_llog, rc);
- rc = obd_llog_init(lov_obd, tgt, count, logid, uuid);
+ rc = obd_llog_init(lov_obd, disk_obd, index);
if (rc) {
CERROR("lov_llog_init err %d\n", rc);
GOTO(err_cleanup, rc);
/* If we added a target we have to reconnect the llogs */
/* We only _need_ to do this at first add (idx), or the first time
after recovery. However, it should now be safe to call anytime. */
- rc = llog_cat_initialize(obd, index, uuid);
+ rc = obd_llog_init(obd, obd, (void *)&index);
out:
OBD_FREE(ld, sizeof(*ld));
GOTO(error_exit, rc);
}
+ rc = obd_llog_init(obd, obd, NULL);
+ if (rc) {
+ CERROR("MDS cannot register as observer of LOV %s (%d)\n",
+ lov_name, rc);
+ GOTO(error_exit, rc);
+ }
+
OBD_ALLOC(data, sizeof(*data));
if (data == NULL)
RETURN(-ENOMEM);
push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
rc = llog_ioctl(ctxt, cmd, data);
pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
- llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
+
+ rc = obd_llog_init(obd, obd, NULL);
llog_ctxt_put(ctxt);
rc2 = obd_set_info_async(mds->mds_osc_exp,
sizeof(KEY_MDS_CONN), KEY_MDS_CONN,
if (rc)
GOTO(err_decref, rc);
- rc = obd_llog_init(obd, obd, 0, NULL, NULL);
+ rc = obd_llog_init(obd, obd, NULL);
if (rc) {
CERROR("failed to setup llogging subsystems\n");
GOTO(err_cleanup, rc);
RETURN(rc);
}
-static int mgc_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *logid,
- struct obd_uuid *uuid)
+static int mgc_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
+ int *index)
{
struct llog_ctxt *ctxt;
int rc;
ENTRY;
- rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, disk_obd, 0, NULL,
&llog_client_ops);
if (rc == 0) {
ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
if (rc)
GOTO(err_decref, rc);
- rc = obd_llog_init(obd, obd, 0, NULL, NULL);
+ rc = obd_llog_init(obd, obd, NULL);
if (rc) {
CERROR("failed to setup llogging subsystems\n");
GOTO(err_cleanup, rc);
RETURN(rc);
}
-static int mgc_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *logid,
- struct obd_uuid *uuid)
+static int mgc_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
+ int *index)
{
struct llog_ctxt *ctxt;
int rc;
ENTRY;
- rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, disk_obd, 0, NULL,
&llog_lvfs_ops);
if (rc)
RETURN(rc);
- rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, disk_obd, 0, NULL,
&llog_client_ops);
if (rc == 0) {
ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
int count, struct llog_logid *logid, struct llog_operations *op)
{
int rc = 0;
- struct llog_ctxt *ctxt;
+ struct llog_ctxt *ctxt, *old_ctxt;
ENTRY;
if (index < 0 || index >= LLOG_MAX_CTXTS)
RETURN(-EFAULT);
- /* someone can call lov_llog_init with NULL uuid - this can produce
- * parallel enter to this function */
- mutex_down(&obd->obd_llog_alloc);
- ctxt = llog_get_context(obd, index);
- if (ctxt) {
- /* mds_lov_update_mds might call here multiple times. So if the
- llog is already set up then don't to do it again. */
- CDEBUG(D_CONFIG, "obd %s ctxt %d already set up\n",
- obd->obd_name, index);
- LASSERT(ctxt->loc_obd == obd);
- LASSERT(ctxt->loc_exp == disk_obd->obd_self_export);
- LASSERT(ctxt->loc_logops == op);
- llog_ctxt_put(ctxt);
- GOTO(out, rc = 0);
- }
-
ctxt = llog_new_ctxt(obd);
if (!ctxt)
GOTO(out, rc = -ENOMEM);
- obd->obd_llog_ctxt[index] = ctxt;
ctxt->loc_exp = class_export_get(disk_obd->obd_self_export);
ctxt->loc_idx = index;
ctxt->loc_logops = op;
+ ctxt->loc_flags = LLOG_CTXT_FLAG_UNINITIALIZED;
sema_init(&ctxt->loc_sem, 1);
+ /* sync with other llog ctxt user thread */
+ spin_lock(&obd->obd_dev_lock);
+ old_ctxt = obd->obd_llog_ctxt[index];
+ if (old_ctxt) {
+ /* mds_lov_update_mds might call here multiple times. So if the
+ llog is already set up then don't to do it again. */
+ CDEBUG(D_CONFIG, "obd %s ctxt %d already set up\n",
+ obd->obd_name, index);
+ LASSERT(old_ctxt->loc_obd == obd);
+ LASSERT(old_ctxt->loc_exp == disk_obd->obd_self_export);
+ LASSERT(old_ctxt->loc_logops == op);
+ spin_unlock(&obd->obd_dev_lock);
+
+ llog_ctxt_destroy(ctxt);
+ ctxt = old_ctxt;
+ GOTO(out, rc = 0);
+ }
+
+ obd->obd_llog_ctxt[index] = ctxt;
+ spin_unlock(&obd->obd_dev_lock);
+
if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LLOG_SETUP)) {
rc = -EOPNOTSUPP;
} else {
CERROR("obd %s ctxt %d lop_setup=%p failed %d\n",
obd->obd_name, index, op->lop_setup, rc);
llog_ctxt_put(ctxt);
+ } else {
+ CDEBUG(D_CONFIG, "obd %s ctxt %d is initialized\n",
+ obd->obd_name, index);
+ ctxt->loc_flags &= ~LLOG_CTXT_FLAG_UNINITIALIZED;
}
out:
- mutex_up(&obd->obd_llog_alloc);
RETURN(rc);
}
EXPORT_SYMBOL(llog_setup);
CERROR("No ctxt\n");
RETURN(-ENODEV);
}
-
+
+ if (ctxt->loc_flags & LLOG_CTXT_FLAG_UNINITIALIZED)
+ RETURN(-ENXIO);
+
CTXT_CHECK_OP(ctxt, add, -EOPNOTSUPP);
raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
if (!raised)
}
EXPORT_SYMBOL(llog_obd_origin_add);
-int llog_cat_initialize(struct obd_device *obd, int idx,
- struct obd_uuid *uuid)
-{
- struct llog_catid idarray;
- char name[32] = CATLIST;
- int rc;
- ENTRY;
-
- mutex_down(&obd->obd_llog_cat_process);
- rc = llog_get_cat_list(obd, obd, name, idx, 1, &idarray);
- if (rc) {
- CERROR("rc: %d\n", rc);
- GOTO(out, rc);
- }
-
- CDEBUG(D_INFO, "%s: Init llog for %s/%d - catid "LPX64"/"LPX64":%x\n",
- obd->obd_name, uuid->uuid, idx, idarray.lci_logid.lgl_oid,
- idarray.lci_logid.lgl_ogr, idarray.lci_logid.lgl_ogen);
-
- rc = obd_llog_init(obd, obd, 1, &idarray, uuid);
- if (rc) {
- CERROR("rc: %d\n", rc);
- GOTO(out, rc);
- }
-
- rc = llog_put_cat_list(obd, obd, name, idx, 1, &idarray);
- if (rc) {
- CERROR("rc: %d\n", rc);
- GOTO(out, rc);
- }
-
- out:
- mutex_up(&obd->obd_llog_cat_process);
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(llog_cat_initialize);
-
int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
- int count, struct llog_catid *logid, struct obd_uuid *uuid)
+ int *index)
{
int rc;
ENTRY;
OBD_CHECK_OP(obd, llog_init, 0);
OBD_COUNTER_INCREMENT(obd, llog_init);
- rc = OBP(obd, llog_init)(obd, disk_obd, count, logid, uuid);
+ rc = OBP(obd, llog_init)(obd, disk_obd, index);
RETURN(rc);
}
EXPORT_SYMBOL(obd_llog_init);
}
-static int llog_test_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *logid,
- struct obd_uuid *uuid)
+static int llog_test_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
+ int *index)
{
int rc;
ENTRY;
- rc = llog_setup(obd, LLOG_TEST_ORIG_CTXT, tgt, 0, NULL, &llog_lvfs_ops);
+ rc = llog_setup(obd, LLOG_TEST_ORIG_CTXT, disk_obd, 0, NULL, &llog_lvfs_ops);
RETURN(rc);
}
RETURN(-EINVAL);
}
- rc = obd_llog_init(obd, tgt, 0, NULL, NULL);
+ rc = obd_llog_init(obd, tgt, NULL);
if (rc)
RETURN(rc);
cfs_waitq_init(&obd->obd_next_transno_waitq);
cfs_waitq_init(&obd->obd_evict_inprogress_waitq);
cfs_waitq_init(&obd->obd_llog_waitq);
- init_mutex(&obd->obd_llog_alloc);
init_mutex(&obd->obd_llog_cat_process);
CFS_INIT_LIST_HEAD(&obd->obd_recovery_queue);
CFS_INIT_LIST_HEAD(&obd->obd_delayed_reply_queue);
ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
"filter_ldlm_cb_client", &obd->obd_ldlm_client);
- rc = obd_llog_init(obd, obd, 1, NULL, NULL);
+ rc = obd_llog_init(obd, obd, NULL);
if (rc) {
CERROR("failed to setup llogging subsystems\n");
GOTO(err_post, rc);
lop_add: llog_obd_origin_add
};
-static int filter_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *catid,
- struct obd_uuid *uuid)
+static int filter_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
+ int *index)
{
struct filter_obd *filter = &obd->u.filter;
struct llog_ctxt *ctxt;
filter_mds_ost_repl_logops.lop_connect = llog_obd_repl_connect;
filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
- rc = llog_setup(obd, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, LLOG_MDS_OST_REPL_CTXT, disk_obd, 0, NULL,
&filter_mds_ost_repl_logops);
if (rc)
GOTO(cleanup_lcm, rc);
ctxt->loc_lcm = filter->fo_lcm;
llog_ctxt_put(ctxt);
- rc = llog_setup(obd, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
+ rc = llog_setup(obd, LLOG_SIZE_ORIG_CTXT, disk_obd, 0, NULL,
&filter_size_orig_logops);
if (rc)
GOTO(cleanup_ctxt, rc);
};
static struct llog_operations osc_mds_ost_orig_logops;
-static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
- int count, struct llog_catid *catid,
- struct obd_uuid *uuid)
+static int osc_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
+ int *index)
{
+ struct llog_catid catid;
+ static char name[32] = CATLIST;
int rc;
ENTRY;
- spin_lock(&obd->obd_dev_lock);
- if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
- osc_mds_ost_orig_logops = llog_lvfs_ops;
- osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
- osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
- osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
- osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
+ LASSERT(index);
+
+ mutex_down(&disk_obd->obd_llog_cat_process);
+
+ rc = llog_get_cat_list(disk_obd, disk_obd, name, *index, 1, &catid);
+ if (rc) {
+ CERROR("rc: %d\n", rc);
+ GOTO(out_unlock, rc);
}
- spin_unlock(&obd->obd_dev_lock);
+#if 0
+ CDEBUG(D_INFO, "%s: Init llog for %s/%d - catid "LPX64"/"LPX64":%x\n",
+ obd->obd_name, uuid->uuid, idx, catid.lci_logid.lgl_oid,
+ catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
+#endif
- rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
- &catid->lci_logid, &osc_mds_ost_orig_logops);
+ rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, disk_obd, 1,
+ &catid.lci_logid, &osc_mds_ost_orig_logops);
if (rc) {
CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
GOTO (out, rc);
}
- rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
+ rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, disk_obd, 1, NULL,
&osc_size_repl_logops);
if (rc) {
struct llog_ctxt *ctxt =
}
out:
if (rc) {
- CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
- obd->obd_name, tgt->obd_name, count, catid, rc);
+ CERROR("osc '%s' tgt '%s' rc=%d\n",
+ obd->obd_name, disk_obd->obd_name, rc);
CERROR("logid "LPX64":0x%x\n",
- catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
+ catid.lci_logid.lgl_oid, catid.lci_logid.lgl_ogen);
+ } else {
+ rc = llog_put_cat_list(disk_obd, disk_obd, name, *index, 1,
+ &catid);
+ if (rc)
+ CERROR("rc: %d\n", rc);
}
+out_unlock:
+ mutex_up(&disk_obd->obd_llog_cat_process);
+
RETURN(rc);
}
RETURN(rc);
}
+ osc_mds_ost_orig_logops = llog_lvfs_ops;
+ osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
+ osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
+ osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
+ osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
+
RETURN(rc);
}