From 8bac521d15e3be1f7057eb41a587c6973a9dd443 Mon Sep 17 00:00:00 2001 From: bobijam Date: Fri, 6 Mar 2009 04:06:22 +0000 Subject: [PATCH] Branch b1_6 b=17536 o=johann i=zhenyu.xu (bobijam) i=adilger MDS create should not wait for statfs RPC while holding DLM lock. --- lustre/ChangeLog | 12 ++-- lustre/include/lustre/lustre_idl.h | 17 ++++-- lustre/include/lustre_net.h | 11 ++-- lustre/include/obd.h | 13 ++++- lustre/lov/lov_internal.h | 3 + lustre/lov/lov_obd.c | 104 ++++++++++++++++++---------------- lustre/lov/lov_qos.c | 113 +++++++++++++++++++++++++++++++++++++ lustre/lov/lov_request.c | 20 +++++-- lustre/ptlrpc/ptlrpcd.c | 68 ++++++++++++++-------- 9 files changed, 266 insertions(+), 95 deletions(-) diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 6f50845..3846142 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -31,14 +31,18 @@ tbd Sun Microsystems, Inc. of Lustre filesystem with 4K stack may cause a stack overflow. For more information, please refer to bugzilla 17630. +Severity : enhancement +Bugzilla : 17536 +Description: MDS create should not wait for statfs RPC while holding DLM lock. + Severity : normal -Frequency : normal +Frequency : normal Bugzilla : 12069 -Descriptoin: OST grant too much space to client even there are not enough space. +Descriptoin: OST grant too much space to client even there are not enough space. Details : Client will shrink its grant cache to OST if there are no write activity over 6 mins (GRANT_SHRINK_INTERVAL), and OST will retrieve this grant cache if there are already not enough avaible space - (left_space < total_clients * 32M). + (left_space < total_clients * 32M). Severity : enhancement Bugzilla : 18289 @@ -68,7 +72,7 @@ Frequency : start MDS on uncleanly shutdowned MDS device Bugzilla : 16839 Descriptoin: ll_sync thread stay in waiting mds<>ost recovery finished Details : stay in waiting mds<>ost recovery finished produce random bugs - due race between two ll_sync thread for one lov target. send + due race between two ll_sync thread for one lov target. send ACTIVATE event only if connect realy finished and import have FULL state. diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 1c4d9a3..b8456a8 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -627,9 +627,14 @@ struct obd_statfs { extern void lustre_swab_obd_statfs (struct obd_statfs *os); #define OBD_STATFS_NODELAY 0x0001 /* requests should be send without delay * and resends for avoid deadlocks */ - #define OBD_STATFS_FROM_CACHE 0x0002 /* the statfs callback should not update * obd_osfs_age */ +#define OBD_STATFS_PTLRPCD 0x0004 /* requests will be sent via ptlrpcd + * instead of a specific set. This + * means that we cannot rely on the set + * interpret routine to be called. + * lov_statfs_fini() must thus be called + * by the request interpret routine */ /* ost_body.data values for OST_BRW */ @@ -790,7 +795,7 @@ struct lu_fid { fid_oid(fid), \ fid_ver(fid) -enum { +enum { /** put FID sequence at this offset in ldlm_res_id. */ LUSTRE_RES_ID_SEQ_OFF = 0, /** put FID oid at this offset in ldlm_res_id. */ @@ -950,7 +955,7 @@ static inline void fid_init(struct lu_fid *fid) /** * Check if a fid is igif or not. * \param fid the fid to be tested. - * \return true if the fid is a igif; otherwise false. + * \return true if the fid is a igif; otherwise false. */ static inline int fid_is_igif(const struct lu_fid *fid) { @@ -960,7 +965,7 @@ static inline int fid_is_igif(const struct lu_fid *fid) /** * Check if a fid is idif or not. * \param fid the fid to be tested. - * \return true if the fid is a idif; otherwise false. + * \return true if the fid is a idif; otherwise false. */ static inline int fid_is_idif(const struct lu_fid *fid) { @@ -993,7 +998,7 @@ static inline int fid_is_sane(const struct lu_fid *fid) /** * Check if a fid is zero. * \param fid the fid to be tested. - * \return true if the fid is zero; otherwise false. + * \return true if the fid is zero; otherwise false. */ static inline int fid_is_zero(const struct lu_fid *fid) { @@ -1014,7 +1019,7 @@ static inline ino_t lu_igif_ino(const struct lu_fid *fid) * Get inode generation from a igif. * \param fid a igif to get inode generation from. * \return inode generation for the igif. - */ + */ static inline __u32 lu_igif_gen(const struct lu_fid *fid) { return fid_oid(fid); diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 9db4bb1..60d0977 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -695,7 +695,7 @@ struct ptlrpcd_ctl { char pc_name[16]; #ifndef __KERNEL__ /** - * Async rpcs flag to make sure that ptlrpcd_check() is called only + * Async rpcs flag to make sure that ptlrpcd_check() is called only * once. */ int pc_recurred; @@ -974,19 +974,19 @@ ptlrpc_rqphase_move(struct ptlrpc_request *req, enum rq_phase new_phase) { if (req->rq_phase == new_phase) return; - + if (new_phase == RQ_PHASE_UNREGISTERING) { req->rq_next_phase = req->rq_phase; if (req->rq_import) atomic_inc(&req->rq_import->imp_unregistering); } - + if (req->rq_phase == RQ_PHASE_UNREGISTERING) { if (req->rq_import) atomic_dec(&req->rq_import->imp_unregistering); } - DEBUG_REQ(D_RPCTRACE, req, "move req \"%s\" -> \"%s\"", + DEBUG_REQ(D_RPCTRACE, req, "move req \"%s\" -> \"%s\"", ptlrpc_rqphase2str(req), ptlrpc_phase2str(new_phase)); req->rq_phase = new_phase; @@ -1113,7 +1113,7 @@ int import_set_conn_priority(struct obd_import *imp, struct obd_uuid *uuid); /* ptlrpc/pinger.c */ enum timeout_event { - TIMEOUT_GRANT = 1 + TIMEOUT_GRANT = 1 }; struct timeout_item; typedef int (*timeout_cb_t)(struct timeout_item *, void *); @@ -1138,6 +1138,7 @@ int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc); void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force); void ptlrpcd_wake(struct ptlrpc_request *req); void ptlrpcd_add_req(struct ptlrpc_request *req); +void ptlrpcd_add_rqset(struct ptlrpc_request_set *set); int ptlrpcd_addref(void); void ptlrpcd_decref(void); diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 242f92d..7f31c6c 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -192,7 +192,7 @@ struct obd_info { * level. E.g. it is used for update lsm->lsm_oinfo at every recieved * request in osc level for enqueue requests. It is also possible to * update some caller data from LOV layer if needed. */ - obd_enqueue_update_f oi_cb_up; + obd_enqueue_update_f oi_cb_up; }; /* compare all relevant fields. */ @@ -661,6 +661,10 @@ struct lov_qos_rr { unsigned long lqr_dirty:1; /* recalc round-robin list */ }; +struct lov_statfs_data { + struct obd_info lsd_oi; + struct obd_statfs lsd_statfs; +}; /* Stripe placement optimization */ struct lov_qos { struct list_head lq_oss_list; /* list of OSSs that targets use */ @@ -672,7 +676,12 @@ struct lov_qos { unsigned long lq_dirty:1, /* recalc qos data */ lq_same_space:1,/* the ost's all have approx. the same space avail */ - lq_reset:1; /* zero current penalties */ + lq_reset:1, /* zero current penalties */ + lq_statfs_in_progress:1; /* statfs op in progress */ + /* qos statfs data */ + struct lov_statfs_data *lq_statfs_data; + cfs_waitq_t lq_statfs_waitq; /* waitqueue to notify statfs + * requests completion */ }; struct lov_tgt_desc { diff --git a/lustre/lov/lov_internal.h b/lustre/lov/lov_internal.h index d7a8f07..f19e5dd 100644 --- a/lustre/lov/lov_internal.h +++ b/lustre/lov/lov_internal.h @@ -180,6 +180,8 @@ int qos_del_tgt(struct obd_device *obd, struct lov_tgt_desc *tgt); void qos_shrink_lsm(struct lov_request_set *set); int qos_prep_create(struct obd_export *exp, struct lov_request_set *set); void qos_update(struct lov_obd *lov); +void qos_statfs_done(struct lov_obd *lov); +void qos_statfs_update(struct obd_device *obd, __u64 max_age, int wait); int qos_remedy_create(struct lov_request_set *set, struct lov_request *req); /* lov_request.c */ @@ -249,6 +251,7 @@ void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs, int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs, int success); int lov_fini_statfs_set(struct lov_request_set *set); +int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc); /* lov_obd.c */ void lov_fix_desc(struct lov_desc *desc); diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 9b8b014..45ddf6e 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -95,7 +95,7 @@ void lov_putref(struct obd_device *obd) if (atomic_dec_and_test(&lov->lov_refcount) && lov->lov_death_row) { int i; struct lov_tgt_desc *n; - CDEBUG(D_CONFIG, "destroying %d lov targets\n", + CDEBUG(D_CONFIG, "destroying %d lov targets\n", lov->lov_death_row); for (i = 0; i < lov->desc.ld_tgt_count; i++) { tgt = lov->lov_tgts[i]; @@ -242,7 +242,7 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched, rc = obd_notify_observer(obd, tgt_obd, ev, data); if (rc) { CERROR("%s: notify %s of %s failed %d\n", - obd->obd_name, + obd->obd_name, obd->obd_observer->obd_name, tgt_obd->obd_name, rc); break; @@ -255,7 +255,7 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched, } #define MAX_STRING_SIZE 128 -static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate, +static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate, struct obd_connect_data *data) { struct lov_obd *lov = &obd->u.lov; @@ -297,7 +297,7 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate, if (activate) { tgt_obd->obd_no_recov = 0; - /* FIXME this is probably supposed to be + /* FIXME this is probably supposed to be ptlrpc_set_import_active. Horrible naming. */ ptlrpc_activate_import(imp); } @@ -386,7 +386,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, rc = class_connect(conn, obd, cluuid); if (rc) RETURN(rc); - + *exp = class_conn2export(conn); /* Why should there ever be more than 1 connect? */ @@ -406,7 +406,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, rc = lov_connect_obd(obd, i, lov->lov_tgts[i]->ltd_activate, &lov->lov_ocd); if (rc) { - CERROR("%s: lov connect tgt %d failed: %d\n", + CERROR("%s: lov connect tgt %d failed: %d\n", obd->obd_name, i, rc); continue; } @@ -435,7 +435,7 @@ static int lov_disconnect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt) ENTRY; - CDEBUG(D_CONFIG, "%s: disconnecting target %s\n", + CDEBUG(D_CONFIG, "%s: disconnecting target %s\n", obd->obd_name, osc_obd->obd_name); if (tgt->ltd_active) { @@ -486,7 +486,7 @@ static int lov_disconnect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt) RETURN(0); } -static int lov_del_target(struct obd_device *obd, __u32 index, +static int lov_del_target(struct obd_device *obd, __u32 index, struct obd_uuid *uuidp, int gen); static int lov_disconnect(struct obd_export *exp) @@ -617,7 +617,7 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp, __u32 newsize, oldsize = 0; newsize = max(lov->lov_tgt_size, (__u32)2); - while (newsize < index + 1) + while (newsize < index + 1) newsize = newsize << 1; OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize); if (newtgts == NULL) { @@ -626,7 +626,7 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp, } if (lov->lov_tgt_size) { - memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) * + memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) * lov->lov_tgt_size); old = lov->lov_tgts; oldsize = lov->lov_tgt_size; @@ -668,8 +668,8 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp, CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n", index, tgt->ltd_gen, lov->desc.ld_tgt_count); - - if (lov->lov_connects == 0) { + + if (lov->lov_connects == 0) { /* lov_connect hasn't been called yet. We'll do the lov_connect_obd on this target when that fn first runs, because we don't know the connect flags yet. */ @@ -686,13 +686,13 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp, if (!tgt->ltd_exp) GOTO(out, rc = 0); - rc = lov_notify(obd, tgt->ltd_exp->exp_obd, + rc = lov_notify(obd, tgt->ltd_exp->exp_obd, active ? OBD_NOTIFY_CONNECT : OBD_NOTIFY_INACTIVE, (void *)&index); out: if (rc) { - CERROR("add failed (%d), deleting %s\n", rc, + CERROR("add failed (%d), deleting %s\n", rc, obd_uuid2str(&tgt->ltd_uuid)); lov_del_target(obd, index, 0, 0); } @@ -701,7 +701,7 @@ out: } /* Schedule a target for deletion */ -static int lov_del_target(struct obd_device *obd, __u32 index, +static int lov_del_target(struct obd_device *obd, __u32 index, struct obd_uuid *uuidp, int gen) { struct lov_obd *lov = &obd->u.lov; @@ -731,7 +731,7 @@ static int lov_del_target(struct obd_device *obd, __u32 index, CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n", lov_uuid2str(lov, index), index, - lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp, + lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp, lov->lov_tgts[index]->ltd_active); lov->lov_tgts[index]->ltd_reap = 1; @@ -868,6 +868,11 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf) lov->lov_qos.lq_prio_free = 232; /* Default threshold for rr (roughly 17%) */ lov->lov_qos.lq_threshold_rr = 43; + /* Init statfs fields */ + OBD_ALLOC_PTR(lov->lov_qos.lq_statfs_data); + if (NULL == lov->lov_qos.lq_statfs_data) + RETURN(-ENOMEM); + cfs_waitq_init(&lov->lov_qos.lq_statfs_waitq); lov->lov_pools_hash_body = lustre_hash_init("POOLS", 7, 7, &pool_hash_operations, 0); @@ -958,19 +963,19 @@ static int lov_cleanup(struct obd_device *obd) if (lov->lov_tgts[i]) { /* Inactive targets may never have connected */ if (lov->lov_tgts[i]->ltd_active || - atomic_read(&lov->lov_refcount)) - /* We should never get here - these - should have been removed in the + atomic_read(&lov->lov_refcount)) + /* We should never get here - these + should have been removed in the disconnect. */ CERROR("lov tgt %d not cleaned!" " deathrow=%d, lovrc=%d\n", - i, lov->lov_death_row, + i, lov->lov_death_row, atomic_read(&lov->lov_refcount)); lov_del_target(obd, i, 0, 0); } } lov_putref(obd); - OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) * + OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) * lov->lov_tgt_size); lov->lov_tgt_size = 0; } @@ -978,6 +983,7 @@ static int lov_cleanup(struct obd_device *obd) /* clear pools parent proc entry only after all pools is killed */ lprocfs_obd_cleanup(obd); + OBD_FREE_PTR(lov->lov_qos.lq_statfs_data); RETURN(0); } @@ -1016,12 +1022,12 @@ static int lov_process_config(struct obd_device *obd, obd_count len, void *buf) case LCFG_PARAM: { struct lprocfs_static_vars lvars = { 0 }; struct lov_desc *desc = &(obd->u.lov.desc); - + if (!desc) GOTO(out, rc = -EINVAL); - + lprocfs_lov_init_vars(&lvars); - + rc = class_process_proc_param(PARAM_LOV, lvars.obd_vars, lcfg, obd); GOTO(out, rc); @@ -1092,14 +1098,14 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa, if (ost_uuid && !obd_uuid_equals(ost_uuid, &tgt->ltd_uuid)) continue; - CDEBUG(D_CONFIG,"Clear orphans for %d:%s\n", i, + CDEBUG(D_CONFIG,"Clear orphans for %d:%s\n", i, obd_uuid2str(ost_uuid)); memcpy(tmp_oa, src_oa, sizeof(*tmp_oa)); LASSERT(lov->lov_tgts[i]->ltd_exp); /* XXX: LOV STACKING: use real "obj_mdp" sub-data */ - err = obd_create(lov->lov_tgts[i]->ltd_exp, + err = obd_create(lov->lov_tgts[i]->ltd_exp, tmp_oa, &obj_mdp, oti); if (err) { /* This export will be disabled until it is recovered, @@ -1166,8 +1172,6 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa, struct obd_info oinfo; struct lov_request_set *set = NULL; struct lov_request *req; - struct obd_statfs osfs; - __u64 maxage; int rc = 0; ENTRY; @@ -1193,8 +1197,11 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa, GOTO(out, rc); } - maxage = cfs_time_shift_64(-lov->desc.ld_qos_maxage); - obd_statfs_rqset(exp->exp_obd, &osfs, maxage, OBD_STATFS_NODELAY); + /* issue statfs rpcs if the osfs data is older than qos_maxage - 1s, + * later in alloc_qos(), we will wait for those rpcs to complete if + * the osfs age is older than 2 * qos_maxage */ + qos_statfs_update(exp->exp_obd, + cfs_time_shift_64(-lov->desc.ld_qos_maxage) + HZ, 0); rc = lov_prep_create_set(exp, &oinfo, ea, src_oa, oti, &set); if (rc) @@ -1303,7 +1310,7 @@ static int lov_getattr(struct obd_export *exp, struct obd_info *oinfo) req = list_entry(pos, struct lov_request, rq_link); CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx " - "%u\n", oinfo->oi_oa->o_id, req->rq_stripe, + "%u\n", oinfo->oi_oa->o_id, req->rq_stripe, req->rq_oi.oi_oa->o_id, req->rq_idx); rc = obd_getattr(lov->lov_tgts[req->rq_idx]->ltd_exp, @@ -1324,7 +1331,7 @@ static int lov_getattr(struct obd_export *exp, struct obd_info *oinfo) RETURN(rc); } -static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, +static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data, int rc) { struct lov_request_set *lovset = (struct lov_request_set *)data; @@ -1361,14 +1368,14 @@ static int lov_getattr_async(struct obd_export *exp, struct obd_info *oinfo, RETURN(rc); CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n", - oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count, + oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count, oinfo->oi_md->lsm_stripe_size); list_for_each (pos, &lovset->set_list) { req = list_entry(pos, struct lov_request, rq_link); CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx " - "%u\n", oinfo->oi_oa->o_id, req->rq_stripe, + "%u\n", oinfo->oi_oa->o_id, req->rq_stripe, req->rq_oi.oi_oa->o_id, req->rq_idx); rc = obd_getattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp, &req->rq_oi, rqset); @@ -1412,12 +1419,12 @@ static int lov_setattr(struct obd_export *exp, struct obd_info *oinfo, RETURN(-ENODEV); /* for now, we only expect the following updates here */ - LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE | - OBD_MD_FLMODE | OBD_MD_FLATIME | + LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE | + OBD_MD_FLMODE | OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME | - OBD_MD_FLFLAGS | OBD_MD_FLSIZE | - OBD_MD_FLGROUP | OBD_MD_FLUID | - OBD_MD_FLGID | OBD_MD_FLFID | + OBD_MD_FLFLAGS | OBD_MD_FLSIZE | + OBD_MD_FLGROUP | OBD_MD_FLUID | + OBD_MD_FLGID | OBD_MD_FLFID | OBD_MD_FLGENER))); lov = &exp->exp_obd->u.lov; rc = lov_prep_setattr_set(exp, oinfo, oti, &set); @@ -1427,13 +1434,13 @@ static int lov_setattr(struct obd_export *exp, struct obd_info *oinfo, list_for_each (pos, &set->set_list) { req = list_entry(pos, struct lov_request, rq_link); - rc = obd_setattr(lov->lov_tgts[req->rq_idx]->ltd_exp, + rc = obd_setattr(lov->lov_tgts[req->rq_idx]->ltd_exp, &req->rq_oi, NULL); err = lov_update_setattr_set(set, req, rc); if (err) { CERROR("error: setattr objid "LPX64" subobj " LPX64" on OST idx %d: rc = %d\n", - set->set_oi->oi_oa->o_id, + set->set_oi->oi_oa->o_id, req->rq_oi.oi_oa->o_id, req->rq_idx, err); if (!rc) rc = err; @@ -1866,12 +1873,12 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, if (!page) { int i = 0; /* Find an existing osc so we can get it's stupid sizeof(*oap). - Only because of this layering limitation will a client + Only because of this layering limitation will a client mount with no osts fail */ - while (!lov->lov_tgts || !lov->lov_tgts[i] || + while (!lov->lov_tgts || !lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp) { i++; - if (i >= lov->desc.ld_tgt_count) + if (i >= lov->desc.ld_tgt_count) RETURN(-ENOMEDIUM); } rc = size_round(sizeof(*lap)) + @@ -2006,7 +2013,7 @@ static int lov_trigger_group_io(struct obd_export *exp, for (i = 0; i < lsm->lsm_stripe_count; i++) { loi = lsm->lsm_oinfo[i]; - if (!lov->lov_tgts[loi->loi_ost_idx] || + if (!lov->lov_tgts[loi->loi_ost_idx] || !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); continue; @@ -2333,8 +2340,7 @@ static int lov_join_lru(struct obd_export *exp, RETURN(count); } -static int lov_statfs_interpret(struct ptlrpc_request_set *rqset, - void *data, int rc) +int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc) { struct lov_request_set *lovset = (struct lov_request_set *)data; int err; @@ -2458,7 +2464,7 @@ static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len, genp = (__u32 *)data->ioc_inlbuf3; /* the uuid will be empty for deleted OSTs */ for (i = 0; i < count; i++, uuidp++, genp++) { - if (!lov->lov_tgts[i]) + if (!lov->lov_tgts[i]) continue; *uuidp = lov->lov_tgts[i]->ltd_uuid; *genp = lov->lov_tgts[i]->ltd_gen; @@ -3261,7 +3267,7 @@ int __init lov_init(void) ENTRY; lov_oinfo_slab = cfs_mem_cache_create("lov_oinfo", - sizeof(struct lov_oinfo), + sizeof(struct lov_oinfo), 0, SLAB_HWCACHE_ALIGN); if (lov_oinfo_slab == NULL) return -ENOMEM; diff --git a/lustre/lov/lov_qos.c b/lustre/lov/lov_qos.c index 61025ef..3193642 100644 --- a/lustre/lov/lov_qos.c +++ b/lustre/lov/lov_qos.c @@ -47,6 +47,7 @@ #include #include +#include #include "lov_internal.h" /* #define QOS_DEBUG 1 */ @@ -752,6 +753,11 @@ static int alloc_qos(struct obd_export *exp, int *idx_arr, int *stripe_cnt, } lov_getref(exp->exp_obd); + /* wait for fresh statfs info if needed, the rpcs are sent in + * lov_create() */ + qos_statfs_update(exp->exp_obd, + cfs_time_shift_64(-2 * lov->desc.ld_qos_maxage), 1); + down_write(&lov->lov_qos.lq_rw_sem); if (lov->desc.ld_active_tgt_count < 2) @@ -1074,3 +1080,110 @@ void qos_update(struct lov_obd *lov) ENTRY; lov->lov_qos.lq_dirty = 1; } + +void qos_statfs_done(struct lov_obd *lov) +{ + LASSERT(lov->lov_qos.lq_statfs_in_progress); + down_write(&lov->lov_qos.lq_rw_sem); + lov->lov_qos.lq_statfs_in_progress = 0; + /* wake up any threads waiting for the statfs rpcs to complete */ + cfs_waitq_signal(&lov->lov_qos.lq_statfs_waitq); + up_write(&lov->lov_qos.lq_rw_sem); +} + +static int qos_statfs_ready(struct obd_device *obd, __u64 max_age) +{ + struct lov_obd *lov = &obd->u.lov; + int rc; + ENTRY; + down_read(&lov->lov_qos.lq_rw_sem); + rc = lov->lov_qos.lq_statfs_in_progress == 0 || + cfs_time_beforeq_64(max_age, obd->obd_osfs_age); + up_read(&lov->lov_qos.lq_rw_sem); + RETURN(rc); +} + +/* + * Update statfs data if the current osfs age is older than max_age. + * If wait is not set, it means that we are called from lov_create() + * and we should just issue the rpcs without waiting for them to complete. + * If wait is set, we are called from alloc_qos() and we just have + * to wait for the request set to complete. + */ +void qos_statfs_update(struct obd_device *obd, __u64 max_age, int wait) +{ + struct lov_obd *lov = &obd->u.lov; + struct obd_info *oinfo; + int rc = 0; + struct ptlrpc_request_set *set = NULL; + ENTRY; + + if (cfs_time_beforeq_64(max_age, obd->obd_osfs_age)) + /* statfs data are quite recent, don't need to refresh it */ + RETURN_EXIT; + + if (!wait && lov->lov_qos.lq_statfs_in_progress) + /* statfs already in progress */ + RETURN_EXIT; + + down_write(&lov->lov_qos.lq_rw_sem); + if (lov->lov_qos.lq_statfs_in_progress) { + up_write(&lov->lov_qos.lq_rw_sem); + GOTO(out, rc = 0); + } + /* no statfs in flight, send rpcs */ + lov->lov_qos.lq_statfs_in_progress = 1; + up_write(&lov->lov_qos.lq_rw_sem); + + if (wait) + CDEBUG(D_QOS, "%s: did not manage to get fresh statfs data " + "in a timely manner (osfs age "LPU64", max age "LPU64")" + ", sending new statfs rpcs\n", + obd_uuid2str(&lov->desc.ld_uuid), obd->obd_osfs_age, + max_age); + + /* need to send statfs rpcs */ + CDEBUG(D_QOS, "sending new statfs requests\n"); + memset(lov->lov_qos.lq_statfs_data, 0, + sizeof(*lov->lov_qos.lq_statfs_data)); + oinfo = &lov->lov_qos.lq_statfs_data->lsd_oi; + oinfo->oi_osfs = &lov->lov_qos.lq_statfs_data->lsd_statfs; + oinfo->oi_flags = OBD_STATFS_NODELAY; + set = ptlrpc_prep_set(); + if (!set) + GOTO(out_failed, rc = -ENOMEM); + + rc = obd_statfs_async(obd, oinfo, max_age, set); + if (rc || list_empty(&set->set_requests)) { + if (rc) + CWARN("statfs failed with %d\n", rc); + GOTO(out_failed, rc); + } + /* send requests via ptlrpcd */ + oinfo->oi_flags |= OBD_STATFS_PTLRPCD; + ptlrpcd_add_rqset(set); + GOTO(out, rc); + +out_failed: + down_write(&lov->lov_qos.lq_rw_sem); + lov->lov_qos.lq_statfs_in_progress = 0; + /* wake up any threads waiting for the statfs rpcs to complete */ + cfs_waitq_signal(&lov->lov_qos.lq_statfs_waitq); + up_write(&lov->lov_qos.lq_rw_sem); + wait = 0; +out: + if (set) + ptlrpc_set_destroy(set); + if (wait) { + struct l_wait_info lwi = { 0 }; + CDEBUG(D_QOS, "waiting for statfs requests to complete\n"); + l_wait_event(lov->lov_qos.lq_statfs_waitq, + qos_statfs_ready(obd, max_age), &lwi); + if (cfs_time_before_64(obd->obd_osfs_age, max_age)) + CDEBUG(D_QOS, "%s: still no fresh statfs data after " + "waiting (osfs age "LPU64", max age " + LPU64")\n", + obd_uuid2str(&lov->desc.ld_uuid), + obd->obd_osfs_age, max_age); + } +} diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index 7e2eda5..9fb2944 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -113,7 +113,7 @@ int lov_update_common_set(struct lov_request_set *set, lov_update_set(set, req, rc); /* grace error on inactive ost */ - if (rc && !(lov->lov_tgts[req->rq_idx] && + if (rc && !(lov->lov_tgts[req->rq_idx] && lov->lov_tgts[req->rq_idx]->ltd_active)) rc = 0; @@ -876,7 +876,7 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, continue; loi = oinfo->oi_md->lsm_oinfo[i]; - if (!lov->lov_tgts[loi->loi_ost_idx] || + if (!lov->lov_tgts[loi->loi_ost_idx] || !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); GOTO(out, rc = -EIO); @@ -1075,7 +1075,7 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_request *req; loi = lsm->lsm_oinfo[i]; - if (!lov->lov_tgts[loi->loi_ost_idx] || + if (!lov->lov_tgts[loi->loi_ost_idx] || !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); continue; @@ -1134,7 +1134,7 @@ int lov_update_setattr_set(struct lov_request_set *set, lov_update_set(set, req, rc); /* grace error on inactive ost */ - if (rc && !(lov->lov_tgts[req->rq_idx] && + if (rc && !(lov->lov_tgts[req->rq_idx] && lov->lov_tgts[req->rq_idx]->ltd_active)) rc = 0; @@ -1576,7 +1576,7 @@ void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs, } /* The callback for osc_statfs_async that finilizes a request info when a - * response is recieved. */ + * response is received. */ static int cb_statfs_update(struct obd_info *oinfo, int rc) { struct lov_request *lovreq; @@ -1602,7 +1602,7 @@ static int cb_statfs_update(struct obd_info *oinfo, int rc) if (rc && !(lov->lov_tgts[lovreq->rq_idx] && lov->lov_tgts[lovreq->rq_idx]->ltd_active)) rc = 0; - RETURN(rc); + GOTO(out, rc); } spin_lock(&obd->obd_osfs_lock); @@ -1613,6 +1613,14 @@ static int cb_statfs_update(struct obd_info *oinfo, int rc) lov_update_statfs(osfs, lov_sfs, success); qos_update(lov); +out: + if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD && + lovreq->rq_rqset->set_count == lovreq->rq_rqset->set_completes) { + lov_statfs_interpret(NULL, lovreq->rq_rqset, + lovreq->rq_rqset->set_success != + lovreq->rq_rqset->set_count); + qos_statfs_done(lov); + } RETURN(0); } diff --git a/lustre/ptlrpc/ptlrpcd.c b/lustre/ptlrpc/ptlrpcd.c index 3ccff0f..c5bb52e 100644 --- a/lustre/ptlrpc/ptlrpcd.c +++ b/lustre/ptlrpc/ptlrpcd.c @@ -69,7 +69,29 @@ void ptlrpcd_wake(struct ptlrpc_request *req) cfs_waitq_signal(&rq_set->set_waitq); } -/* +/* + * Move all request from an existing request set to the ptlrpcd queue. + * All requests from the set must be in phase RQ_PHASE_NEW. + */ +void ptlrpcd_add_rqset(struct ptlrpc_request_set *set) +{ + struct list_head *tmp, *pos; + + list_for_each_safe(pos, tmp, &set->set_requests) { + struct ptlrpc_request *req = + list_entry(pos, struct ptlrpc_request, rq_set_chain); + + LASSERT(req->rq_phase == RQ_PHASE_NEW); + list_del_init(&req->rq_set_chain); + req->rq_set = NULL; + ptlrpcd_add_req(req); + set->set_remaining--; + } + LASSERT(set->set_remaining == 0); +} +EXPORT_SYMBOL(ptlrpcd_add_rqset); + +/* * Requests that are added to the ptlrpcd queue are sent via * ptlrpcd_check->ptlrpc_check_set(). */ @@ -86,7 +108,7 @@ void ptlrpcd_add_req(struct ptlrpc_request *req) if (rc) { int (*interpreter)(struct ptlrpc_request *, void *, int); - + interpreter = req->rq_interpret_reply; /* @@ -116,8 +138,8 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) req = list_entry(pos, struct ptlrpc_request, rq_set_chain); list_del_init(&req->rq_set_chain); ptlrpc_set_add_req(pc->pc_set, req); - /* - * Need to calculate its timeout. + /* + * Need to calculate its timeout. */ rc = 1; } @@ -126,9 +148,9 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) if (pc->pc_set->set_remaining) { rc = rc | ptlrpc_check_set(pc->pc_set); - /* + /* * XXX: our set never completes, so we prune the completed - * reqs after each iteration. boy could this be smarter. + * reqs after each iteration. boy could this be smarter. */ list_for_each_safe(pos, tmp, &pc->pc_set->set_requests) { req = list_entry(pos, struct ptlrpc_request, @@ -143,8 +165,8 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) } if (rc == 0) { - /* - * If new requests have been added, make sure to wake up. + /* + * If new requests have been added, make sure to wake up. */ spin_lock(&pc->pc_set->set_new_req_lock); rc = !list_empty(&pc->pc_set->set_new_requests); @@ -155,7 +177,7 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) } #ifdef __KERNEL__ -/* +/* * ptlrpc's code paths like to execute in process context, so we have this * thread which spins on a set which contains the io rpcs. llite specifies * ptlrpcd's set when it pushes pages down into the oscs. @@ -173,18 +195,18 @@ static int ptlrpcd(void *arg) complete(&pc->pc_starting); - /* + /* * This mainloop strongly resembles ptlrpc_set_wait() except that our * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when - * there are requests in the set. New requests come in on the set's - * new_req_list and ptlrpcd_check() moves them into the set. + * there are requests in the set. New requests come in on the set's + * new_req_list and ptlrpcd_check() moves them into the set. */ do { struct l_wait_info lwi; int timeout; timeout = ptlrpc_set_next_timeout(pc->pc_set); - lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1), + lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1), ptlrpc_expired_set, pc->pc_set); l_wait_event(pc->pc_set->set_waitq, ptlrpcd_check(pc), &lwi); @@ -198,14 +220,14 @@ static int ptlrpcd(void *arg) exit++; } - /* + /* * Let's make one more loop to make sure that ptlrpcd_check() * copied all raced new rpcs into the set so we can kill them. */ } while (exit < 2); - /* - * Wait for inflight requests to drain. + /* + * Wait for inflight requests to drain. */ if (!list_empty(&pc->pc_set->set_requests)) ptlrpc_set_wait(pc->pc_set); @@ -225,8 +247,8 @@ int ptlrpcd_check_async_rpcs(void *arg) struct ptlrpcd_ctl *pc = arg; int rc = 0; - /* - * Single threaded!! + /* + * Single threaded!! */ pc->pc_recurred++; @@ -234,8 +256,8 @@ int ptlrpcd_check_async_rpcs(void *arg) rc = ptlrpcd_check(pc); if (!rc) ptlrpc_expired_set(pc->pc_set); - /* - * XXX: send replay requests. + /* + * XXX: send replay requests. */ if (pc == &ptlrpcd_recovery_pc) rc = ptlrpcd_check(pc); @@ -259,9 +281,9 @@ int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc) { int rc = 0; ENTRY; - - /* - * Do not allow start second thread for one pc. + + /* + * Do not allow start second thread for one pc. */ if (test_bit(LIOD_START, &pc->pc_flags)) { CERROR("Starting second thread (%s) for same pc %p\n", -- 1.8.3.1