set->set_cookies = 0;
CFS_INIT_LIST_HEAD(&set->set_list);
atomic_set(&set->set_refcount, 1);
+ cfs_waitq_init(&set->set_waitq);
+ spin_lock_init(&set->set_lock);
}
static void lov_finish_set(struct lov_request_set *set)
EXIT;
}
+int lov_finished_set(struct lov_request_set *set)
+{
+ CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
+ set->set_count);
+ return set->set_completes == set->set_count;
+}
+
+
void lov_update_set(struct lov_request_set *set,
struct lov_request *req, int rc)
{
set->set_completes++;
if (rc == 0)
set->set_success++;
+
+ cfs_waitq_signal(&set->set_waitq);
}
int lov_update_common_set(struct lov_request_set *set,
{
list_add_tail(&req->rq_link, &set->set_list);
set->set_count++;
+ req->rq_rqset = set;
}
extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
sizeof(struct lov_oinfo *);
-
- req->rq_rqset = set;
/* Set lov request specific parameters. */
req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
req->rq_oi.oi_cb_up = cb_update_enqueue;
rc = qos_remedy_create(set, req);
lov_update_create_set(set, req, rc);
-
- if (rc)
- break;
}
}
if (set->set_success == 0)
GOTO(cleanup, rc);
- /* If there was an explicit stripe set, fail. Otherwise, we
- * got some objects and that's not bad. */
if (set->set_count != set->set_success) {
- if (*lsmp)
- GOTO(cleanup, rc);
set->set_count = set->set_success;
qos_shrink_lsm(set);
}
struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
ENTRY;
- req->rq_stripe = set->set_success;
- loi = lsm->lsm_oinfo[req->rq_stripe];
-
if (rc && lov->lov_tgts[req->rq_idx] &&
lov->lov_tgts[req->rq_idx]->ltd_active) {
CERROR("error creating fid "LPX64" sub-object"
rc = -EIO;
}
}
- lov_update_set(set, req, rc);
- if (rc)
+
+ spin_lock(&set->set_lock);
+ req->rq_stripe = set->set_success;
+ loi = lsm->lsm_oinfo[req->rq_stripe];
+ if (rc) {
+ lov_update_set(set, req, rc);
+ spin_unlock(&set->set_lock);
RETURN(rc);
+ }
loi->loi_id = req->rq_oi.oi_oa->o_id;
loi->loi_gr = req->rq_oi.oi_oa->o_gr;
loi->loi_ost_idx = req->rq_idx;
- CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
- lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
loi_init(loi);
if (oti && set->set_cookies)
if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
set->set_cookie_sent++;
- RETURN(0);
+ lov_update_set(set, req, rc);
+ spin_unlock(&set->set_lock);
+
+ CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
+ lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
+ RETURN(rc);
+}
+
+int cb_create_update(void *cookie, int rc)
+{
+ struct obd_info *oinfo = cookie;
+ struct lov_request *lovreq;
+
+ lovreq = container_of(oinfo, struct lov_request, rq_oi);
+ return lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
}
+
int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
struct lov_stripe_md **lsmp, struct obdo *src_oa,
struct obd_trans_info *oti,
req->rq_oi.oi_oa->o_id = loi->loi_id;
req->rq_oi.oi_cb_up = cb_getattr_update;
req->rq_oi.oi_capa = oinfo->oi_capa;
- req->rq_rqset = set;
lov_set_add_req(req, set);
}
memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
sizeof(*req->rq_oi.oi_oa));
req->rq_oi.oi_oa->o_id = loi->loi_id;
- LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) ||
- CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr),
- "req->rq_oi.oi_oa->o_valid="LPX64" "
- "req->rq_oi.oi_oa->o_gr="LPU64"\n",
- req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr);
req->rq_oi.oi_oa->o_stripe_idx = i;
req->rq_oi.oi_cb_up = cb_setattr_update;
req->rq_oi.oi_capa = oinfo->oi_capa;
- req->rq_rqset = set;
if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
int off = lov_stripe_offset(oinfo->oi_md,
struct lov_request *req;
obd_off rs, re;
- if (!lov->lov_tgts[loi->loi_ost_idx] ||
- !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
- continue;
- }
-
if (!lov_stripe_intersects(oinfo->oi_md, i,
oinfo->oi_policy.l_extent.start,
oinfo->oi_policy.l_extent.end,
&rs, &re))
continue;
+ if (!lov->lov_tgts[loi->loi_ost_idx] ||
+ !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
+ CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+ GOTO(out_set, rc = -EIO);
+ }
+
OBD_ALLOC(req, sizeof(*req));
if (req == NULL)
GOTO(out_set, rc = -ENOMEM);
req->rq_oi.oi_oa->o_stripe_idx = i;
req->rq_oi.oi_cb_up = cb_update_punch;
- req->rq_rqset = set;
req->rq_oi.oi_policy.l_extent.start = rs;
req->rq_oi.oi_policy.l_extent.end = re;
req->rq_oi.oi_policy.l_extent.start = rs;
req->rq_oi.oi_policy.l_extent.end = re;
req->rq_oi.oi_policy.l_extent.gid = -1;
- req->rq_rqset = set;
lov_set_add_req(req, set);
}
struct obd_info *oinfo = cookie;
struct lov_request *lovreq;
struct obd_statfs *osfs, *lov_sfs;
- struct obd_device *obd;
struct lov_obd *lov;
+ struct lov_tgt_desc *tgt;
+ struct obd_device *lovobd, *tgtobd;
int success;
ENTRY;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
- lov = &lovreq->rq_rqset->set_obd->u.lov;
- obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
-
+ lovobd = lovreq->rq_rqset->set_obd;
+ lov = &lovobd->u.lov;
osfs = lovreq->rq_rqset->set_oi->oi_osfs;
lov_sfs = oinfo->oi_osfs;
-
success = lovreq->rq_rqset->set_success;
/* XXX: the same is done in lov_update_common_set, however
lovset->set_exp is not initialized. */
lov_update_set(lovreq->rq_rqset, lovreq, rc);
- if (rc) {
- /* XXX ignore error for disconnected ost ? */
- if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
- lov->lov_tgts[lovreq->rq_idx]->ltd_active))
- rc = 0;
+ if (rc)
GOTO(out, rc);
- }
-
- spin_lock(&obd->obd_osfs_lock);
- memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
+
+ obd_getref(lovobd);
+ tgt = lov->lov_tgts[lovreq->rq_idx];
+ if (!tgt || !tgt->ltd_active)
+ GOTO(out_update, rc);
+
+ tgtobd = class_exp2obd(tgt->ltd_exp);
+ spin_lock(&tgtobd->obd_osfs_lock);
+ memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
- obd->obd_osfs_age = cfs_time_current_64();
- spin_unlock(&obd->obd_osfs_lock);
+ tgtobd->obd_osfs_age = cfs_time_current_64();
+ spin_unlock(&tgtobd->obd_osfs_lock);
+out_update:
lov_update_statfs(osfs, lov_sfs, success);
qos_update(lov);
+ obd_putref(lovobd);
+
out:
if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
- lovreq->rq_rqset->set_count == lovreq->rq_rqset->set_completes) {
+ lov_finished_set(lovreq->rq_rqset)) {
lov_statfs_interpret(NULL, lovreq->rq_rqset,
lovreq->rq_rqset->set_success !=
lovreq->rq_rqset->set_count);
continue;
}
+ /* skip targets that have been explicitely disabled by the
+ * administrator */
+ if (!lov->lov_tgts[i]->ltd_exp) {
+ CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
+ continue;
+ }
+
OBD_ALLOC(req, sizeof(*req));
if (req == NULL)
GOTO(out_set, rc = -ENOMEM);
req->rq_idx = i;
req->rq_oi.oi_cb_up = cb_statfs_update;
req->rq_oi.oi_flags = oinfo->oi_flags;
- req->rq_rqset = set;
lov_set_add_req(req, set);
}