Whamcloud - gitweb
land clio.
[fs/lustre-release.git] / lustre / lov / lov_obd.c
index 84835e5..f946a91 100644 (file)
@@ -63,7 +63,7 @@
 #include <obd_ost.h>
 #include <lprocfs_status.h>
 #include <lustre_param.h>
-#include <lustre_cache.h>
+#include <cl_object.h>
 #include <lustre/ll_fiemap.h>
 
 #include "lov_internal.h"
@@ -104,97 +104,9 @@ void lov_putref(struct obd_device *obd)
         mutex_up(&lov->lov_lock);
 }
 
-static int lov_register_page_removal_cb(struct obd_export *exp,
-                                        obd_page_removal_cb_t func,
-                                        obd_pin_extent_cb pin_cb)
-{
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        int i, rc = 0;
-
-        if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
-                return -EBUSY;
-
-        if (lov->lov_page_pin_cb && lov->lov_page_pin_cb != pin_cb)
-                return -EBUSY;
-
-        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
-                        continue;
-                rc |= obd_register_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
-                                                   func, pin_cb);
-        }
-
-        lov->lov_page_removal_cb = func;
-        lov->lov_page_pin_cb = pin_cb;
-
-        return rc;
-}
-
-static int lov_unregister_page_removal_cb(struct obd_export *exp,
-                                        obd_page_removal_cb_t func)
-{
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        int i, rc = 0;
-
-        if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
-                return -EINVAL;
-
-        lov->lov_page_removal_cb = NULL;
-        lov->lov_page_pin_cb = NULL;
-
-        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
-                        continue;
-                rc |= obd_unregister_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
-                                                     func);
-        }
-
-        return rc;
-}
-
-static int lov_register_lock_cancel_cb(struct obd_export *exp,
-                                         obd_lock_cancel_cb func)
-{
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        int i, rc = 0;
-
-        if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
-                return -EBUSY;
-
-        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
-                        continue;
-                rc |= obd_register_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
-                                                  func);
-        }
-
-        lov->lov_lock_cancel_cb = func;
-
-        return rc;
-}
-
-static int lov_unregister_lock_cancel_cb(struct obd_export *exp,
-                                         obd_lock_cancel_cb func)
-{
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        int i, rc = 0;
-
-        if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
-                return -EINVAL;
-
-        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
-                        continue;
-                rc |= obd_unregister_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
-                                                    func);
-        }
-        lov->lov_lock_cancel_cb = NULL;
-        return rc;
-}
-
 #define MAX_STRING_SIZE 128
-static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
-                           struct obd_connect_data *data)
+int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
+                    struct obd_connect_data *data)
 {
         struct lov_obd *lov = &obd->u.lov;
         struct obd_uuid tgt_uuid;
@@ -236,7 +148,7 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
 
         if (activate) {
                 tgt_obd->obd_no_recov = 0;
-                /* FIXME this is probably supposed to be 
+                /* FIXME this is probably supposed to be
                    ptlrpc_set_import_active.  Horrible naming. */
                 ptlrpc_activate_import(imp);
         }
@@ -265,33 +177,10 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
                 RETURN(-ENODEV);
         }
 
-        rc = obd_register_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
-                                          lov->lov_page_removal_cb,
-                                          lov->lov_page_pin_cb);
-        if (rc) {
-                obd_disconnect(lov->lov_tgts[index]->ltd_exp);
-                lov->lov_tgts[index]->ltd_exp = NULL;
-                RETURN(rc);
-        }
-
-        rc = obd_register_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
-                                         lov->lov_lock_cancel_cb);
-        if (rc) {
-                obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
-                                               lov->lov_page_removal_cb);
-                obd_disconnect(lov->lov_tgts[index]->ltd_exp);
-                lov->lov_tgts[index]->ltd_exp = NULL;
-                RETURN(rc);
-        }
-
         rc = obd_register_observer(tgt_obd, obd);
         if (rc) {
                 CERROR("Target %s register_observer error %d\n",
                        obd_uuid2str(&tgt_uuid), rc);
-                obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
-                                              lov->lov_lock_cancel_cb);
-                obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
-                                               lov->lov_page_removal_cb);
                 obd_disconnect(lov->lov_tgts[index]->ltd_exp);
                 lov->lov_tgts[index]->ltd_exp = NULL;
                 RETURN(rc);
