X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flov%2Flov_request.c;h=f0eee2766df1803b00b89f85f48276dfdffdda4c;hp=17b699d51fb85b5bf06a7613af83f13b3eb67d81;hb=65a3ea7ae327e1db678432e81e45d834e8932b4a;hpb=70e80ade90af09300396706b8910e196a7928520 diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index 17b699d..f0eee27 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -16,8 +16,8 @@ * in the LICENSE file that accompanied this code). * * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see [sun.com URL with a - * copy of GPLv2]. + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, * CA 95054 USA or visit www.sun.com if you need additional information or @@ -59,6 +59,7 @@ static void lov_init_set(struct lov_request_set *set) set->set_cookies = 0; CFS_INIT_LIST_HEAD(&set->set_list); atomic_set(&set->set_refcount, 1); + cfs_waitq_init(&set->set_waitq); } static void lov_finish_set(struct lov_request_set *set) @@ -93,6 +94,14 @@ static void lov_finish_set(struct lov_request_set *set) EXIT; } +int lov_finished_set(struct lov_request_set *set) +{ + CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes, + set->set_count); + return set->set_completes == set->set_count; +} + + void lov_update_set(struct lov_request_set *set, struct lov_request *req, int rc) { @@ -102,6 +111,8 @@ void lov_update_set(struct lov_request_set *set, set->set_completes++; if (rc == 0) set->set_success++; + + cfs_waitq_signal(&set->set_waitq); } int lov_update_common_set(struct lov_request_set *set, @@ -113,7 +124,7 @@ int lov_update_common_set(struct lov_request_set *set, lov_update_set(set, req, rc); /* grace error on inactive ost */ - if (rc && !(lov->lov_tgts[req->rq_idx] && + if (rc && !(lov->lov_tgts[req->rq_idx] && lov->lov_tgts[req->rq_idx]->ltd_active)) rc = 0; @@ -125,20 +136,47 @@ void lov_set_add_req(struct lov_request *req, struct lov_request_set *set) { list_add_tail(&req->rq_link, &set->set_list); set->set_count++; + req->rq_rqset = set; +} + +extern void osc_update_enqueue(struct lustre_handle *lov_lockhp, + struct lov_oinfo *loi, int flags, + struct ost_lvb *lvb, __u32 mode, int rc); + +static int lov_update_enqueue_lov(struct obd_export *exp, + struct lustre_handle *lov_lockhp, + struct lov_oinfo *loi, int flags, int idx, + __u64 oid, int rc) +{ + struct lov_obd *lov = &exp->exp_obd->u.lov; + + if (rc != ELDLM_OK && + !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) { + memset(lov_lockhp, 0, sizeof(*lov_lockhp)); + if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) { + /* -EUSERS used by OST to report file contention */ + if (rc != -EINTR && rc != -EUSERS) + CERROR("enqueue objid "LPX64" subobj " + LPX64" on OST idx %d: rc %d\n", + oid, loi->loi_id, loi->loi_ost_idx, rc); + } else + rc = ELDLM_OK; + } + return rc; } int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc) { struct lov_request_set *set = req->rq_rqset; struct lustre_handle *lov_lockhp; + struct obd_info *oi = set->set_oi; struct lov_oinfo *loi; ENTRY; - LASSERT(set != NULL); - LASSERT(set->set_oi != NULL); + LASSERT(oi != NULL); lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe; - loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe]; + loi = oi->oi_md->lsm_oinfo[req->rq_stripe]; /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set * and that copy can be arbitrarily out of date. @@ -146,65 +184,22 @@ int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc) * The LOV API is due for a serious rewriting anyways, and this * can be addressed then. */ - if (rc == ELDLM_OK) { - struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); - __u64 tmp; - - LASSERT(lock != NULL); - lov_stripe_lock(set->set_oi->oi_md); - loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb; - tmp = loi->loi_lvb.lvb_size; - /* Extend KMS up to the end of this lock and no further - * A lock on [x,y] means a KMS of up to y + 1 bytes! */ - if (tmp > lock->l_policy_data.l_extent.end) - tmp = lock->l_policy_data.l_extent.end + 1; - if (tmp >= loi->loi_kms) { - LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64 - ", kms="LPU64, loi->loi_lvb.lvb_size, tmp); - loi->loi_kms = tmp; - loi->loi_kms_valid = 1; - } else { - LDLM_DEBUG(lock, "lock acquired, setting rss=" - LPU64"; leaving kms="LPU64", end="LPU64, - loi->loi_lvb.lvb_size, loi->loi_kms, - lock->l_policy_data.l_extent.end); - } - lov_stripe_unlock(set->set_oi->oi_md); - ldlm_lock_allow_match(lock); - LDLM_LOCK_PUT(lock); - } else if ((rc == ELDLM_LOCK_ABORTED) && - (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) { - memset(lov_lockhp, 0, sizeof(*lov_lockhp)); - lov_stripe_lock(set->set_oi->oi_md); - loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb; - lov_stripe_unlock(set->set_oi->oi_md); - CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving" - " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms); - rc = ELDLM_OK; - } else { - struct obd_export *exp = set->set_exp; - struct lov_obd *lov = &exp->exp_obd->u.lov; - - memset(lov_lockhp, 0, sizeof(*lov_lockhp)); - if (lov->lov_tgts[req->rq_idx] && - lov->lov_tgts[req->rq_idx]->ltd_active) { - /* -EUSERS used by OST to report file contention */ - if (rc != -EINTR && rc != -EUSERS) - CERROR("enqueue objid "LPX64" subobj " - LPX64" on OST idx %d: rc %d\n", - set->set_oi->oi_md->lsm_object_id, - loi->loi_id, loi->loi_ost_idx, rc); - } else { - rc = ELDLM_OK; - } - } + lov_stripe_lock(oi->oi_md); + osc_update_enqueue(lov_lockhp, loi, oi->oi_flags, + &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc); + if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT)) + memset(lov_lockhp, 0, sizeof *lov_lockhp); + rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags, + req->rq_idx, oi->oi_md->lsm_object_id, rc); + lov_stripe_unlock(oi->oi_md); lov_update_set(set, req, rc); RETURN(rc); } /* The callback for osc_enqueue that updates lov info for every OSC request. */ -static int cb_update_enqueue(struct obd_info *oinfo, int rc) +static int cb_update_enqueue(void *cookie, int rc) { + struct obd_info *oinfo = cookie; struct ldlm_enqueue_info *einfo; struct lov_request *lovreq; @@ -330,8 +325,6 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) + sizeof(struct lov_oinfo *); - - req->rq_rqset = set; /* Set lov request specific parameters. */ req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i; req->rq_oi.oi_cb_up = cb_update_enqueue; @@ -583,9 +576,6 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, rc = qos_remedy_create(set, req); lov_update_create_set(set, req, rc); - - if (rc) - break; } } @@ -593,11 +583,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, if (set->set_success == 0) GOTO(cleanup, rc); - /* If there was an explicit stripe set, fail. Otherwise, we - * got some objects and that's not bad. */ if (set->set_count != set->set_success) { - if (*lsmp) - GOTO(cleanup, rc); set->set_count = set->set_success; qos_shrink_lsm(set); } @@ -637,7 +623,8 @@ cleanup: continue; sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp; - err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL); + err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL, + NULL); if (err) CERROR("Failed to uncreate objid "LPX64" subobj " LPX64" on OST idx %d: rc = %d\n", @@ -718,6 +705,16 @@ int lov_update_create_set(struct lov_request_set *set, RETURN(0); } +int cb_create_update(void *cookie, int rc) +{ + struct obd_info *oinfo = cookie; + struct lov_request *lovreq; + + lovreq = container_of(oinfo, struct lov_request, rq_oi); + return lov_update_create_set(lovreq->rq_rqset, lovreq, rc); +} + + int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_stripe_md **lsmp, struct obdo *src_oa, struct obd_trans_info *oti, @@ -739,6 +736,8 @@ int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo, set->set_oti = oti; rc = qos_prep_create(exp, set); + /* qos_shrink_lsm() may have allocated a new lsm */ + *lsmp = oinfo->oi_md; if (rc) lov_fini_create_set(set, lsmp); else @@ -877,9 +876,9 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, if (info[i].count == 0) continue; - + loi = oinfo->oi_md->lsm_oinfo[i]; - if (!lov->lov_tgts[loi->loi_ost_idx] || + if (!lov->lov_tgts[loi->loi_ost_idx] || !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); GOTO(out, rc = -EIO); @@ -971,9 +970,10 @@ int lov_fini_getattr_set(struct lov_request_set *set) } /* The callback for osc_getattr_async that finilizes a request info when a - * response is recieved. */ -static int cb_getattr_update(struct obd_info *oinfo, int rc) + * response is received. */ +static int cb_getattr_update(void *cookie, int rc) { + struct obd_info *oinfo = cookie; struct lov_request *lovreq; lovreq = container_of(oinfo, struct lov_request, rq_oi); return lov_update_common_set(lovreq->rq_rqset, lovreq, rc); @@ -1023,7 +1023,6 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo, req->rq_oi.oi_oa->o_id = loi->loi_id; req->rq_oi.oi_cb_up = cb_getattr_update; req->rq_oi.oi_capa = oinfo->oi_capa; - req->rq_rqset = set; lov_set_add_req(req, set); } @@ -1081,7 +1080,7 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_request *req; loi = lsm->lsm_oinfo[i]; - if (!lov->lov_tgts[loi->loi_ost_idx] || + if (!lov->lov_tgts[loi->loi_ost_idx] || !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); continue; @@ -1140,7 +1139,7 @@ int lov_update_setattr_set(struct lov_request_set *set, lov_update_set(set, req, rc); /* grace error on inactive ost */ - if (rc && !(lov->lov_tgts[req->rq_idx] && + if (rc && !(lov->lov_tgts[req->rq_idx] && lov->lov_tgts[req->rq_idx]->ltd_active)) rc = 0; @@ -1160,9 +1159,10 @@ int lov_update_setattr_set(struct lov_request_set *set, } /* The callback for osc_setattr_async that finilizes a request info when a - * response is recieved. */ -static int cb_setattr_update(struct obd_info *oinfo, int rc) + * response is received. */ +static int cb_setattr_update(void *cookie, int rc) { + struct obd_info *oinfo = cookie; struct lov_request *lovreq; lovreq = container_of(oinfo, struct lov_request, rq_oi); return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc); @@ -1212,12 +1212,14 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo, memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, sizeof(*req->rq_oi.oi_oa)); req->rq_oi.oi_oa->o_id = loi->loi_id; - LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) - || req->rq_oi.oi_oa->o_gr>0); + LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) || + CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr), + "req->rq_oi.oi_oa->o_valid="LPX64" " + "req->rq_oi.oi_oa->o_gr="LPU64"\n", + req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr); req->rq_oi.oi_oa->o_stripe_idx = i; req->rq_oi.oi_cb_up = cb_setattr_update; req->rq_oi.oi_capa = oinfo->oi_capa; - req->rq_rqset = set; if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) { int off = lov_stripe_offset(oinfo->oi_md, @@ -1292,9 +1294,10 @@ int lov_update_punch_set(struct lov_request_set *set, } /* The callback for osc_punch that finilizes a request info when a response - * is recieved. */ -static int cb_update_punch(struct obd_info *oinfo, int rc) + * is received. */ +static int cb_update_punch(void *cookie, int rc) { + struct obd_info *oinfo = cookie; struct lov_request *lovreq; lovreq = container_of(oinfo, struct lov_request, rq_oi); return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc); @@ -1322,18 +1325,18 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_request *req; obd_off rs, re; - if (!lov->lov_tgts[loi->loi_ost_idx] || - !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { - CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); - continue; - } - if (!lov_stripe_intersects(oinfo->oi_md, i, oinfo->oi_policy.l_extent.start, oinfo->oi_policy.l_extent.end, &rs, &re)) continue; + if (!lov->lov_tgts[loi->loi_ost_idx] || + !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + GOTO(out_set, rc = -EIO); + } + OBD_ALLOC(req, sizeof(*req)); if (req == NULL) GOTO(out_set, rc = -ENOMEM); @@ -1353,7 +1356,6 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo, req->rq_oi.oi_oa->o_stripe_idx = i; req->rq_oi.oi_cb_up = cb_update_punch; - req->rq_rqset = set; req->rq_oi.oi_policy.l_extent.start = rs; req->rq_oi.oi_policy.l_extent.end = re; @@ -1479,7 +1481,7 @@ int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success) spin_lock(&obd->obd_osfs_lock); memcpy(&obd->obd_osfs, osfs, sizeof(*osfs)); - obd->obd_osfs_age = get_jiffies_64(); + obd->obd_osfs_age = cfs_time_current_64(); spin_unlock(&obd->obd_osfs_lock); RETURN(0); } @@ -1506,15 +1508,11 @@ int lov_fini_statfs_set(struct lov_request_set *set) RETURN(rc); } -void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs, - struct obd_statfs *lov_sfs, int success) +void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs, + int success) { int shift = 0, quit = 0; __u64 tmp; - spin_lock(&obd->obd_osfs_lock); - memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs)); - obd->obd_osfs_age = get_jiffies_64(); - spin_unlock(&obd->obd_osfs_lock); if (success == 0) { memcpy(osfs, lov_sfs, sizeof(*lov_sfs)); @@ -1579,9 +1577,10 @@ void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs, } /* The callback for osc_statfs_async that finilizes a request info when a - * response is recieved. */ -static int cb_statfs_update(struct obd_info *oinfo, int rc) + * response is received. */ +static int cb_statfs_update(void *cookie, int rc) { + struct obd_info *oinfo = cookie; struct lov_request *lovreq; struct obd_statfs *osfs, *lov_sfs; struct obd_device *obd; @@ -1597,19 +1596,33 @@ static int cb_statfs_update(struct obd_info *oinfo, int rc) lov_sfs = oinfo->oi_osfs; success = lovreq->rq_rqset->set_success; - /* XXX: the same is done in lov_update_common_set, however lovset->set_exp is not initialized. */ lov_update_set(lovreq->rq_rqset, lovreq, rc); if (rc) { + /* XXX ignore error for disconnected ost ? */ if (rc && !(lov->lov_tgts[lovreq->rq_idx] && lov->lov_tgts[lovreq->rq_idx]->ltd_active)) rc = 0; - RETURN(rc); + GOTO(out, rc); } - lov_update_statfs(obd, osfs, lov_sfs, success); + spin_lock(&obd->obd_osfs_lock); + memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs)); + if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0) + obd->obd_osfs_age = cfs_time_current_64(); + spin_unlock(&obd->obd_osfs_lock); + + lov_update_statfs(osfs, lov_sfs, success); qos_update(lov); +out: + if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD && + lov_finished_set(lovreq->rq_rqset)) { + lov_statfs_interpret(NULL, lovreq->rq_rqset, + lovreq->rq_rqset->set_success != + lovreq->rq_rqset->set_count); + qos_statfs_done(lov); + } RETURN(0); } @@ -1634,7 +1647,8 @@ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo, for (i = 0; i < lov->desc.ld_tgt_count; i++) { struct lov_request *req; - if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) { + if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active + && (oinfo->oi_flags & OBD_STATFS_NODELAY))) { CDEBUG(D_HA, "lov idx %d inactive\n", i); continue; } @@ -1651,7 +1665,7 @@ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo, req->rq_idx = i; req->rq_oi.oi_cb_up = cb_statfs_update; - req->rq_rqset = set; + req->rq_oi.oi_flags = oinfo->oi_flags; lov_set_add_req(req, set); }