};
struct ptlrpc_request_set;
-typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int);
typedef int (*set_producer_func)(struct ptlrpc_request_set *, void *);
/**
atomic_t set_remaining;
/** wait queue to wait on for request events */
wait_queue_head_t set_waitq;
- wait_queue_head_t *set_wakeup_ptr;
/** List of requests in the set */
struct list_head set_requests;
/**
- * List of completion callbacks to be called when the set is completed
- * This is only used if \a set_interpret is NULL.
- * Links struct ptlrpc_set_cbdata.
- */
- struct list_head set_cblist;
- /** Completion callback, if only one. */
- set_interpreter_func set_interpret;
- /** opaq argument passed to completion \a set_interpret callback. */
- void *set_arg;
- /**
* Lock for \a set_new_requests manipulations
* locked so that any old caller can communicate requests to
* the set holder who can then fold them into the lock-free set
unsigned int set_allow_intr:1;
};
-/**
- * Description of a single ptrlrpc_set callback
- */
-struct ptlrpc_set_cbdata {
- /** List linkage item */
- struct list_head psc_item;
- /** Pointer to interpreting function */
- set_interpreter_func psc_interpret;
- /** Opaq argument to pass to the callback */
- void *psc_data;
-};
-
struct ptlrpc_bulk_desc;
struct ptlrpc_service_part;
struct ptlrpc_service;
struct ptlrpc_request_set *ptlrpc_prep_set(void);
struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
void *arg);
-int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
- set_interpreter_func fn, void *data);
int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set);
int ptlrpc_set_wait(struct ptlrpc_request_set *);
void ptlrpc_mark_interrupted(struct ptlrpc_request *req);
RETURN(rc);
}
-static inline int obd_statfs_rqset(struct obd_export *exp,
- struct obd_statfs *osfs, __u64 max_age,
- __u32 flags)
-{
- struct ptlrpc_request_set *set = NULL;
- struct obd_info oinfo = {
- .oi_osfs = osfs,
- .oi_flags = flags,
- };
- int rc = 0;
-
- ENTRY;
-
- set = ptlrpc_prep_set();
- if (set == NULL)
- RETURN(-ENOMEM);
-
- rc = obd_statfs_async(exp, &oinfo, max_age, set);
- if (rc == 0)
- rc = ptlrpc_set_wait(set);
-
- ptlrpc_set_destroy(set);
-
- RETURN(rc);
-}
-
/* @max_age is the oldest time in jiffies that we accept using a cached data.
* If the cache is older than @max_age we will get a new value from the
* target. Use a value of "cfs_time_current() + HZ" to guarantee freshness. */
#define OBD_FAIL_OST_LIST_ASSERT 0x239
#define OBD_FAIL_OST_GL_WORK_ALLOC 0x240
#define OBD_FAIL_OST_SKIP_LV_CHECK 0x241
+#define OBD_FAIL_OST_STATFS_DELAY 0x242
#define OBD_FAIL_LDLM 0x300
#define OBD_FAIL_LDLM_NAMESPACE_NEW 0x301
if (sbi->ll_flags & LL_SBI_LAZYSTATFS)
flags |= OBD_STATFS_NODELAY;
- rc = obd_statfs_rqset(sbi->ll_dt_exp, &obd_osfs, max_age, flags);
+ rc = obd_statfs(NULL, sbi->ll_dt_exp, &obd_osfs, max_age, flags);
if (rc) {
CERROR("obd_statfs fails: rc = %d\n", rc);
RETURN(rc);
RETURN(rc);
}
-static int
-lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc)
-{
- struct lov_request_set *lovset = (struct lov_request_set *)data;
- int err;
- ENTRY;
-
- if (rc)
- atomic_set(&lovset->set_completes, 0);
-
- err = lov_fini_statfs_set(lovset);
- RETURN(rc ? rc : err);
-}
-
-static int lov_statfs_async(struct obd_export *exp, struct obd_info *oinfo,
- __u64 max_age, struct ptlrpc_request_set *rqset)
-{
- struct obd_device *obd = class_exp2obd(exp);
- struct lov_request_set *set;
- struct lov_request *req;
- struct list_head *pos;
- struct lov_obd *lov;
- int rc = 0;
- ENTRY;
-
- LASSERT(oinfo != NULL);
- LASSERT(oinfo->oi_osfs != NULL);
-
- lov = &obd->u.lov;
- rc = lov_prep_statfs_set(obd, oinfo, &set);
- if (rc)
- RETURN(rc);
-
- list_for_each(pos, &set->set_list) {
- req = list_entry(pos, struct lov_request, rq_link);
- rc = obd_statfs_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
- &req->rq_oi, max_age, rqset);
- if (rc)
- break;
- }
-
- if (rc || list_empty(&rqset->set_requests)) {
- int err;
- if (rc)
- atomic_set(&set->set_completes, 0);
- err = lov_fini_statfs_set(set);
- RETURN(rc ? rc : err);
- }
-
- LASSERT(rqset->set_interpret == NULL);
- rqset->set_interpret = lov_statfs_interpret;
- rqset->set_arg = (void *)set;
- RETURN(0);
-}
-
static int lov_statfs(const struct lu_env *env, struct obd_export *exp,
struct obd_statfs *osfs, __u64 max_age, __u32 flags)
{
- struct ptlrpc_request_set *set = NULL;
+ struct obd_device *obd = class_exp2obd(exp);
+ struct lov_obd *lov = &obd->u.lov;
struct obd_info oinfo = {
.oi_osfs = osfs,
.oi_flags = flags,
};
+ struct ptlrpc_request_set *rqset;
+ struct lov_request_set *set = NULL;
+ struct lov_request *req;
int rc = 0;
+ int rc2;
ENTRY;
- /* for obdclass we forbid using obd_statfs_rqset, but prefer using async
- * statfs requests */
- set = ptlrpc_prep_set();
- if (set == NULL)
+ rqset = ptlrpc_prep_set();
+ if (rqset == NULL)
RETURN(-ENOMEM);
- rc = lov_statfs_async(exp, &oinfo, max_age, set);
+ rc = lov_prep_statfs_set(obd, &oinfo, &set);
+ if (rc < 0)
+ GOTO(out_rqset, rc);
+
+ list_for_each_entry(req, &set->set_list, rq_link) {
+ rc = obd_statfs_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
+ &req->rq_oi, max_age, rqset);
+ if (rc < 0)
+ GOTO(out_set, rc);
+ }
+
+ rc = ptlrpc_set_wait(rqset);
+
+out_set:
+ if (rc < 0)
+ atomic_set(&set->set_completes, 0);
+
+ rc2 = lov_fini_statfs_set(set);
if (rc == 0)
- rc = ptlrpc_set_wait(set);
+ rc = rc2;
- ptlrpc_set_destroy(set);
+out_rqset:
+ ptlrpc_set_destroy(rqset);
RETURN(rc);
}
.o_connect = lov_connect,
.o_disconnect = lov_disconnect,
.o_statfs = lov_statfs,
- .o_statfs_async = lov_statfs_async,
.o_iocontrol = lov_iocontrol,
.o_get_info = lov_get_info,
.o_set_info_async = lov_set_info_async,
ENTRY;
+ OBD_FAIL_TIMEOUT(OBD_FAIL_OST_STATFS_DELAY, 10);
+
osfs = req_capsule_server_get(tsi->tsi_pill, &RMF_OBD_STATFS);
rc = ofd_statfs(tsi->tsi_env, tsi->tsi_exp, osfs,
atomic_set(&set->set_remaining, 0);
spin_lock_init(&set->set_new_req_lock);
INIT_LIST_HEAD(&set->set_new_requests);
- INIT_LIST_HEAD(&set->set_cblist);
set->set_max_inflight = UINT_MAX;
set->set_producer = NULL;
set->set_producer_arg = NULL;
EXPORT_SYMBOL(ptlrpc_set_destroy);
/**
- * Add a callback function \a fn to the set.
- * This function would be called when all requests on this set are completed.
- * The function will be passed \a data argument.
- */
-int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
- set_interpreter_func fn, void *data)
-{
- struct ptlrpc_set_cbdata *cbdata;
-
- OBD_ALLOC_PTR(cbdata);
- if (cbdata == NULL)
- RETURN(-ENOMEM);
-
- cbdata->psc_interpret = fn;
- cbdata->psc_data = data;
- list_add_tail(&cbdata->psc_item, &set->set_cblist);
-
- RETURN(0);
-}
-
-/**
* Add a new request to the general purpose request set.
* Assumes request reference from the caller.
*/
rc = req->rq_status;
}
- if (set->set_interpret != NULL) {
- int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
- set->set_interpret;
- rc = interpreter (set, set->set_arg, rc);
- } else {
- struct ptlrpc_set_cbdata *cbdata, *n;
- int err;
-
- list_for_each_entry_safe(cbdata, n,
- &set->set_cblist, psc_item) {
- list_del_init(&cbdata->psc_item);
- err = cbdata->psc_interpret(set, cbdata->psc_data, rc);
- if (err && !rc)
- rc = err;
- OBD_FREE_PTR(cbdata);
- }
- }
-
- RETURN(rc);
+ RETURN(rc);
}
EXPORT_SYMBOL(ptlrpc_set_wait);
[ "$SLOW" = "no" ] && [ -n "$OLD_RESENDCOUNT" ] && set_resend_count $OLD_RESENDCOUNT
+test_118n()
+{
+ local begin
+ local end
+
+ [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
+ remote_ost_nodsh && skip "remote OSTs with nodsh" && return
+
+ # Sleep to avoid a cached response.
+ #define OBD_STATFS_CACHE_SECONDS 1
+ sleep 2
+
+ # Inject a 10 second delay in the OST_STATFS handler.
+ #define OBD_FAIL_OST_STATFS_DELAY 0x242
+ set_nodes_failloc "$(osts_nodes)" 0x242
+
+ begin=$SECONDS
+ stat --file-system $MOUNT > /dev/null
+ end=$SECONDS
+
+ set_nodes_failloc "$(osts_nodes)" 0
+
+ if ((end - begin > 20)); then
+ error "statfs took $((end - begin)) seconds, expected 10"
+ fi
+}
+run_test 118n "statfs() sends OST_STATFS requests in parallel"
+
test_119a() # bug 11737
{
BSIZE=$((512 * 1024))