X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flov%2Flov_request.c;h=7c0e040ea79f2af7fa20216c64fc6d4eee771099;hb=3ec611d61138de3901944e4fdb50d8872fd472d5;hp=9e898e32f63590f3458e761396e87b01a3d7be41;hpb=c73fbea0f8fe64c371c978449b702b9a84fb3067;p=fs%2Flustre-release.git diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index 9e898e3..7c0e040 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -58,21 +58,22 @@ static void lov_init_set(struct lov_request_set *set) set->set_success = 0; set->set_cookies = 0; CFS_INIT_LIST_HEAD(&set->set_list); - atomic_set(&set->set_refcount, 1); + cfs_atomic_set(&set->set_refcount, 1); cfs_waitq_init(&set->set_waitq); - spin_lock_init(&set->set_lock); + cfs_spin_lock_init(&set->set_lock); } -static void lov_finish_set(struct lov_request_set *set) +void lov_finish_set(struct lov_request_set *set) { - struct list_head *pos, *n; + cfs_list_t *pos, *n; ENTRY; LASSERT(set); - list_for_each_safe(pos, n, &set->set_list) { - struct lov_request *req = list_entry(pos, struct lov_request, - rq_link); - list_del_init(&req->rq_link); + cfs_list_for_each_safe(pos, n, &set->set_list) { + struct lov_request *req = cfs_list_entry(pos, + struct lov_request, + rq_link); + cfs_list_del_init(&req->rq_link); if (req->rq_oi.oi_oa) OBDO_FREE(req->rq_oi.oi_oa); @@ -135,7 +136,7 @@ int lov_update_common_set(struct lov_request_set *set, void lov_set_add_req(struct lov_request *req, struct lov_request_set *set) { - list_add_tail(&req->rq_link, &set->set_list); + cfs_list_add_tail(&req->rq_link, &set->set_list); set->set_count++; req->rq_rqset = set; } @@ -221,7 +222,7 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode) RETURN(0); /* cancel enqueued/matched locks */ - list_for_each_entry(req, &set->set_list, rq_link) { + cfs_list_for_each_entry(req, &set->set_list, rq_link) { struct lustre_handle *lov_lockhp; if (!req->rq_complete || req->rq_rc) @@ -264,8 +265,7 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc, } else if (set->set_lockh) lov_llh_put(set->set_lockh); - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc ? rc : ret); } @@ -388,8 +388,7 @@ int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags) (flags & LDLM_FL_TEST_LOCK)) lov_llh_put(set->set_lockh); - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -480,8 +479,7 @@ int lov_fini_cancel_set(struct lov_request_set *set) if (set->set_lockh) lov_llh_put(set->set_lockh); - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -517,7 +515,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo, lov_lockhp = set->set_lockh->llh_handles + i; if (!lustre_handle_is_used(lov_lockhp)) { - CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n", + CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n", loi->loi_ost_idx, loi->loi_id); continue; } @@ -568,7 +566,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, /* try alloc objects on other osts if osc_create fails for * exceptions: RPC failure, ENOSPC, etc */ if (set->set_count != set->set_success) { - list_for_each_entry (req, &set->set_list, rq_link) { + cfs_list_for_each_entry (req, &set->set_list, rq_link) { if (req->rq_rc == 0) continue; @@ -593,7 +591,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, if (ret_oa == NULL) GOTO(cleanup, rc = -ENOMEM); - list_for_each_entry(req, &set->set_list, rq_link) { + cfs_list_for_each_entry(req, &set->set_list, rq_link) { if (!req->rq_complete || req->rq_rc) continue; lov_merge_attrs(ret_oa, req->rq_oi.oi_oa, @@ -616,7 +614,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, GOTO(done, rc = 0); cleanup: - list_for_each_entry(req, &set->set_list, rq_link) { + cfs_list_for_each_entry(req, &set->set_list, rq_link) { struct obd_export *sub_exp; int err = 0; @@ -658,9 +656,7 @@ int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp) if (set->set_completes) rc = create_done(set->set_exp, set, lsmp); - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); - + lov_put_reqset(set); RETURN(rc); } @@ -685,12 +681,12 @@ int lov_update_create_set(struct lov_request_set *set, } } - spin_lock(&set->set_lock); + cfs_spin_lock(&set->set_lock); req->rq_stripe = set->set_success; loi = lsm->lsm_oinfo[req->rq_stripe]; if (rc) { lov_update_set(set, req, rc); - spin_unlock(&set->set_lock); + cfs_spin_unlock(&set->set_lock); RETURN(rc); } @@ -705,7 +701,7 @@ int lov_update_create_set(struct lov_request_set *set, set->set_cookie_sent++; lov_update_set(set, req, rc); - spin_unlock(&set->set_lock); + cfs_spin_unlock(&set->set_lock); CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n", lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx); @@ -718,7 +714,10 @@ int cb_create_update(void *cookie, int rc) struct lov_request *lovreq; lovreq = container_of(oinfo, struct lov_request, rq_oi); - return lov_update_create_set(lovreq->rq_rqset, lovreq, rc); + rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc); + if (lov_finished_set(lovreq->rq_rqset)) + lov_put_reqset(lovreq->rq_rqset); + return rc; } @@ -741,20 +740,23 @@ int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo, set->set_oi->oi_md = *lsmp; set->set_oi->oi_oa = src_oa; set->set_oti = oti; + lov_get_reqset(set); rc = qos_prep_create(exp, set); /* qos_shrink_lsm() may have allocated a new lsm */ *lsmp = oinfo->oi_md; - if (rc) + if (rc) { lov_fini_create_set(set, lsmp); - else + lov_put_reqset(set); + } else { *reqset = set; + } RETURN(rc); } static int common_attr_done(struct lov_request_set *set) { - struct list_head *pos; + cfs_list_t *pos; struct lov_request *req; struct obdo *tmp_oa; int rc = 0, attrset = 0; @@ -772,8 +774,8 @@ static int common_attr_done(struct lov_request_set *set) if (tmp_oa == NULL) GOTO(out, rc = -ENOMEM); - list_for_each (pos, &set->set_list) { - req = list_entry(pos, struct lov_request, rq_link); + cfs_list_for_each (pos, &set->set_list) { + req = cfs_list_entry(pos, struct lov_request, rq_link); if (!req->rq_complete || req->rq_rc) continue; @@ -787,6 +789,14 @@ static int common_attr_done(struct lov_request_set *set) CERROR("No stripes had valid attrs\n"); rc = -EIO; } + if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) && + (set->set_oi->oi_md->lsm_stripe_count != attrset)) { + /* When we take attributes of some epoch, we require all the + * ost to be active. */ + CERROR("Not all the stripes had valid attrs\n"); + GOTO(out, rc = -EIO); + } + tmp_oa->o_id = set->set_oi->oi_oa->o_id; memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa)); out: @@ -800,12 +810,12 @@ static int brw_done(struct lov_request_set *set) { struct lov_stripe_md *lsm = set->set_oi->oi_md; struct lov_oinfo *loi = NULL; - struct list_head *pos; + cfs_list_t *pos; struct lov_request *req; ENTRY; - list_for_each (pos, &set->set_list) { - req = list_entry(pos, struct lov_request, rq_link); + cfs_list_for_each (pos, &set->set_list) { + req = cfs_list_entry(pos, struct lov_request, rq_link); if (!req->rq_complete || req->rq_rc) continue; @@ -831,8 +841,7 @@ int lov_fini_brw_set(struct lov_request_set *set) rc = brw_done(set); /* FIXME update qos data here */ } - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -970,8 +979,7 @@ int lov_fini_getattr_set(struct lov_request_set *set) if (set->set_completes) rc = common_attr_done(set); - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -1010,6 +1018,9 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo, if (!lov->lov_tgts[loi->loi_ost_idx] || !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH) + /* SOM requires all the OSTs to be active. */ + GOTO(out_set, rc = -EIO); continue; } @@ -1053,8 +1064,7 @@ int lov_fini_destroy_set(struct lov_request_set *set) /* FIXME update qos data here */ } - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(0); } @@ -1131,8 +1141,7 @@ int lov_fini_setattr_set(struct lov_request_set *set) /* FIXME update qos data here */ } - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -1219,11 +1228,6 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo, memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, sizeof(*req->rq_oi.oi_oa)); req->rq_oi.oi_oa->o_id = loi->loi_id; - LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) || - CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr), - "req->rq_oi.oi_oa->o_valid="LPX64" " - "req->rq_oi.oi_oa->o_gr="LPU64"\n", - req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr); req->rq_oi.oi_oa->o_stripe_idx = i; req->rq_oi.oi_cb_up = cb_setattr_update; req->rq_oi.oi_capa = oinfo->oi_capa; @@ -1266,8 +1270,7 @@ int lov_fini_punch_set(struct lov_request_set *set) rc = common_attr_done(set); } - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -1395,8 +1398,7 @@ int lov_fini_sync_set(struct lov_request_set *set) /* FIXME update qos data here */ } - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -1486,10 +1488,10 @@ int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success) if (osfs->os_ffree != LOV_U64_MAX) do_div(osfs->os_ffree, expected_stripes); - spin_lock(&obd->obd_osfs_lock); + cfs_spin_lock(&obd->obd_osfs_lock); memcpy(&obd->obd_osfs, osfs, sizeof(*osfs)); obd->obd_osfs_age = cfs_time_current_64(); - spin_unlock(&obd->obd_osfs_lock); + cfs_spin_unlock(&obd->obd_osfs_lock); RETURN(0); } @@ -1508,10 +1510,7 @@ int lov_fini_statfs_set(struct lov_request_set *set) rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs, set->set_success); } - - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); - + lov_put_reqset(set); RETURN(rc); } @@ -1590,38 +1589,41 @@ static int cb_statfs_update(void *cookie, int rc) struct obd_info *oinfo = cookie; struct lov_request *lovreq; struct obd_statfs *osfs, *lov_sfs; - struct obd_device *obd; struct lov_obd *lov; + struct lov_tgt_desc *tgt; + struct obd_device *lovobd, *tgtobd; int success; ENTRY; lovreq = container_of(oinfo, struct lov_request, rq_oi); - lov = &lovreq->rq_rqset->set_obd->u.lov; - obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp); - + lovobd = lovreq->rq_rqset->set_obd; + lov = &lovobd->u.lov; osfs = lovreq->rq_rqset->set_oi->oi_osfs; lov_sfs = oinfo->oi_osfs; - success = lovreq->rq_rqset->set_success; /* XXX: the same is done in lov_update_common_set, however lovset->set_exp is not initialized. */ lov_update_set(lovreq->rq_rqset, lovreq, rc); - if (rc) { - /* XXX ignore error for disconnected ost ? */ - if (rc && !(lov->lov_tgts[lovreq->rq_idx] && - lov->lov_tgts[lovreq->rq_idx]->ltd_active)) - rc = 0; + if (rc) GOTO(out, rc); - } - spin_lock(&obd->obd_osfs_lock); - memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs)); + obd_getref(lovobd); + tgt = lov->lov_tgts[lovreq->rq_idx]; + if (!tgt || !tgt->ltd_active) + GOTO(out_update, rc); + + tgtobd = class_exp2obd(tgt->ltd_exp); + cfs_spin_lock(&tgtobd->obd_osfs_lock); + memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs)); if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0) - obd->obd_osfs_age = cfs_time_current_64(); - spin_unlock(&obd->obd_osfs_lock); + tgtobd->obd_osfs_age = cfs_time_current_64(); + cfs_spin_unlock(&tgtobd->obd_osfs_lock); +out_update: lov_update_statfs(osfs, lov_sfs, success); qos_update(lov); + obd_putref(lovobd); + out: if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD && lov_finished_set(lovreq->rq_rqset)) { @@ -1660,6 +1662,13 @@ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo, continue; } + /* skip targets that have been explicitely disabled by the + * administrator */ + if (!lov->lov_tgts[i]->ltd_exp) { + CDEBUG(D_HA, "lov idx %d administratively disabled\n", i); + continue; + } + OBD_ALLOC(req, sizeof(*req)); if (req == NULL) GOTO(out_set, rc = -ENOMEM);