{
struct mds_obd *mds = &obd->u.mds;
int i;
- int dirty = 0;
ENTRY;
- down_read(&mds->mds_lov_objids_sem);
-
- spin_lock(&mds->mds_lov_objids_lock);
- for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++) {
+ lock_kernel();
+ for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
if (ids[i] > (mds->mds_lov_objids)[i]) {
(mds->mds_lov_objids)[i] = ids[i];
- dirty = 1;
+ mds->mds_lov_objids_dirty = 1;
}
- }
- mds->mds_lov_objids_dirty = dirty;
-
- spin_unlock(&mds->mds_lov_objids_lock);
-
- up_read(&mds->mds_lov_objids_sem);
-
+ unlock_kernel();
EXIT;
}
EXPORT_SYMBOL(mds_lov_update_objids);
int i, rc, size;
ENTRY;
- LASSERT(!mds->mds_lov_objids_count);
+ LASSERT(!mds->mds_lov_objids_size);
LASSERT(!mds->mds_lov_objids_dirty);
/* Read everything in the file, even if our current lov desc
during mds setup may still have valid objids. */
size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
if (size == 0)
- GOTO(out, rc = 0);
+ RETURN(0);
OBD_ALLOC(ids, size);
if (ids == NULL)
- GOTO(out, rc = -ENOMEM);
+ RETURN(-ENOMEM);
+ mds->mds_lov_objids = ids;
+ mds->mds_lov_objids_size = size;
rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
if (rc < 0) {
- OBD_FREE(ids, size);
CERROR("Error reading objids %d\n", rc);
- GOTO(out, rc);
+ RETURN(rc);
}
- mds->mds_lov_objids = ids;
- mds->mds_lov_objids_count = size / sizeof(*ids);
- for (i = 0; i < mds->mds_lov_objids_count; i++) {
+ mds->mds_lov_objids_in_file = size / sizeof(*ids);
+
+ for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
mds->mds_lov_objids[i], i);
}
-
-out:
- RETURN(rc);
+ RETURN(0);
}
-/* must held mds_lov_objids_sem */
+
int mds_lov_write_objids(struct obd_device *obd)
{
struct mds_obd *mds = &obd->u.mds;
loff_t off = 0;
int i, rc, tgts;
ENTRY;
-
- spin_lock(&mds->mds_lov_objids_lock);
- if (!mds->mds_lov_objids_dirty) {
- spin_unlock(&mds->mds_lov_objids_lock);
+
+ if (!mds->mds_lov_objids_dirty)
RETURN(0);
- }
- mds->mds_lov_objids_dirty = 0;
- spin_unlock(&mds->mds_lov_objids_lock);
- tgts = mds->mds_lov_objids_count;
+ tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
+
if (!tgts)
RETURN(0);
mds->mds_lov_objids, tgts * sizeof(obd_id),
&off, 0);
if (rc >= 0) {
+ mds->mds_lov_objids_dirty = 0;
rc = 0;
- cfs_waitq_signal(&mds->mds_lov_objids_wait);
- } else {
- mds->mds_lov_objids_dirty = 1;
}
RETURN(rc);
ENTRY;
LASSERT(!obd->obd_recovering);
- if (mds->mds_lov_objids_count == 0)
- RETURN(0);
LASSERT(mds->mds_lov_objids != NULL);
- /* mds->mds_lov_objids_sem must be held so mds_lov_objids doesn't change
- * we need use mds->mds_lov_desc.ld_tgt_count because in recovery not all
- * target can be connected at start time */
- rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
+ rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
KEY_NEXT_ID,
mds->mds_lov_desc.ld_tgt_count *
sizeof(*mds->mds_lov_objids),
if (!ld)
RETURN(-ENOMEM);
- rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
+ rc = obd_get_info(lov, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC,
&valsize, ld);
if (rc)
GOTO(out, rc);
/* The size of the LOV target table may have increased. */
- if (ld->ld_tgt_count > mds->mds_lov_objids_count) {
+ size = ld->ld_tgt_count * sizeof(obd_id);
+ if ((mds->mds_lov_objids_size == 0) ||
+ (size > mds->mds_lov_objids_size)) {
obd_id *ids;
- size = ld->ld_tgt_count * sizeof(obd_id);
+ /* add room by powers of 2 */
+ size = 1;
+ while (size < ld->ld_tgt_count)
+ size = size << 1;
+ size = size * sizeof(obd_id);
OBD_ALLOC(ids, size);
if (ids == NULL)
GOTO(out, rc = -ENOMEM);
memset(ids, 0, size);
-
- /* write lock is enough for protect from access to
- * old pointer in mds_lov_update_objids and dirty == 0
- * is enough for protect from access in write_objids */
- if (mds->mds_lov_objids) {
- struct l_wait_info lwi = { 0 };
+ if (mds->mds_lov_objids_size) {
obd_id *old_ids = mds->mds_lov_objids;
-
memcpy(ids, mds->mds_lov_objids,
- mds_lov_objids_size(mds));
-
- l_wait_event(mds->mds_lov_objids_wait,
- mds->mds_lov_objids_dirty == 0, &lwi);
-
- OBD_FREE(old_ids, mds_lov_objids_size(mds));
+ mds->mds_lov_objids_size);
+ mds->mds_lov_objids = ids;
+ OBD_FREE(old_ids, mds->mds_lov_objids_size);
}
mds->mds_lov_objids = ids;
- mds->mds_lov_objids_count = ld->ld_tgt_count;
+ mds->mds_lov_objids_size = size;
}
/* Don't change the mds_lov_desc until the objids size matches the
stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
max(mds->mds_lov_desc.ld_tgt_count,
- mds->mds_lov_objids_count));
+ mds->mds_lov_objids_in_file));
mds->mds_max_mdsize = lov_mds_md_size(stripes);
mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
/* We only _need_ to do this at first add (idx), or the first time
after recovery. However, it should now be safe to call anytime. */
mutex_down(&obd->obd_dev_sem);
- rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
+ llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
mutex_up(&obd->obd_dev_sem);
/*XXX this notifies the MDD until lov handling use old mds code */
struct mds_obd *mds = &obd->u.mds;
int old_count;
int rc = 0;
- obd_id lastid;
- __u32 size = sizeof(lastid);
-
ENTRY;
old_count = mds->mds_lov_desc.ld_tgt_count;
- down_write(&mds->mds_lov_objids_sem);
rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
- downgrade_write(&mds->mds_lov_objids_sem);
if (rc)
- GOTO(out, rc);
+ RETURN(rc);
CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
idx, obd->obd_recovering, obd->obd_async_recov, old_count,
mds->mds_lov_desc.ld_tgt_count);
/* idx is set as data from lov_notify. */
- if (idx == MDSLOV_NO_INDEX || obd->obd_recovering)
- GOTO(out, rc);
+ if (idx != MDSLOV_NO_INDEX && !obd->obd_recovering) {
+ if (idx >= mds->mds_lov_desc.ld_tgt_count) {
+ CERROR("index %d > count %d!\n", idx,
+ mds->mds_lov_desc.ld_tgt_count);
+ RETURN(-EINVAL);
+ }
- if (idx >= mds->mds_lov_desc.ld_tgt_count) {
- CERROR("index %d > count %d!\n", idx,
- mds->mds_lov_desc.ld_tgt_count);
- GOTO(out, rc = -EINVAL);
- }
- rc = obd_get_info(watched->obd_self_export, sizeof("last_id"),
- "last_id", &size, &lastid);
- if (rc)
- GOTO(out, rc);
+ if (idx >= mds->mds_lov_objids_in_file) {
+ /* We never read this lastid; ask the osc */
+ obd_id lastid;
+ __u32 size = sizeof(lastid);
+ rc = obd_get_info(watched->obd_self_export,
+ strlen("last_id"),
+ "last_id", &size, &lastid);
+ if (rc)
+ RETURN(rc);
+ mds->mds_lov_objids[idx] = lastid;
+ mds->mds_lov_objids_dirty = 1;
+ mds_lov_write_objids(obd);
+ } else {
+ /* We have read this lastid from disk; tell the osc.
+ Don't call this during recovery. */
+ rc = mds_lov_set_nextid(obd);
+ }
- spin_lock(&mds->mds_lov_objids_lock);
- if (mds->mds_lov_objids[idx] == 0 || mds->mds_lov_objids[idx] > lastid) {
- /* last id not init or corrupted - use data from osc */
- mds->mds_lov_objids[idx] = lastid;
- mds->mds_lov_objids_dirty = 1;
- spin_unlock(&mds->mds_lov_objids_lock);
- /* not need write immediately, mark for write for avoid
- * lock inverse */
- } else {
- spin_unlock(&mds->mds_lov_objids_lock);
- /* We have read this lastid from disk; tell the osc.
- Don't call this during recovery. */
- rc = mds_lov_set_nextid(obd);
+ CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
+ mds->mds_lov_objids[idx], idx);
}
- CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
- mds->mds_lov_objids[idx], idx);
-
-out:
- up_read(&mds->mds_lov_objids_sem);
- RETURN(rc);
-}
-
-int mds_update_objids_from_lastid(struct obd_device *obd)
-{
- struct mds_obd *mds = &obd->u.mds;
- int size;
- int rc, i;
-
- if (mds->mds_lov_objids_count < mds->mds_lov_desc.ld_tgt_count) {
- obd_id *ids;
-
- size = mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id);
- OBD_ALLOC(ids, size);
- if (ids == NULL)
- GOTO(out, rc = -ENOMEM);
-
- OBD_FREE(mds->mds_lov_objids, mds_lov_objids_size(mds));
- mds->mds_lov_objids = ids;
- mds->mds_lov_objids_count = size / sizeof(obd_id);
- }
-
- size = mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id);
- rc = obd_get_info(mds->mds_osc_exp, sizeof("last_id"),
- "last_id", &size, mds->mds_lov_objids);
- if (!rc) {
- for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
- CWARN("got last object "LPU64" from OST %d\n",
- mds->mds_lov_objids[i], i);
- mds->mds_lov_objids_dirty = 1;
- rc = mds_lov_write_objids(obd);
- if (rc)
- CERROR("got last objids from OSTs, but error "
- "writing objids file: %d\n", rc);
- }
-out:
RETURN(rc);
}
-
/* update the LOV-OSC knowledge of the last used object id's */
int mds_lov_connect(struct obd_device *obd, char * lov_name)
{
struct mds_obd *mds = &obd->u.mds;
struct lustre_handle conn = {0,};
struct obd_connect_data *data;
- int rc;
+ int rc, i;
ENTRY;
if (IS_ERR(mds->mds_osc_obd))
/* Deny new client connections until we are sure we have some OSTs */
obd->obd_no_conn = 1;
- down_write(&mds->mds_lov_objids_sem);
rc = mds_lov_read_objids(obd);
if (rc) {
CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
if (rc)
GOTO(err_reg, rc);
+ /* tgt_count may be 0! */
+ rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
+ if (rc) {
+ CERROR("failed to initialize catalog %d\n", rc);
+ GOTO(err_reg, rc);
+ }
+
/* If we're mounting this code for the first time on an existing FS,
* we need to populate the objids array from the real OST values */
- if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_count) {
- mds_update_objids_from_lastid(obd);
+ if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
+ int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
+ rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
+ "last_id", &size, mds->mds_lov_objids);
+ if (!rc) {
+ for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
+ CWARN("got last object "LPU64" from OST %d\n",
+ mds->mds_lov_objids[i], i);
+ mds->mds_lov_objids_dirty = 1;
+ rc = mds_lov_write_objids(obd);
+ if (rc)
+ CERROR("got last objids from OSTs, but error "
+ "writing objids file: %d\n", rc);
+ }
}
- up_write(&mds->mds_lov_objids_sem);
/* I want to see a callback happen when the OBD moves to a
* "For General Use" state, and that's when we'll call
RETURN(rc);
err_reg:
- up_write(&mds->mds_lov_objids_sem);
obd_register_observer(mds->mds_osc_obd, NULL);
err_discon:
obd_disconnect(mds->mds_osc_exp);
int mds_notify(struct obd_device *obd, struct obd_device *watched,
enum obd_notify_event ev, void *data)
{
- struct mds_obd *mds = &obd->u.mds;
int rc = 0;
ENTRY;
/* We still have to fix the lov descriptor for ost's added
after the mdt in the config log. They didn't make it into
mds_lov_connect. */
- down_write(&mds->mds_lov_objids_sem);
- rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
- up_write(&mds->mds_lov_objids_sem);
+ rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
if (rc)
RETURN(rc);
/* We should update init llog here too for replay unlink and