X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flov%2Flov_request.c;h=672eca2832030b6ffc5d66f455a453eab0790d3e;hb=7e5be18a3ac2f5138627d7a93670fc88a06cafb5;hp=d0580f376d0eb58685560d18c7d6c53e9cc2f8e8;hpb=a5599c53b24ad99bba168cd41404b0dc3c314bb2;p=fs%2Flustre-release.git diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index d0580f3..672eca2 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -26,17 +24,16 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ * Lustre is a trademark of Sun Microsystems, Inc. */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_LOV #ifdef __KERNEL__ @@ -54,28 +51,31 @@ static void lov_init_set(struct lov_request_set *set) { set->set_count = 0; - set->set_completes = 0; - set->set_success = 0; + cfs_atomic_set(&set->set_completes, 0); + cfs_atomic_set(&set->set_success, 0); set->set_cookies = 0; CFS_INIT_LIST_HEAD(&set->set_list); - atomic_set(&set->set_refcount, 1); + cfs_atomic_set(&set->set_refcount, 1); + cfs_waitq_init(&set->set_waitq); + cfs_spin_lock_init(&set->set_lock); } -static void lov_finish_set(struct lov_request_set *set) +void lov_finish_set(struct lov_request_set *set) { - struct list_head *pos, *n; + cfs_list_t *pos, *n; ENTRY; LASSERT(set); - list_for_each_safe(pos, n, &set->set_list) { - struct lov_request *req = list_entry(pos, struct lov_request, - rq_link); - list_del_init(&req->rq_link); + cfs_list_for_each_safe(pos, n, &set->set_list) { + struct lov_request *req = cfs_list_entry(pos, + struct lov_request, + rq_link); + cfs_list_del_init(&req->rq_link); if (req->rq_oi.oi_oa) OBDO_FREE(req->rq_oi.oi_oa); if (req->rq_oi.oi_md) - OBD_FREE(req->rq_oi.oi_md, req->rq_buflen); + OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_osfs) OBD_FREE(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs)); @@ -84,7 +84,7 @@ static void lov_finish_set(struct lov_request_set *set) if (set->set_pga) { int len = set->set_oabufs * sizeof(*set->set_pga); - OBD_FREE(set->set_pga, len); + OBD_FREE_LARGE(set->set_pga, len); } if (set->set_lockh) lov_llh_put(set->set_lockh); @@ -93,15 +93,26 @@ static void lov_finish_set(struct lov_request_set *set) EXIT; } +int lov_finished_set(struct lov_request_set *set) +{ + int completes = cfs_atomic_read(&set->set_completes); + + CDEBUG(D_INFO, "check set %d/%d\n", completes, + set->set_count); + return completes == set->set_count; +} + void lov_update_set(struct lov_request_set *set, struct lov_request *req, int rc) { req->rq_complete = 1; req->rq_rc = rc; - set->set_completes++; + cfs_atomic_inc(&set->set_completes); if (rc == 0) - set->set_success++; + cfs_atomic_inc(&set->set_success); + + cfs_waitq_signal(&set->set_waitq); } int lov_update_common_set(struct lov_request_set *set, @@ -123,8 +134,9 @@ int lov_update_common_set(struct lov_request_set *set, void lov_set_add_req(struct lov_request *req, struct lov_request_set *set) { - list_add_tail(&req->rq_link, &set->set_list); + cfs_list_add_tail(&req->rq_link, &set->set_list); set->set_count++; + req->rq_rqset = set; } extern void osc_update_enqueue(struct lustre_handle *lov_lockhp, @@ -200,15 +212,16 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode) { struct lov_request *req; struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; + int completes = cfs_atomic_read(&set->set_completes); int rc = 0; ENTRY; /* enqueue/match success, just return */ - if (set->set_completes && set->set_completes == set->set_success) + if (completes && completes == cfs_atomic_read(&set->set_success)) RETURN(0); /* cancel enqueued/matched locks */ - list_for_each_entry(req, &set->set_list, rq_link) { + cfs_list_for_each_entry(req, &set->set_list, rq_link) { struct lustre_handle *lov_lockhp; if (!req->rq_complete || req->rq_rc) @@ -246,17 +259,47 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc, * succeeded. */ if (!rqset) { if (rc) - set->set_completes = 0; + cfs_atomic_set(&set->set_completes, 0); ret = enqueue_done(set, mode); } else if (set->set_lockh) lov_llh_put(set->set_lockh); - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc ? rc : ret); } +static void lov_llh_addref(void *llhp) +{ + struct lov_lock_handles *llh = llhp; + + cfs_atomic_inc(&llh->llh_refcount); + CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh, + cfs_atomic_read(&llh->llh_refcount)); +} + +static struct portals_handle_ops lov_handle_ops = { + .hop_addref = lov_llh_addref, + .hop_free = NULL, +}; + +static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm) +{ + struct lov_lock_handles *llh; + + OBD_ALLOC(llh, sizeof *llh + + sizeof(*llh->llh_handles) * lsm->lsm_stripe_count); + if (llh == NULL) + return NULL; + + cfs_atomic_set(&llh->llh_refcount, 2); + llh->llh_stripe_count = lsm->lsm_stripe_count; + CFS_INIT_LIST_HEAD(&llh->llh_handle.h_link); + class_handle_hash(&llh->llh_handle, &lov_handle_ops); + + return llh; +} + int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, struct ldlm_enqueue_info *einfo, struct lov_request_set **reqset) @@ -304,7 +347,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, req->rq_buflen = sizeof(*req->rq_oi.oi_md) + sizeof(struct lov_oinfo *) + sizeof(struct lov_oinfo); - OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); + OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_md == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); @@ -313,8 +356,6 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) + sizeof(struct lov_oinfo *); - - req->rq_rqset = set; /* Set lov request specific parameters. */ req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i; req->rq_oi.oi_cb_up = cb_update_enqueue; @@ -332,7 +373,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, /* XXX LOV STACKING: submd should be from the subobj */ req->rq_oi.oi_md->lsm_object_id = loi->loi_id; - req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr; + req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq; req->rq_oi.oi_md->lsm_stripe_count = 0; req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid = loi->loi_kms_valid; @@ -350,20 +391,6 @@ out_set: RETURN(rc); } -int lov_update_match_set(struct lov_request_set *set, struct lov_request *req, - int rc) -{ - int ret = rc; - ENTRY; - - if (rc > 0) - ret = 0; - else if (rc == 0) - ret = 1; - lov_update_set(set, req, ret); - RETURN(rc); -} - int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags) { int rc = 0; @@ -373,12 +400,11 @@ int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags) RETURN(0); LASSERT(set->set_exp); rc = enqueue_done(set, mode); - if ((set->set_count == set->set_success) && + if ((set->set_count == cfs_atomic_read(&set->set_success)) && (flags & LDLM_FL_TEST_LOCK)) lov_llh_put(set->set_lockh); - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -428,7 +454,7 @@ int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo, GOTO(out_set, rc = -ENOMEM); req->rq_buflen = sizeof(*req->rq_oi.oi_md); - OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); + OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_md == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); @@ -443,7 +469,7 @@ int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo, /* XXX LOV STACKING: submd should be from the subobj */ req->rq_oi.oi_md->lsm_object_id = loi->loi_id; - req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr; + req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq; req->rq_oi.oi_md->lsm_stripe_count = 0; lov_set_add_req(req, set); @@ -469,8 +495,7 @@ int lov_fini_cancel_set(struct lov_request_set *set) if (set->set_lockh) lov_llh_put(set->set_lockh); - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -506,7 +531,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo, lov_lockhp = set->set_lockh->llh_handles + i; if (!lustre_handle_is_used(lov_lockhp)) { - CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n", + CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n", loi->loi_ost_idx, loi->loi_id); continue; } @@ -516,7 +541,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo, GOTO(out_set, rc = -ENOMEM); req->rq_buflen = sizeof(*req->rq_oi.oi_md); - OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); + OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_md == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); @@ -527,7 +552,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo, /* XXX LOV STACKING: submd should be from the subobj */ req->rq_oi.oi_md->lsm_object_id = loi->loi_id; - req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr; + req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq; req->rq_oi.oi_md->lsm_stripe_count = 0; lov_set_add_req(req, set); @@ -541,6 +566,59 @@ out_set: RETURN(rc); } +static int lov_update_create_set(struct lov_request_set *set, + struct lov_request *req, int rc) +{ + struct obd_trans_info *oti = set->set_oti; + struct lov_stripe_md *lsm = set->set_oi->oi_md; + struct lov_oinfo *loi; + struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; + ENTRY; + + if (rc && lov->lov_tgts[req->rq_idx] && + lov->lov_tgts[req->rq_idx]->ltd_active) { + /* Pre-creating objects may timeout via -ETIMEDOUT or + * -ENOTCONN both are always non-critical events. */ + CDEBUG(rc == -ETIMEDOUT || rc == -ENOTCONN ? D_HA : D_ERROR, + "error creating fid "LPX64" sub-object " + "on OST idx %d/%d: rc = %d\n", + set->set_oi->oi_oa->o_id, req->rq_idx, + lsm->lsm_stripe_count, rc); + if (rc > 0) { + CERROR("obd_create returned invalid err %d\n", rc); + rc = -EIO; + } + } + + cfs_spin_lock(&set->set_lock); + req->rq_stripe = cfs_atomic_read(&set->set_success); + loi = lsm->lsm_oinfo[req->rq_stripe]; + + + if (rc) { + lov_update_set(set, req, rc); + cfs_spin_unlock(&set->set_lock); + RETURN(rc); + } + + loi->loi_id = req->rq_oi.oi_oa->o_id; + loi->loi_seq = req->rq_oi.oi_oa->o_seq; + loi->loi_ost_idx = req->rq_idx; + loi_init(loi); + + if (oti && set->set_cookies) + ++oti->oti_logcookies; + if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE) + set->set_cookie_sent++; + + lov_update_set(set, req, rc); + cfs_spin_unlock(&set->set_lock); + + CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n", + lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx); + RETURN(rc); +} + static int create_done(struct obd_export *exp, struct lov_request_set *set, struct lov_stripe_md **lsmp) { @@ -549,39 +627,33 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, struct obdo *src_oa = set->set_oi->oi_oa; struct lov_request *req; struct obdo *ret_oa = NULL; - int attrset = 0, rc = 0; + int success, attrset = 0, rc = 0; ENTRY; - LASSERT(set->set_completes); + LASSERT(cfs_atomic_read(&set->set_completes)); /* try alloc objects on other osts if osc_create fails for * exceptions: RPC failure, ENOSPC, etc */ - if (set->set_count != set->set_success) { - list_for_each_entry (req, &set->set_list, rq_link) { + if (set->set_count != cfs_atomic_read(&set->set_success)) { + cfs_list_for_each_entry (req, &set->set_list, rq_link) { if (req->rq_rc == 0) continue; - set->set_completes--; + cfs_atomic_dec(&set->set_completes); req->rq_complete = 0; rc = qos_remedy_create(set, req); lov_update_create_set(set, req, rc); - - if (rc) - break; } } + success = cfs_atomic_read(&set->set_success); /* no successful creates */ - if (set->set_success == 0) + if (success == 0) GOTO(cleanup, rc); - /* If there was an explicit stripe set, fail. Otherwise, we - * got some objects and that's not bad. */ - if (set->set_count != set->set_success) { - if (*lsmp) - GOTO(cleanup, rc); - set->set_count = set->set_success; + if (set->set_count != success) { + set->set_count = success; qos_shrink_lsm(set); } @@ -589,7 +661,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, if (ret_oa == NULL) GOTO(cleanup, rc = -ENOMEM); - list_for_each_entry(req, &set->set_list, rq_link) { + cfs_list_for_each_entry(req, &set->set_list, rq_link) { if (!req->rq_complete || req->rq_rc) continue; lov_merge_attrs(ret_oa, req->rq_oi.oi_oa, @@ -603,7 +675,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, LBUG(); } ret_oa->o_id = src_oa->o_id; - ret_oa->o_gr = src_oa->o_gr; + ret_oa->o_seq = src_oa->o_seq; ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP; memcpy(src_oa, ret_oa, sizeof(*src_oa)); OBDO_FREE(ret_oa); @@ -612,7 +684,7 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, GOTO(done, rc = 0); cleanup: - list_for_each_entry(req, &set->set_list, rq_link) { + cfs_list_for_each_entry(req, &set->set_list, rq_link) { struct obd_export *sub_exp; int err = 0; @@ -620,8 +692,8 @@ cleanup: continue; sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp; - err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL, - NULL); + err = obd_destroy(NULL, sub_exp, req->rq_oi.oi_oa, NULL, oti, + NULL, NULL); if (err) CERROR("Failed to uncreate objid "LPX64" subobj " LPX64" on OST idx %d: rc = %d\n", @@ -651,55 +723,28 @@ int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) + if (cfs_atomic_read(&set->set_completes)) rc = create_done(set->set_exp, set, lsmp); - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); - + lov_put_reqset(set); RETURN(rc); } -int lov_update_create_set(struct lov_request_set *set, - struct lov_request *req, int rc) +int cb_create_update(void *cookie, int rc) { - struct obd_trans_info *oti = set->set_oti; - struct lov_stripe_md *lsm = set->set_oi->oi_md; - struct lov_oinfo *loi; - struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; - ENTRY; - - req->rq_stripe = set->set_success; - loi = lsm->lsm_oinfo[req->rq_stripe]; - - if (rc && lov->lov_tgts[req->rq_idx] && - lov->lov_tgts[req->rq_idx]->ltd_active) { - CERROR("error creating fid "LPX64" sub-object" - " on OST idx %d/%d: rc = %d\n", - set->set_oi->oi_oa->o_id, req->rq_idx, - lsm->lsm_stripe_count, rc); - if (rc > 0) { - CERROR("obd_create returned invalid err %d\n", rc); - rc = -EIO; - } - } - lov_update_set(set, req, rc); - if (rc) - RETURN(rc); + struct obd_info *oinfo = cookie; + struct lov_request *lovreq; - loi->loi_id = req->rq_oi.oi_oa->o_id; - loi->loi_gr = req->rq_oi.oi_oa->o_gr; - loi->loi_ost_idx = req->rq_idx; - CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n", - lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx); - loi_init(loi); + lovreq = container_of(oinfo, struct lov_request, rq_oi); - if (oti && set->set_cookies) - ++oti->oti_logcookies; - if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE) - set->set_cookie_sent++; + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL)) + if (lovreq->rq_idx == cfs_fail_val) + rc = -ENOTCONN; - RETURN(0); + rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc); + if (lov_finished_set(lovreq->rq_rqset)) + lov_put_reqset(lovreq->rq_rqset); + return rc; } int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo, @@ -721,20 +766,23 @@ int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo, set->set_oi->oi_md = *lsmp; set->set_oi->oi_oa = src_oa; set->set_oti = oti; + lov_get_reqset(set); rc = qos_prep_create(exp, set); /* qos_shrink_lsm() may have allocated a new lsm */ *lsmp = oinfo->oi_md; - if (rc) + if (rc) { lov_fini_create_set(set, lsmp); - else + lov_put_reqset(set); + } else { *reqset = set; + } RETURN(rc); } static int common_attr_done(struct lov_request_set *set) { - struct list_head *pos; + cfs_list_t *pos; struct lov_request *req; struct obdo *tmp_oa; int rc = 0, attrset = 0; @@ -745,15 +793,15 @@ static int common_attr_done(struct lov_request_set *set) if (set->set_oi->oi_oa == NULL) RETURN(0); - if (!set->set_success) + if (!cfs_atomic_read(&set->set_success)) RETURN(-EIO); OBDO_ALLOC(tmp_oa); if (tmp_oa == NULL) GOTO(out, rc = -ENOMEM); - list_for_each (pos, &set->set_list) { - req = list_entry(pos, struct lov_request, rq_link); + cfs_list_for_each (pos, &set->set_list) { + req = cfs_list_entry(pos, struct lov_request, rq_link); if (!req->rq_complete || req->rq_rc) continue; @@ -767,6 +815,14 @@ static int common_attr_done(struct lov_request_set *set) CERROR("No stripes had valid attrs\n"); rc = -EIO; } + if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) && + (set->set_oi->oi_md->lsm_stripe_count != attrset)) { + /* When we take attributes of some epoch, we require all the + * ost to be active. */ + CERROR("Not all the stripes had valid attrs\n"); + GOTO(out, rc = -EIO); + } + tmp_oa->o_id = set->set_oi->oi_oa->o_id; memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa)); out: @@ -780,12 +836,12 @@ static int brw_done(struct lov_request_set *set) { struct lov_stripe_md *lsm = set->set_oi->oi_md; struct lov_oinfo *loi = NULL; - struct list_head *pos; + cfs_list_t *pos; struct lov_request *req; ENTRY; - list_for_each (pos, &set->set_list) { - req = list_entry(pos, struct lov_request, rq_link); + cfs_list_for_each (pos, &set->set_list) { + req = cfs_list_entry(pos, struct lov_request, rq_link); if (!req->rq_complete || req->rq_rc) continue; @@ -807,12 +863,11 @@ int lov_fini_brw_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) { + if (cfs_atomic_read(&set->set_completes)) { rc = brw_done(set); /* FIXME update qos data here */ } - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -841,11 +896,11 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, set->set_oti = oti; set->set_oi = oinfo; set->set_oabufs = oa_bufs; - OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga)); + OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga)); if (!set->set_pga) GOTO(out, rc = -ENOMEM); - OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count); + OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count); if (!info) GOTO(out, rc = -ENOMEM); @@ -886,10 +941,11 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, sizeof(*req->rq_oi.oi_oa)); } req->rq_oi.oi_oa->o_id = loi->loi_id; + req->rq_oi.oi_oa->o_seq = loi->loi_seq; req->rq_oi.oi_oa->o_stripe_idx = i; req->rq_buflen = sizeof(*req->rq_oi.oi_md); - OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); + OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_md == NULL) { OBDO_FREE(req->rq_oi.oi_oa); OBD_FREE(req, sizeof(*req)); @@ -901,7 +957,7 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, /* XXX LOV STACKING */ req->rq_oi.oi_md->lsm_object_id = loi->loi_id; - req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr; + req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq; req->rq_oabufs = info[i].count; req->rq_pgaidx = shift; shift += req->rq_oabufs; @@ -929,7 +985,8 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, } out: if (info) - OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count); + OBD_FREE_LARGE(info, + sizeof(*info) * oinfo->oi_md->lsm_stripe_count); if (rc == 0) *reqset = set; @@ -947,11 +1004,10 @@ int lov_fini_getattr_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) + if (cfs_atomic_read(&set->set_completes)) rc = common_attr_done(set); - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -990,6 +1046,9 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo, if (!lov->lov_tgts[loi->loi_ost_idx] || !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH) + /* SOM requires all the OSTs to be active. */ + GOTO(out_set, rc = -EIO); continue; } @@ -1008,9 +1067,9 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo, memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, sizeof(*req->rq_oi.oi_oa)); req->rq_oi.oi_oa->o_id = loi->loi_id; + req->rq_oi.oi_oa->o_seq = loi->loi_seq; req->rq_oi.oi_cb_up = cb_getattr_update; req->rq_oi.oi_capa = oinfo->oi_capa; - req->rq_rqset = set; lov_set_add_req(req, set); } @@ -1030,12 +1089,11 @@ int lov_fini_destroy_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) { + if (cfs_atomic_read(&set->set_completes)) { /* FIXME update qos data here */ } - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(0); } @@ -1088,6 +1146,7 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo, } memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa)); req->rq_oi.oi_oa->o_id = loi->loi_id; + req->rq_oi.oi_oa->o_seq = loi->loi_seq; lov_set_add_req(req, set); } if (!set->set_count) @@ -1107,13 +1166,12 @@ int lov_fini_setattr_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) { + if (cfs_atomic_read(&set->set_completes)) { rc = common_attr_done(set); /* FIXME update qos data here */ } - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -1200,15 +1258,10 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo, memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, sizeof(*req->rq_oi.oi_oa)); req->rq_oi.oi_oa->o_id = loi->loi_id; - LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) || - CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr), - "req->rq_oi.oi_oa->o_valid="LPX64" " - "req->rq_oi.oi_oa->o_gr="LPU64"\n", - req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr); + req->rq_oi.oi_oa->o_seq= loi->loi_seq; req->rq_oi.oi_oa->o_stripe_idx = i; req->rq_oi.oi_cb_up = cb_setattr_update; req->rq_oi.oi_capa = oinfo->oi_capa; - req->rq_rqset = set; if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) { int off = lov_stripe_offset(oinfo->oi_md, @@ -1241,15 +1294,14 @@ int lov_fini_punch_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) { + if (cfs_atomic_read(&set->set_completes)) { rc = -EIO; /* FIXME update qos data here */ - if (set->set_success) + if (cfs_atomic_read(&set->set_success)) rc = common_attr_done(set); } - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } @@ -1314,18 +1366,18 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_request *req; obd_off rs, re; - if (!lov->lov_tgts[loi->loi_ost_idx] || - !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { - CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); - continue; - } - if (!lov_stripe_intersects(oinfo->oi_md, i, oinfo->oi_policy.l_extent.start, oinfo->oi_policy.l_extent.end, &rs, &re)) continue; + if (!lov->lov_tgts[loi->loi_ost_idx] || + !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { + CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); + GOTO(out_set, rc = -EIO); + } + OBD_ALLOC(req, sizeof(*req)); if (req == NULL) GOTO(out_set, rc = -ENOMEM); @@ -1340,12 +1392,11 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo, memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, sizeof(*req->rq_oi.oi_oa)); req->rq_oi.oi_oa->o_id = loi->loi_id; - req->rq_oi.oi_oa->o_gr = loi->loi_gr; + req->rq_oi.oi_oa->o_seq = loi->loi_seq; req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP; req->rq_oi.oi_oa->o_stripe_idx = i; req->rq_oi.oi_cb_up = cb_update_punch; - req->rq_rqset = set; req->rq_oi.oi_policy.l_extent.start = rs; req->rq_oi.oi_policy.l_extent.end = re; @@ -1372,20 +1423,29 @@ int lov_fini_sync_set(struct lov_request_set *set) if (set == NULL) RETURN(0); LASSERT(set->set_exp); - if (set->set_completes) { - if (!set->set_success) + if (cfs_atomic_read(&set->set_completes)) { + if (!cfs_atomic_read(&set->set_success)) rc = -EIO; /* FIXME update qos data here */ } - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); + lov_put_reqset(set); RETURN(rc); } +/* The callback for osc_sync that finilizes a request info when a + * response is recieved. */ +static int cb_sync_update(void *cookie, int rc) +{ + struct obd_info *oinfo = cookie; + struct lov_request *lovreq; + + lovreq = container_of(oinfo, struct lov_request, rq_oi); + return lov_update_common_set(lovreq->rq_rqset, lovreq, rc); +} + int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo, - struct obdo *src_oa, struct lov_stripe_md *lsm, obd_off start, obd_off end, struct lov_request_set **reqset) { @@ -1394,18 +1454,16 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo, int rc = 0, i; ENTRY; - OBD_ALLOC(set, sizeof(*set)); + OBD_ALLOC_PTR(set); if (set == NULL) RETURN(-ENOMEM); lov_init_set(set); set->set_exp = exp; set->set_oi = oinfo; - set->set_oi->oi_md = lsm; - set->set_oi->oi_oa = src_oa; - for (i = 0; i < lsm->lsm_stripe_count; i++) { - struct lov_oinfo *loi = lsm->lsm_oinfo[i]; + for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) { + struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i]; struct lov_request *req; obd_off rs, re; @@ -1415,10 +1473,11 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo, continue; } - if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re)) + if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs, + &re)) continue; - OBD_ALLOC(req, sizeof(*req)); + OBD_ALLOC_PTR(req); if (req == NULL) GOTO(out_set, rc = -ENOMEM); req->rq_stripe = i; @@ -1429,14 +1488,15 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo, OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); } - memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa)); + *req->rq_oi.oi_oa = *oinfo->oi_oa; req->rq_oi.oi_oa->o_id = loi->loi_id; + req->rq_oi.oi_oa->o_seq = loi->loi_seq; req->rq_oi.oi_oa->o_stripe_idx = i; req->rq_oi.oi_policy.l_extent.start = rs; req->rq_oi.oi_policy.l_extent.end = re; req->rq_oi.oi_policy.l_extent.gid = -1; - req->rq_rqset = set; + req->rq_oi.oi_cb_up = cb_sync_update; lov_set_add_req(req, set); } @@ -1463,17 +1523,17 @@ int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success) ENTRY; if (success) { - __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0); - - if (osfs->os_files != LOV_U64_MAX) - do_div(osfs->os_files, expected_stripes); - if (osfs->os_ffree != LOV_U64_MAX) - do_div(osfs->os_ffree, expected_stripes); - - spin_lock(&obd->obd_osfs_lock); + __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, + LOV_MAGIC, 0); + if (osfs->os_files != LOV_U64_MAX) + lov_do_div64(osfs->os_files, expected_stripes); + if (osfs->os_ffree != LOV_U64_MAX) + lov_do_div64(osfs->os_ffree, expected_stripes); + + cfs_spin_lock(&obd->obd_osfs_lock); memcpy(&obd->obd_osfs, osfs, sizeof(*osfs)); obd->obd_osfs_age = cfs_time_current_64(); - spin_unlock(&obd->obd_osfs_lock); + cfs_spin_unlock(&obd->obd_osfs_lock); RETURN(0); } @@ -1488,14 +1548,11 @@ int lov_fini_statfs_set(struct lov_request_set *set) if (set == NULL) RETURN(0); - if (set->set_completes) { + if (cfs_atomic_read(&set->set_completes)) { rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs, - set->set_success); + cfs_atomic_read(&set->set_success)); } - - if (atomic_dec_and_test(&set->set_refcount)) - lov_finish_set(set); - + lov_put_reqset(set); RETURN(rc); } @@ -1573,46 +1630,51 @@ static int cb_statfs_update(void *cookie, int rc) { struct obd_info *oinfo = cookie; struct lov_request *lovreq; + struct lov_request_set *set; struct obd_statfs *osfs, *lov_sfs; - struct obd_device *obd; struct lov_obd *lov; + struct lov_tgt_desc *tgt; + struct obd_device *lovobd, *tgtobd; int success; ENTRY; lovreq = container_of(oinfo, struct lov_request, rq_oi); - lov = &lovreq->rq_rqset->set_obd->u.lov; - obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp); - - osfs = lovreq->rq_rqset->set_oi->oi_osfs; + set = lovreq->rq_rqset; + lovobd = set->set_obd; + lov = &lovobd->u.lov; + osfs = set->set_oi->oi_osfs; lov_sfs = oinfo->oi_osfs; - - success = lovreq->rq_rqset->set_success; + success = cfs_atomic_read(&set->set_success); /* XXX: the same is done in lov_update_common_set, however lovset->set_exp is not initialized. */ - lov_update_set(lovreq->rq_rqset, lovreq, rc); - if (rc) { - /* XXX ignore error for disconnected ost ? */ - if (rc && !(lov->lov_tgts[lovreq->rq_idx] && - lov->lov_tgts[lovreq->rq_idx]->ltd_active)) - rc = 0; + lov_update_set(set, lovreq, rc); + if (rc) GOTO(out, rc); - } - spin_lock(&obd->obd_osfs_lock); - memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs)); + obd_getref(lovobd); + tgt = lov->lov_tgts[lovreq->rq_idx]; + if (!tgt || !tgt->ltd_active) + GOTO(out_update, rc); + + tgtobd = class_exp2obd(tgt->ltd_exp); + cfs_spin_lock(&tgtobd->obd_osfs_lock); + memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs)); if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0) - obd->obd_osfs_age = cfs_time_current_64(); - spin_unlock(&obd->obd_osfs_lock); + tgtobd->obd_osfs_age = cfs_time_current_64(); + cfs_spin_unlock(&tgtobd->obd_osfs_lock); +out_update: lov_update_statfs(osfs, lov_sfs, success); qos_update(lov); + obd_putref(lovobd); + out: - if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD && - lovreq->rq_rqset->set_count == lovreq->rq_rqset->set_completes) { - lov_statfs_interpret(NULL, lovreq->rq_rqset, - lovreq->rq_rqset->set_success != - lovreq->rq_rqset->set_count); - qos_statfs_done(lov); + if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD && + lov_finished_set(set)) { + lov_statfs_interpret(NULL, set, set->set_count != + cfs_atomic_read(&set->set_success)); + if (lov->lov_qos.lq_statfs_in_progress) + qos_statfs_done(lov); } RETURN(0); @@ -1644,6 +1706,13 @@ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo, continue; } + /* skip targets that have been explicitely disabled by the + * administrator */ + if (!lov->lov_tgts[i]->ltd_exp) { + CDEBUG(D_HA, "lov idx %d administratively disabled\n", i); + continue; + } + OBD_ALLOC(req, sizeof(*req)); if (req == NULL) GOTO(out_set, rc = -ENOMEM); @@ -1657,7 +1726,6 @@ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo, req->rq_idx = i; req->rq_oi.oi_cb_up = cb_statfs_update; req->rq_oi.oi_flags = oinfo->oi_flags; - req->rq_rqset = set; lov_set_add_req(req, set); }