@@ -396,11 +285,6 @@ static int lov_disconnect_obd(struct obd_device *obd, __u32 index)
         CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
                obd->obd_name, osc_obd->obd_name);
 
-        obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
-                                      lov->lov_lock_cancel_cb);
-        obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
-                                       lov->lov_page_removal_cb);
-
         if (lov->lov_tgts[index]->ltd_active) {
                 lov->lov_tgts[index]->ltd_active = 0;
                 lov->desc.ld_active_tgt_count--;
@@ -446,9 +330,6 @@ static int lov_disconnect_obd(struct obd_device *obd, __u32 index)
         RETURN(0);
 }
 
-static int lov_del_target(struct obd_device *obd, __u32 index,
-                          struct obd_uuid *uuidp, int gen);
-
 static int lov_disconnect(struct obd_export *exp)
 {
         struct obd_device *obd = class_exp2obd(exp);
@@ -608,8 +489,8 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
         RETURN(rc);
 }
 
-static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
-                          __u32 index, int gen, int active)
+int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
+                   __u32 index, int gen, int active)
 {
         struct lov_obd *lov = &obd->u.lov;
         struct lov_tgt_desc *tgt;
@@ -721,8 +602,8 @@ out:
 }
 
 /* Schedule a target for deletion */
-static int lov_del_target(struct obd_device *obd, __u32 index,
-                          struct obd_uuid *uuidp, int gen)
+int lov_del_target(struct obd_device *obd, __u32 index,
+                   struct obd_uuid *uuidp, int gen)
 {
         struct lov_obd *lov = &obd->u.lov;
         int count = lov->desc.ld_tgt_count;
@@ -841,7 +722,7 @@ void lov_fix_desc(struct lov_desc *desc)
         lov_fix_desc_qos_maxage(&desc->ld_qos_maxage);
 }
 
-static int lov_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
+int lov_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
         struct lprocfs_static_vars lvars = { 0 };
         struct lov_desc *desc;
@@ -1005,9 +886,9 @@ static int lov_cleanup(struct obd_device *obd)
         RETURN(0);
 }
 
