of Lustre filesystem with 4K stack may cause a stack overflow. For
more information, please refer to bugzilla 17630.
+Severity : enhancement
+Bugzilla : 17536
+Description: MDS create should not wait for statfs RPC while holding DLM lock.
+
Severity : normal
-Frequency : normal
+Frequency : normal
Bugzilla : 12069
-Descriptoin: OST grant too much space to client even there are not enough space.
+Descriptoin: OST grant too much space to client even there are not enough space.
Details : Client will shrink its grant cache to OST if there are no write
activity over 6 mins (GRANT_SHRINK_INTERVAL), and OST will retrieve
this grant cache if there are already not enough avaible space
- (left_space < total_clients * 32M).
+ (left_space < total_clients * 32M).
Severity : enhancement
Bugzilla : 18289
Bugzilla : 16839
Descriptoin: ll_sync thread stay in waiting mds<>ost recovery finished
Details : stay in waiting mds<>ost recovery finished produce random bugs
- due race between two ll_sync thread for one lov target. send
+ due race between two ll_sync thread for one lov target. send
ACTIVATE event only if connect realy finished and import have
FULL state.
extern void lustre_swab_obd_statfs (struct obd_statfs *os);
#define OBD_STATFS_NODELAY 0x0001 /* requests should be send without delay
* and resends for avoid deadlocks */
-
#define OBD_STATFS_FROM_CACHE 0x0002 /* the statfs callback should not update
* obd_osfs_age */
+#define OBD_STATFS_PTLRPCD 0x0004 /* requests will be sent via ptlrpcd
+ * instead of a specific set. This
+ * means that we cannot rely on the set
+ * interpret routine to be called.
+ * lov_statfs_fini() must thus be called
+ * by the request interpret routine */
/* ost_body.data values for OST_BRW */
fid_oid(fid), \
fid_ver(fid)
-enum {
+enum {
/** put FID sequence at this offset in ldlm_res_id. */
LUSTRE_RES_ID_SEQ_OFF = 0,
/** put FID oid at this offset in ldlm_res_id. */
/**
* Check if a fid is igif or not.
* \param fid the fid to be tested.
- * \return true if the fid is a igif; otherwise false.
+ * \return true if the fid is a igif; otherwise false.
*/
static inline int fid_is_igif(const struct lu_fid *fid)
{
/**
* Check if a fid is idif or not.
* \param fid the fid to be tested.
- * \return true if the fid is a idif; otherwise false.
+ * \return true if the fid is a idif; otherwise false.
*/
static inline int fid_is_idif(const struct lu_fid *fid)
{
/**
* Check if a fid is zero.
* \param fid the fid to be tested.
- * \return true if the fid is zero; otherwise false.
+ * \return true if the fid is zero; otherwise false.
*/
static inline int fid_is_zero(const struct lu_fid *fid)
{
* Get inode generation from a igif.
* \param fid a igif to get inode generation from.
* \return inode generation for the igif.
- */
+ */
static inline __u32 lu_igif_gen(const struct lu_fid *fid)
{
return fid_oid(fid);
char pc_name[16];
#ifndef __KERNEL__
/**
- * Async rpcs flag to make sure that ptlrpcd_check() is called only
+ * Async rpcs flag to make sure that ptlrpcd_check() is called only
* once.
*/
int pc_recurred;
{
if (req->rq_phase == new_phase)
return;
-
+
if (new_phase == RQ_PHASE_UNREGISTERING) {
req->rq_next_phase = req->rq_phase;
if (req->rq_import)
atomic_inc(&req->rq_import->imp_unregistering);
}
-
+
if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
if (req->rq_import)
atomic_dec(&req->rq_import->imp_unregistering);
}
- DEBUG_REQ(D_RPCTRACE, req, "move req \"%s\" -> \"%s\"",
+ DEBUG_REQ(D_RPCTRACE, req, "move req \"%s\" -> \"%s\"",
ptlrpc_rqphase2str(req), ptlrpc_phase2str(new_phase));
req->rq_phase = new_phase;
/* ptlrpc/pinger.c */
enum timeout_event {
- TIMEOUT_GRANT = 1
+ TIMEOUT_GRANT = 1
};
struct timeout_item;
typedef int (*timeout_cb_t)(struct timeout_item *, void *);
void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force);
void ptlrpcd_wake(struct ptlrpc_request *req);
void ptlrpcd_add_req(struct ptlrpc_request *req);
+void ptlrpcd_add_rqset(struct ptlrpc_request_set *set);
int ptlrpcd_addref(void);
void ptlrpcd_decref(void);
* level. E.g. it is used for update lsm->lsm_oinfo at every recieved
* request in osc level for enqueue requests. It is also possible to
* update some caller data from LOV layer if needed. */
- obd_enqueue_update_f oi_cb_up;
+ obd_enqueue_update_f oi_cb_up;
};
/* compare all relevant fields. */
unsigned long lqr_dirty:1; /* recalc round-robin list */
};
+struct lov_statfs_data {
+ struct obd_info lsd_oi;
+ struct obd_statfs lsd_statfs;
+};
/* Stripe placement optimization */
struct lov_qos {
struct list_head lq_oss_list; /* list of OSSs that targets use */
unsigned long lq_dirty:1, /* recalc qos data */
lq_same_space:1,/* the ost's all have approx.
the same space avail */
- lq_reset:1; /* zero current penalties */
+ lq_reset:1, /* zero current penalties */
+ lq_statfs_in_progress:1; /* statfs op in progress */
+ /* qos statfs data */
+ struct lov_statfs_data *lq_statfs_data;
+ cfs_waitq_t lq_statfs_waitq; /* waitqueue to notify statfs
+ * requests completion */
};
struct lov_tgt_desc {
void qos_shrink_lsm(struct lov_request_set *set);
int qos_prep_create(struct obd_export *exp, struct lov_request_set *set);
void qos_update(struct lov_obd *lov);
+void qos_statfs_done(struct lov_obd *lov);
+void qos_statfs_update(struct obd_device *obd, __u64 max_age, int wait);
int qos_remedy_create(struct lov_request_set *set, struct lov_request *req);
/* lov_request.c */
int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,
int success);
int lov_fini_statfs_set(struct lov_request_set *set);
+int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc);
/* lov_obd.c */
void lov_fix_desc(struct lov_desc *desc);
if (atomic_dec_and_test(&lov->lov_refcount) && lov->lov_death_row) {
int i;
struct lov_tgt_desc *n;
- CDEBUG(D_CONFIG, "destroying %d lov targets\n",
+ CDEBUG(D_CONFIG, "destroying %d lov targets\n",
lov->lov_death_row);
for (i = 0; i < lov->desc.ld_tgt_count; i++) {
tgt = lov->lov_tgts[i];
rc = obd_notify_observer(obd, tgt_obd, ev, data);
if (rc) {
CERROR("%s: notify %s of %s failed %d\n",
- obd->obd_name,
+ obd->obd_name,
obd->obd_observer->obd_name,
tgt_obd->obd_name, rc);
break;
}
#define MAX_STRING_SIZE 128
-static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
+static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
struct obd_connect_data *data)
{
struct lov_obd *lov = &obd->u.lov;
if (activate) {
tgt_obd->obd_no_recov = 0;
- /* FIXME this is probably supposed to be
+ /* FIXME this is probably supposed to be
ptlrpc_set_import_active. Horrible naming. */
ptlrpc_activate_import(imp);
}
rc = class_connect(conn, obd, cluuid);
if (rc)
RETURN(rc);
-
+
*exp = class_conn2export(conn);
/* Why should there ever be more than 1 connect? */
rc = lov_connect_obd(obd, i, lov->lov_tgts[i]->ltd_activate,
&lov->lov_ocd);
if (rc) {
- CERROR("%s: lov connect tgt %d failed: %d\n",
+ CERROR("%s: lov connect tgt %d failed: %d\n",
obd->obd_name, i, rc);
continue;
}
ENTRY;
- CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
+ CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
obd->obd_name, osc_obd->obd_name);
if (tgt->ltd_active) {
RETURN(0);
}
-static int lov_del_target(struct obd_device *obd, __u32 index,
+static int lov_del_target(struct obd_device *obd, __u32 index,
struct obd_uuid *uuidp, int gen);
static int lov_disconnect(struct obd_export *exp)
__u32 newsize, oldsize = 0;
newsize = max(lov->lov_tgt_size, (__u32)2);
- while (newsize < index + 1)
+ while (newsize < index + 1)
newsize = newsize << 1;
OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
if (newtgts == NULL) {
}
if (lov->lov_tgt_size) {
- memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) *
+ memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) *
lov->lov_tgt_size);
old = lov->lov_tgts;
oldsize = lov->lov_tgt_size;
CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n",
index, tgt->ltd_gen, lov->desc.ld_tgt_count);
-
- if (lov->lov_connects == 0) {
+
+ if (lov->lov_connects == 0) {
/* lov_connect hasn't been called yet. We'll do the
lov_connect_obd on this target when that fn first runs,
because we don't know the connect flags yet. */
if (!tgt->ltd_exp)
GOTO(out, rc = 0);
- rc = lov_notify(obd, tgt->ltd_exp->exp_obd,
+ rc = lov_notify(obd, tgt->ltd_exp->exp_obd,
active ? OBD_NOTIFY_CONNECT : OBD_NOTIFY_INACTIVE,
(void *)&index);
out:
if (rc) {
- CERROR("add failed (%d), deleting %s\n", rc,
+ CERROR("add failed (%d), deleting %s\n", rc,
obd_uuid2str(&tgt->ltd_uuid));
lov_del_target(obd, index, 0, 0);
}
}
/* Schedule a target for deletion */
-static int lov_del_target(struct obd_device *obd, __u32 index,
+static int lov_del_target(struct obd_device *obd, __u32 index,
struct obd_uuid *uuidp, int gen)
{
struct lov_obd *lov = &obd->u.lov;
CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
lov_uuid2str(lov, index), index,
- lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp,
+ lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp,
lov->lov_tgts[index]->ltd_active);
lov->lov_tgts[index]->ltd_reap = 1;
lov->lov_qos.lq_prio_free = 232;
/* Default threshold for rr (roughly 17%) */
lov->lov_qos.lq_threshold_rr = 43;
+ /* Init statfs fields */
+ OBD_ALLOC_PTR(lov->lov_qos.lq_statfs_data);
+ if (NULL == lov->lov_qos.lq_statfs_data)
+ RETURN(-ENOMEM);
+ cfs_waitq_init(&lov->lov_qos.lq_statfs_waitq);
lov->lov_pools_hash_body = lustre_hash_init("POOLS", 7, 7,
&pool_hash_operations, 0);
if (lov->lov_tgts[i]) {
/* Inactive targets may never have connected */
if (lov->lov_tgts[i]->ltd_active ||
- atomic_read(&lov->lov_refcount))
- /* We should never get here - these
- should have been removed in the
+ atomic_read(&lov->lov_refcount))
+ /* We should never get here - these
+ should have been removed in the
disconnect. */
CERROR("lov tgt %d not cleaned!"
" deathrow=%d, lovrc=%d\n",
- i, lov->lov_death_row,
+ i, lov->lov_death_row,
atomic_read(&lov->lov_refcount));
lov_del_target(obd, i, 0, 0);
}
}
lov_putref(obd);
- OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) *
+ OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) *
lov->lov_tgt_size);
lov->lov_tgt_size = 0;
}
/* clear pools parent proc entry only after all pools is killed */
lprocfs_obd_cleanup(obd);
+ OBD_FREE_PTR(lov->lov_qos.lq_statfs_data);
RETURN(0);
}
case LCFG_PARAM: {
struct lprocfs_static_vars lvars = { 0 };
struct lov_desc *desc = &(obd->u.lov.desc);
-
+
if (!desc)
GOTO(out, rc = -EINVAL);
-
+
lprocfs_lov_init_vars(&lvars);
-
+
rc = class_process_proc_param(PARAM_LOV, lvars.obd_vars,
lcfg, obd);
GOTO(out, rc);
if (ost_uuid && !obd_uuid_equals(ost_uuid, &tgt->ltd_uuid))
continue;
- CDEBUG(D_CONFIG,"Clear orphans for %d:%s\n", i,
+ CDEBUG(D_CONFIG,"Clear orphans for %d:%s\n", i,
obd_uuid2str(ost_uuid));
memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
LASSERT(lov->lov_tgts[i]->ltd_exp);
/* XXX: LOV STACKING: use real "obj_mdp" sub-data */
- err = obd_create(lov->lov_tgts[i]->ltd_exp,
+ err = obd_create(lov->lov_tgts[i]->ltd_exp,
tmp_oa, &obj_mdp, oti);
if (err) {
/* This export will be disabled until it is recovered,
struct obd_info oinfo;
struct lov_request_set *set = NULL;
struct lov_request *req;
- struct obd_statfs osfs;
- __u64 maxage;
int rc = 0;
ENTRY;
GOTO(out, rc);
}
- maxage = cfs_time_shift_64(-lov->desc.ld_qos_maxage);
- obd_statfs_rqset(exp->exp_obd, &osfs, maxage, OBD_STATFS_NODELAY);
+ /* issue statfs rpcs if the osfs data is older than qos_maxage - 1s,
+ * later in alloc_qos(), we will wait for those rpcs to complete if
+ * the osfs age is older than 2 * qos_maxage */
+ qos_statfs_update(exp->exp_obd,
+ cfs_time_shift_64(-lov->desc.ld_qos_maxage) + HZ, 0);
rc = lov_prep_create_set(exp, &oinfo, ea, src_oa, oti, &set);
if (rc)
req = list_entry(pos, struct lov_request, rq_link);
CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
- "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
+ "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
req->rq_oi.oi_oa->o_id, req->rq_idx);
rc = obd_getattr(lov->lov_tgts[req->rq_idx]->ltd_exp,
RETURN(rc);
}
-static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
+static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
void *data, int rc)
{
struct lov_request_set *lovset = (struct lov_request_set *)data;
RETURN(rc);
CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
- oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count,
+ oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count,
oinfo->oi_md->lsm_stripe_size);
list_for_each (pos, &lovset->set_list) {
req = list_entry(pos, struct lov_request, rq_link);
CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
- "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
+ "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
req->rq_oi.oi_oa->o_id, req->rq_idx);
rc = obd_getattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
&req->rq_oi, rqset);
RETURN(-ENODEV);
/* for now, we only expect the following updates here */
- LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE |
- OBD_MD_FLMODE | OBD_MD_FLATIME |
+ LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE |
+ OBD_MD_FLMODE | OBD_MD_FLATIME |
OBD_MD_FLMTIME | OBD_MD_FLCTIME |
- OBD_MD_FLFLAGS | OBD_MD_FLSIZE |
- OBD_MD_FLGROUP | OBD_MD_FLUID |
- OBD_MD_FLGID | OBD_MD_FLFID |
+ OBD_MD_FLFLAGS | OBD_MD_FLSIZE |
+ OBD_MD_FLGROUP | OBD_MD_FLUID |
+ OBD_MD_FLGID | OBD_MD_FLFID |
OBD_MD_FLGENER)));
lov = &exp->exp_obd->u.lov;
rc = lov_prep_setattr_set(exp, oinfo, oti, &set);
list_for_each (pos, &set->set_list) {
req = list_entry(pos, struct lov_request, rq_link);
- rc = obd_setattr(lov->lov_tgts[req->rq_idx]->ltd_exp,
+ rc = obd_setattr(lov->lov_tgts[req->rq_idx]->ltd_exp,
&req->rq_oi, NULL);
err = lov_update_setattr_set(set, req, rc);
if (err) {
CERROR("error: setattr objid "LPX64" subobj "
LPX64" on OST idx %d: rc = %d\n",
- set->set_oi->oi_oa->o_id,
+ set->set_oi->oi_oa->o_id,
req->rq_oi.oi_oa->o_id, req->rq_idx, err);
if (!rc)
rc = err;
if (!page) {
int i = 0;
/* Find an existing osc so we can get it's stupid sizeof(*oap).
- Only because of this layering limitation will a client
+ Only because of this layering limitation will a client
mount with no osts fail */
- while (!lov->lov_tgts || !lov->lov_tgts[i] ||
+ while (!lov->lov_tgts || !lov->lov_tgts[i] ||
!lov->lov_tgts[i]->ltd_exp) {
i++;
- if (i >= lov->desc.ld_tgt_count)
+ if (i >= lov->desc.ld_tgt_count)
RETURN(-ENOMEDIUM);
}
rc = size_round(sizeof(*lap)) +
for (i = 0; i < lsm->lsm_stripe_count; i++) {
loi = lsm->lsm_oinfo[i];
- if (!lov->lov_tgts[loi->loi_ost_idx] ||
+ if (!lov->lov_tgts[loi->loi_ost_idx] ||
!lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
continue;
RETURN(count);
}
-static int lov_statfs_interpret(struct ptlrpc_request_set *rqset,
- void *data, int rc)
+int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc)
{
struct lov_request_set *lovset = (struct lov_request_set *)data;
int err;
genp = (__u32 *)data->ioc_inlbuf3;
/* the uuid will be empty for deleted OSTs */
for (i = 0; i < count; i++, uuidp++, genp++) {
- if (!lov->lov_tgts[i])
+ if (!lov->lov_tgts[i])
continue;
*uuidp = lov->lov_tgts[i]->ltd_uuid;
*genp = lov->lov_tgts[i]->ltd_gen;
ENTRY;
lov_oinfo_slab = cfs_mem_cache_create("lov_oinfo",
- sizeof(struct lov_oinfo),
+ sizeof(struct lov_oinfo),
0, SLAB_HWCACHE_ALIGN);
if (lov_oinfo_slab == NULL)
return -ENOMEM;
#include <obd_class.h>
#include <obd_lov.h>
+#include <lustre/lustre_idl.h>
#include "lov_internal.h"
/* #define QOS_DEBUG 1 */
}
lov_getref(exp->exp_obd);
+ /* wait for fresh statfs info if needed, the rpcs are sent in
+ * lov_create() */
+ qos_statfs_update(exp->exp_obd,
+ cfs_time_shift_64(-2 * lov->desc.ld_qos_maxage), 1);
+
down_write(&lov->lov_qos.lq_rw_sem);
if (lov->desc.ld_active_tgt_count < 2)
ENTRY;
lov->lov_qos.lq_dirty = 1;
}
+
+void qos_statfs_done(struct lov_obd *lov)
+{
+ LASSERT(lov->lov_qos.lq_statfs_in_progress);
+ down_write(&lov->lov_qos.lq_rw_sem);
+ lov->lov_qos.lq_statfs_in_progress = 0;
+ /* wake up any threads waiting for the statfs rpcs to complete */
+ cfs_waitq_signal(&lov->lov_qos.lq_statfs_waitq);
+ up_write(&lov->lov_qos.lq_rw_sem);
+}
+
+static int qos_statfs_ready(struct obd_device *obd, __u64 max_age)
+{
+ struct lov_obd *lov = &obd->u.lov;
+ int rc;
+ ENTRY;
+ down_read(&lov->lov_qos.lq_rw_sem);
+ rc = lov->lov_qos.lq_statfs_in_progress == 0 ||
+ cfs_time_beforeq_64(max_age, obd->obd_osfs_age);
+ up_read(&lov->lov_qos.lq_rw_sem);
+ RETURN(rc);
+}
+
+/*
+ * Update statfs data if the current osfs age is older than max_age.
+ * If wait is not set, it means that we are called from lov_create()
+ * and we should just issue the rpcs without waiting for them to complete.
+ * If wait is set, we are called from alloc_qos() and we just have
+ * to wait for the request set to complete.
+ */
+void qos_statfs_update(struct obd_device *obd, __u64 max_age, int wait)
+{
+ struct lov_obd *lov = &obd->u.lov;
+ struct obd_info *oinfo;
+ int rc = 0;
+ struct ptlrpc_request_set *set = NULL;
+ ENTRY;
+
+ if (cfs_time_beforeq_64(max_age, obd->obd_osfs_age))
+ /* statfs data are quite recent, don't need to refresh it */
+ RETURN_EXIT;
+
+ if (!wait && lov->lov_qos.lq_statfs_in_progress)
+ /* statfs already in progress */
+ RETURN_EXIT;
+
+ down_write(&lov->lov_qos.lq_rw_sem);
+ if (lov->lov_qos.lq_statfs_in_progress) {
+ up_write(&lov->lov_qos.lq_rw_sem);
+ GOTO(out, rc = 0);
+ }
+ /* no statfs in flight, send rpcs */
+ lov->lov_qos.lq_statfs_in_progress = 1;
+ up_write(&lov->lov_qos.lq_rw_sem);
+
+ if (wait)
+ CDEBUG(D_QOS, "%s: did not manage to get fresh statfs data "
+ "in a timely manner (osfs age "LPU64", max age "LPU64")"
+ ", sending new statfs rpcs\n",
+ obd_uuid2str(&lov->desc.ld_uuid), obd->obd_osfs_age,
+ max_age);
+
+ /* need to send statfs rpcs */
+ CDEBUG(D_QOS, "sending new statfs requests\n");
+ memset(lov->lov_qos.lq_statfs_data, 0,
+ sizeof(*lov->lov_qos.lq_statfs_data));
+ oinfo = &lov->lov_qos.lq_statfs_data->lsd_oi;
+ oinfo->oi_osfs = &lov->lov_qos.lq_statfs_data->lsd_statfs;
+ oinfo->oi_flags = OBD_STATFS_NODELAY;
+ set = ptlrpc_prep_set();
+ if (!set)
+ GOTO(out_failed, rc = -ENOMEM);
+
+ rc = obd_statfs_async(obd, oinfo, max_age, set);
+ if (rc || list_empty(&set->set_requests)) {
+ if (rc)
+ CWARN("statfs failed with %d\n", rc);
+ GOTO(out_failed, rc);
+ }
+ /* send requests via ptlrpcd */
+ oinfo->oi_flags |= OBD_STATFS_PTLRPCD;
+ ptlrpcd_add_rqset(set);
+ GOTO(out, rc);
+
+out_failed:
+ down_write(&lov->lov_qos.lq_rw_sem);
+ lov->lov_qos.lq_statfs_in_progress = 0;
+ /* wake up any threads waiting for the statfs rpcs to complete */
+ cfs_waitq_signal(&lov->lov_qos.lq_statfs_waitq);
+ up_write(&lov->lov_qos.lq_rw_sem);
+ wait = 0;
+out:
+ if (set)
+ ptlrpc_set_destroy(set);
+ if (wait) {
+ struct l_wait_info lwi = { 0 };
+ CDEBUG(D_QOS, "waiting for statfs requests to complete\n");
+ l_wait_event(lov->lov_qos.lq_statfs_waitq,
+ qos_statfs_ready(obd, max_age), &lwi);
+ if (cfs_time_before_64(obd->obd_osfs_age, max_age))
+ CDEBUG(D_QOS, "%s: still no fresh statfs data after "
+ "waiting (osfs age "LPU64", max age "
+ LPU64")\n",
+ obd_uuid2str(&lov->desc.ld_uuid),
+ obd->obd_osfs_age, max_age);
+ }
+}
lov_update_set(set, req, rc);
/* grace error on inactive ost */
- if (rc && !(lov->lov_tgts[req->rq_idx] &&
+ if (rc && !(lov->lov_tgts[req->rq_idx] &&
lov->lov_tgts[req->rq_idx]->ltd_active))
rc = 0;
continue;
loi = oinfo->oi_md->lsm_oinfo[i];
- if (!lov->lov_tgts[loi->loi_ost_idx] ||
+ if (!lov->lov_tgts[loi->loi_ost_idx] ||
!lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
GOTO(out, rc = -EIO);
struct lov_request *req;
loi = lsm->lsm_oinfo[i];
- if (!lov->lov_tgts[loi->loi_ost_idx] ||
+ if (!lov->lov_tgts[loi->loi_ost_idx] ||
!lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
continue;
lov_update_set(set, req, rc);
/* grace error on inactive ost */
- if (rc && !(lov->lov_tgts[req->rq_idx] &&
+ if (rc && !(lov->lov_tgts[req->rq_idx] &&
lov->lov_tgts[req->rq_idx]->ltd_active))
rc = 0;
}
/* The callback for osc_statfs_async that finilizes a request info when a
- * response is recieved. */
+ * response is received. */
static int cb_statfs_update(struct obd_info *oinfo, int rc)
{
struct lov_request *lovreq;
if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
lov->lov_tgts[lovreq->rq_idx]->ltd_active))
rc = 0;
- RETURN(rc);
+ GOTO(out, rc);
}
spin_lock(&obd->obd_osfs_lock);
lov_update_statfs(osfs, lov_sfs, success);
qos_update(lov);
+out:
+ if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
+ lovreq->rq_rqset->set_count == lovreq->rq_rqset->set_completes) {
+ lov_statfs_interpret(NULL, lovreq->rq_rqset,
+ lovreq->rq_rqset->set_success !=
+ lovreq->rq_rqset->set_count);
+ qos_statfs_done(lov);
+ }
RETURN(0);
}
cfs_waitq_signal(&rq_set->set_waitq);
}
-/*
+/*
+ * Move all request from an existing request set to the ptlrpcd queue.
+ * All requests from the set must be in phase RQ_PHASE_NEW.
+ */
+void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
+{
+ struct list_head *tmp, *pos;
+
+ list_for_each_safe(pos, tmp, &set->set_requests) {
+ struct ptlrpc_request *req =
+ list_entry(pos, struct ptlrpc_request, rq_set_chain);
+
+ LASSERT(req->rq_phase == RQ_PHASE_NEW);
+ list_del_init(&req->rq_set_chain);
+ req->rq_set = NULL;
+ ptlrpcd_add_req(req);
+ set->set_remaining--;
+ }
+ LASSERT(set->set_remaining == 0);
+}
+EXPORT_SYMBOL(ptlrpcd_add_rqset);
+
+/*
* Requests that are added to the ptlrpcd queue are sent via
* ptlrpcd_check->ptlrpc_check_set().
*/
if (rc) {
int (*interpreter)(struct ptlrpc_request *,
void *, int);
-
+
interpreter = req->rq_interpret_reply;
/*
req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
list_del_init(&req->rq_set_chain);
ptlrpc_set_add_req(pc->pc_set, req);
- /*
- * Need to calculate its timeout.
+ /*
+ * Need to calculate its timeout.
*/
rc = 1;
}
if (pc->pc_set->set_remaining) {
rc = rc | ptlrpc_check_set(pc->pc_set);
- /*
+ /*
* XXX: our set never completes, so we prune the completed
- * reqs after each iteration. boy could this be smarter.
+ * reqs after each iteration. boy could this be smarter.
*/
list_for_each_safe(pos, tmp, &pc->pc_set->set_requests) {
req = list_entry(pos, struct ptlrpc_request,
}
if (rc == 0) {
- /*
- * If new requests have been added, make sure to wake up.
+ /*
+ * If new requests have been added, make sure to wake up.
*/
spin_lock(&pc->pc_set->set_new_req_lock);
rc = !list_empty(&pc->pc_set->set_new_requests);
}
#ifdef __KERNEL__
-/*
+/*
* ptlrpc's code paths like to execute in process context, so we have this
* thread which spins on a set which contains the io rpcs. llite specifies
* ptlrpcd's set when it pushes pages down into the oscs.
complete(&pc->pc_starting);
- /*
+ /*
* This mainloop strongly resembles ptlrpc_set_wait() except that our
* set never completes. ptlrpcd_check() calls ptlrpc_check_set() when
- * there are requests in the set. New requests come in on the set's
- * new_req_list and ptlrpcd_check() moves them into the set.
+ * there are requests in the set. New requests come in on the set's
+ * new_req_list and ptlrpcd_check() moves them into the set.
*/
do {
struct l_wait_info lwi;
int timeout;
timeout = ptlrpc_set_next_timeout(pc->pc_set);
- lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
+ lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
ptlrpc_expired_set, pc->pc_set);
l_wait_event(pc->pc_set->set_waitq, ptlrpcd_check(pc), &lwi);
exit++;
}
- /*
+ /*
* Let's make one more loop to make sure that ptlrpcd_check()
* copied all raced new rpcs into the set so we can kill them.
*/
} while (exit < 2);
- /*
- * Wait for inflight requests to drain.
+ /*
+ * Wait for inflight requests to drain.
*/
if (!list_empty(&pc->pc_set->set_requests))
ptlrpc_set_wait(pc->pc_set);
struct ptlrpcd_ctl *pc = arg;
int rc = 0;
- /*
- * Single threaded!!
+ /*
+ * Single threaded!!
*/
pc->pc_recurred++;
rc = ptlrpcd_check(pc);
if (!rc)
ptlrpc_expired_set(pc->pc_set);
- /*
- * XXX: send replay requests.
+ /*
+ * XXX: send replay requests.
*/
if (pc == &ptlrpcd_recovery_pc)
rc = ptlrpcd_check(pc);
{
int rc = 0;
ENTRY;
-
- /*
- * Do not allow start second thread for one pc.
+
+ /*
+ * Do not allow start second thread for one pc.
*/
if (test_bit(LIOD_START, &pc->pc_flags)) {
CERROR("Starting second thread (%s) for same pc %p\n",