};
struct lov_request_set {
- struct ldlm_enqueue_info*set_ei;
- struct obd_info *set_oi;
- cfs_atomic_t set_refcount;
- struct obd_export *set_exp;
- /* XXX: There is @set_exp already, however obd_statfs gets obd_device
- only. */
- struct obd_device *set_obd;
- int set_count;
- cfs_atomic_t set_completes;
- cfs_atomic_t set_success;
- struct llog_cookie *set_cookies;
- int set_cookie_sent;
- struct obd_trans_info *set_oti;
- obd_count set_oabufs;
- struct brw_page *set_pga;
- struct lov_lock_handles *set_lockh;
- cfs_list_t set_list;
- cfs_waitq_t set_waitq;
- cfs_spinlock_t set_lock;
+ struct ldlm_enqueue_info *set_ei;
+ struct obd_info *set_oi;
+ cfs_atomic_t set_refcount;
+ struct obd_export *set_exp;
+ /* XXX: There is @set_exp already, however obd_statfs gets obd_device
+ only. */
+ struct obd_device *set_obd;
+ int set_count;
+ cfs_atomic_t set_completes;
+ cfs_atomic_t set_success;
+ cfs_atomic_t set_finish_checked;
+ struct llog_cookie *set_cookies;
+ int set_cookie_sent;
+ struct obd_trans_info *set_oti;
+ obd_count set_oabufs;
+ struct brw_page *set_pga;
+ struct lov_lock_handles *set_lockh;
+ cfs_list_t set_list;
+ cfs_waitq_t set_waitq;
+ cfs_spinlock_t set_lock;
};
extern cfs_mem_cache_t *lov_oinfo_slab;
/* lov_request.c */
void lov_set_add_req(struct lov_request *req, struct lov_request_set *set);
-int lov_finished_set(struct lov_request_set *set);
+int lov_set_finished(struct lov_request_set *set, int idempotent);
void lov_update_set(struct lov_request_set *set,
struct lov_request *req, int rc);
int lov_update_common_set(struct lov_request_set *set,
/* osc_create have timeout equ obd_timeout/2 so waiting don't be
* longer then this */
- l_wait_event(set->set_waitq, lov_finished_set(set), &lwi);
+ l_wait_event(set->set_waitq, lov_set_finished(set, 1), &lwi);
/* we not have ptlrpc set for assign set->interpret and should
* be call interpret function himself. calling from cb_create_update
int qos_remedy_create(struct lov_request_set *set, struct lov_request *req)
{
- struct lov_stripe_md *lsm = set->set_oi->oi_md;
- struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
- unsigned ost_idx = 0, ost_count;
- struct pool_desc *pool;
- struct ost_pool *osts = NULL;
- int i, rc = -EIO;
- ENTRY;
-
- /* First check whether we can create the objects on the pool */
- pool = lov_find_pool(lov, lsm->lsm_pool_name);
- if (pool != NULL) {
- cfs_down_read(&pool_tgt_rw_sem(pool));
- osts = &(pool->pool_obds);
- ost_count = osts->op_count;
- for (i = 0; i < ost_count; i++, ost_idx = osts->op_array[i]) {
- rc = lov_check_and_create_object(lov, ost_idx, lsm, req,
- set->set_oti);
- if (rc == 0)
- break;
- }
- cfs_up_read(&pool_tgt_rw_sem(pool));
- lov_pool_putref(pool);
- RETURN(rc);
- }
+ struct lov_stripe_md *lsm = set->set_oi->oi_md;
+ struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
+ unsigned ost_idx;
+ unsigned ost_count;
+ struct pool_desc *pool;
+ struct ost_pool *osts = NULL;
+ int i;
+ int rc = -EIO;
+ ENTRY;
+
+ /* First check whether we can create the objects on the pool */
+ pool = lov_find_pool(lov, lsm->lsm_pool_name);
+ if (pool != NULL) {
+ cfs_down_read(&pool_tgt_rw_sem(pool));
+ osts = &(pool->pool_obds);
+ ost_count = osts->op_count;
+ for (i = 0, ost_idx = osts->op_array[0]; i < ost_count;
+ i++, ost_idx = osts->op_array[i]) {
+ rc = lov_check_and_create_object(lov, ost_idx, lsm, req,
+ set->set_oti);
+ if (rc == 0)
+ break;
+ }
+ cfs_up_read(&pool_tgt_rw_sem(pool));
+ lov_pool_putref(pool);
+ RETURN(rc);
+ }
ost_count = lov->desc.ld_tgt_count;
/* Then check whether we can create the objects on other OSTs */
static void lov_init_set(struct lov_request_set *set)
{
- set->set_count = 0;
- cfs_atomic_set(&set->set_completes, 0);
- cfs_atomic_set(&set->set_success, 0);
- set->set_cookies = 0;
- CFS_INIT_LIST_HEAD(&set->set_list);
- cfs_atomic_set(&set->set_refcount, 1);
- cfs_waitq_init(&set->set_waitq);
- cfs_spin_lock_init(&set->set_lock);
+ set->set_count = 0;
+ cfs_atomic_set(&set->set_completes, 0);
+ cfs_atomic_set(&set->set_success, 0);
+ cfs_atomic_set(&set->set_finish_checked, 0);
+ set->set_cookies = 0;
+ CFS_INIT_LIST_HEAD(&set->set_list);
+ cfs_atomic_set(&set->set_refcount, 1);
+ cfs_waitq_init(&set->set_waitq);
+ cfs_spin_lock_init(&set->set_lock);
}
void lov_finish_set(struct lov_request_set *set)
EXIT;
}
-int lov_finished_set(struct lov_request_set *set)
+int lov_set_finished(struct lov_request_set *set, int idempotent)
{
- int completes = cfs_atomic_read(&set->set_completes);
+ int completes = cfs_atomic_read(&set->set_completes);
+
+ CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
- CDEBUG(D_INFO, "check set %d/%d\n", completes,
- set->set_count);
- return completes == set->set_count;
+ if (completes == set->set_count) {
+ if (idempotent)
+ return 1;
+ if (cfs_atomic_inc_return(&set->set_finish_checked) == 1)
+ return 1;
+ }
+ return 0;
}
void lov_update_set(struct lov_request_set *set,
if (lovreq->rq_idx == cfs_fail_val)
rc = -ENOTCONN;
- rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
- if (lov_finished_set(lovreq->rq_rqset))
- lov_put_reqset(lovreq->rq_rqset);
- return rc;
+ rc = lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
+ if (lov_set_finished(lovreq->rq_rqset, 0))
+ lov_put_reqset(lovreq->rq_rqset);
+ return rc;
}
int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
obd_putref(lovobd);
out:
- if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
- lov_finished_set(set)) {
- lov_statfs_interpret(NULL, set, set->set_count !=
- cfs_atomic_read(&set->set_success));
- if (lov->lov_qos.lq_statfs_in_progress)
- qos_statfs_done(lov);
- }
-
- RETURN(0);
+ if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
+ lov_set_finished(set, 0)) {
+ lov_statfs_interpret(NULL, set, set->set_count !=
+ cfs_atomic_read(&set->set_success));
+ if (lov->lov_qos.lq_statfs_in_progress)
+ qos_statfs_done(lov);
+ }
+
+ RETURN(0);
}
int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,