-static int lov_process_config(struct obd_device *obd, obd_count len, void *buf)
+int lov_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg,
+                            __u32 *indexp, int *genp)
 {
-        struct lustre_cfg *lcfg = buf;
         struct obd_uuid obd_uuid;
         int cmd;
         int rc = 0;
@@ -1025,10 +906,12 @@ static int lov_process_config(struct obd_device *obd, obd_count len, void *buf)
 
                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
 
-                if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
+                if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", indexp) != 1)
                         GOTO(out, rc = -EINVAL);
-                if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
+                if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", genp) != 1)
                         GOTO(out, rc = -EINVAL);
+                index = *indexp;
+                gen = *genp;
                 if (cmd == LCFG_LOV_ADD_OBD)
                         rc = lov_add_target(obd, &obd_uuid, index, gen, 1);
                 else if (cmd == LCFG_LOV_ADD_INA)
@@ -1678,7 +1561,7 @@ static int lov_brw_check(struct lov_obd *lov, struct obd_info *lov_oinfo,
                 obd_off start, end;
 
                 if (!lov_stripe_intersects(lov_oinfo->oi_md, i, pga[i].off,
-                                           pga[i].off + pga[i].count,
+                                           pga[i].off + pga[i].count - 1,
                                            &start, &end))
                         continue;
 
@@ -1737,330 +1620,6 @@ static int lov_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
         RETURN(rc);
 }
 
-static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data,
-                             int rc)
-{
-        struct lov_request_set *lovset = (struct lov_request_set *)data;
-        ENTRY;
-
-        if (rc) {
-                lovset->set_completes = 0;
-                lov_fini_brw_set(lovset);
-        } else {
-                rc = lov_fini_brw_set(lovset);
-        }
-
-        RETURN(rc);
-}
-
-static int lov_brw_async(int cmd, struct obd_export *exp,
-                         struct obd_info *oinfo, obd_count oa_bufs,
-                         struct brw_page *pga, struct obd_trans_info *oti,
-                         struct ptlrpc_request_set *set)
-{
-        struct lov_request_set *lovset;
-        struct lov_request *req;
-        struct list_head *pos;
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        int rc = 0;
-        ENTRY;
-
-        LASSERT(oinfo);
-        ASSERT_LSM_MAGIC(oinfo->oi_md);
-
-        if (cmd == OBD_BRW_CHECK) {
-                rc = lov_brw_check(lov, oinfo, oa_bufs, pga);
-                RETURN(rc);
-        }
-
-        rc = lov_prep_brw_set(exp, oinfo, oa_bufs, pga, oti, &lovset);
-        if (rc)
-                RETURN(rc);
-
-        list_for_each (pos, &lovset->set_list) {
-                struct obd_export *sub_exp;
-                struct brw_page *sub_pga;
-                req = list_entry(pos, struct lov_request, rq_link);
-
-                sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
-                sub_pga = lovset->set_pga + req->rq_pgaidx;
-                rc = obd_brw_async(cmd, sub_exp, &req->rq_oi, req->rq_oabufs,
-                                   sub_pga, oti, set);
-                if (rc)
-                        GOTO(out, rc);
-                lov_update_common_set(lovset, req, rc);
-        }
-        LASSERT(rc == 0);
-        LASSERT(set->set_interpret == NULL);
-        LASSERT(set->set_arg == NULL);
-        rc = ptlrpc_set_add_cb(set, lov_brw_interpret, lovset);
-        if (rc)
-                GOTO(out, rc);
-
-        RETURN(rc);
-out:
-        lov_fini_brw_set(lovset);
-        RETURN(rc);
-}
-
-static int lov_ap_make_ready(void *data, int cmd)
-{
-        struct lov_async_page *lap = lap_from_cookie(data);
-
-        return lap->lap_caller_ops->ap_make_ready(lap->lap_caller_data, cmd);
-}
-
-static int lov_ap_refresh_count(void *data, int cmd)
-{
-        struct lov_async_page *lap = lap_from_cookie(data);
-
-        return lap->lap_caller_ops->ap_refresh_count(lap->lap_caller_data,
-                                                     cmd);
-}
-
-static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
-{
-        struct lov_async_page *lap = lap_from_cookie(data);
-
-        lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa);
-        /* XXX woah, shouldn't we be altering more here?  size? */
-        oa->o_id = lap->lap_loi_id;
-        oa->o_gr = lap->lap_loi_gr;
-        oa->o_valid |= OBD_MD_FLGROUP;
-        oa->o_stripe_idx = lap->lap_stripe;
-}
-
-static void lov_ap_update_obdo(void *data, int cmd, struct obdo *oa,
-                               obd_valid valid)
-{
-        struct lov_async_page *lap = lap_from_cookie(data);
-
-        lap->lap_caller_ops->ap_update_obdo(lap->lap_caller_data, cmd,oa,valid);
-}
-
-static int lov_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
-{
-        struct lov_async_page *lap = lap_from_cookie(data);
-
-        /* in a raid1 regime this would down a count of many ios
-         * in flight, onl calling the caller_ops completion when all
-         * the raid1 ios are complete */
-        rc = lap->lap_caller_ops->ap_completion(lap->lap_caller_data,cmd,oa,rc);
-        return rc;
-}
-
-static struct obd_capa *lov_ap_lookup_capa(void *data, int cmd)
-{
-        struct lov_async_page *lap = lap_from_cookie(data);
-        return lap->lap_caller_ops->ap_lookup_capa(lap->lap_caller_data, cmd);
-}
-
-static struct obd_async_page_ops lov_async_page_ops = {
-        .ap_make_ready =        lov_ap_make_ready,
-        .ap_refresh_count =     lov_ap_refresh_count,
-        .ap_fill_obdo =         lov_ap_fill_obdo,
-        .ap_update_obdo =       lov_ap_update_obdo,
-        .ap_completion =        lov_ap_completion,
-        .ap_lookup_capa =       lov_ap_lookup_capa,
-};
-
-int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
-                           struct lov_oinfo *loi, cfs_page_t *page,
-                           obd_off offset, struct obd_async_page_ops *ops,
-                           void *data, void **res, int nocache,
-                           struct lustre_handle *lockh)
-{
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        struct lov_async_page *lap;
-        struct lov_lock_handles *lov_lockh = NULL;
-        int rc = 0;
-        ENTRY;
-
-        if (!page) {
-                int i = 0;
-                /* Find an existing osc so we can get it's stupid sizeof(*oap).
-                   Only because of this layering limitation will a client
-                   mount with no osts fail */
-                while (!lov->lov_tgts || !lov->lov_tgts[i] ||
-                       !lov->lov_tgts[i]->ltd_exp) {
-                        i++;
-                        if (i >= lov->desc.ld_tgt_count)
-                                RETURN(-ENOMEDIUM);
-                }
-                rc = size_round(sizeof(*lap)) +
-                        obd_prep_async_page(lov->lov_tgts[i]->ltd_exp, NULL,
-                                            NULL, NULL, 0, NULL, NULL, NULL, 0,
-                                            NULL);
-                RETURN(rc);
-        }
-        ASSERT_LSM_MAGIC(lsm);
-        LASSERT(loi == NULL);
-
-        lap = *res;
-        lap->lap_magic = LOV_AP_MAGIC;
-        lap->lap_caller_ops = ops;
-        lap->lap_caller_data = data;
-
-        /* for now only raid 0 which passes through */
-        lap->lap_stripe = lov_stripe_number(lsm, offset);
-        lov_stripe_offset(lsm, offset, lap->lap_stripe, &lap->lap_sub_offset);
-        loi = lsm->lsm_oinfo[lap->lap_stripe];
-
-        /* so the callback doesn't need the lsm */
-        lap->lap_loi_id = loi->loi_id;
-        lap->lap_loi_gr = lsm->lsm_object_gr;
-        LASSERT(lsm->lsm_object_gr > 0);
-        
-        lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap));
-
-        if (lockh) {
-                lov_lockh = lov_handle2llh(lockh);
-                if (lov_lockh) {
-                        lockh = lov_lockh->llh_handles + lap->lap_stripe;
-                }
-        }
-
-        rc = obd_prep_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
-                                 lsm, loi, page, lap->lap_sub_offset,
-                                 &lov_async_page_ops, lap,
-                                 &lap->lap_sub_cookie, nocache, lockh);
-        if (lov_lockh)
-                lov_llh_put(lov_lockh);
-        if (rc)
-                RETURN(rc);
-        CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
-               lap->lap_sub_cookie, offset);
-        RETURN(0);
-}
-
-static int lov_queue_async_io(struct obd_export *exp,
-                              struct lov_stripe_md *lsm,
-                              struct lov_oinfo *loi, void *cookie,
-                              int cmd, obd_off off, int count,
-                              obd_flag brw_flags, obd_flag async_flags)
-{
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        struct lov_async_page *lap;
-        int rc;
-
-        LASSERT(loi == NULL);
-
-        ASSERT_LSM_MAGIC(lsm);
-
-        lap = lap_from_cookie(cookie);
-
-        loi = lsm->lsm_oinfo[lap->lap_stripe];
-
-        rc = obd_queue_async_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp, lsm,
-                                loi, lap->lap_sub_cookie, cmd, off, count,
-                                brw_flags, async_flags);
-        RETURN(rc);
-}
-
-static int lov_set_async_flags(struct obd_export *exp,
-                               struct lov_stripe_md *lsm,
-                               struct lov_oinfo *loi, void *cookie,
-                               obd_flag async_flags)
-{
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        struct lov_async_page *lap;
-        int rc;
-
-        LASSERT(loi == NULL);
-
-        ASSERT_LSM_MAGIC(lsm);
-
-        lap = lap_from_cookie(cookie);
-
-        loi = lsm->lsm_oinfo[lap->lap_stripe];
-
-        rc = obd_set_async_flags(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
-                                 lsm, loi, lap->lap_sub_cookie, async_flags);
-        RETURN(rc);
-}
-
-static int lov_queue_group_io(struct obd_export *exp,
-                              struct lov_stripe_md *lsm,
-                              struct lov_oinfo *loi,
-                              struct obd_io_group *oig, void *cookie,
-                              int cmd, obd_off off, int count,
-                              obd_flag brw_flags, obd_flag async_flags)
-{
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        struct lov_async_page *lap;
-        int rc;
-
-        LASSERT(loi == NULL);
-
-        ASSERT_LSM_MAGIC(lsm);
-
-        lap = lap_from_cookie(cookie);
-
-        loi = lsm->lsm_oinfo[lap->lap_stripe];
-
-        rc = obd_queue_group_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp, lsm,
-                                loi, oig, lap->lap_sub_cookie, cmd, off, count,
-                                brw_flags, async_flags);
-        RETURN(rc);
-}
-
-/* this isn't exactly optimal.  we may have queued sync io in oscs on
- * all stripes, but we don't record that fact at queue time.  so we
- * trigger sync io on all stripes. */
-static int lov_trigger_group_io(struct obd_export *exp,
-                                struct lov_stripe_md *lsm,
-                                struct lov_oinfo *loi,
-                                struct obd_io_group *oig)
-{
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        int rc = 0, i, err;
-
-        LASSERT(loi == NULL);
-
-        ASSERT_LSM_MAGIC(lsm);
-
-        for (i = 0; i < lsm->lsm_stripe_count; i++) {
-                loi = lsm->lsm_oinfo[i];
-                if (!lov->lov_tgts[loi->loi_ost_idx] ||
-                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
-                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
-                        continue;
-                }
-
-                err = obd_trigger_group_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
-                                           lsm, loi, oig);
-                if (rc == 0 && err != 0)
-                        rc = err;
-        };
-        RETURN(rc);
-}
-
-static int lov_teardown_async_page(struct obd_export *exp,
-                                   struct lov_stripe_md *lsm,
-                                   struct lov_oinfo *loi, void *cookie)
-{
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        struct lov_async_page *lap;
-        int rc;
-
-        LASSERT(loi == NULL);
-
-        ASSERT_LSM_MAGIC(lsm);
-
-        lap = lap_from_cookie(cookie);
-
-        loi = lsm->lsm_oinfo[lap->lap_stripe];
-
-        rc = obd_teardown_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
-                                     lsm, loi, lap->lap_sub_cookie);
-        if (rc) {
-                CERROR("unable to teardown sub cookie %p: %d\n",
-                       lap->lap_sub_cookie, rc);
-                RETURN(rc);
-        }
-        RETURN(rc);
-}
-
 static int lov_enqueue_interpret(struct ptlrpc_request_set *rqset,
                                  void *data, int rc)
 {
@@ -2118,50 +1677,6 @@ out:
         RETURN(rc);
 }
 
