* array of UUIDs returned by the MDS. With the current
* protocol, this will limit the max number of OSTs per LOV */
+#define LOV_IDX_MAGIC 0x80000000
#define LOV_DESC_MAGIC 0xB0CCDE5C
struct lov_desc {
int mds_has_lov_desc;
struct lov_desc mds_lov_desc;
obd_id *mds_lov_objids;
- int mds_lov_objids_valid;
+ int mds_lov_objids_size;
+ int mds_lov_objids_red;
int mds_lov_nextid_set;
struct file *mds_lov_objid_filp;
struct file *mds_health_check_filp;
enum obd_import_event);
int (*o_notify)(struct obd_device *obd, struct obd_device *watched,
- enum obd_notify_event ev);
+ enum obd_notify_event ev, void *data);
int (*o_health_check)(struct obd_device *);
static inline int obd_notify(struct obd_device *obd,
struct obd_device *watched,
- enum obd_notify_event ev)
+ enum obd_notify_event ev,
+ void *data)
{
OBD_CHECK_DEV(obd);
if (!obd->obd_set_up) {
}
OBD_COUNTER_INCREMENT(obd, notify);
- return OBP(obd, notify)(obd, watched, ev);
+ return OBP(obd, notify)(obd, watched, ev, data);
}
static inline int obd_notify_observer(struct obd_device *observer,
struct obd_device *observed,
- enum obd_notify_event ev)
+ enum obd_notify_event ev,
+ void *data)
{
int rc1;
int rc2;
struct obd_notify_upcall *onu;
if (observer->obd_observer)
- rc1 = obd_notify(observer->obd_observer, observed, ev);
+ rc1 = obd_notify(observer->obd_observer, observed, ev, data);
else
rc1 = 0;
/*
else
rc2 = 0;
- return rc1 ?: rc2;
+ return rc1 ? rc1 : rc2;
}
static inline int obd_quotacheck(struct obd_export *exp,
rc = llog_connect(cctxt, 1, logid, gen, uuid);
if (rc) {
- CERROR("error osc_llog_connect %d\n", i);
+ CERROR("error osc_llog_connect tgt %d (%d)\n", i, rc);
break;
}
}
}
static int lov_notify(struct obd_device *obd, struct obd_device *watched,
- enum obd_notify_event ev)
+ enum obd_notify_event ev, void *data)
{
int rc;
struct obd_uuid *uuid;
}
/* Pass the notification up the chain. */
- rc = obd_notify_observer(obd, watched, ev);
+ rc = obd_notify_observer(obd, watched, ev, data);
RETURN(rc);
}
{
struct lov_obd *lov = &obd->u.lov;
struct lov_tgt_desc *tgt;
- struct obd_export *exp_observer;
__u32 bufsize;
- __u32 size;
- obd_id params[2];
- int rc, old_count;
+ int rc;
ENTRY;
CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
/* XXX - add a sanity check on the generation number. */
tgt->ltd_gen = gen;
- old_count = lov->desc.ld_tgt_count;
if (index >= lov->desc.ld_tgt_count)
lov->desc.ld_tgt_count = index + 1;
if (osc_obd)
osc_obd->obd_no_recov = 0;
}
-
+
/* NULL may need to change when we use flags for osc's */
rc = lov_connect_obd(obd, tgt, 1, NULL);
- if (rc || !obd->obd_observer)
- RETURN(rc);
-
- /* tell the mds_lov about the new target */
- obd_llog_finish(obd->obd_observer, old_count);
- llog_cat_initialize(obd->obd_observer, lov->desc.ld_tgt_count);
-
- params[0] = index;
- /* FIXME we must try to mds_lov_read_objids insetad of obd_get_info
- to allow the mdt to start before the ost */
- rc = obd_get_info(tgt->ltd_exp, strlen("last_id"), "last_id", &size,
- ¶ms[1]);
if (rc)
GOTO(out, rc);
- exp_observer = obd->obd_observer->obd_self_export;
- rc = obd_set_info(exp_observer, strlen("next_id"), "next_id",
- sizeof(params), params);
- if (rc)
- GOTO(out, rc);
+ /* Crazy high index, catch collision with flag here */
+ LASSERT((index & LOV_IDX_MAGIC) == 0);
- rc = lov_notify(obd, tgt->ltd_exp->exp_obd, OBD_NOTIFY_ACTIVE);
-
- /* if we added after the disconnect */
- if (lov->connects == 0) {
- CERROR("Disconnected\n");
- rc = -ENODEV;
- }
+ rc = lov_notify(obd, tgt->ltd_exp->exp_obd, OBD_NOTIFY_ACTIVE,
+ (void *)(index | LOV_IDX_MAGIC));
out:
if (rc) {
break;
}
case IMP_EVENT_INACTIVE: {
- rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
+ rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
break;
}
case IMP_EVENT_INVALIDATE: {
break;
}
case IMP_EVENT_ACTIVE: {
- rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
+ rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
break;
}
case IMP_EVENT_OCD:
mds_quota_cleanup(mds);
mds_update_server_data(obd, 1);
- if (mds->mds_lov_objids != NULL) {
- OBD_FREE(mds->mds_lov_objids,
- mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id));
- }
+ if (mds->mds_lov_objids != NULL)
+ OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
mds_fs_cleanup(obd);
upcall_cache_cleanup(mds->mds_group_hash);
return rc;
}
+#if 0
static int mds_set_info(struct obd_export *exp, obd_count keylen,
void *key, obd_count vallen, void *val)
{
if (vallen != sizeof(obd_id) * 2)
RETURN(-EINVAL);
-
- if (idx > mds->mds_lov_desc.ld_tgt_count)
+ if (idx >= mds->mds_lov_desc.ld_tgt_count)
RETURN(-EINVAL);
-
- /* FIXME realloc mds_lov_objids --
- see HEAD mds_dt_update_desc */
- LASSERT("FIXME must realloc mds_lov_objids");
+ if (idx >= mds->mds_lov_desc.ld_tgt_count) {
+ obd_id *ids;
+ int size;
+
+ size = mds->mds_lov_desc.ld_tgt_count * sizeof(*ids);
+ OBD_ALLOC(ids, size);
+ if (ids == NULL)
+ RETURN(-ENOMEM);
+
+ memset(ids, 0, size);
+
+ if (mds->mds_lov_objids != NULL) {
+ int oldsize = mds->mds_lov_desc.ld_tgt_count *
+ sizeof(*ids);
+ memcpy(ids, mds->mds_lov_objids, oldsize);
+ OBD_FREE(mds->mds_lov_objids, oldsize);
+ }
+ mds->mds_lov_objids = ids;
+ mds->mds_lov_desc.ld_tgt_count =
+ mds->mds_lov_desc.ld_tgt_count;
+ }
+
mds->mds_lov_objids[idx] = id;
- //mds->mds_lov_objids_valid = 1;
CWARN("got last object "LPU64" from OST %d\n",
mds->mds_lov_objids[idx], idx);
RETURN(rc);
}
-
+#endif
struct lvfs_callback_ops mds_lvfs_ops = {
l_fid2dentry: mds_lvfs_fid2dentry,
.o_llog_finish = mds_llog_finish,
.o_notify = mds_notify,
.o_health_check = mds_health_check,
- .o_set_info = mds_set_info,
};
static struct obd_ops mdt_obd_ops = {
int nonblock);
int mds_post_mds_lovconf(struct obd_device *obd);
int mds_notify(struct obd_device *obd, struct obd_device *watched,
- enum obd_notify_event ev);
+ enum obd_notify_event ev, void *data);
int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
struct lov_mds_md *lmm, int lmm_size);
void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
struct mds_obd *mds = &obd->u.mds;
obd_id *ids;
loff_t off = 0;
- int i, rc, size = mds->mds_lov_desc.ld_tgt_count * sizeof(*ids);
+ int i, rc, size;
ENTRY;
- if (mds->mds_lov_objids != NULL)
+ LASSERT(!mds->mds_lov_objids_size);
+
+ /* Read everything in the file, even if our current lov desc
+ has fewer targets. Old targets not in the lov descriptor
+ during mds setup may still have valid objids. */
+ size = mds->mds_lov_objid_filp->f_dentry->d_inode->i_size;
+ if (size == 0)
RETURN(0);
OBD_ALLOC(ids, size);
if (ids == NULL)
RETURN(-ENOMEM);
mds->mds_lov_objids = ids;
+ mds->mds_lov_objids_size = size;
- if (mds->mds_lov_objid_filp->f_dentry->d_inode->i_size == 0)
- RETURN(0);
rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
if (rc < 0) {
CERROR("Error reading objids %d\n", rc);
} else {
- mds->mds_lov_objids_valid = 1;
+ mds->mds_lov_objids_red = size / sizeof(*ids);
rc = 0;
}
- for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
+ for (i = 0; i < mds->mds_lov_objids_red; i++)
CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
mds->mds_lov_objids[i], i);
RETURN(rc);
}
+/* Update the lov desc for a new size lov.
+ From HEAD mds_dt_lov_update_desc (but fixed) */
+static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ struct lov_desc *ld;
+ __u32 valsize = sizeof(mds->mds_lov_desc);
+ int rc = 0, i, size;
+ ENTRY;
+
+ OBD_ALLOC(ld, sizeof(*ld));
+ if (!ld)
+ RETURN(-ENOMEM);
+
+ rc = obd_get_info(lov, strlen("lovdesc") + 1, "lovdesc",
+ &valsize, ld);
+ if (rc)
+ GOTO(out, rc);
+
+ /* The size of the LOV target table may have increased. */
+ size = ld->ld_tgt_count * sizeof(obd_id);
+ if ((mds->mds_lov_objids_size == 0) ||
+ (size > mds->mds_lov_objids_size)) {
+ obd_id *ids;
+
+ /* add room for a bunch at a time */
+ size = (ld->ld_tgt_count + 8) * sizeof(obd_id);
+
+ OBD_ALLOC(ids, size);
+ if (ids == NULL)
+ GOTO(out, rc = -ENOMEM);
+ memset(ids, 0, size);
+ if (mds->mds_lov_objids_size) {
+ memcpy(ids, mds->mds_lov_objids,
+ mds->mds_lov_objids_size);
+ OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
+ }
+ mds->mds_lov_objids = ids;
+ mds->mds_lov_objids_size = size;
+ }
+
+ /* Only update the tgt count after we have an objid ready.
+ The objid may invalidly be 0 right now - is that a problem? */
+ mds->mds_lov_desc = *ld;
+
+ i = lov_mds_md_size(mds->mds_lov_desc.ld_tgt_count);
+ if (i > mds->mds_max_mdsize)
+ mds->mds_max_mdsize = i;
+ mds->mds_max_cookiesize = mds->mds_lov_desc.ld_tgt_count *
+ sizeof(struct llog_cookie);
+ mds->mds_has_lov_desc = 1;
+out:
+ OBD_FREE(ld, sizeof(*ld));
+ RETURN(rc);
+}
+
int mds_lov_connect(struct obd_device *obd, char * lov_name)
{
struct mds_obd *mds = &obd->u.mds;
struct lustre_handle conn = {0,};
- int valsize;
int rc, i;
ENTRY;
GOTO(err_discon, rc);
}
- valsize = sizeof(mds->mds_lov_desc);
- rc = obd_get_info(mds->mds_osc_exp, strlen("lovdesc") + 1, "lovdesc",
- &valsize, &mds->mds_lov_desc);
- if (rc)
- GOTO(err_reg, rc);
-
- mds->mds_max_mdsize = lov_mds_md_size(mds->mds_lov_desc.ld_tgt_count);
- mds->mds_max_cookiesize = mds->mds_lov_desc.ld_tgt_count*
- sizeof(struct llog_cookie);
- mds->mds_has_lov_desc = 1;
rc = mds_lov_read_objids(obd);
if (rc) {
CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
GOTO(err_reg, rc);
}
+ rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
+ if (rc)
+ GOTO(err_reg, rc);
+
/* tgt_count may be 0! */
rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
if (rc) {
/* If we're mounting this code for the first time on an existing FS,
* we need to populate the objids array from the real OST values */
- if (!mds->mds_lov_objids_valid) {
+ if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_red) {
int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
"last_id", &size, mds->mds_lov_objids);
for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
CWARN("got last object "LPU64" from OST %d\n",
mds->mds_lov_objids[i], i);
- mds->mds_lov_objids_valid = 1;
rc = mds_lov_write_objids(obd);
if (rc)
CERROR("got last objids from OSTs, but error "
struct obd_uuid *mlsi_uuid; /* target to sync */
};
-static int __mds_lov_syncronize(void *data)
+static int __mds_lov_synchronize(void *data)
{
struct mds_lov_sync_info *mlsi = data;
struct obd_device *obd;
SIGNAL_MASK_UNLOCK(current, flags);
unlock_kernel();
- return (__mds_lov_syncronize(data));
+ return (__mds_lov_synchronize(data));
}
int mds_lov_start_synchronize(struct obd_device *obd, struct obd_uuid *uuid,
/* Although class_export_get(obd->obd_self_export) would lock
the MDS in place, since it's only a self-export
it doesn't lock the LOV in place. The LOV can be disconnected
- during MDS precleanup, leaving nothing for __mds_lov_syncronize.
+ during MDS precleanup, leaving nothing for __mds_lov_synchronize.
Simply taking an export ref on the LOV doesn't help, because it's
still disconnected. Taking an obd reference insures that we don't
disconnect the LOV. This of course means a cleanup won't
rc = 0;
}
} else {
- rc = __mds_lov_syncronize((void *)mlsi);
+ rc = __mds_lov_synchronize((void *)mlsi);
}
RETURN(rc);
}
int mds_notify(struct obd_device *obd, struct obd_device *watched,
- enum obd_notify_event ev)
+ enum obd_notify_event ev, void *data)
{
struct obd_uuid *uuid;
+ int idx = (int)data;
int rc = 0;
ENTRY;
}
uuid = &watched->u.cli.cl_import->imp_target_uuid;
+
if (obd->obd_recovering) {
CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
obd->obd_name, uuid->uuid);
- } else {
- LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
- rc = mds_lov_start_synchronize(obd, uuid, 1);
+ RETURN(rc);
+ }
+
+ /* FIXME Put all this in the sync, so that we don't change the
+ tgt_count out from under it. We also can't run multiple
+ sync threads simultaneously. This means we need a single
+ permanent sync thread that we occationally wake to re-sync.
+ Sigh, but for a single uuid or a whole lov? */
+ /* Tell the mds_lov about the new target */
+ if (idx & LOV_IDX_MAGIC) {
+ struct mds_obd *mds = &obd->u.mds;
+ int old_count = mds->mds_lov_desc.ld_tgt_count;
+
+ idx &= ~LOV_IDX_MAGIC;
+ //FIXME remove D_ERROR
+ CDEBUG(D_CONFIG|D_ERROR, "Updating mds lov for OST idx %d\n", idx);
+
+ rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
+ if (rc)
+ RETURN(rc);
+
+ if (idx >= mds->mds_lov_desc.ld_tgt_count) {
+ CERROR("index %d > count %d!\n", idx,
+ mds->mds_lov_desc.ld_tgt_count);
+ RETURN(-EINVAL);
+ }
+
+ /* Don't get_info unless we never read it -- this will
+ block on a missing OST */
+ if (idx >= mds->mds_lov_objids_red) {
+ obd_id lastid;
+ __u32 size = sizeof(lastid);
+ rc = obd_get_info(watched->obd_self_export,
+ strlen("last_id"),
+ "last_id", &size, &lastid);
+ if (rc)
+ RETURN(rc);
+ mds->mds_lov_objids[idx] = lastid;
+ CWARN("got last object "LPU64" from OST %d\n",
+ mds->mds_lov_objids[idx], idx);
+ rc = mds_lov_write_objids(obd);
+ }
+
+ obd_llog_finish(obd, old_count);
+ llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
}
+
+ LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
+ rc = mds_lov_start_synchronize(obd, uuid, 1);
RETURN(rc);
}
char *s1, char *s2)
{
int rc;
- CDEBUG(D_MOUNT, "Starting obd %s\n", obdname);
+ CDEBUG(D_MOUNT, "Starting obd %s (typ=%s)\n", obdname, type);
rc = do_lcfg(obdname, 0, LCFG_ATTACH, type, uuid, 0, 0);
if (rc) {
break;
}
case IMP_EVENT_INACTIVE: {
- rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
+ rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
break;
}
case IMP_EVENT_INVALIDATE: {
oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
spin_unlock(&oscc->oscc_lock);
}
- rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
+ rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
break;
}
case IMP_EVENT_OCD: {
if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
- rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
+ rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
break;
}
default: