Whamcloud - gitweb
git://git.whamcloud.com
/
fs
/
lustre-release.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Revert "b=19427 correct lmm_object_id and reserve fids for fid-on-OST."
[fs/lustre-release.git]
/
lustre
/
lov
/
lov_request.c
diff --git
a/lustre/lov/lov_request.c
b/lustre/lov/lov_request.c
index
f0eee27
..
7c0e040
100644
(file)
--- a/
lustre/lov/lov_request.c
+++ b/
lustre/lov/lov_request.c
@@
-58,20
+58,22
@@
static void lov_init_set(struct lov_request_set *set)
set->set_success = 0;
set->set_cookies = 0;
CFS_INIT_LIST_HEAD(&set->set_list);
set->set_success = 0;
set->set_cookies = 0;
CFS_INIT_LIST_HEAD(&set->set_list);
- atomic_set(&set->set_refcount, 1);
+
cfs_
atomic_set(&set->set_refcount, 1);
cfs_waitq_init(&set->set_waitq);
cfs_waitq_init(&set->set_waitq);
+ cfs_spin_lock_init(&set->set_lock);
}
}
-
static
void lov_finish_set(struct lov_request_set *set)
+void lov_finish_set(struct lov_request_set *set)
{
{
-
struct list_head
*pos, *n;
+
cfs_list_t
*pos, *n;
ENTRY;
LASSERT(set);
ENTRY;
LASSERT(set);
- list_for_each_safe(pos, n, &set->set_list) {
- struct lov_request *req = list_entry(pos, struct lov_request,
- rq_link);
- list_del_init(&req->rq_link);
+ cfs_list_for_each_safe(pos, n, &set->set_list) {
+ struct lov_request *req = cfs_list_entry(pos,
+ struct lov_request,
+ rq_link);
+ cfs_list_del_init(&req->rq_link);
if (req->rq_oi.oi_oa)
OBDO_FREE(req->rq_oi.oi_oa);
if (req->rq_oi.oi_oa)
OBDO_FREE(req->rq_oi.oi_oa);
@@
-134,7
+136,7
@@
int lov_update_common_set(struct lov_request_set *set,
void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
{
void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
{
- list_add_tail(&req->rq_link, &set->set_list);
+
cfs_
list_add_tail(&req->rq_link, &set->set_list);
set->set_count++;
req->rq_rqset = set;
}
set->set_count++;
req->rq_rqset = set;
}
@@
-220,7
+222,7
@@
static int enqueue_done(struct lov_request_set *set, __u32 mode)
RETURN(0);
/* cancel enqueued/matched locks */
RETURN(0);
/* cancel enqueued/matched locks */
- list_for_each_entry(req, &set->set_list, rq_link) {
+
cfs_
list_for_each_entry(req, &set->set_list, rq_link) {
struct lustre_handle *lov_lockhp;
if (!req->rq_complete || req->rq_rc)
struct lustre_handle *lov_lockhp;
if (!req->rq_complete || req->rq_rc)
@@
-263,8
+265,7
@@
int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
} else if (set->set_lockh)
lov_llh_put(set->set_lockh);
} else if (set->set_lockh)
lov_llh_put(set->set_lockh);
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(rc ? rc : ret);
}
RETURN(rc ? rc : ret);
}
@@
-387,8
+388,7
@@
int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
(flags & LDLM_FL_TEST_LOCK))
lov_llh_put(set->set_lockh);
(flags & LDLM_FL_TEST_LOCK))
lov_llh_put(set->set_lockh);
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(rc);
}
RETURN(rc);
}
@@
-479,8
+479,7
@@
int lov_fini_cancel_set(struct lov_request_set *set)
if (set->set_lockh)
lov_llh_put(set->set_lockh);
if (set->set_lockh)
lov_llh_put(set->set_lockh);
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(rc);
}
RETURN(rc);
}
@@
-516,7
+515,7
@@
int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
lov_lockhp = set->set_lockh->llh_handles + i;
if (!lustre_handle_is_used(lov_lockhp)) {
lov_lockhp = set->set_lockh->llh_handles + i;
if (!lustre_handle_is_used(lov_lockhp)) {
- CDEBUG(D_
RPCTRACE,
"lov idx %d subobj "LPX64" no lock\n",
+ CDEBUG(D_
INFO,
"lov idx %d subobj "LPX64" no lock\n",
loi->loi_ost_idx, loi->loi_id);
continue;
}
loi->loi_ost_idx, loi->loi_id);
continue;
}
@@
-567,7
+566,7
@@
static int create_done(struct obd_export *exp, struct lov_request_set *set,
/* try alloc objects on other osts if osc_create fails for
* exceptions: RPC failure, ENOSPC, etc */
if (set->set_count != set->set_success) {
/* try alloc objects on other osts if osc_create fails for
* exceptions: RPC failure, ENOSPC, etc */
if (set->set_count != set->set_success) {
- list_for_each_entry (req, &set->set_list, rq_link) {
+
cfs_
list_for_each_entry (req, &set->set_list, rq_link) {
if (req->rq_rc == 0)
continue;
if (req->rq_rc == 0)
continue;
@@
-592,7
+591,7
@@
static int create_done(struct obd_export *exp, struct lov_request_set *set,
if (ret_oa == NULL)
GOTO(cleanup, rc = -ENOMEM);
if (ret_oa == NULL)
GOTO(cleanup, rc = -ENOMEM);
- list_for_each_entry(req, &set->set_list, rq_link) {
+
cfs_
list_for_each_entry(req, &set->set_list, rq_link) {
if (!req->rq_complete || req->rq_rc)
continue;
lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
if (!req->rq_complete || req->rq_rc)
continue;
lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
@@
-615,7
+614,7
@@
static int create_done(struct obd_export *exp, struct lov_request_set *set,
GOTO(done, rc = 0);
cleanup:
GOTO(done, rc = 0);
cleanup:
- list_for_each_entry(req, &set->set_list, rq_link) {
+
cfs_
list_for_each_entry(req, &set->set_list, rq_link) {
struct obd_export *sub_exp;
int err = 0;
struct obd_export *sub_exp;
int err = 0;
@@
-657,9
+656,7
@@
int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
if (set->set_completes)
rc = create_done(set->set_exp, set, lsmp);
if (set->set_completes)
rc = create_done(set->set_exp, set, lsmp);
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
-
+ lov_put_reqset(set);
RETURN(rc);
}
RETURN(rc);
}
@@
-672,9
+669,6
@@
int lov_update_create_set(struct lov_request_set *set,
struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
ENTRY;
struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
ENTRY;
- req->rq_stripe = set->set_success;
- loi = lsm->lsm_oinfo[req->rq_stripe];
-
if (rc && lov->lov_tgts[req->rq_idx] &&
lov->lov_tgts[req->rq_idx]->ltd_active) {
CERROR("error creating fid "LPX64" sub-object"
if (rc && lov->lov_tgts[req->rq_idx] &&
lov->lov_tgts[req->rq_idx]->ltd_active) {
CERROR("error creating fid "LPX64" sub-object"
@@
-686,15
+680,19
@@
int lov_update_create_set(struct lov_request_set *set,
rc = -EIO;
}
}
rc = -EIO;
}
}
- lov_update_set(set, req, rc);
- if (rc)
+
+ cfs_spin_lock(&set->set_lock);
+ req->rq_stripe = set->set_success;
+ loi = lsm->lsm_oinfo[req->rq_stripe];
+ if (rc) {
+ lov_update_set(set, req, rc);
+ cfs_spin_unlock(&set->set_lock);
RETURN(rc);
RETURN(rc);
+ }
loi->loi_id = req->rq_oi.oi_oa->o_id;
loi->loi_gr = req->rq_oi.oi_oa->o_gr;
loi->loi_ost_idx = req->rq_idx;
loi->loi_id = req->rq_oi.oi_oa->o_id;
loi->loi_gr = req->rq_oi.oi_oa->o_gr;
loi->loi_ost_idx = req->rq_idx;
- CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
- lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
loi_init(loi);
if (oti && set->set_cookies)
loi_init(loi);
if (oti && set->set_cookies)
@@
-702,7
+700,12
@@
int lov_update_create_set(struct lov_request_set *set,
if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
set->set_cookie_sent++;
if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
set->set_cookie_sent++;
- RETURN(0);
+ lov_update_set(set, req, rc);
+ cfs_spin_unlock(&set->set_lock);
+
+ CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
+ lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
+ RETURN(rc);
}
int cb_create_update(void *cookie, int rc)
}
int cb_create_update(void *cookie, int rc)
@@
-711,7
+714,10
@@
int cb_create_update(void *cookie, int rc)
struct lov_request *lovreq;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
struct lov_request *lovreq;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
- return lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
+ rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
+ if (lov_finished_set(lovreq->rq_rqset))
+ lov_put_reqset(lovreq->rq_rqset);
+ return rc;
}
}
@@
-734,20
+740,23
@@
int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
set->set_oi->oi_md = *lsmp;
set->set_oi->oi_oa = src_oa;
set->set_oti = oti;
set->set_oi->oi_md = *lsmp;
set->set_oi->oi_oa = src_oa;
set->set_oti = oti;
+ lov_get_reqset(set);
rc = qos_prep_create(exp, set);
/* qos_shrink_lsm() may have allocated a new lsm */
*lsmp = oinfo->oi_md;
rc = qos_prep_create(exp, set);
/* qos_shrink_lsm() may have allocated a new lsm */
*lsmp = oinfo->oi_md;
- if (rc)
+ if (rc)
{
lov_fini_create_set(set, lsmp);
lov_fini_create_set(set, lsmp);
- else
+ lov_put_reqset(set);
+ } else {
*reqset = set;
*reqset = set;
+ }
RETURN(rc);
}
static int common_attr_done(struct lov_request_set *set)
{
RETURN(rc);
}
static int common_attr_done(struct lov_request_set *set)
{
-
struct list_head
*pos;
+
cfs_list_t
*pos;
struct lov_request *req;
struct obdo *tmp_oa;
int rc = 0, attrset = 0;
struct lov_request *req;
struct obdo *tmp_oa;
int rc = 0, attrset = 0;
@@
-765,8
+774,8
@@
static int common_attr_done(struct lov_request_set *set)
if (tmp_oa == NULL)
GOTO(out, rc = -ENOMEM);
if (tmp_oa == NULL)
GOTO(out, rc = -ENOMEM);
- list_for_each (pos, &set->set_list) {
- req = list_entry(pos, struct lov_request, rq_link);
+
cfs_
list_for_each (pos, &set->set_list) {
+ req =
cfs_
list_entry(pos, struct lov_request, rq_link);
if (!req->rq_complete || req->rq_rc)
continue;
if (!req->rq_complete || req->rq_rc)
continue;
@@
-780,6
+789,14
@@
static int common_attr_done(struct lov_request_set *set)
CERROR("No stripes had valid attrs\n");
rc = -EIO;
}
CERROR("No stripes had valid attrs\n");
rc = -EIO;
}
+ if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
+ (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
+ /* When we take attributes of some epoch, we require all the
+ * ost to be active. */
+ CERROR("Not all the stripes had valid attrs\n");
+ GOTO(out, rc = -EIO);
+ }
+
tmp_oa->o_id = set->set_oi->oi_oa->o_id;
memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
out:
tmp_oa->o_id = set->set_oi->oi_oa->o_id;
memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
out:
@@
-793,12
+810,12
@@
static int brw_done(struct lov_request_set *set)
{
struct lov_stripe_md *lsm = set->set_oi->oi_md;
struct lov_oinfo *loi = NULL;
{
struct lov_stripe_md *lsm = set->set_oi->oi_md;
struct lov_oinfo *loi = NULL;
-
struct list_head
*pos;
+
cfs_list_t
*pos;
struct lov_request *req;
ENTRY;
struct lov_request *req;
ENTRY;
- list_for_each (pos, &set->set_list) {
- req = list_entry(pos, struct lov_request, rq_link);
+
cfs_
list_for_each (pos, &set->set_list) {
+ req =
cfs_
list_entry(pos, struct lov_request, rq_link);
if (!req->rq_complete || req->rq_rc)
continue;
if (!req->rq_complete || req->rq_rc)
continue;
@@
-824,8
+841,7
@@
int lov_fini_brw_set(struct lov_request_set *set)
rc = brw_done(set);
/* FIXME update qos data here */
}
rc = brw_done(set);
/* FIXME update qos data here */
}
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(rc);
}
RETURN(rc);
}
@@
-963,8
+979,7
@@
int lov_fini_getattr_set(struct lov_request_set *set)
if (set->set_completes)
rc = common_attr_done(set);
if (set->set_completes)
rc = common_attr_done(set);
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(rc);
}
RETURN(rc);
}
@@
-1003,6
+1018,9
@@
int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
if (!lov->lov_tgts[loi->loi_ost_idx] ||
!lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
if (!lov->lov_tgts[loi->loi_ost_idx] ||
!lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
+ /* SOM requires all the OSTs to be active. */
+ GOTO(out_set, rc = -EIO);
continue;
}
continue;
}
@@
-1046,8
+1064,7
@@
int lov_fini_destroy_set(struct lov_request_set *set)
/* FIXME update qos data here */
}
/* FIXME update qos data here */
}
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(0);
}
RETURN(0);
}
@@
-1124,8
+1141,7
@@
int lov_fini_setattr_set(struct lov_request_set *set)
/* FIXME update qos data here */
}
/* FIXME update qos data here */
}
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(rc);
}
RETURN(rc);
}
@@
-1212,11
+1228,6
@@
int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
sizeof(*req->rq_oi.oi_oa));
req->rq_oi.oi_oa->o_id = loi->loi_id;
memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
sizeof(*req->rq_oi.oi_oa));
req->rq_oi.oi_oa->o_id = loi->loi_id;
- LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) ||
- CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr),
- "req->rq_oi.oi_oa->o_valid="LPX64" "
- "req->rq_oi.oi_oa->o_gr="LPU64"\n",
- req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr);
req->rq_oi.oi_oa->o_stripe_idx = i;
req->rq_oi.oi_cb_up = cb_setattr_update;
req->rq_oi.oi_capa = oinfo->oi_capa;
req->rq_oi.oi_oa->o_stripe_idx = i;
req->rq_oi.oi_cb_up = cb_setattr_update;
req->rq_oi.oi_capa = oinfo->oi_capa;
@@
-1259,8
+1270,7
@@
int lov_fini_punch_set(struct lov_request_set *set)
rc = common_attr_done(set);
}
rc = common_attr_done(set);
}
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(rc);
}
RETURN(rc);
}
@@
-1388,8
+1398,7
@@
int lov_fini_sync_set(struct lov_request_set *set)
/* FIXME update qos data here */
}
/* FIXME update qos data here */
}
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
+ lov_put_reqset(set);
RETURN(rc);
}
RETURN(rc);
}
@@
-1479,10
+1488,10
@@
int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
if (osfs->os_ffree != LOV_U64_MAX)
do_div(osfs->os_ffree, expected_stripes);
if (osfs->os_ffree != LOV_U64_MAX)
do_div(osfs->os_ffree, expected_stripes);
- spin_lock(&obd->obd_osfs_lock);
+
cfs_
spin_lock(&obd->obd_osfs_lock);
memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
obd->obd_osfs_age = cfs_time_current_64();
memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
obd->obd_osfs_age = cfs_time_current_64();
- spin_unlock(&obd->obd_osfs_lock);
+
cfs_
spin_unlock(&obd->obd_osfs_lock);
RETURN(0);
}
RETURN(0);
}
@@
-1501,10
+1510,7
@@
int lov_fini_statfs_set(struct lov_request_set *set)
rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
set->set_success);
}
rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
set->set_success);
}
-
- if (atomic_dec_and_test(&set->set_refcount))
- lov_finish_set(set);
-
+ lov_put_reqset(set);
RETURN(rc);
}
RETURN(rc);
}
@@
-1583,38
+1589,41
@@
static int cb_statfs_update(void *cookie, int rc)
struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
struct obd_statfs *osfs, *lov_sfs;
struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
struct obd_statfs *osfs, *lov_sfs;
- struct obd_device *obd;
struct lov_obd *lov;
struct lov_obd *lov;
+ struct lov_tgt_desc *tgt;
+ struct obd_device *lovobd, *tgtobd;
int success;
ENTRY;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
int success;
ENTRY;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
- lov = &lovreq->rq_rqset->set_obd->u.lov;
- obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
-
+ lovobd = lovreq->rq_rqset->set_obd;
+ lov = &lovobd->u.lov;
osfs = lovreq->rq_rqset->set_oi->oi_osfs;
lov_sfs = oinfo->oi_osfs;
osfs = lovreq->rq_rqset->set_oi->oi_osfs;
lov_sfs = oinfo->oi_osfs;
-
success = lovreq->rq_rqset->set_success;
/* XXX: the same is done in lov_update_common_set, however
lovset->set_exp is not initialized. */
lov_update_set(lovreq->rq_rqset, lovreq, rc);
success = lovreq->rq_rqset->set_success;
/* XXX: the same is done in lov_update_common_set, however
lovset->set_exp is not initialized. */
lov_update_set(lovreq->rq_rqset, lovreq, rc);
- if (rc) {
- /* XXX ignore error for disconnected ost ? */
- if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
- lov->lov_tgts[lovreq->rq_idx]->ltd_active))
- rc = 0;
+ if (rc)
GOTO(out, rc);
GOTO(out, rc);
- }
- spin_lock(&obd->obd_osfs_lock);
- memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
+ obd_getref(lovobd);
+ tgt = lov->lov_tgts[lovreq->rq_idx];
+ if (!tgt || !tgt->ltd_active)
+ GOTO(out_update, rc);
+
+ tgtobd = class_exp2obd(tgt->ltd_exp);
+ cfs_spin_lock(&tgtobd->obd_osfs_lock);
+ memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
- obd->obd_osfs_age = cfs_time_current_64();
-
spin_unlock(&
obd->obd_osfs_lock);
+
tgt
obd->obd_osfs_age = cfs_time_current_64();
+
cfs_spin_unlock(&tgt
obd->obd_osfs_lock);
+out_update:
lov_update_statfs(osfs, lov_sfs, success);
qos_update(lov);
lov_update_statfs(osfs, lov_sfs, success);
qos_update(lov);
+ obd_putref(lovobd);
+
out:
if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
lov_finished_set(lovreq->rq_rqset)) {
out:
if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
lov_finished_set(lovreq->rq_rqset)) {
@@
-1653,6
+1662,13
@@
int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
continue;
}
continue;
}
+ /* skip targets that have been explicitely disabled by the
+ * administrator */
+ if (!lov->lov_tgts[i]->ltd_exp) {
+ CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
+ continue;
+ }
+
OBD_ALLOC(req, sizeof(*req));
if (req == NULL)
GOTO(out_set, rc = -ENOMEM);
OBD_ALLOC(req, sizeof(*req));
if (req == NULL)
GOTO(out_set, rc = -ENOMEM);