-static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
-                     __u32 type, ldlm_policy_data_t *policy, __u32 mode,
-                     int *flags, void *data, struct lustre_handle *lockh)
-{
-        struct lov_request_set *set;
-        struct obd_info oinfo;
-        struct lov_request *req;
-        struct list_head *pos;
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        struct lustre_handle *lov_lockhp;
-        int lov_flags, rc = 0;
-        ENTRY;
-
-        ASSERT_LSM_MAGIC(lsm);
-        LASSERT((*flags & LDLM_FL_TEST_LOCK) || mode == (mode & -mode));
-
-        if (!exp || !exp->exp_obd)
-                RETURN(-ENODEV);
-
-        lov = &exp->exp_obd->u.lov;
-        rc = lov_prep_match_set(exp, &oinfo, lsm, policy, mode, lockh, &set);
-        if (rc)
-                RETURN(rc);
-
-        list_for_each (pos, &set->set_list) {
-                ldlm_policy_data_t sub_policy;
-                req = list_entry(pos, struct lov_request, rq_link);
-                lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
-                LASSERT(lov_lockhp);
-
-                lov_flags = *flags;
-                sub_policy.l_extent = req->rq_oi.oi_policy.l_extent;
-
-                rc = obd_match(lov->lov_tgts[req->rq_idx]->ltd_exp,
-                               req->rq_oi.oi_md, type, &sub_policy,
-                               mode, &lov_flags, data, lov_lockhp);
-                rc = lov_update_match_set(set, req, rc);
-                if (rc <= 0)
-                        break;
-        }
-        lov_fini_match_set(set, mode, *flags);
-        RETURN(rc);
-}
-
 static int lov_change_cbdata(struct obd_export *exp,
                              struct lov_stripe_md *lsm, ldlm_iterator_t it,
                              void *data)
