-static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data,
- int rc)
-{
- struct lov_request_set *lovset = (struct lov_request_set *)data;
- ENTRY;
-
- if (rc) {
- lovset->set_completes = 0;
- lov_fini_brw_set(lovset);
- } else {
- rc = lov_fini_brw_set(lovset);
- }
-
- RETURN(rc);
-}
-
-static int lov_brw_async(int cmd, struct obd_export *exp,
- struct obd_info *oinfo, obd_count oa_bufs,
- struct brw_page *pga, struct obd_trans_info *oti,
- struct ptlrpc_request_set *set)
-{
- struct lov_request_set *lovset;
- struct lov_request *req;
- struct list_head *pos;
- struct lov_obd *lov = &exp->exp_obd->u.lov;
- int rc = 0;
- ENTRY;
-
- LASSERT(oinfo);
- ASSERT_LSM_MAGIC(oinfo->oi_md);
-
- if (cmd == OBD_BRW_CHECK) {
- rc = lov_brw_check(lov, oinfo, oa_bufs, pga);
- RETURN(rc);
- }
-
- rc = lov_prep_brw_set(exp, oinfo, oa_bufs, pga, oti, &lovset);
- if (rc)
- RETURN(rc);
-
- list_for_each (pos, &lovset->set_list) {
- struct obd_export *sub_exp;
- struct brw_page *sub_pga;
- req = list_entry(pos, struct lov_request, rq_link);
-
- sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
- sub_pga = lovset->set_pga + req->rq_pgaidx;
- rc = obd_brw_async(cmd, sub_exp, &req->rq_oi, req->rq_oabufs,
- sub_pga, oti, set);
- if (rc)
- GOTO(out, rc);
- lov_update_common_set(lovset, req, rc);
- }
- LASSERT(rc == 0);
- LASSERT(set->set_interpret == NULL);
- LASSERT(set->set_arg == NULL);
- rc = ptlrpc_set_add_cb(set, lov_brw_interpret, lovset);
- if (rc)
- GOTO(out, rc);
-
- RETURN(rc);
-out:
- lov_fini_brw_set(lovset);
- RETURN(rc);
-}
-
-static int lov_ap_make_ready(void *data, int cmd)
-{
- struct lov_async_page *lap = LAP_FROM_COOKIE(data);
-
- return lap->lap_caller_ops->ap_make_ready(lap->lap_caller_data, cmd);
-}
-
-static int lov_ap_refresh_count(void *data, int cmd)
-{
- struct lov_async_page *lap = LAP_FROM_COOKIE(data);
-
- return lap->lap_caller_ops->ap_refresh_count(lap->lap_caller_data,
- cmd);
-}
-
-static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
-{
- struct lov_async_page *lap = LAP_FROM_COOKIE(data);
-
- lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa);
- /* XXX woah, shouldn't we be altering more here? size? */
- oa->o_id = lap->lap_loi_id;
- oa->o_gr = lap->lap_loi_gr;
- oa->o_valid |= OBD_MD_FLGROUP;
- oa->o_stripe_idx = lap->lap_stripe;
-}
-
-static void lov_ap_update_obdo(void *data, int cmd, struct obdo *oa,
- obd_valid valid)
-{
- struct lov_async_page *lap = LAP_FROM_COOKIE(data);
-
- lap->lap_caller_ops->ap_update_obdo(lap->lap_caller_data, cmd,oa,valid);
-}
-
-static int lov_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
-{
- struct lov_async_page *lap = LAP_FROM_COOKIE(data);
-
- /* in a raid1 regime this would down a count of many ios
- * in flight, onl calling the caller_ops completion when all
- * the raid1 ios are complete */
- rc = lap->lap_caller_ops->ap_completion(lap->lap_caller_data,cmd,oa,rc);
- return rc;
-}
-
-static struct obd_capa *lov_ap_lookup_capa(void *data, int cmd)
-{
- struct lov_async_page *lap = LAP_FROM_COOKIE(data);
- return lap->lap_caller_ops->ap_lookup_capa(lap->lap_caller_data, cmd);
-}
-
-static struct obd_async_page_ops lov_async_page_ops = {
- .ap_make_ready = lov_ap_make_ready,
- .ap_refresh_count = lov_ap_refresh_count,
- .ap_fill_obdo = lov_ap_fill_obdo,
- .ap_update_obdo = lov_ap_update_obdo,
- .ap_completion = lov_ap_completion,
- .ap_lookup_capa = lov_ap_lookup_capa,
-};
-
-int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
- struct lov_oinfo *loi, cfs_page_t *page,
- obd_off offset, struct obd_async_page_ops *ops,
- void *data, void **res, int nocache,
- struct lustre_handle *lockh)
-{
- struct lov_obd *lov = &exp->exp_obd->u.lov;
- struct lov_async_page *lap;
- struct lov_lock_handles *lov_lockh = NULL;
- int rc = 0;
- ENTRY;
-
- if (!page) {
- int i = 0;
- /* Find an existing osc so we can get it's stupid sizeof(*oap).
- Only because of this layering limitation will a client
- mount with no osts fail */
- while (!lov->lov_tgts || !lov->lov_tgts[i] ||
- !lov->lov_tgts[i]->ltd_exp) {
- i++;
- if (i >= lov->desc.ld_tgt_count)
- RETURN(-ENOMEDIUM);
- }
- rc = size_round(sizeof(*lap)) +
- obd_prep_async_page(lov->lov_tgts[i]->ltd_exp, NULL,
- NULL, NULL, 0, NULL, NULL, NULL, 0,
- NULL);
- RETURN(rc);
- }
- ASSERT_LSM_MAGIC(lsm);
- LASSERT(loi == NULL);
-
- lap = *res;
- lap->lap_magic = LOV_AP_MAGIC;
- lap->lap_caller_ops = ops;
- lap->lap_caller_data = data;
-
- /* for now only raid 0 which passes through */
- lap->lap_stripe = lov_stripe_number(lsm, offset);
- lov_stripe_offset(lsm, offset, lap->lap_stripe, &lap->lap_sub_offset);
- loi = lsm->lsm_oinfo[lap->lap_stripe];
-
- /* so the callback doesn't need the lsm */
- lap->lap_loi_id = loi->loi_id;
- lap->lap_loi_gr = lsm->lsm_object_gr;
- LASSERT(lsm->lsm_object_gr > 0);
-
- lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap));
-
- if (lockh) {
- lov_lockh = lov_handle2llh(lockh);
- if (lov_lockh) {
- lockh = lov_lockh->llh_handles + lap->lap_stripe;
- }
- }
-
- rc = obd_prep_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
- lsm, loi, page, lap->lap_sub_offset,
- &lov_async_page_ops, lap,
- &lap->lap_sub_cookie, nocache, lockh);
- if (lov_lockh)
- lov_llh_put(lov_lockh);
- if (rc)
- RETURN(rc);
- CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
- lap->lap_sub_cookie, offset);
- RETURN(0);
-}
-
-static int lov_queue_async_io(struct obd_export *exp,
- struct lov_stripe_md *lsm,
- struct lov_oinfo *loi, void *cookie,
- int cmd, obd_off off, int count,
- obd_flag brw_flags, obd_flag async_flags)
-{
- struct lov_obd *lov = &exp->exp_obd->u.lov;
- struct lov_async_page *lap;
- int rc;
-
- LASSERT(loi == NULL);
-
- ASSERT_LSM_MAGIC(lsm);
-
- lap = LAP_FROM_COOKIE(cookie);
-
- loi = lsm->lsm_oinfo[lap->lap_stripe];
-
- rc = obd_queue_async_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp, lsm,
- loi, lap->lap_sub_cookie, cmd, off, count,
- brw_flags, async_flags);
- RETURN(rc);
-}
-
-static int lov_set_async_flags(struct obd_export *exp,
- struct lov_stripe_md *lsm,
- struct lov_oinfo *loi, void *cookie,
- obd_flag async_flags)
-{
- struct lov_obd *lov = &exp->exp_obd->u.lov;
- struct lov_async_page *lap;
- int rc;
-
- LASSERT(loi == NULL);
-
- ASSERT_LSM_MAGIC(lsm);
-
- lap = LAP_FROM_COOKIE(cookie);
-
- loi = lsm->lsm_oinfo[lap->lap_stripe];
-
- rc = obd_set_async_flags(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
- lsm, loi, lap->lap_sub_cookie, async_flags);
- RETURN(rc);
-}
-
-static int lov_queue_group_io(struct obd_export *exp,
- struct lov_stripe_md *lsm,
- struct lov_oinfo *loi,
- struct obd_io_group *oig, void *cookie,
- int cmd, obd_off off, int count,
- obd_flag brw_flags, obd_flag async_flags)
-{
- struct lov_obd *lov = &exp->exp_obd->u.lov;
- struct lov_async_page *lap;
- int rc;
-
- LASSERT(loi == NULL);
-
- ASSERT_LSM_MAGIC(lsm);
-
- lap = LAP_FROM_COOKIE(cookie);
-
- loi = lsm->lsm_oinfo[lap->lap_stripe];
-
- rc = obd_queue_group_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp, lsm,
- loi, oig, lap->lap_sub_cookie, cmd, off, count,
- brw_flags, async_flags);
- RETURN(rc);
-}
-
-/* this isn't exactly optimal. we may have queued sync io in oscs on
- * all stripes, but we don't record that fact at queue time. so we
- * trigger sync io on all stripes. */
-static int lov_trigger_group_io(struct obd_export *exp,
- struct lov_stripe_md *lsm,
- struct lov_oinfo *loi,
- struct obd_io_group *oig)
-{
- struct lov_obd *lov = &exp->exp_obd->u.lov;
- int rc = 0, i, err;
-
- LASSERT(loi == NULL);
-
- ASSERT_LSM_MAGIC(lsm);
-
- for (i = 0; i < lsm->lsm_stripe_count; i++) {
- loi = lsm->lsm_oinfo[i];
- if (!lov->lov_tgts[loi->loi_ost_idx] ||
- !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
- CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
- continue;
- }
-
- err = obd_trigger_group_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
- lsm, loi, oig);
- if (rc == 0 && err != 0)
- rc = err;
- };
- RETURN(rc);
-}
-
-static int lov_teardown_async_page(struct obd_export *exp,
- struct lov_stripe_md *lsm,
- struct lov_oinfo *loi, void *cookie)
-{
- struct lov_obd *lov = &exp->exp_obd->u.lov;
- struct lov_async_page *lap;
- int rc;
-
- LASSERT(loi == NULL);
-
- ASSERT_LSM_MAGIC(lsm);
-
- lap = LAP_FROM_COOKIE(cookie);
-
- loi = lsm->lsm_oinfo[lap->lap_stripe];
-
- rc = obd_teardown_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
- lsm, loi, lap->lap_sub_cookie);
- if (rc) {
- CERROR("unable to teardown sub cookie %p: %d\n",
- lap->lap_sub_cookie, rc);
- RETURN(rc);
- }
- RETURN(rc);
-}
-