From 74ec90e7575d29821acbb39310fe7f77b6874257 Mon Sep 17 00:00:00 2001 From: bobijam Date: Fri, 6 Mar 2009 03:52:54 +0000 Subject: [PATCH] Branch b1_6 b=17536 o=johann i=zhenyu.xu (bobijam) i=adilger MDS create should not wait for statfs RPC while holding DLM lock. --- lustre/ChangeLog | 12 ++-- lustre/include/lustre/lustre_idl.h | 9 ++- lustre/include/lustre_net.h | 11 ++-- lustre/include/obd.h | 13 ++++- lustre/lov/lov_internal.h | 3 + lustre/lov/lov_obd.c | 110 +++++++++++++++++++----------------- lustre/lov/lov_qos.c | 112 +++++++++++++++++++++++++++++++++++++ lustre/lov/lov_request.c | 18 ++++-- lustre/ptlrpc/ptlrpcd.c | 68 ++++++++++++++-------- 9 files changed, 263 insertions(+), 93 deletions(-) diff --git a/lustre/ChangeLog b/lustre/ChangeLog index e7a995d..e04effb 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -45,17 +45,21 @@ tbd Sun Microsystems, Inc. more information, please refer to bugzilla 17630. Severity : enhancement +Bugzilla : 17536 +Description: MDS create should not wait for statfs RPC while holding DLM lock. + +Severity : enhancement Bugzilla : 18289 Description: Update to RHEL5U3 kernel-2.6.18-128.1.1.el5. Severity : normal -Frequency : normal +Frequency : normal Bugzilla : 12069 -Descriptoin: OST grant too much space to client even there are not enough space. +Descriptoin: OST grant too much space to client even there are not enough space. Details : Client will shrink its grant cache to OST if there are no write activity over 6 mins (GRANT_SHRINK_INTERVAL), and OST will retrieve this grant cache if there are already not enough avaible space - (left_space < total_clients * 32M). + (left_space < total_clients * 32M). Severity : enhancement Bugzilla : 14250 @@ -74,7 +78,7 @@ Frequency : start MDS on uncleanly shutdowned MDS device Bugzilla : 16839 Descriptoin: ll_sync thread stay in waiting mds<>ost recovery finished Details : stay in waiting mds<>ost recovery finished produce random bugs - due race between two ll_sync thread for one lov target. send + due race between two ll_sync thread for one lov target. send ACTIVATE event only if connect realy finished and import have FULL state. diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 3b98b34..a1ae407 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -589,9 +589,14 @@ struct obd_statfs { extern void lustre_swab_obd_statfs (struct obd_statfs *os); #define OBD_STATFS_NODELAY 0x0001 /* requests should be send without delay * and resends for avoid deadlocks */ - #define OBD_STATFS_FROM_CACHE 0x0002 /* the statfs callback should not update * obd_osfs_age */ +#define OBD_STATFS_PTLRPCD 0x0004 /* requests will be sent via ptlrpcd + * instead of a specific set. This + * means that we cannot rely on the set + * interpret routine to be called. + * lov_statfs_fini() must thus be called + * by the request interpret routine */ /* ost_body.data values for OST_BRW */ @@ -632,7 +637,7 @@ extern void lustre_swab_niobuf_remote (struct niobuf_remote *nbr); /* lock value block communicated between the filter and llite */ -/* OST_LVB_ERR_INIT is needed because the return code in rc is +/* OST_LVB_ERR_INIT is needed because the return code in rc is * negative, i.e. because ((MASK + rc) & MASK) != MASK. */ #define OST_LVB_ERR_INIT 0xffbadbad80000000ULL #define OST_LVB_ERR_MASK 0xffbadbad00000000ULL diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 0b08b84..e0b7fc8 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -695,7 +695,7 @@ struct ptlrpcd_ctl { char pc_name[16]; #ifndef __KERNEL__ /** - * Async rpcs flag to make sure that ptlrpcd_check() is called only + * Async rpcs flag to make sure that ptlrpcd_check() is called only * once. */ int pc_recurred; @@ -971,19 +971,19 @@ ptlrpc_rqphase_move(struct ptlrpc_request *req, enum rq_phase new_phase) { if (req->rq_phase == new_phase) return; - + if (new_phase == RQ_PHASE_UNREGISTERING) { req->rq_next_phase = req->rq_phase; if (req->rq_import) atomic_inc(&req->rq_import->imp_unregistering); } - + if (req->rq_phase == RQ_PHASE_UNREGISTERING) { if (req->rq_import) atomic_dec(&req->rq_import->imp_unregistering); } - DEBUG_REQ(D_RPCTRACE, req, "move req \"%s\" -> \"%s\"", + DEBUG_REQ(D_RPCTRACE, req, "move req \"%s\" -> \"%s\"", ptlrpc_rqphase2str(req), ptlrpc_phase2str(new_phase)); req->rq_phase = new_phase; @@ -1110,7 +1110,7 @@ int import_set_conn_priority(struct obd_import *imp, struct obd_uuid *uuid); /* ptlrpc/pinger.c */ enum timeout_event { - TIMEOUT_GRANT = 1 + TIMEOUT_GRANT = 1 }; struct timeout_item; typedef int (*timeout_cb_t)(struct timeout_item *, void *); @@ -1135,6 +1135,7 @@ int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc); void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force); void ptlrpcd_wake(struct ptlrpc_request *req); void ptlrpcd_add_req(struct ptlrpc_request *req); +void ptlrpcd_add_rqset(struct ptlrpc_request_set *set); int ptlrpcd_addref(void); void ptlrpcd_decref(void); diff --git a/lustre/include/obd.h b/lustre/include/obd.h index e7ce881..5092ef2 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -190,7 +190,7 @@ struct obd_info { * level. E.g. it is used for update lsm->lsm_oinfo at every recieved * request in osc level for enqueue requests. It is also possible to * update some caller data from LOV layer if needed. */ - obd_enqueue_update_f oi_cb_up; + obd_enqueue_update_f oi_cb_up; }; /* compare all relevant fields. */ @@ -615,6 +615,10 @@ struct ltd_qos { unsigned int ltq_usable:1; /* usable for striping */ }; +struct lov_statfs_data { + struct obd_info lsd_oi; + struct obd_statfs lsd_statfs; +}; struct lov_qos { struct list_head lq_oss_list; /* list of OSSs that targets use */ struct rw_semaphore lq_rw_sem; @@ -627,7 +631,12 @@ struct lov_qos { lq_dirty_rr:1, /* recalc round-robin list */ lq_same_space:1,/* the ost's all have approx. the same space avail */ - lq_reset:1; /* zero current penalties */ + lq_reset:1, /* zero current penalties */ + lq_statfs_in_progress:1; /* statfs op in progress */ + /* qos statfs data */ + struct lov_statfs_data *lq_statfs_data; + cfs_waitq_t lq_statfs_waitq; /* waitqueue to notify statfs + * requests completion */ }; struct lov_tgt_desc { diff --git a/lustre/lov/lov_internal.h b/lustre/lov/lov_internal.h index ac8b3e1..f38e2e9 100644 --- a/lustre/lov/lov_internal.h +++ b/lustre/lov/lov_internal.h @@ -178,6 +178,8 @@ int qos_del_tgt(struct obd_device *obd, struct lov_tgt_desc *tgt); void qos_shrink_lsm(struct lov_request_set *set); int qos_prep_create(struct obd_export *exp, struct lov_request_set *set); void qos_update(struct lov_obd *lov); +void qos_statfs_done(struct lov_obd *lov); +void qos_statfs_update(struct obd_device *obd, __u64 max_age, int wait); int qos_remedy_create(struct lov_request_set *set, struct lov_request *req); /* lov_request.c */ @@ -248,6 +250,7 @@ void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs, int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs, int success); int lov_fini_statfs_set(struct lov_request_set *set); +int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc); /* lov_obd.c */ void lov_fix_desc(struct lov_desc *desc); diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 85449dd..04a79d4 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -95,7 +95,7 @@ void lov_putref(struct obd_device *obd) if (atomic_dec_and_test(&lov->lov_refcount) && lov->lov_death_row) { int i; struct lov_tgt_desc *n; - CDEBUG(D_CONFIG, "destroying %d lov targets\n", + CDEBUG(D_CONFIG, "destroying %d lov targets\n", lov->lov_death_row); for (i = 0; i < lov->desc.ld_tgt_count; i++) { tgt = lov->lov_tgts[i]; @@ -241,7 +241,7 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched, rc = obd_notify_observer(obd, tgt_obd, ev, data); if (rc) { CERROR("%s: notify %s of %s failed %d\n", - obd->obd_name, + obd->obd_name, obd->obd_observer->obd_name, tgt_obd->obd_name, rc); break; @@ -254,7 +254,7 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched, } #define MAX_STRING_SIZE 128 -static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate, +static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate, struct obd_connect_data *data) { struct lov_obd *lov = &obd->u.lov; @@ -296,7 +296,7 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate, if (activate) { tgt_obd->obd_no_recov = 0; - /* FIXME this is probably supposed to be + /* FIXME this is probably supposed to be ptlrpc_set_import_active. Horrible naming. */ ptlrpc_activate_import(imp); } @@ -385,7 +385,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, rc = class_connect(conn, obd, cluuid); if (rc) RETURN(rc); - + *exp = class_conn2export(conn); /* Why should there ever be more than 1 connect? */ @@ -405,7 +405,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, rc = lov_connect_obd(obd, i, lov->lov_tgts[i]->ltd_activate, &lov->lov_ocd); if (rc) { - CERROR("%s: lov connect tgt %d failed: %d\n", + CERROR("%s: lov connect tgt %d failed: %d\n", obd->obd_name, i, rc); continue; } @@ -434,7 +434,7 @@ static int lov_disconnect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt) ENTRY; - CDEBUG(D_CONFIG, "%s: disconnecting target %s\n", + CDEBUG(D_CONFIG, "%s: disconnecting target %s\n", obd->obd_name, osc_obd->obd_name); if (tgt->ltd_active) { @@ -485,7 +485,7 @@ static int lov_disconnect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt) RETURN(0); } -static int lov_del_target(struct obd_device *obd, __u32 index, +static int lov_del_target(struct obd_device *obd, __u32 index, struct obd_uuid *uuidp, int gen); static int lov_disconnect(struct obd_export *exp) @@ -616,7 +616,7 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp, __u32 newsize, oldsize = 0; newsize = max(lov->lov_tgt_size, (__u32)2); - while (newsize < index + 1) + while (newsize < index + 1) newsize = newsize << 1; OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize); if (newtgts == NULL) { @@ -625,7 +625,7 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp, } if (lov->lov_tgt_size) { - memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) * + memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) * lov->lov_tgt_size); old = lov->lov_tgts; oldsize = lov->lov_tgt_size; @@ -663,8 +663,8 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp, CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n", index, tgt->ltd_gen, lov->desc.ld_tgt_count); - - if (lov->lov_connects == 0) { + + if (lov->lov_connects == 0) { /* lov_connect hasn't been called yet. We'll do the lov_connect_obd on this target when that fn first runs, because we don't know the connect flags yet. */ @@ -681,13 +681,13 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp, if (!tgt->ltd_exp) GOTO(out, rc = 0); - rc = lov_notify(obd, tgt->ltd_exp->exp_obd, + rc = lov_notify(obd, tgt->ltd_exp->exp_obd, active ? OBD_NOTIFY_CONNECT : OBD_NOTIFY_INACTIVE, (void *)&index); out: if (rc) { - CERROR("add failed (%d), deleting %s\n", rc, + CERROR("add failed (%d), deleting %s\n", rc, obd_uuid2str(&tgt->ltd_uuid)); lov_del_target(obd, index, 0, 0); } @@ -696,7 +696,7 @@ out: } /* Schedule a target for deletion */ -static int lov_del_target(struct obd_device *obd, __u32 index, +static int lov_del_target(struct obd_device *obd, __u32 index, struct obd_uuid *uuidp, int gen) { struct lov_obd *lov = &obd->u.lov; @@ -726,7 +726,7 @@ static int lov_del_target(struct obd_device *obd, __u32 index, CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n", lov_uuid2str(lov, index), index, - lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp, + lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp, lov->lov_tgts[index]->ltd_active); lov->lov_tgts[index]->ltd_reap = 1; @@ -860,6 +860,11 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf) lov->lov_qos.lq_prio_free = 232; /* Default threshold for rr (roughly 17%) */ lov->lov_qos.lq_threshold_rr = 43; + /* Init statfs fields */ + OBD_ALLOC_PTR(lov->lov_qos.lq_statfs_data); + if (NULL == lov->lov_qos.lq_statfs_data) + RETURN(-ENOMEM); + cfs_waitq_init(&lov->lov_qos.lq_statfs_waitq); lprocfs_lov_init_vars(&lvars); lprocfs_obd_setup(obd, lvars.obd_vars); @@ -920,25 +925,26 @@ static int lov_cleanup(struct obd_device *obd) if (lov->lov_tgts[i]) { /* Inactive targets may never have connected */ if (lov->lov_tgts[i]->ltd_active || - atomic_read(&lov->lov_refcount)) - /* We should never get here - these - should have been removed in the + atomic_read(&lov->lov_refcount)) + /* We should never get here - these + should have been removed in the disconnect. */ CERROR("lov tgt %d not cleaned!" " deathrow=%d, lovrc=%d\n", - i, lov->lov_death_row, + i, lov->lov_death_row, atomic_read(&lov->lov_refcount)); lov_del_target(obd, i, 0, 0); } } - OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) * + OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) * lov->lov_tgt_size); lov->lov_tgt_size = 0; } - if (lov->lov_qos.lq_rr_size) + if (lov->lov_qos.lq_rr_size) OBD_FREE(lov->lov_qos.lq_rr_array, lov->lov_qos.lq_rr_size); + OBD_FREE_PTR(lov->lov_qos.lq_statfs_data); RETURN(0); } @@ -977,12 +983,12 @@ static int lov_process_config(struct obd_device *obd, obd_count len, void *buf) case LCFG_PARAM: { struct lprocfs_static_vars lvars = { 0 }; struct lov_desc *desc = &(obd->u.lov.desc); - + if (!desc) GOTO(out, rc = -EINVAL); - + lprocfs_lov_init_vars(&lvars); - + rc = class_process_proc_param(PARAM_LOV, lvars.obd_vars, lcfg, obd); GOTO(out, rc); @@ -1047,14 +1053,14 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa, if (ost_uuid && !obd_uuid_equals(ost_uuid, &tgt->ltd_uuid)) continue; - CDEBUG(D_CONFIG,"Clear orphans for %d:%s\n", i, + CDEBUG(D_CONFIG,"Clear orphans for %d:%s\n", i, obd_uuid2str(ost_uuid)); memcpy(tmp_oa, src_oa, sizeof(*tmp_oa)); LASSERT(lov->lov_tgts[i]->ltd_exp); /* XXX: LOV STACKING: use real "obj_mdp" sub-data */ - err = obd_create(lov->lov_tgts[i]->ltd_exp, + err = obd_create(lov->lov_tgts[i]->ltd_exp, tmp_oa, &obj_mdp, oti); if (err) { /* This export will be disabled until it is recovered, @@ -1121,8 +1127,6 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa, struct obd_info oinfo; struct lov_request_set *set = NULL; struct lov_request *req; - struct obd_statfs osfs; - __u64 maxage; int rc = 0; ENTRY; @@ -1148,8 +1152,11 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa, GOTO(out, rc); } - maxage = cfs_time_shift_64(-lov->desc.ld_qos_maxage); - obd_statfs_rqset(exp->exp_obd, &osfs, maxage, OBD_STATFS_NODELAY); + /* issue statfs rpcs if the osfs data is older than qos_maxage - 1s, + * later in alloc_qos(), we will wait for those rpcs to complete if + * the osfs age is older than 2 * qos_maxage */ + qos_statfs_update(exp->exp_obd, + cfs_time_shift_64(-lov->desc.ld_qos_maxage) + HZ, 0); rc = lov_prep_create_set(exp, &oinfo, ea, src_oa, oti, &set); if (rc) @@ -1257,7 +1264,7 @@ static int lov_getattr(struct obd_export *exp, struct obd_info *oinfo) req = list_entry(pos, struct lov_request, rq_link); CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx " - "%u\n", oinfo->oi_oa->o_id, req->rq_stripe, + "%u\n", oinfo->oi_oa->o_id, req->rq_stripe, req->rq_oi.oi_oa->o_id, req->rq_idx); rc = obd_getattr(lov->lov_tgts[req->rq_idx]->ltd_exp, @@ -1278,7 +1285,7 @@ static int lov_getattr(struct obd_export *exp, struct obd_info *oinfo) RETURN(rc); } -static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, +static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data, int rc) { struct lov_request_set *lovset = (struct lov_request_set *)data; @@ -1315,14 +1322,14 @@ static int lov_getattr_async(struct obd_export *exp, struct obd_info *oinfo, RETURN(rc); CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n", - oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count, + oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count, oinfo->oi_md->lsm_stripe_size); list_for_each (pos, &lovset->set_list) { req = list_entry(pos, struct lov_request, rq_link); CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx " - "%u\n", oinfo->oi_oa->o_id, req->rq_stripe, + "%u\n", oinfo->oi_oa->o_id, req->rq_stripe, req->rq_oi.oi_oa->o_id, req->rq_idx); rc = obd_getattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp, &req->rq_oi, rqset); @@ -1366,12 +1373,12 @@ static int lov_setattr(struct obd_export *exp, struct obd_info *oinfo, RETURN(-ENODEV); /* for now, we only expect the following updates here */ - LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE | - OBD_MD_FLMODE | OBD_MD_FLATIME | + LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE | + OBD_MD_FLMODE | OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME | - OBD_MD_FLFLAGS | OBD_MD_FLSIZE | - OBD_MD_FLGROUP | OBD_MD_FLUID | - OBD_MD_FLGID | OBD_MD_FLFID | + OBD_MD_FLFLAGS | OBD_MD_FLSIZE | + OBD_MD_FLGROUP | OBD_MD_FLUID | + OBD_MD_FLGID | OBD_MD_FLFID | OBD_MD_FLGENER))); lov = &exp->exp_obd->u.lov; rc = lov_prep_setattr_set(exp, oinfo, oti, &set); @@ -1381,13 +1388,13 @@ static int lov_setattr(struct obd_export *exp, struct obd_info *oinfo, list_for_each (pos, &set->set_list) { req = list_entry(pos, struct lov_request, rq_link); - rc = obd_setattr(lov->lov_tgts[req->rq_idx]->ltd_exp, + rc = obd_setattr(lov->lov_tgts[req->rq_idx]->ltd_exp, &req->rq_oi, NULL); err = lov_update_setattr_set(set, req, rc); if (err) { CERROR("error: setattr objid "LPX64" subobj " LPX64" on OST idx %d: rc = %d\n", - set->set_oi->oi_oa->o_id, + set->set_oi->oi_oa->o_id, req->rq_oi.oi_oa->o_id, req->rq_idx, err); if (!rc) rc = err; @@ -1571,8 +1578,8 @@ static int lov_sync(struct obd_export *exp, struct obdo *oa, list_for_each (pos, &set->set_list) { req = list_entry(pos, struct lov_request, rq_link); - rc = obd_sync(lov->lov_tgts[req->rq_idx]->ltd_exp, - req->rq_oi.oi_oa, NULL, + rc = obd_sync(lov->lov_tgts[req->rq_idx]->ltd_exp, + req->rq_oi.oi_oa, NULL, req->rq_oi.oi_policy.l_extent.start, req->rq_oi.oi_policy.l_extent.end); err = lov_update_common_set(set, req, rc); @@ -1799,12 +1806,12 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, if (!page) { int i = 0; /* Find an existing osc so we can get it's stupid sizeof(*oap). - Only because of this layering limitation will a client + Only because of this layering limitation will a client mount with no osts fail */ - while (!lov->lov_tgts || !lov->lov_tgts[i] || + while (!lov->lov_tgts || !lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp) { i++; - if (i >= lov->desc.ld_tgt_count) + if (i >= lov->desc.ld_tgt_count) RETURN(-ENOMEDIUM); } rc = size_round(sizeof(*lap)) + @@ -1939,7 +1946,7 @@ static int lov_trigger_group_io(struct obd_export *exp, for (i = 0; i < lsm->lsm_stripe_count; i++) { loi = lsm->lsm_oinfo[i]; - if (!lov->lov_tgts[loi->loi_ost_idx] || + if (!lov->lov_tgts[loi->loi_ost_idx] || !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); continue; @@ -2265,8 +2272,7 @@ static int lov_join_lru(struct obd_export *exp, RETURN(count); } -static int lov_statfs_interpret(struct ptlrpc_request_set *rqset, - void *data, int rc) +int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc) { struct lov_request_set *lovset = (struct lov_request_set *)data; int err; @@ -2390,7 +2396,7 @@ static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len, genp = (__u32 *)data->ioc_inlbuf3; /* the uuid will be empty for deleted OSTs */ for (i = 0; i < count; i++, uuidp++, genp++) { - if (!lov->lov_tgts[i]) + if (!lov->lov_tgts[i]) continue; *uuidp = lov->lov_tgts[i]->ltd_uuid; *genp = lov->lov_tgts[i]->ltd_gen; @@ -3189,7 +3195,7 @@ int __init lov_init(void) ENTRY; lov_oinfo_slab = cfs_mem_cache_create("lov_oinfo", - sizeof(struct lov_oinfo), + sizeof(struct lov_oinfo), 0, SLAB_HWCACHE_ALIGN); if (lov_oinfo_slab == NULL) return -ENOMEM; diff --git a/lustre/lov/lov_qos.c b/lustre/lov/lov_qos.c index a301849..1532f50 100644 --- a/lustre/lov/lov_qos.c +++ b/lustre/lov/lov_qos.c @@ -683,6 +683,11 @@ static int alloc_qos(struct obd_export *exp, int *idx_arr, int *stripe_cnt, RETURN(-EINVAL); lov_getref(exp->exp_obd); + /* wait for fresh statfs info if needed, the rpcs are sent in + * lov_create() */ + qos_statfs_update(exp->exp_obd, + cfs_time_shift_64(-2 * lov->desc.ld_qos_maxage), 1); + down_write(&lov->lov_qos.lq_rw_sem); ost_count = lov->desc.ld_tgt_count; @@ -987,3 +992,110 @@ void qos_update(struct lov_obd *lov) ENTRY; lov->lov_qos.lq_dirty = 1; } + +void qos_statfs_done(struct lov_obd *lov) +{ + LASSERT(lov->lov_qos.lq_statfs_in_progress); + down_write(&lov->lov_qos.lq_rw_sem); + lov->lov_qos.lq_statfs_in_progress = 0; + /* wake up any threads waiting for the statfs rpcs to complete */ + cfs_waitq_signal(&lov->lov_qos.lq_statfs_waitq); + up_write(&lov->lov_qos.lq_rw_sem); +} + +static int qos_statfs_ready(struct obd_device *obd, __u64 max_age) +{ + struct lov_obd *lov = &obd->u.lov; + int rc; + ENTRY; + down_read(&lov->lov_qos.lq_rw_sem); + rc = lov->lov_qos.lq_statfs_in_progress == 0 || + cfs_time_beforeq_64(max_age, obd->obd_osfs_age); + up_read(&lov->lov_qos.lq_rw_sem); + RETURN(rc); +} + +/* + * Update statfs data if the current osfs age is older than max_age. + * If wait is not set, it means that we are called from lov_create() + * and we should just issue the rpcs without waiting for them to complete. + * If wait is set, we are called from alloc_qos() and we just have + * to wait for the request set to complete. + */ +void qos_statfs_update(struct obd_device *obd, __u64 max_age, int wait) +{ + struct lov_obd *lov = &obd->u.lov; + struct obd_info *oinfo; + int rc = 0; + struct ptlrpc_request_set *set = NULL; + ENTRY; + + if (cfs_time_beforeq_64(max_age, obd->obd_osfs_age)) + /* statfs data are quite recent, don't need to refresh it */ + RETURN_EXIT; + + if (!wait && lov->lov_qos.lq_statfs_in_progress) + /* statfs already in progress */ + RETURN_EXIT; + + down_write(&lov->lov_qos.lq_rw_sem); + if (lov->lov_qos.lq_statfs_in_progress) { + up_write(&lov->lov_qos.lq_rw_sem); + GOTO(out, rc = 0); + } + /* no statfs in flight, send rpcs */ + lov->lov_qos.lq_statfs_in_progress = 1; + up_write(&lov->lov_qos.lq_rw_sem); + + if (wait) + CDEBUG(D_QOS, "%s: did not manage to get fresh statfs data " + "in a timely manner (osfs age "LPU64", max age "LPU64")" + ", sending new statfs rpcs\n", + obd_uuid2str(&lov->desc.ld_uuid), obd->obd_osfs_age, + max_age); + + /* need to send statfs rpcs */ + CDEBUG(D_QOS, "sending new statfs requests\n"); + memset(lov->lov_qos.lq_statfs_data, 0, + sizeof(*lov->lov_qos.lq_statfs_data)); + oinfo = &lov->lov_qos.lq_statfs_data->lsd_oi; + oinfo->oi_osfs = &lov->lov_qos.lq_statfs_data->lsd_statfs; + oinfo->oi_flags = OBD_STATFS_NODELAY; + set = ptlrpc_prep_set(); + if (!set) + GOTO(out_failed, rc = -ENOMEM); + + rc = obd_statfs_async(obd, oinfo, max_age, set); + if (rc || list_empty(&set->set_requests)) { + if (rc) + CWARN("statfs failed with %d\n", rc); + GOTO(out_failed, rc); + } + /* send requests via ptlrpcd */ + oinfo->oi_flags |= OBD_STATFS_PTLRPCD; + ptlrpcd_add_rqset(set); + GOTO(out, rc); + +out_failed: + down_write(&lov->lov_qos.lq_rw_sem); + lov->lov_qos.lq_statfs_in_progress = 0; + /* wake up any threads waiting for the statfs rpcs to complete */ + cfs_waitq_signal(&lov->lov_qos.lq_statfs_waitq); + up_write(&lov->lov_qos.lq_rw_sem); + wait = 0; +out: + if (set) + ptlrpc_set_destroy(set); + if (wait) { + struct l_wait_info lwi = { 0 }; + CDEBUG(D_QOS, "waiting for statfs requests to complete\n"); + l_wait_event(lov->lov_qos.lq_statfs_waitq, + qos_statfs_ready(obd, max_age), &lwi); + if (cfs_time_before_64(obd->obd_osfs_age, max_age)) + CDEBUG(D_QOS, "%s: still no fresh statfs data after " + "waiting (osfs age "LPU64", max age " + LPU64")\n", + obd_uuid2str(&lov->desc.ld_uuid), + obd->obd_osfs_age, max_age); + } +} diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index c8caa46..6b27291 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -113,7 +113,7 @@ int lov_update_common_set(struct lov_request_set *set, lov_update_set(set, req, rc); /* grace error on inactive ost */ - if (rc && !(lov->lov_tgts[req->rq_idx] && + if (rc && !(lov->lov_tgts[req->rq_idx] && lov->lov_tgts[req->rq_idx]->ltd_active)) rc = 0; @@ -874,7 +874,7 @@ int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, continue; loi = oinfo->oi_md->lsm_oinfo[i]; - if (!lov->lov_tgts[loi->loi_ost_idx] || + if (!lov->lov_tgts[loi->loi_ost_idx] || !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); GOTO(out, rc = -EIO); @@ -1073,7 +1073,7 @@ int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_request *req; loi = lsm->lsm_oinfo[i]; - if (!lov->lov_tgts[loi->loi_ost_idx] || + if (!lov->lov_tgts[loi->loi_ost_idx] || !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); continue; @@ -1132,7 +1132,7 @@ int lov_update_setattr_set(struct lov_request_set *set, lov_update_set(set, req, rc); /* grace error on inactive ost */ - if (rc && !(lov->lov_tgts[req->rq_idx] && + if (rc && !(lov->lov_tgts[req->rq_idx] && lov->lov_tgts[req->rq_idx]->ltd_active)) rc = 0; @@ -1589,7 +1589,7 @@ static int cb_statfs_update(struct obd_info *oinfo, int rc) if (rc && !(lov->lov_tgts[lovreq->rq_idx] && lov->lov_tgts[lovreq->rq_idx]->ltd_active)) rc = 0; - RETURN(rc); + GOTO(out, rc); } spin_lock(&obd->obd_osfs_lock); @@ -1600,6 +1600,14 @@ static int cb_statfs_update(struct obd_info *oinfo, int rc) lov_update_statfs(osfs, lov_sfs, success); qos_update(lov); +out: + if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD && + lovreq->rq_rqset->set_count == lovreq->rq_rqset->set_completes) { + lov_statfs_interpret(NULL, lovreq->rq_rqset, + lovreq->rq_rqset->set_success != + lovreq->rq_rqset->set_count); + qos_statfs_done(lov); + } RETURN(0); } diff --git a/lustre/ptlrpc/ptlrpcd.c b/lustre/ptlrpc/ptlrpcd.c index ff316c5..8224188 100644 --- a/lustre/ptlrpc/ptlrpcd.c +++ b/lustre/ptlrpc/ptlrpcd.c @@ -69,7 +69,29 @@ void ptlrpcd_wake(struct ptlrpc_request *req) cfs_waitq_signal(&rq_set->set_waitq); } -/* +/* + * Move all request from an existing request set to the ptlrpcd queue. + * All requests from the set must be in phase RQ_PHASE_NEW. + */ +void ptlrpcd_add_rqset(struct ptlrpc_request_set *set) +{ + struct list_head *tmp, *pos; + + list_for_each_safe(pos, tmp, &set->set_requests) { + struct ptlrpc_request *req = + list_entry(pos, struct ptlrpc_request, rq_set_chain); + + LASSERT(req->rq_phase == RQ_PHASE_NEW); + list_del_init(&req->rq_set_chain); + req->rq_set = NULL; + ptlrpcd_add_req(req); + set->set_remaining--; + } + LASSERT(set->set_remaining == 0); +} +EXPORT_SYMBOL(ptlrpcd_add_rqset); + +/* * Requests that are added to the ptlrpcd queue are sent via * ptlrpcd_check->ptlrpc_check_set(). */ @@ -87,7 +109,7 @@ void ptlrpcd_add_req(struct ptlrpc_request *req) if (rc) { int (*interpreter)(struct ptlrpc_request *, void *, int); - + interpreter = req->rq_interpret_reply; /* @@ -117,8 +139,8 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) req = list_entry(pos, struct ptlrpc_request, rq_set_chain); list_del_init(&req->rq_set_chain); ptlrpc_set_add_req(pc->pc_set, req); - /* - * Need to calculate its timeout. + /* + * Need to calculate its timeout. */ rc = 1; } @@ -127,9 +149,9 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) if (pc->pc_set->set_remaining) { rc = rc | ptlrpc_check_set(pc->pc_set); - /* + /* * XXX: our set never completes, so we prune the completed - * reqs after each iteration. boy could this be smarter. + * reqs after each iteration. boy could this be smarter. */ list_for_each_safe(pos, tmp, &pc->pc_set->set_requests) { req = list_entry(pos, struct ptlrpc_request, @@ -144,8 +166,8 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) } if (rc == 0) { - /* - * If new requests have been added, make sure to wake up. + /* + * If new requests have been added, make sure to wake up. */ spin_lock(&pc->pc_set->set_new_req_lock); rc = !list_empty(&pc->pc_set->set_new_requests); @@ -156,7 +178,7 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) } #ifdef __KERNEL__ -/* +/* * ptlrpc's code paths like to execute in process context, so we have this * thread which spins on a set which contains the io rpcs. llite specifies * ptlrpcd's set when it pushes pages down into the oscs. @@ -174,18 +196,18 @@ static int ptlrpcd(void *arg) complete(&pc->pc_starting); - /* + /* * This mainloop strongly resembles ptlrpc_set_wait() except that our * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when - * there are requests in the set. New requests come in on the set's - * new_req_list and ptlrpcd_check() moves them into the set. + * there are requests in the set. New requests come in on the set's + * new_req_list and ptlrpcd_check() moves them into the set. */ do { struct l_wait_info lwi; int timeout; timeout = ptlrpc_set_next_timeout(pc->pc_set); - lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1), + lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1), ptlrpc_expired_set, pc->pc_set); l_wait_event(pc->pc_set->set_waitq, ptlrpcd_check(pc), &lwi); @@ -199,14 +221,14 @@ static int ptlrpcd(void *arg) exit++; } - /* + /* * Let's make one more loop to make sure that ptlrpcd_check() * copied all raced new rpcs into the set so we can kill them. */ } while (exit < 2); - /* - * Wait for inflight requests to drain. + /* + * Wait for inflight requests to drain. */ if (!list_empty(&pc->pc_set->set_requests)) ptlrpc_set_wait(pc->pc_set); @@ -226,8 +248,8 @@ int ptlrpcd_check_async_rpcs(void *arg) struct ptlrpcd_ctl *pc = arg; int rc = 0; - /* - * Single threaded!! + /* + * Single threaded!! */ pc->pc_recurred++; @@ -235,8 +257,8 @@ int ptlrpcd_check_async_rpcs(void *arg) rc = ptlrpcd_check(pc); if (!rc) ptlrpc_expired_set(pc->pc_set); - /* - * XXX: send replay requests. + /* + * XXX: send replay requests. */ if (pc == &ptlrpcd_recovery_pc) rc = ptlrpcd_check(pc); @@ -260,9 +282,9 @@ int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc) { int rc = 0; ENTRY; - - /* - * Do not allow start second thread for one pc. + + /* + * Do not allow start second thread for one pc. */ if (test_bit(LIOD_START, &pc->pc_flags)) { CERROR("Starting second thread (%s) for same pc %p\n", -- 1.8.3.1