Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / lov / lov_obd.c
index a63d1a8..f293cfd 100644 (file)
@@ -74,7 +74,7 @@ void lov_putref(struct obd_device *obd)
         /* ok to dec to 0 more than once -- ltd_exp's will be null */
         if (atomic_dec_and_test(&lov->lov_refcount) && lov->lov_death_row) {
                 int i;
-                CDEBUG(D_CONFIG, "destroying %d lov targets\n", 
+                CDEBUG(D_CONFIG, "destroying %d lov targets\n",
                        lov->lov_death_row);
                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_reap)
@@ -88,7 +88,7 @@ void lov_putref(struct obd_device *obd)
 }
 
 #define MAX_STRING_SIZE 128
-static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate, 
+static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
                            struct obd_connect_data *data)
 {
         struct lov_obd *lov = &obd->u.lov;
@@ -113,7 +113,6 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
                 CERROR("Target %s not attached\n", obd_uuid2str(&tgt_uuid));
                 RETURN(-EINVAL);
         }
-        
         if (!tgt_obd->obd_set_up) {
                 CERROR("Target %s not set up\n", obd_uuid2str(&tgt_uuid));
                 RETURN(-EINVAL);
@@ -146,7 +145,7 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
                 RETURN(0);
         }
 
-        rc = obd_connect(&conn, tgt_obd, &lov_osc_uuid, data);
+        rc = obd_connect(NULL, &conn, tgt_obd, &lov_osc_uuid, data);
         if (rc) {
                 CERROR("Target %s connect error %d\n",
                        obd_uuid2str(&tgt_uuid), rc);
@@ -201,13 +200,14 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
 #endif
 
         rc = qos_add_tgt(obd, index);
-        if (rc) 
+        if (rc)
                 CERROR("qos_add_tgt failed %d\n", rc);
 
         RETURN(0);
 }
 
-static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
+static int lov_connect(const struct lu_env *env,
+                       struct lustre_handle *conn, struct obd_device *obd,
                        struct obd_uuid *cluuid, struct obd_connect_data *data)
 {
         struct lov_obd *lov = &obd->u.lov;
@@ -224,7 +224,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         /* Why should there ever be more than 1 connect? */
         lov->lov_connects++;
         LASSERT(lov->lov_connects == 1);
-        
+
         memset(&lov->lov_ocd, 0, sizeof(lov->lov_ocd));
         if (data)
                 lov->lov_ocd = *data;
@@ -235,16 +235,15 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
                 if (!tgt || obd_uuid_empty(&tgt->ltd_uuid))
                         continue;
                 /* Flags will be lowest common denominator */
-                rc = lov_connect_obd(obd, i, lov->lov_tgts[i]->ltd_activate,
-                                     &lov->lov_ocd);
+                rc = lov_connect_obd(obd, i, tgt->ltd_activate, &lov->lov_ocd);
                 if (rc) {
-                        CERROR("%s: lov connect tgt %d failed: %d\n", 
+                        CERROR("%s: lov connect tgt %d failed: %d\n",
                                obd->obd_name, i, rc);
                         continue;
                 }
         }
         lov_putref(obd);
-        
+
         RETURN(0);
 }
 
@@ -257,7 +256,7 @@ static int lov_disconnect_obd(struct obd_device *obd, __u32 index)
         int rc;
         ENTRY;
 
-        CDEBUG(D_CONFIG, "%s: disconnecting target %s\n", 
+        CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
                obd->obd_name, osc_obd->obd_name);
 
         if (lov->lov_tgts[index]->ltd_active) {
@@ -279,13 +278,14 @@ static int lov_disconnect_obd(struct obd_device *obd, __u32 index)
                 }
         }
 
-        if (obd->obd_no_recov) {
+        if (osc_obd) {
                 /* Pass it on to our clients.
                  * XXX This should be an argument to disconnect,
                  * XXX not a back-door flag on the OBD.  Ah well.
                  */
-                if (osc_obd)
-                        osc_obd->obd_no_recov = 1;
+                osc_obd->obd_force = obd->obd_force;
+                osc_obd->obd_fail = obd->obd_fail;
+                osc_obd->obd_no_recov = obd->obd_no_recov;
         }
 
         obd_register_observer(osc_obd, NULL);
@@ -303,7 +303,7 @@ static int lov_disconnect_obd(struct obd_device *obd, __u32 index)
         RETURN(0);
 }
 