@@ -2186,7 +1701,7 @@ static int lov_change_cbdata(struct obd_export *exp,
                         CDEBUG(D_HA, "lov idx %d NULL \n", loi->loi_ost_idx);
                         continue;
                 }
-                
+
                 submd.lsm_object_id = loi->loi_id;
                 submd.lsm_object_gr = lsm->lsm_object_gr;
                 submd.lsm_stripe_count = 0;
@@ -3149,7 +2664,7 @@ int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm,
                 struct ldlm_lock *lock;
                 struct obd_device *obd;
 
-                lock = ldlm_handle2lock(lov_lockhp);
+                lock = ldlm_handle2lock_long(lov_lockhp, 0);
                 if (lock == NULL) {
                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
                                loi->loi_ost_idx, loi->loi_id);
@@ -3207,93 +2722,13 @@ void lov_stripe_unlock(struct lov_stripe_md *md)
 }
 EXPORT_SYMBOL(lov_stripe_unlock);
 
-/**
- * Checks if requested extent lock is compatible with a lock under the page.
- *
- * Checks if the lock under \a page is compatible with a read or write lock
- * (specified by \a rw) for an extent [\a start , \a end].
- *
- * \param exp lov export
- * \param lsm striping information for the file
- * \param res lov_async_page placeholder
- * \param rw OBD_BRW_READ if requested for reading,
- *           OBD_BRW_WRITE if requested for writing
- * \param start start of the requested extent
- * \param end end of the requested extent
- * \param cookie transparent parameter for passing locking context
- *
- * \post result == 1, *cookie == context, appropriate lock is referenced or
- * \post result == 0
- *
- * \retval 1 owned lock is reused for the request
- * \retval 0 no lock reused for the request
- *
- * \see lov_release_short_lock
- */
-static int lov_reget_short_lock(struct obd_export *exp,
-                                struct lov_stripe_md *lsm,
-                                void **res, int rw,
-                                obd_off start, obd_off end,
-                                void **cookie)
-{
-        struct lov_async_page *l = *res;
-        obd_off stripe_start, stripe_end = start;
-
-        ENTRY;
-
-        /* ensure we don't cross stripe boundaries */
-        lov_extent_calc(exp, lsm, OBD_CALC_STRIPE_END, &stripe_end);
-        if (stripe_end <= end)
-                RETURN(0);
-
-        /* map the region limits to the object limits */
-        lov_stripe_offset(lsm, start, l->lap_stripe, &stripe_start);
-        lov_stripe_offset(lsm, end, l->lap_stripe, &stripe_end);
-
-        RETURN(obd_reget_short_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
-                                    lsm_oinfo[l->lap_stripe]->loi_ost_idx]->
-                                    ltd_exp, NULL, &l->lap_sub_cookie,
-                                    rw, stripe_start, stripe_end, cookie));
-}
-
-/**
- * Releases a reference to a lock taken in a "fast" way.
- *
- * Releases a read or a write (specified by \a rw) lock
- * referenced by \a cookie.
- *
- * \param exp lov export
- * \param lsm striping information for the file
- * \param end end of the locked extent
- * \param rw OBD_BRW_READ if requested for reading,
- *           OBD_BRW_WRITE if requested for writing
- * \param cookie transparent parameter for passing locking context
- *
- * \post appropriate lock is dereferenced
- *
- * \see lov_reget_short_lock
- */
-static int lov_release_short_lock(struct obd_export *exp,
-                                  struct lov_stripe_md *lsm, obd_off end,
-                                  void *cookie, int rw)
-{
-        int stripe;
-
-        ENTRY;
-
-        stripe = lov_stripe_number(lsm, end);
-
-        RETURN(obd_release_short_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
-                                      lsm_oinfo[stripe]->loi_ost_idx]->
-                                      ltd_exp, NULL, end, cookie, rw));
-}
 
 struct obd_ops lov_obd_ops = {
         .o_owner               = THIS_MODULE,
         .o_setup               = lov_setup,
         .o_precleanup          = lov_precleanup,
         .o_cleanup             = lov_cleanup,
-        .o_process_config      = lov_process_config,
+        //.o_process_config      = lov_process_config,
         .o_connect             = lov_connect,
         .o_disconnect          = lov_disconnect,
         .o_statfs              = lov_statfs,
@@ -3308,21 +2743,11 @@ struct obd_ops lov_obd_ops = {
         .o_setattr             = lov_setattr,
         .o_setattr_async       = lov_setattr_async,
         .o_brw                 = lov_brw,
-        .o_brw_async           = lov_brw_async,
-        .o_prep_async_page     = lov_prep_async_page,
-        .o_reget_short_lock    = lov_reget_short_lock,
-        .o_release_short_lock  = lov_release_short_lock,
-        .o_queue_async_io      = lov_queue_async_io,
-        .o_set_async_flags     = lov_set_async_flags,
-        .o_queue_group_io      = lov_queue_group_io,
-        .o_trigger_group_io    = lov_trigger_group_io,
-        .o_teardown_async_page = lov_teardown_async_page,
         .o_merge_lvb           = lov_merge_lvb,
         .o_adjust_kms          = lov_adjust_kms,
         .o_punch               = lov_punch,
         .o_sync                = lov_sync,
         .o_enqueue             = lov_enqueue,
-        .o_match               = lov_match,
         .o_change_cbdata       = lov_change_cbdata,
         .o_cancel              = lov_cancel,
         .o_cancel_unused       = lov_cancel_unused,
@@ -3333,10 +2758,6 @@ struct obd_ops lov_obd_ops = {
         .o_llog_init           = lov_llog_init,
         .o_llog_finish         = lov_llog_finish,
         .o_notify              = lov_notify,
-        .o_register_page_removal_cb = lov_register_page_removal_cb,
-        .o_unregister_page_removal_cb = lov_unregister_page_removal_cb,
-        .o_register_lock_cancel_cb = lov_register_lock_cancel_cb,
-        .o_unregister_lock_cancel_cb = lov_unregister_lock_cancel_cb,
         .o_pool_new            = lov_pool_new,
         .o_pool_rem            = lov_pool_remove,
         .o_pool_add            = lov_pool_add,
@@ -3348,17 +2769,30 @@ extern quota_interface_t lov_quota_interface;
 
 cfs_mem_cache_t *lov_oinfo_slab;
 
+extern struct lu_kmem_descr lov_caches[];
+
 int __init lov_init(void)
 {
         struct lprocfs_static_vars lvars = { 0 };
         int rc, rc2;
         ENTRY;
 
+        /* print an address of _any_ initialized kernel symbol from this
+         * module, to allow debugging with gdb that doesn't support data
+         * symbols from modules.*/
+        CDEBUG(D_CONSOLE, "Lustre LOV module (%p).\n", &lov_caches);
+
+        rc = lu_kmem_init(lov_caches);
+        if (rc)
+                return rc;
+
         lov_oinfo_slab = cfs_mem_cache_create("lov_oinfo",
-                                              sizeof(struct lov_oinfo), 
+                                              sizeof(struct lov_oinfo),
                                               0, SLAB_HWCACHE_ALIGN);
-        if (lov_oinfo_slab == NULL)
+        if (lov_oinfo_slab == NULL) {
+                lu_kmem_fini(lov_caches);
                 return -ENOMEM;
+        }
         lprocfs_lov_init_vars(&lvars);
 
         request_module("lquota");
@@ -3366,12 +2800,14 @@ int __init lov_init(void)
         init_obd_quota_ops(quota_interface, &lov_obd_ops);
 
         rc = class_register_type(&lov_obd_ops, NULL, lvars.module_vars,
-                                 LUSTRE_LOV_NAME, NULL);
+                                 LUSTRE_LOV_NAME, &lov_device_type);
+
         if (rc) {
                 if (quota_interface)
                         PORTAL_SYMBOL_PUT(lov_quota_interface);
                 rc2 = cfs_mem_cache_destroy(lov_oinfo_slab);
                 LASSERT(rc2 == 0);
+                lu_kmem_fini(lov_caches);
         }
 
         RETURN(rc);
@@ -3381,7 +2817,10 @@ int __init lov_init(void)
 static void /*__exit*/ lov_exit(void)
 {
         int rc;
-        
+
+        lu_device_type_fini(&lov_device_type);
+        lu_kmem_fini(lov_caches);
+
         if (quota_interface)
                 PORTAL_SYMBOL_PUT(lov_quota_interface);