Whamcloud - gitweb
git://git.whamcloud.com
/
fs
/
lustre-release.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
| inline |
side by side
refine locking for avoid write wrong info into lov_objid file and some races.
[fs/lustre-release.git]
/
lustre
/
mds
/
mds_lov.c
diff --git
a/lustre/mds/mds_lov.c
b/lustre/mds/mds_lov.c
index
a0ef3ce
..
7688a2d
100644
(file)
--- a/
lustre/mds/mds_lov.c
+++ b/
lustre/mds/mds_lov.c
@@
-45,15
+45,24
@@
void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
{
struct mds_obd *mds = &obd->u.mds;
int i;
+ int dirty = 0;
ENTRY;
- lock_kernel();
- for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
+ down_read(&mds->mds_lov_objids_sem);
+
+ spin_lock(&mds->mds_lov_objids_lock);
+ for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++) {
if (ids[i] > (mds->mds_lov_objids)[i]) {
(mds->mds_lov_objids)[i] = ids[i];
-
mds->mds_lov_objids_
dirty = 1;
+ dirty = 1;
}
- unlock_kernel();
+ }
+ mds->mds_lov_objids_dirty = dirty;
+
+ spin_unlock(&mds->mds_lov_objids_lock);
+
+ up_read(&mds->mds_lov_objids_sem);
+
EXIT;
}
EXPORT_SYMBOL(mds_lov_update_objids);
@@
-66,7
+75,7
@@
static int mds_lov_read_objids(struct obd_device *obd)
int i, rc, size;
ENTRY;
- LASSERT(!mds->mds_lov_objids_
size
);
+ LASSERT(!mds->mds_lov_objids_
count
);
LASSERT(!mds->mds_lov_objids_dirty);
/* Read everything in the file, even if our current lov desc
@@
-74,41
+83,46
@@
static int mds_lov_read_objids(struct obd_device *obd)
during mds setup may still have valid objids. */
size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
if (size == 0)
-
RETURN(
0);
+
GOTO(out, rc =
0);
OBD_ALLOC(ids, size);
if (ids == NULL)
- RETURN(-ENOMEM);
- mds->mds_lov_objids = ids;
- mds->mds_lov_objids_size = size;
+ GOTO(out, rc = -ENOMEM);
rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
if (rc < 0) {
+ OBD_FREE(ids, size);
CERROR("Error reading objids %d\n", rc);
-
RETURN(
rc);
+
GOTO(out,
rc);
}
+ mds->mds_lov_objids = ids;
+ mds->mds_lov_objids_count = size / sizeof(*ids);
- mds->mds_lov_objids_in_file = size / sizeof(*ids);
-
- for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
+ for (i = 0; i < mds->mds_lov_objids_count; i++) {
CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
mds->mds_lov_objids[i], i);
}
- RETURN(0);
-}
+out:
+ RETURN(rc);
+}
+/* must held mds_lov_objids_sem */
int mds_lov_write_objids(struct obd_device *obd)
{
struct mds_obd *mds = &obd->u.mds;
loff_t off = 0;
int i, rc, tgts;
ENTRY;
-
- if (!mds->mds_lov_objids_dirty)
+
+ spin_lock(&mds->mds_lov_objids_lock);
+ if (!mds->mds_lov_objids_dirty) {
+ spin_unlock(&mds->mds_lov_objids_lock);
RETURN(0);
+ }
+ mds->mds_lov_objids_dirty = 0;
+ spin_unlock(&mds->mds_lov_objids_lock);
- tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
-
+ tgts = mds->mds_lov_objids_count;
if (!tgts)
RETURN(0);
@@
-120,8
+134,10
@@
int mds_lov_write_objids(struct obd_device *obd)
mds->mds_lov_objids, tgts * sizeof(obd_id),
&off, 0);
if (rc >= 0) {
- mds->mds_lov_objids_dirty = 0;
rc = 0;
+ cfs_waitq_signal(&mds->mds_lov_objids_wait);
+ } else {
+ mds->mds_lov_objids_dirty = 1;
}
RETURN(rc);
@@
-162,10
+178,15
@@
int mds_lov_set_nextid(struct obd_device *obd)
ENTRY;
LASSERT(!obd->obd_recovering);
+ if (mds->mds_lov_objids_count == 0)
+ RETURN(0);
LASSERT(mds->mds_lov_objids != NULL);
- rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
+ /* mds->mds_lov_objids_sem must be held so mds_lov_objids doesn't change
+ * we need use mds->mds_lov_desc.ld_tgt_count because in recovery not all
+ * target can be connected at start time */
+ rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
KEY_NEXT_ID,
mds->mds_lov_desc.ld_tgt_count *
sizeof(*mds->mds_lov_objids),
@@
-191,36
+212,39
@@
static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
if (!ld)
RETURN(-ENOMEM);
- rc = obd_get_info(lov, s
trlen(KEY_LOVDESC) + 1
, KEY_LOVDESC,
+ rc = obd_get_info(lov, s
izeof(KEY_LOVDESC)
, KEY_LOVDESC,
&valsize, ld);
if (rc)
GOTO(out, rc);
/* The size of the LOV target table may have increased. */
- size = ld->ld_tgt_count * sizeof(obd_id);
- if ((mds->mds_lov_objids_size == 0) ||
- (size > mds->mds_lov_objids_size)) {
+ if (ld->ld_tgt_count > mds->mds_lov_objids_count) {
obd_id *ids;
- /* add room by powers of 2 */
- size = 1;
- while (size < ld->ld_tgt_count)
- size = size << 1;
- size = size * sizeof(obd_id);
+ size = ld->ld_tgt_count * sizeof(obd_id);
OBD_ALLOC(ids, size);
if (ids == NULL)
GOTO(out, rc = -ENOMEM);
memset(ids, 0, size);
- if (mds->mds_lov_objids_size) {
+
+ /* write lock is enough for protect from access to
+ * old pointer in mds_lov_update_objids and dirty == 0
+ * is enough for protect from access in write_objids */
+ if (mds->mds_lov_objids) {
+ struct l_wait_info lwi = { 0 };
obd_id *old_ids = mds->mds_lov_objids;
+
memcpy(ids, mds->mds_lov_objids,
- mds->mds_lov_objids_size);
- mds->mds_lov_objids = ids;
- OBD_FREE(old_ids, mds->mds_lov_objids_size);
+ mds_lov_objids_size(mds));
+
+ l_wait_event(mds->mds_lov_objids_wait,
+ mds->mds_lov_objids_dirty == 0, &lwi);
+
+ OBD_FREE(old_ids, mds_lov_objids_size(mds));
}
mds->mds_lov_objids = ids;
- mds->mds_lov_objids_
size = size
;
+ mds->mds_lov_objids_
count = ld->ld_tgt_count
;
}
/* Don't change the mds_lov_desc until the objids size matches the
@@
-231,7
+255,7
@@
static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
max(mds->mds_lov_desc.ld_tgt_count,
- mds->mds_lov_objids_
in_file
));
+ mds->mds_lov_objids_
count
));
mds->mds_max_mdsize = lov_mds_md_size(stripes);
mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
@@
-242,7
+266,7
@@
static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
/* We only _need_ to do this at first add (idx), or the first time
after recovery. However, it should now be safe to call anytime. */
mutex_down(&obd->obd_dev_sem);
- llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
+
rc =
llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
mutex_up(&obd->obd_dev_sem);
/*XXX this notifies the MDD until lov handling use old mds code */
@@
-267,57
+291,103
@@
static int mds_lov_update_mds(struct obd_device *obd,
struct mds_obd *mds = &obd->u.mds;
int old_count;
int rc = 0;
+ obd_id lastid;
+ __u32 size = sizeof(lastid);
+
ENTRY;
old_count = mds->mds_lov_desc.ld_tgt_count;
+ down_write(&mds->mds_lov_objids_sem);
rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
+ downgrade_write(&mds->mds_lov_objids_sem);
if (rc)
-
RETURN(
rc);
+
GOTO(out,
rc);
CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
idx, obd->obd_recovering, obd->obd_async_recov, old_count,
mds->mds_lov_desc.ld_tgt_count);
/* idx is set as data from lov_notify. */
- if (idx != MDSLOV_NO_INDEX && !obd->obd_recovering) {
- if (idx >= mds->mds_lov_desc.ld_tgt_count) {
- CERROR("index %d > count %d!\n", idx,
- mds->mds_lov_desc.ld_tgt_count);
- RETURN(-EINVAL);
- }
+ if (idx == MDSLOV_NO_INDEX || obd->obd_recovering)
+ GOTO(out, rc);
- if (idx >= mds->mds_lov_objids_in_file) {
- /* We never read this lastid; ask the osc */
- obd_id lastid;
- __u32 size = sizeof(lastid);
- rc = obd_get_info(watched->obd_self_export,
- strlen("last_id"),
- "last_id", &size, &lastid);
- if (rc)
- RETURN(rc);
- mds->mds_lov_objids[idx] = lastid;
- mds->mds_lov_objids_dirty = 1;
- mds_lov_write_objids(obd);
- } else {
- /* We have read this lastid from disk; tell the osc.
- Don't call this during recovery. */
- rc = mds_lov_set_nextid(obd);
- }
+ if (idx >= mds->mds_lov_desc.ld_tgt_count) {
+ CERROR("index %d > count %d!\n", idx,
+ mds->mds_lov_desc.ld_tgt_count);
+ GOTO(out, rc = -EINVAL);
+ }
+ rc = obd_get_info(watched->obd_self_export, sizeof("last_id"),
+ "last_id", &size, &lastid);
+ if (rc)
+ GOTO(out, rc);
- CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
- mds->mds_lov_objids[idx], idx);
+ spin_lock(&mds->mds_lov_objids_lock);
+ if (mds->mds_lov_objids[idx] == 0 || mds->mds_lov_objids[idx] > lastid) {
+ /* last id not init or corrupted - use data from osc */
+ mds->mds_lov_objids[idx] = lastid;
+ mds->mds_lov_objids_dirty = 1;
+ spin_unlock(&mds->mds_lov_objids_lock);
+ /* not need write immediately, mark for write for avoid
+ * lock inverse */
+ } else {
+ spin_unlock(&mds->mds_lov_objids_lock);
+ /* We have read this lastid from disk; tell the osc.
+ Don't call this during recovery. */
+ rc = mds_lov_set_nextid(obd);
}
+ CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
+ mds->mds_lov_objids[idx], idx);
+
+out:
+ up_read(&mds->mds_lov_objids_sem);
+ RETURN(rc);
+}
+
+int mds_update_objids_from_lastid(struct obd_device *obd)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ int size;
+ int rc, i;
+
+ if (mds->mds_lov_objids_count < mds->mds_lov_desc.ld_tgt_count) {
+ obd_id *ids;
+
+ size = mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id);
+ OBD_ALLOC(ids, size);
+ if (ids == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ OBD_FREE(mds->mds_lov_objids, mds_lov_objids_size(mds));
+ mds->mds_lov_objids = ids;
+ mds->mds_lov_objids_count = size / sizeof(obd_id);
+ }
+
+ size = mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id);
+ rc = obd_get_info(mds->mds_osc_exp, sizeof("last_id"),
+ "last_id", &size, mds->mds_lov_objids);
+ if (!rc) {
+ for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
+ CWARN("got last object "LPU64" from OST %d\n",
+ mds->mds_lov_objids[i], i);
+ mds->mds_lov_objids_dirty = 1;
+ rc = mds_lov_write_objids(obd);
+ if (rc)
+ CERROR("got last objids from OSTs, but error "
+ "writing objids file: %d\n", rc);
+ }
+out:
RETURN(rc);
}
+
/* update the LOV-OSC knowledge of the last used object id's */
int mds_lov_connect(struct obd_device *obd, char * lov_name)
{
struct mds_obd *mds = &obd->u.mds;
struct lustre_handle conn = {0,};
struct obd_connect_data *data;
- int rc
, i
;
+ int rc;
ENTRY;
if (IS_ERR(mds->mds_osc_obd))
@@
-364,6
+434,7
@@
int mds_lov_connect(struct obd_device *obd, char * lov_name)
/* Deny new client connections until we are sure we have some OSTs */
obd->obd_no_conn = 1;
+ down_write(&mds->mds_lov_objids_sem);
rc = mds_lov_read_objids(obd);
if (rc) {
CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
@@
-374,30
+445,12
@@
int mds_lov_connect(struct obd_device *obd, char * lov_name)
if (rc)
GOTO(err_reg, rc);
- /* tgt_count may be 0! */
- rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
- if (rc) {
- CERROR("failed to initialize catalog %d\n", rc);
- GOTO(err_reg, rc);
- }
-
/* If we're mounting this code for the first time on an existing FS,
* we need to populate the objids array from the real OST values */
- if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
- int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
- rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
- "last_id", &size, mds->mds_lov_objids);
- if (!rc) {
- for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
- CWARN("got last object "LPU64" from OST %d\n",
- mds->mds_lov_objids[i], i);
- mds->mds_lov_objids_dirty = 1;
- rc = mds_lov_write_objids(obd);
- if (rc)
- CERROR("got last objids from OSTs, but error "
- "writing objids file: %d\n", rc);
- }
+ if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_count) {
+ mds_update_objids_from_lastid(obd);
}
+ up_write(&mds->mds_lov_objids_sem);
/* I want to see a callback happen when the OBD moves to a
* "For General Use" state, and that's when we'll call
@@
-411,6
+464,7
@@
int mds_lov_connect(struct obd_device *obd, char * lov_name)
RETURN(rc);
err_reg:
+ up_write(&mds->mds_lov_objids_sem);
obd_register_observer(mds->mds_osc_obd, NULL);
err_discon:
obd_disconnect(mds->mds_osc_exp);
@@
-822,6
+876,7
@@
int mds_lov_start_synchronize(struct obd_device *obd,
int mds_notify(struct obd_device *obd, struct obd_device *watched,
enum obd_notify_event ev, void *data)
{
+ struct mds_obd *mds = &obd->u.mds;
int rc = 0;
ENTRY;
@@
-852,7
+907,9
@@
int mds_notify(struct obd_device *obd, struct obd_device *watched,
/* We still have to fix the lov descriptor for ost's added
after the mdt in the config log. They didn't make it into
mds_lov_connect. */
- rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
+ down_write(&mds->mds_lov_objids_sem);
+ rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
+ up_write(&mds->mds_lov_objids_sem);
if (rc)
RETURN(rc);
/* We should update init llog here too for replay unlink and