-static int lov_del_target(struct obd_device *obd, __u32 index, 
+static int lov_del_target(struct obd_device *obd, __u32 index,
                           struct obd_uuid *uuidp, int gen);
 
 static int lov_disconnect(struct obd_export *exp)
@@ -405,7 +405,7 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
                 struct obd_uuid *uuid;
 
                 LASSERT(watched);
-                
+
                 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME)) {
                         CERROR("unexpected notification of %s %s!\n",
                                watched->obd_type->typ_name,
@@ -442,7 +442,7 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
                         rc = obd_notify_observer(obd, tgt_obd, ev, data);
                         if (rc) {
                                 CERROR("%s: notify %s of %s failed %d\n",
-                                       obd->obd_name, 
+                                       obd->obd_name,
                                        obd->obd_observer->obd_name,
                                        tgt_obd->obd_name, rc);
                                 break;
@@ -487,7 +487,7 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
                 __u32 newsize, oldsize = 0;
 
                 newsize = max(lov->lov_tgt_size, (__u32)2);
-                while (newsize < index + 1) 
+                while (newsize < index + 1)
                         newsize = newsize << 1;
                 OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
                 if (newtgts == NULL) {
@@ -496,7 +496,7 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
                 }
 
                 if (lov->lov_tgt_size) {
-                        memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) * 
+                        memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) *
                                lov->lov_tgt_size);
                         old = lov->lov_tgts;
                         oldsize = lov->lov_tgt_size;
@@ -512,7 +512,7 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
 
                 CDEBUG(D_CONFIG, "tgts: %p size: %d\n",
                        lov->lov_tgts, lov->lov_tgt_size);
-        }       
+        }
 
 
         OBD_ALLOC_PTR(tgt);
@@ -534,8 +534,8 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
 
         CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n",
                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
-        
-        if (lov->lov_connects == 0) { 
+
+        if (lov->lov_connects == 0) {
                 /* lov_connect hasn't been called yet. We'll do the
                    lov_connect_obd on this target when that fn first runs,
                    because we don't know the connect flags yet. */
@@ -548,13 +548,13 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
         if (rc)
                 GOTO(out, rc);
 
-        rc = lov_notify(obd, tgt->ltd_exp->exp_obd, 
+        rc = lov_notify(obd, tgt->ltd_exp->exp_obd,
                         active ? OBD_NOTIFY_ACTIVE : OBD_NOTIFY_INACTIVE,
                         (void *)&index);
 
 out:
         if (rc) {
-                CERROR("add failed (%d), deleting %s\n", rc, 
+                CERROR("add failed (%d), deleting %s\n", rc,
                        obd_uuid2str(&tgt->ltd_uuid));
                 lov_del_target(obd, index, 0, 0);
         }
@@ -563,7 +563,7 @@ out:
 }
 
 /* Schedule a target for deletion */
-static int lov_del_target(struct obd_device *obd, __u32 index, 
+static int lov_del_target(struct obd_device *obd, __u32 index,
                           struct obd_uuid *uuidp, int gen)
 {
         struct lov_obd *lov = &obd->u.lov;
@@ -593,7 +593,7 @@ static int lov_del_target(struct obd_device *obd, __u32 index,
 
         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
                lov_uuid2str(lov, index), index,
-               lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp, 
+               lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp,
                lov->lov_tgts[index]->ltd_active);
 
         lov->lov_tgts[index]->ltd_reap = 1;
@@ -618,7 +618,7 @@ static void __lov_del_obd(struct obd_device *obd, __u32 index)
         osc_obd = class_exp2obd(tgt->ltd_exp);
 
         CDEBUG(D_CONFIG, "Removing tgt %s : %s\n",
-               lov_uuid2str(lov, index), 
+               lov_uuid2str(lov, index),
                osc_obd ? osc_obd->obd_name : "<no obd>");
 
         if (tgt->ltd_exp)
@@ -629,17 +629,13 @@ static void __lov_del_obd(struct obd_device *obd, __u32 index)
          * shrink it. */
 
         lov->lov_tgts[index] = NULL;
-        OBD_FREE_PTR(tgt);        
+        OBD_FREE_PTR(tgt);
 
         /* Manual cleanup - no cleanup logs to clean up the osc's.  We must
            do it ourselves. And we can't do it from lov_cleanup,
            because we just lost our only reference to it. */
-        if (osc_obd) {
-                /* Use lov's force/fail flags. */
-                osc_obd->obd_force = obd->obd_force;
-                osc_obd->obd_fail = obd->obd_fail;
+        if (osc_obd)
                 class_manual_cleanup(osc_obd);
-        }
 }
 
 void lov_fix_desc(struct lov_desc *desc)
@@ -659,20 +655,19 @@ void lov_fix_desc(struct lov_desc *desc)
                 desc->ld_default_stripe_count = 1;
 
         /* from lov_setstripe */
-        if ((desc->ld_pattern != 0) && 
+        if ((desc->ld_pattern != 0) &&
             (desc->ld_pattern != LOV_PATTERN_RAID0)) {
                 LCONSOLE_WARN("Unknown stripe pattern: %#x\n",desc->ld_pattern);
                 desc->ld_pattern = 0;
         }
-        /* fix qos_maxage */
+
         if (desc->ld_qos_maxage == 0)
                 desc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
 }
 
-static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
+static int lov_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
         struct lprocfs_static_vars lvars;
-        struct lustre_cfg *lcfg = buf;
         struct lov_desc *desc;
         struct lov_obd *lov = &obd->u.lov;
         int count;
@@ -794,17 +789,17 @@ static int lov_cleanup(struct obd_device *obd)
                                            disconnect. */
                                         CERROR("lov tgt %d not cleaned!"
                                                " deathrow=%d, lovrc=%d\n",
-                                               i, lov->lov_death_row, 
+                                               i, lov->lov_death_row,
                                                atomic_read(&lov->lov_refcount));
                                 lov_del_target(obd, i, 0, 0);
                         }
                 }
-                OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) * 
+                OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) *
                          lov->lov_tgt_size);
                 lov->lov_tgt_size = 0;
         }
-        
-        if (lov->lov_qos.lq_rr_size) 
+
+        if (lov->lov_qos.lq_rr_size)
                 OBD_FREE(lov->lov_qos.lq_rr_array, lov->lov_qos.lq_rr_size);
 
         RETURN(0);
@@ -845,12 +840,12 @@ static int lov_process_config(struct obd_device *obd, obd_count len, void *buf)
         case LCFG_PARAM: {
                 struct lprocfs_static_vars lvars;
                 struct lov_desc *desc = &(obd->u.lov.desc);
-                
+
                 if (!desc)
                         GOTO(out, rc = -EINVAL);
-                
+
                 lprocfs_init_vars(lov, &lvars);
-                
+
                 rc = class_process_proc_param(PARAM_LOV, lvars.obd_vars,
                                               lcfg, obd);
                 GOTO(out, rc);
@@ -884,7 +879,7 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
 
         lov = &export->exp_obd->u.lov;
 
-        tmp_oa = obdo_alloc();
+        OBDO_ALLOC(tmp_oa);
         if (tmp_oa == NULL)
                 RETURN(-ENOMEM);
 
@@ -915,14 +910,14 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &tgt->ltd_uuid))
                         continue;
 
-                CDEBUG(D_CONFIG,"Clear orphans for %d:%s\n", i, 
+                CDEBUG(D_CONFIG,"Clear orphans for %d:%s\n", i,
                        obd_uuid2str(ost_uuid));
 
                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
 
                 LASSERT(lov->lov_tgts[i]->ltd_exp);
                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
-                err = obd_create(lov->lov_tgts[i]->ltd_exp, 
+                err = obd_create(lov->lov_tgts[i]->ltd_exp,
                                  tmp_oa, &obj_mdp, oti);
                 if (err)
                         /* This export will be disabled until it is recovered,
@@ -935,7 +930,7 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
         }
         lov_putref(export->exp_obd);
 
-        obdo_free(tmp_oa);
+        OBDO_FREE(tmp_oa);
         RETURN(rc);
 }
 
@@ -964,8 +959,8 @@ static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
                 GOTO(out, rc = -EINVAL);
 
         for (i = 0; i < lsm->lsm_stripe_count; i++) {
-                if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) {
-                        if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id)
+                if (lsm->lsm_oinfo[i]->loi_ost_idx == ost_idx) {
+                        if (lsm->lsm_oinfo[i]->loi_id != src_oa->o_id)
                                 GOTO(out, rc = -EINVAL);
                         break;
                 }
@@ -1118,7 +1113,7 @@ static int lov_getattr(struct obd_export *exp, struct obd_info *oinfo)
                 req = list_entry(pos, struct lov_request, rq_link);
 
                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
-                       "%u\n", oinfo->oi_oa->o_id, req->rq_stripe, 
+                       "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
                        req->rq_oi.oi_oa->o_id, req->rq_idx);
 
                 rc = obd_getattr(lov->lov_tgts[req->rq_idx]->ltd_exp,
@@ -1139,7 +1134,7 @@ static int lov_getattr(struct obd_export *exp, struct obd_info *oinfo)
         RETURN(rc);
 }
 
-static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, 
+static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
                                  void *data, int rc)
 {
         struct lov_request_set *lovset = (struct lov_request_set *)data;
@@ -1176,14 +1171,14 @@ static int lov_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
                 RETURN(rc);
 
         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
-               oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count, 
+               oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count,
                oinfo->oi_md->lsm_stripe_size);
 
         list_for_each (pos, &lovset->set_list) {
                 req = list_entry(pos, struct lov_request, rq_link);
 
                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
-                       "%u\n", oinfo->oi_oa->o_id, req->rq_stripe, 
+                       "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
                        req->rq_oi.oi_oa->o_id, req->rq_idx);
                 rc = obd_getattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
                                        &req->rq_oi, rqset);
@@ -1227,11 +1222,11 @@ static int lov_setattr(struct obd_export *exp, struct obd_info *oinfo,
                 RETURN(-ENODEV);
 
         /* for now, we only expect the following updates here */
-        LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE | 
-                                            OBD_MD_FLMODE | OBD_MD_FLATIME | 
+        LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE |
+                                            OBD_MD_FLMODE | OBD_MD_FLATIME |
                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME |
-                                            OBD_MD_FLFLAGS | OBD_MD_FLSIZE | 
-                                            OBD_MD_FLGROUP | OBD_MD_FLUID | 
+                                            OBD_MD_FLFLAGS | OBD_MD_FLSIZE |
+                                            OBD_MD_FLGROUP | OBD_MD_FLUID |
                                             OBD_MD_FLGID | OBD_MD_FLINLINE |
                                             OBD_MD_FLFID | OBD_MD_FLGENER)));
         lov = &exp->exp_obd->u.lov;
@@ -1242,13 +1237,13 @@ static int lov_setattr(struct obd_export *exp, struct obd_info *oinfo,
         list_for_each (pos, &set->set_list) {
                 req = list_entry(pos, struct lov_request, rq_link);
 
-                rc = obd_setattr(lov->lov_tgts[req->rq_idx]->ltd_exp, 
+                rc = obd_setattr(lov->lov_tgts[req->rq_idx]->ltd_exp,
                                  &req->rq_oi, NULL);
                 err = lov_update_setattr_set(set, req, rc);
                 if (err) {
                         CERROR("error: setattr objid "LPX64" subobj "
                                LPX64" on OST idx %d: rc = %d\n",
-                               set->set_oi->oi_oa->o_id, 
+                               set->set_oi->oi_oa->o_id,
                                req->rq_oi.oi_oa->o_id, req->rq_idx, err);
                         if (!rc)
                                 rc = err;
@@ -1409,7 +1404,8 @@ static int lov_punch(struct obd_export *exp, struct obd_info *oinfo,
 }
 
 static int lov_sync(struct obd_export *exp, struct obdo *oa,
-                    struct lov_stripe_md *lsm, obd_off start, obd_off end)
+                    struct lov_stripe_md *lsm, obd_off start, obd_off end,
+                    void *capa)
 {
         struct lov_request_set *set;
         struct obd_info oinfo;
@@ -1432,10 +1428,10 @@ static int lov_sync(struct obd_export *exp, struct obdo *oa,
         list_for_each (pos, &set->set_list) {
                 req = list_entry(pos, struct lov_request, rq_link);
 
-                rc = obd_sync(lov->lov_tgts[req->rq_idx]->ltd_exp, 
-                              req->rq_oi.oi_oa, NULL, 
+                rc = obd_sync(lov->lov_tgts[req->rq_idx]->ltd_exp,
+                              req->rq_oi.oi_oa, NULL,
                               req->rq_oi.oi_policy.l_extent.start,
-                              req->rq_oi.oi_policy.l_extent.end);
+                              req->rq_oi.oi_policy.l_extent.end, capa);
                 err = lov_update_common_set(set, req, rc);
                 if (err) {
                         CERROR("error: fsync objid "LPX64" subobj "LPX64
@@ -1464,7 +1460,7 @@ static int lov_brw_check(struct lov_obd *lov, struct obd_info *lov_oinfo,
          * I/O can succeed */
         for (i = 0; i < oa_bufs; i++) {
                 int stripe = lov_stripe_number(lov_oinfo->oi_md, pga[i].off);
-                int ost = lov_oinfo->oi_md->lsm_oinfo[stripe].loi_ost_idx;
+                int ost = lov_oinfo->oi_md->lsm_oinfo[stripe]->loi_ost_idx;
                 obd_off start, end;
 
                 if (!lov_stripe_intersects(lov_oinfo->oi_md, i, pga[i].off,
@@ -1613,6 +1609,8 @@ static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
         lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa);
         /* XXX woah, shouldn't we be altering more here?  size? */
         oa->o_id = lap->lap_loi_id;
+        oa->o_gr = lap->lap_loi_gr;
+        oa->o_valid |= OBD_MD_FLGROUP;
         oa->o_stripe_idx = lap->lap_stripe;
 }
 
@@ -1635,12 +1633,19 @@ static int lov_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
         return rc;
 }
 
+static struct obd_capa *lov_ap_lookup_capa(void *data, int cmd)
+{
+        struct lov_async_page *lap = LAP_FROM_COOKIE(data);
+        return lap->lap_caller_ops->ap_lookup_capa(lap->lap_caller_data, cmd);
+}
+
 static struct obd_async_page_ops lov_async_page_ops = {
         .ap_make_ready =        lov_ap_make_ready,
         .ap_refresh_count =     lov_ap_refresh_count,
         .ap_fill_obdo =         lov_ap_fill_obdo,
         .ap_update_obdo =       lov_ap_update_obdo,
         .ap_completion =        lov_ap_completion,
+        .ap_lookup_capa =       lov_ap_lookup_capa,
 };
 
 int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
@@ -1656,12 +1661,12 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
         if (!page) {
                 int i = 0;
                 /* Find an existing osc so we can get it's stupid sizeof(*oap).
-                   Only because of this layering limitation will a client 
+                   Only because of this layering limitation will a client
                    mount with no osts fail */
-                while (!lov->lov_tgts || !lov->lov_tgts[i] || 
+                while (!lov->lov_tgts || !lov->lov_tgts[i] ||
                        !lov->lov_tgts[i]->ltd_exp) {
                         i++;
-                        if (i >= lov->desc.ld_tgt_count) 
+                        if (i >= lov->desc.ld_tgt_count)
                                 RETURN(-ENOMEDIUM);
                 }
                 rc = size_round(sizeof(*lap)) +
@@ -1680,11 +1685,13 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
         /* for now only raid 0 which passes through */
         lap->lap_stripe = lov_stripe_number(lsm, offset);
         lov_stripe_offset(lsm, offset, lap->lap_stripe, &lap->lap_sub_offset);
-        loi = &lsm->lsm_oinfo[lap->lap_stripe];
+        loi = lsm->lsm_oinfo[lap->lap_stripe];
 
         /* so the callback doesn't need the lsm */
         lap->lap_loi_id = loi->loi_id;
-
+        lap->lap_loi_gr = lsm->lsm_object_gr;
+        LASSERT(lsm->lsm_object_gr > 0);
+        
         lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap));
 
         rc = obd_prep_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
@@ -1714,7 +1721,7 @@ static int lov_queue_async_io(struct obd_export *exp,
 
         lap = LAP_FROM_COOKIE(cookie);
 
-        loi = &lsm->lsm_oinfo[lap->lap_stripe];
+        loi = lsm->lsm_oinfo[lap->lap_stripe];
 
         rc = obd_queue_async_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp, lsm,
                                 loi, lap->lap_sub_cookie, cmd, off, count,
@@ -1737,7 +1744,7 @@ static int lov_set_async_flags(struct obd_export *exp,
 
         lap = LAP_FROM_COOKIE(cookie);
 
-        loi = &lsm->lsm_oinfo[lap->lap_stripe];
+        loi = lsm->lsm_oinfo[lap->lap_stripe];
 
         rc = obd_set_async_flags(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
                                  lsm, loi, lap->lap_sub_cookie, async_flags);
@@ -1761,7 +1768,7 @@ static int lov_queue_group_io(struct obd_export *exp,
 
         lap = LAP_FROM_COOKIE(cookie);
 
-        loi = &lsm->lsm_oinfo[lap->lap_stripe];
+        loi = lsm->lsm_oinfo[lap->lap_stripe];
 
         rc = obd_queue_group_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp, lsm,
                                 loi, oig, lap->lap_sub_cookie, cmd, off, count,
@@ -1784,9 +1791,9 @@ static int lov_trigger_group_io(struct obd_export *exp,
 
         ASSERT_LSM_MAGIC(lsm);
 
-        loi = lsm->lsm_oinfo;
-        for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
-                if (!lov->lov_tgts[loi->loi_ost_idx] || 
+        for (i = 0; i < lsm->lsm_stripe_count; i++) {
+                loi = lsm->lsm_oinfo[i];
+                if (!lov->lov_tgts[loi->loi_ost_idx] ||
                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
@@ -1814,7 +1821,7 @@ static int lov_teardown_async_page(struct obd_export *exp,
 
         lap = LAP_FROM_COOKIE(cookie);
 
-        loi = &lsm->lsm_oinfo[lap->lap_stripe];
+        loi = lsm->lsm_oinfo[lap->lap_stripe];
 
         rc = obd_teardown_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
                                      lsm, loi, lap->lap_sub_cookie);
@@ -1831,13 +1838,15 @@ static int lov_enqueue_interpret(struct ptlrpc_request_set *rqset,
 {
         struct lov_request_set *lovset = (struct lov_request_set *)data;
         ENTRY;
-        rc = lov_fini_enqueue_set(lovset, lovset->set_ei->ei_mode, rc);
+        rc = lov_fini_enqueue_set(lovset, lovset->set_ei->ei_mode, rc, rqset);
         RETURN(rc);
 }
 
 static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
-                       struct obd_enqueue_info *einfo)
+                       struct ldlm_enqueue_info *einfo,
+                       struct ptlrpc_request_set *rqset)
 {
+        ldlm_mode_t mode = einfo->ei_mode;
         struct lov_request_set *set;
         struct lov_request *req;
         struct list_head *pos;
@@ -1847,9 +1856,10 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
 
         LASSERT(oinfo);
         ASSERT_LSM_MAGIC(oinfo->oi_md);
+        LASSERT(mode == (mode & -mode));
 
         /* we should never be asked to replay a lock this way. */
-        LASSERT((einfo->ei_flags & LDLM_FL_REPLAY) == 0);
+        LASSERT((oinfo->oi_flags & LDLM_FL_REPLAY) == 0);
 
         if (!exp || !exp->exp_obd)
                 RETURN(-ENODEV);
@@ -1863,20 +1873,20 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 req = list_entry(pos, struct lov_request, rq_link);
 
                 rc = obd_enqueue(lov->lov_tgts[req->rq_idx]->ltd_exp,
-                                 &req->rq_oi, einfo);
+                                 &req->rq_oi, einfo, rqset);
                 if (rc != ELDLM_OK)
                         GOTO(out, rc);
         }
 
-        if (einfo->ei_rqset && !list_empty(&einfo->ei_rqset->set_requests)) {
+        if (rqset && !list_empty(&rqset->set_requests)) {
                 LASSERT(rc == 0);
-                LASSERT(einfo->ei_rqset->set_interpret == NULL);
-                einfo->ei_rqset->set_interpret = lov_enqueue_interpret;
-                einfo->ei_rqset->set_arg = (void *)set;
+                LASSERT(rqset->set_interpret == NULL);
+                rqset->set_interpret = lov_enqueue_interpret;
+                rqset->set_arg = (void *)set;
                 RETURN(rc);
         }
 out:
-        rc = lov_fini_enqueue_set(set, einfo->ei_mode, rc);
+        rc = lov_fini_enqueue_set(set, mode, rc, rqset);
         RETURN(rc);
 }
 
@@ -1894,6 +1904,7 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
         ENTRY;
 
         ASSERT_LSM_MAGIC(lsm);
+        LASSERT((*flags & LDLM_FL_TEST_LOCK) || mode == (mode & -mode));
 
         if (!exp || !exp->exp_obd)
                 RETURN(-ENODEV);
@@ -1916,7 +1927,7 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
                                req->rq_oi.oi_md, type, &sub_policy,
                                mode, &lov_flags, data, lov_lockhp);
                 rc = lov_update_match_set(set, req, rc);
-                if (rc != 1)
+                if (rc <= 0)
                         break;
         }
         lov_fini_match_set(set, mode, *flags);
@@ -1928,7 +1939,6 @@ static int lov_change_cbdata(struct obd_export *exp,
                              void *data)
 {
         struct lov_obd *lov;
-        struct lov_oinfo *loi;
         int rc = 0, i;
         ENTRY;
 
@@ -1937,11 +1947,20 @@ static int lov_change_cbdata(struct obd_export *exp,
         if (!exp || !exp->exp_obd)
                 RETURN(-ENODEV);
 
+        LASSERT(lsm->lsm_object_gr > 0);
+
         lov = &exp->exp_obd->u.lov;
-        for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
+        for (i = 0; i < lsm->lsm_stripe_count; i++) {
                 struct lov_stripe_md submd;
+                struct lov_oinfo *loi = lsm->lsm_oinfo[i];
 
+                if (!lov->lov_tgts[loi->loi_ost_idx]) {
+                        CDEBUG(D_HA, "lov idx %d NULL \n", loi->loi_ost_idx);
+                        continue;
+                }
+                
                 submd.lsm_object_id = loi->loi_id;
+                submd.lsm_object_gr = lsm->lsm_object_gr;
                 submd.lsm_stripe_count = 0;
                 rc = obd_change_cbdata(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
                                        &submd, it, data);
@@ -1966,6 +1985,7 @@ static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
         if (!exp || !exp->exp_obd)
                 RETURN(-ENODEV);
 
+        LASSERT(lsm->lsm_object_gr > 0);
         LASSERT(lockh);
         lov = &exp->exp_obd->u.lov;
         rc = lov_prep_cancel_set(exp, &oinfo, lsm, mode, lockh, &set);
@@ -1994,10 +2014,10 @@ static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
 }
 
 static int lov_cancel_unused(struct obd_export *exp,
-                             struct lov_stripe_md *lsm, int flags, void *opaque)
+                             struct lov_stripe_md *lsm,
+                             int flags, void *opaque)
 {
         struct lov_obd *lov;
-        struct lov_oinfo *loi;
         int rc = 0, i;
         ENTRY;
 
@@ -2021,15 +2041,22 @@ static int lov_cancel_unused(struct obd_export *exp,
 
         ASSERT_LSM_MAGIC(lsm);
 
-        for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
+        LASSERT(lsm->lsm_object_gr > 0);
+        for (i = 0; i < lsm->lsm_stripe_count; i++) {
                 struct lov_stripe_md submd;
+                struct lov_oinfo *loi = lsm->lsm_oinfo[i];
                 int err;
 
-                if (!lov->lov_tgts[loi->loi_ost_idx] || 
-                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active)
+                if (!lov->lov_tgts[loi->loi_ost_idx]) {
+                        CDEBUG(D_HA, "lov idx %d NULL\n", loi->loi_ost_idx);
+                        continue;
+                }
+
+                if (!lov->lov_tgts[loi->loi_ost_idx]->ltd_active)
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
 
                 submd.lsm_object_id = loi->loi_id;
+                submd.lsm_object_gr = lsm->lsm_object_gr;
                 submd.lsm_stripe_count = 0;
                 err = obd_cancel_unused(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
                                         &submd, flags, opaque);
@@ -2048,7 +2075,6 @@ static int lov_join_lru(struct obd_export *exp,
                         struct lov_stripe_md *lsm, int join)
 {
         struct lov_obd *lov;
-        struct lov_oinfo *loi;
         int i, count = 0;
         ENTRY;
 
@@ -2057,15 +2083,21 @@ static int lov_join_lru(struct obd_export *exp,
                 RETURN(-ENODEV);
 
         lov = &exp->exp_obd->u.lov;
-        for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
+        for (i = 0; i < lsm->lsm_stripe_count; i++) {
                 struct lov_stripe_md submd;
+                struct lov_oinfo *loi = lsm->lsm_oinfo[i];
                 int rc = 0;
 
-                if (!lov->lov_tgts[loi->loi_ost_idx] || 
-                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active)
+                if (!lov->lov_tgts[loi->loi_ost_idx]) {
+                        CDEBUG(D_HA, "lov idx %d NULL\n", loi->loi_ost_idx);
+                        continue;
+                }
+
+                if (!lov->lov_tgts[loi->loi_ost_idx]->ltd_active)
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
 
                 submd.lsm_object_id = loi->loi_id;
+                submd.lsm_object_gr = lsm->lsm_object_gr;
                 submd.lsm_stripe_count = 0;
                 rc = obd_join_lru(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
                                   &submd, join);
@@ -2104,7 +2136,7 @@ static int lov_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
         struct lov_obd *lov;
         int rc = 0;
         ENTRY;
-
+        
         LASSERT(oinfo != NULL);
         LASSERT(oinfo->oi_osfs != NULL);
 
@@ -2155,7 +2187,7 @@ static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                         continue;
                 }
 
-                err = obd_statfs(class_exp2obd(lov->lov_tgts[i]->ltd_exp), 
+                err = obd_statfs(class_exp2obd(lov->lov_tgts[i]->ltd_exp),
                                  &lov_sfs, max_age);
                 if (err) {
                         if (lov->lov_tgts[i]->ltd_active && !rc)
@@ -2217,7 +2249,7 @@ static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 genp = (__u32 *)data->ioc_inlbuf3;
                 /* the uuid will be empty for deleted OSTs */
                 for (i = 0; i < count; i++, uuidp++, genp++) {
-                        if (!lov->lov_tgts[i]) 
+                        if (!lov->lov_tgts[i])
                                 continue;
                         *uuidp = lov->lov_tgts[i]->ltd_uuid;
                         *genp = lov->lov_tgts[i]->ltd_gen;
@@ -2312,15 +2344,14 @@ static int lov_get_info(struct obd_export *exp, __u32 keylen,
                 /* XXX - it's assumed all the locks for deleted OSTs have
                  * been cancelled. Also, the export for deleted OSTs will
                  * be NULL and won't match the lock's export. */
-                for (i = 0, loi = data->lsm->lsm_oinfo;
-                     i < data->lsm->lsm_stripe_count;
-                     i++, loi++) {
+                for (i = 0; i < data->lsm->lsm_stripe_count; i++) {
+                        loi = data->lsm->lsm_oinfo[i];
                         if (!lov->lov_tgts[loi->loi_ost_idx])
                                 continue;
                         if (lov->lov_tgts[loi->loi_ost_idx]->ltd_exp ==
                             data->lock->l_conn_export &&
                             loi->loi_id == res_id->name[0] &&
-                            loi->loi_gr == res_id->name[1]) {
+                            loi->loi_gr == res_id->name[2]) {
                                 *stripe = i;
                                 GOTO(out, rc = 0);
                         }
@@ -2359,8 +2390,9 @@ static int lov_set_info_async(struct obd_export *exp, obd_count keylen,
 {
         struct obd_device *obddev = class_exp2obd(exp);
         struct lov_obd *lov = &obddev->u.lov;
-        int i, rc = 0, err, incr = 0, check_uuid = 0, do_inactive = 0;
+        int i, rc = 0, err;
         int no_set = !set;
+        int incr = 0, check_uuid = 0, do_inactive = 0;
         ENTRY;
 
         if (no_set) {
@@ -2377,12 +2409,10 @@ static int lov_set_info_async(struct obd_export *exp, obd_count keylen,
                 do_inactive = 1;
         } else if (KEY_IS("checksum")) {
                 do_inactive = 1;
-        } else if (KEY_IS("mds_conn") || KEY_IS("unlinked")) {
+        } else if (KEY_IS("unlinked")) {
                 check_uuid = val ? 1 : 0;
         } else if (KEY_IS("evict_by_nid")) {
-                /* use defaults:
-                do_inactive = incr = 0;
-                 */
+                /* use defaults:  do_inactive = incr = 0; */
         }
 
         lov_getref(obddev);
@@ -2396,16 +2426,34 @@ static int lov_set_info_async(struct obd_export *exp, obd_count keylen,
                 if (!lov->lov_tgts[i]->ltd_active && !do_inactive)
                         continue;
 
-                /* Only want a specific OSC */
-                if (check_uuid &&
-                    !obd_uuid_equals(val, &lov->lov_tgts[i]->ltd_uuid))
-                        continue;
+                if (KEY_IS(KEY_MDS_CONN)) {
+                        struct mds_group_info *mgi;
 
-                err = obd_set_info_async(lov->lov_tgts[i]->ltd_exp,
+                        LASSERT(vallen == sizeof(*mgi));
+                        mgi = (struct mds_group_info *)val;
+                        
+                        /* Only want a specific OSC */
+                        if (mgi->uuid && !obd_uuid_equals(mgi->uuid,
+                                                &lov->lov_tgts[i]->ltd_uuid))
+                                continue;
+
+                        err = obd_set_info_async(lov->lov_tgts[i]->ltd_exp,
+                                         keylen, key, sizeof(int), 
+                                         &mgi->group, set);
+                } else {
+                        /* Only want a specific OSC */
+                        if (check_uuid &&
+                            !obd_uuid_equals(val, &lov->lov_tgts[i]->ltd_uuid))
+                                continue;
+                
+                        err = obd_set_info_async(lov->lov_tgts[i]->ltd_exp,
                                          keylen, key, vallen, val, set);
+                }
+                
                 if (!rc)
                         rc = err;
         }
+
         lov_putref(obddev);
         if (no_set) {
                 err = ptlrpc_set_wait(set);
@@ -2433,12 +2481,11 @@ static int lov_checkmd(struct obd_export *exp, struct obd_export *md_exp,
 
 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm)
 {
-        struct lov_oinfo *loi;
         int i, rc = 0;
         ENTRY;
 
-        for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
-             i++, loi++) {
+        for (i = 0; i < lsm->lsm_stripe_count; i++) {
+                struct lov_oinfo *loi = lsm->lsm_oinfo[i];
                 if (loi->loi_ar.ar_rc && !rc)
                         rc = loi->loi_ar.ar_rc;
                 loi->loi_ar.ar_rc = 0;
@@ -2451,15 +2498,16 @@ EXPORT_SYMBOL(lov_test_and_clear_async_rc);
 static int lov_extent_calc(struct obd_export *exp, struct lov_stripe_md *lsm,
                            int cmd, __u64 *offset)
 {
+        __u32 ssize = lsm->lsm_stripe_size;
         __u64 start;
-        __u32 ssize  = lsm->lsm_stripe_size;
 
         start = *offset;
         do_div(start, ssize);
         start = start * ssize;
 
         CDEBUG(D_DLMTRACE, "offset "LPU64", stripe %u, start "LPU64
-               ", end "LPU64"\n", *offset, ssize, start, start + ssize - 1);
+                           ", end "LPU64"\n", *offset, ssize, start,
+                           start + ssize - 1);
         if (cmd == OBD_CALC_STRIPE_END) {
                 *offset = start + ssize - 1;
         } else if (cmd == OBD_CALC_STRIPE_START) {
@@ -2626,22 +2674,33 @@ struct obd_ops lov_obd_ops = {
 static quota_interface_t *quota_interface;
 extern quota_interface_t lov_quota_interface;
 
+cfs_mem_cache_t *lov_oinfo_slab;
+
 int __init lov_init(void)
 {
         struct lprocfs_static_vars lvars;
-        int rc;
+        int rc, rc2;
         ENTRY;
 
+        lov_oinfo_slab = cfs_mem_cache_create("lov_oinfo",
+                                              sizeof(struct lov_oinfo), 
+                                              0, SLAB_HWCACHE_ALIGN);
+        if (lov_oinfo_slab == NULL)
+                return -ENOMEM;
         lprocfs_init_vars(lov, &lvars);
 
         request_module("lquota");
         quota_interface = PORTAL_SYMBOL_GET(lov_quota_interface);
         init_obd_quota_ops(quota_interface, &lov_obd_ops);
 
-        rc = class_register_type(&lov_obd_ops, lvars.module_vars,
-                                 LUSTRE_LOV_NAME);
-        if (rc && quota_interface)
-                PORTAL_SYMBOL_PUT(lov_quota_interface);
+        rc = class_register_type(&lov_obd_ops, NULL, lvars.module_vars,
+                                 LUSTRE_LOV_NAME, NULL);
+        if (rc) {
+                if (quota_interface)
+                        PORTAL_SYMBOL_PUT(lov_quota_interface);
+                rc2 = cfs_mem_cache_destroy(lov_oinfo_slab);
+                LASSERT(rc2 == 0);
+        }
 
         RETURN(rc);
 }
@@ -2649,10 +2708,14 @@ int __init lov_init(void)
 #ifdef __KERNEL__
 static void /*__exit*/ lov_exit(void)
 {
+        int rc;
+        
         if (quota_interface)
                 PORTAL_SYMBOL_PUT(lov_quota_interface);
 
         class_unregister_type(LUSTRE_LOV_NAME);
+        rc = cfs_mem_cache_destroy(lov_oinfo_slab);
+        LASSERT(rc == 0);
 }
 
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");