Whamcloud - gitweb
1)merge filter_group support from HEAD to cmd_new
authorwangdi <wangdi>
Thu, 7 Sep 2006 06:41:05 +0000 (06:41 +0000)
committerwangdi <wangdi>
Thu, 7 Sep 2006 06:41:05 +0000 (06:41 +0000)
2)some fixes after this merge

20 files changed:
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_export.h
lustre/include/obd.h
lustre/llite/file.c
lustre/llite/llite_lib.c
lustre/llite/namei.c
lustre/llite/rw.c
lustre/lov/lov_log.c
lustre/lov/lov_obd.c
lustre/lov/lov_qos.c
lustre/lov/lov_request.c
lustre/mdd/mdd_lov.c
lustre/mds/mds_open.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_log.c
lustre/obdfilter/filter_lvb.c
lustre/obdfilter/lproc_obdfilter.c
lustre/osc/osc_create.c
lustre/osc/osc_request.c

index b297be5..c4f7eff 100644 (file)
@@ -487,6 +487,7 @@ struct obd_connect_data {
         __u32 ocd_version;              /* lustre release version number */
         __u32 ocd_grant;                /* initial cache grant amount (bytes) */
         __u32 ocd_index;                /* LOV index to connect to */
+        __u32 ocd_group;                /* MDS group on OST */
         __u32 ocd_brw_size;             /* Maximum BRW size in bytes */
         __u64 ocd_ibits_known;          /* inode bits this client understands */
         __u32 ocd_nllu;                 /* non-local-lustre-user */
index 511ba12..ca5cabb 100644 (file)
@@ -63,6 +63,7 @@ struct filter_export_data {
         struct list_head           fed_mod_list; /* files being modified */
         int                        fed_mod_count;/* items in fed_writing list */
         long                       fed_pending;  /* bytes just being written */
+        __u32                      fed_group;
 };
 
 struct obd_export {
index d89889c..6192e9a 100644 (file)
@@ -264,8 +264,16 @@ struct obd_device_target {
         struct lustre_quota_ctxt  obt_qctxt;
 };
 
+#define FILTER_SUBDIR_COUNT      32            /* set to zero for no subdirs */
+
 #define FILTER_GROUP_LLOG 1
 #define FILTER_GROUP_ECHO 2
+#define FILTER_GROUP_MDS0 3
+
+struct filter_subdirs {
+       cfs_dentry_t *dentry[FILTER_SUBDIR_COUNT];
+};
+
 
 struct filter_ext {
         __u64                fe_start;
@@ -277,9 +285,15 @@ struct filter_obd {
         struct obd_device_target fo_obt;
         const char          *fo_fstype;
         struct vfsmount     *fo_vfsmnt;
+
+        int                  fo_group_count;
         cfs_dentry_t        *fo_dentry_O;
         cfs_dentry_t       **fo_dentry_O_groups;
-        cfs_dentry_t       **fo_dentry_O_sub;
+        struct filter_subdirs   *fo_dentry_O_sub;
+        struct semaphore     fo_init_lock;      /* group initialization lock */
+        int                  fo_committed_group;
+
+        
         spinlock_t           fo_objidlock;      /* protect fo_lastobjid */
         spinlock_t           fo_translock;      /* protect fsd_last_transno */
         struct file         *fo_rcvd_filp;
@@ -485,6 +499,7 @@ struct mds_obd {
         char                            *mds_profile;
         struct obd_export               *mds_osc_exp; /* XXX lov_exp */
         struct lov_desc                  mds_lov_desc;
+        __u32                            mds_id;
         obd_id                          *mds_lov_objids;
         int                              mds_lov_objids_size;
         __u32                            mds_lov_objids_in_file;
index 171210c..5d682fa 100644 (file)
@@ -562,7 +562,7 @@ int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
         oa->o_mode = S_IFREG;
         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
                 OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
-                OBD_MD_FLCTIME;
+                OBD_MD_FLCTIME | OBD_MD_FLGROUP;
 
         set = ptlrpc_prep_set();
         if (set == NULL) {
@@ -623,7 +623,7 @@ static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
 
 check:
         if (lsm->lsm_oinfo[stripe].loi_id != lock->l_resource->lr_name.name[0]||
-            lsm->lsm_oinfo[stripe].loi_gr != lock->l_resource->lr_name.name[1]){
+            lsm->lsm_oinfo[stripe].loi_gr != lock->l_resource->lr_name.name[2]){
                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
                            lsm->lsm_oinfo[stripe].loi_id,
                            lsm->lsm_oinfo[stripe].loi_gr);
@@ -2114,9 +2114,10 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data)
                         RETURN(rc ? rc : -ENOMEM);
 
                 oa->o_id = lsm->lsm_object_id;
-                oa->o_valid = OBD_MD_FLID;
+                oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
-                                           OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+                                           OBD_MD_FLMTIME | OBD_MD_FLCTIME |
+                                           OBD_MD_FLGROUP);
 
                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
                                0, OBD_OBJECT_EOF);
index 3d24735..09a9120 100644 (file)
@@ -1380,11 +1380,13 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
 
                 if (oa) {
                         oa->o_id = lsm->lsm_object_id;
-                        oa->o_valid = OBD_MD_FLID;
+                        oa->o_gr = lsm->lsm_object_gr;
+                        oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
 
                         flags = OBD_MD_FLTYPE | OBD_MD_FLATIME |
                                 OBD_MD_FLMTIME | OBD_MD_FLCTIME |
-                                OBD_MD_FLFID | OBD_MD_FLGENER;
+                                OBD_MD_FLFID | OBD_MD_FLGENER | 
+                                OBD_MD_FLGROUP;
 
                         obdo_from_inode(oa, inode, flags);
 
@@ -1807,8 +1809,10 @@ int ll_iocontrol(struct inode *inode, struct file *file,
                 }
 
                 oinfo.oi_oa->o_id = lsm->lsm_object_id;
+                oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
                 oinfo.oi_oa->o_flags = flags;
-                oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
+                oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | 
+                                       OBD_MD_FLGROUP;
 
                 obdo_from_inode(oinfo.oi_oa, inode,
                                 OBD_MD_FLFID | OBD_MD_FLGENER);
index 4a111b8..7ecf181 100644 (file)
@@ -505,6 +505,7 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
                 RETURN(ERR_PTR(-ENOMEM));
 
         /* prepare operatoin hint first */
+
         ll_prepare_md_op_data(op_data, parent, NULL, dentry->d_name.name,
                               dentry->d_name.len, lookup_flags);
 
@@ -1114,8 +1115,9 @@ int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir)
                 GOTO(out_free_memmd, rc = -ENOMEM);
 
         oa->o_id = lsm->lsm_object_id;
+        oa->o_gr = lsm->lsm_object_gr;
         oa->o_mode = body->mode & S_IFMT;
-        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
 
         if (body->valid & OBD_MD_FLCOOKIE) {
                 oa->o_valid |= OBD_MD_FLCOOKIE;
index 57c8a45..8ad18e5 100644 (file)
@@ -174,7 +174,7 @@ void ll_truncate(struct inode *inode)
         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
         oinfo.oi_oa = &oa;
         oa.o_id = lli->lli_smd->lsm_object_id;
-        oa.o_valid = OBD_MD_FLID;
+        oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
 
         obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
                         OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME |
@@ -220,7 +220,9 @@ int ll_prepare_write(struct file *file, struct page *page, unsigned from,
 
         oa.o_mode = inode->i_mode;
         oa.o_id = lsm->lsm_object_id;
-        oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE;
+        oa.o_gr = lsm->lsm_object_gr;
+        oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | 
+                     OBD_MD_FLTYPE | OBD_MD_FLGROUP;
         obdo_from_inode(&oa, inode, OBD_MD_FLFID | OBD_MD_FLGENER);
 
         oinfo.oi_oa = &oa;
@@ -363,7 +365,8 @@ void ll_inode_fill_obdo(struct inode *inode, int cmd, struct obdo *oa)
         lsm = ll_i2info(inode)->lli_smd;
 
         oa->o_id = lsm->lsm_object_id;
-        oa->o_valid = OBD_MD_FLID;
+        oa->o_gr = lsm->lsm_object_gr;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
         if (cmd & OBD_BRW_WRITE) {
                 oa->o_valid |= OBD_MD_FLEPOCH;
index f7b41f7..5b07c30 100644 (file)
@@ -89,7 +89,7 @@ static int lov_llog_origin_add(struct llog_ctxt *ctxt,
                 default:
                         break;
                 }
-
+                LASSERT(lsm->lsm_object_gr == loi->loi_gr);
                 rc += llog_add(cctxt, rec, NULL, logcookies + rc,
                                 numcookies - rc);
         }
index 0ff9f57..f4c9ceb 100644 (file)
@@ -1972,12 +1972,15 @@ static int lov_change_cbdata(struct obd_export *exp,
 
         if (!exp || !exp->exp_obd)
                 RETURN(-ENODEV);
-
+        
+        LASSERT(lsm->lsm_object_gr > 0);
+        
         lov = &exp->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 struct lov_stripe_md submd;
 
                 submd.lsm_object_id = loi->loi_id;
+                submd.lsm_object_gr = lsm->lsm_object_gr;
                 submd.lsm_stripe_count = 0;
                 rc = obd_change_cbdata(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
                                        &submd, it, data);
@@ -2002,6 +2005,7 @@ static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
         if (!exp || !exp->exp_obd)
                 RETURN(-ENODEV);
 
+        LASSERT(lsm->lsm_object_gr > 0);
         LASSERT(lockh);
         lov = &exp->exp_obd->u.lov;
         rc = lov_prep_cancel_set(exp, &oinfo, lsm, mode, lockh, &set);
@@ -2058,6 +2062,7 @@ static int lov_cancel_unused(struct obd_export *exp,
 
         ASSERT_LSM_MAGIC(lsm);
 
+        LASSERT(lsm->lsm_object_gr > 0);
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 struct lov_stripe_md submd;
                 int err;
@@ -2067,6 +2072,7 @@ static int lov_cancel_unused(struct obd_export *exp,
                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
 
                 submd.lsm_object_id = loi->loi_id;
+                submd.lsm_object_gr = lsm->lsm_object_gr;
                 submd.lsm_stripe_count = 0;
                 err = obd_cancel_unused(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
                                         &submd, flags, opaque);
@@ -2445,7 +2451,7 @@ static int lov_set_info_async(struct obd_export *exp, obd_count keylen,
         }
 
         if (KEY_IS(KEY_MDS_CONN) || KEY_IS("unlinked")) {
-                if (vallen != 0)
+                if (vallen != 0 && KEY_IS("unlinked"))
                         GOTO(out, rc = -EINVAL);
         } else {
                 GOTO(out, rc = -EINVAL);
index 2622590..1d1a056 100644 (file)
@@ -770,6 +770,8 @@ int qos_prep_create(struct obd_export *exp, struct lov_request_set *set)
 
         lsm = set->set_oi->oi_md;
         lsm->lsm_object_id = src_oa->o_id;
+        lsm->lsm_object_gr = src_oa->o_gr;
+
         if (!lsm->lsm_stripe_size)
                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
         if (!lsm->lsm_pattern) {
index f441f25..2107483 100644 (file)
@@ -326,6 +326,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 /* XXX LOV STACKING: submd should be from the subobj */
                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
+                req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
                 req->rq_oi.oi_md->lsm_stripe_count = 0;
                 req->rq_oi.oi_md->lsm_oinfo->loi_kms_valid =
                         loi->loi_kms_valid;
@@ -437,6 +438,7 @@ int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 /* XXX LOV STACKING: submd should be from the subobj */
                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
+                req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
                 req->rq_oi.oi_md->lsm_stripe_count = 0;
 
                 lov_set_add_req(req, set);
@@ -520,6 +522,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 /* XXX LOV STACKING: submd should be from the subobj */
                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
+                req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
                 req->rq_oi.oi_md->lsm_stripe_count = 0;
 
                 lov_set_add_req(req, set);
@@ -595,6 +598,8 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set,
                 LBUG();
         }
         ret_oa->o_id = src_oa->o_id;
+        ret_oa->o_gr = src_oa->o_gr;
+        ret_oa->o_valid |= OBD_MD_FLGROUP;
         memcpy(src_oa, ret_oa, sizeof(*src_oa));
         obdo_free(ret_oa);
 
@@ -680,6 +685,7 @@ int lov_update_create_set(struct lov_request_set *set,
                 oti->oti_objid[req->rq_idx] = req->rq_oi.oi_oa->o_id;
 
         loi->loi_id = req->rq_oi.oi_oa->o_id;
+        loi->loi_gr = req->rq_oi.oi_oa->o_gr;
         loi->loi_ost_idx = req->rq_idx;
         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
@@ -1184,6 +1190,8 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
                        sizeof(*req->rq_oi.oi_oa));
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
+                LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) 
+                                || req->rq_oi.oi_oa->o_gr>0);
                 req->rq_oi.oi_oa->o_stripe_idx = i;
                 req->rq_oi.oi_cb_up = cb_setattr_update;
                 req->rq_rqset = set;
@@ -1289,6 +1297,9 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
                        sizeof(*req->rq_oi.oi_oa));
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
+                req->rq_oi.oi_oa->o_gr = loi->loi_gr;
+                req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
+
                 req->rq_oi.oi_oa->o_stripe_idx = i;
                 req->rq_oi.oi_cb_up = cb_update_punch;
                 req->rq_rqset = set;
index 846f669..fa7663e 100644 (file)
@@ -123,7 +123,7 @@ int mdd_init_obd(const struct lu_context *ctxt, struct mdd_device *mdd,
                 CERROR("can not find obd %s \n", MDD_OBD_NAME);
                 LBUG();
         }
-
+        obd->u.mds.mds_id = index;
         rc = class_setup(obd, lcfg);
         if (rc)
                 GOTO(class_detach, rc);
@@ -347,10 +347,11 @@ int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd,
 
         oa->o_uid = 0; /* must have 0 uid / gid on OST */
         oa->o_gid = 0;
+        oa->o_gr = FILTER_GROUP_MDS0 + mdd2lu_dev(mdd)->ld_site->ls_node_id;
         oa->o_mode = S_IFREG | 0600;
         oa->o_id = mdd_lov_create_id(lu_object_fid(mdd2lu_obj(child)));
         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS |
-                OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID;
+                OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP;
         oa->o_size = 0;
 
         if (!(create_flags & MDS_OPEN_HAS_OBJS)) {
@@ -360,6 +361,8 @@ int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd,
                                            0, &lsm, (void*)eadata);
                         if (rc)
                                 GOTO(out_oa, rc);
+                        lsm->lsm_object_id = oa->o_id;
+                        lsm->lsm_object_gr = oa->o_gr;
                 } else if (parent != NULL) {
                         /* get lov ea from parent and set to lov */
                         struct lov_mds_md *__lmm;
@@ -389,6 +392,7 @@ int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd,
                         }
                         GOTO(out_oa, rc);
                 }
+                LASSERT(lsm->lsm_object_gr >= FILTER_GROUP_MDS0);
         } else {
                 LASSERT(eadata != NULL);
                 rc = obd_iocontrol(OBD_IOC_LOV_SETEA, lov_exp, 0, &lsm,
@@ -396,6 +400,7 @@ int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd,
                 if (rc)
                         GOTO(out_oa, rc);
                 lsm->lsm_object_id = oa->o_id;
+                lsm->lsm_object_gr = oa->o_gr;
         }
         /*Sometimes, we may truncate some object(without lsm)
          *then open (with write flags)it, so creating lsm above.
@@ -408,6 +413,8 @@ int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd,
                 memset(oinfo, 0, sizeof(*oinfo));
 
                 oa->o_size = la->la_size;
+                /* when setting attr to ost, FLBKSZ is not needed */
+                oa->o_valid &= ~OBD_MD_FLBLKSZ; 
                 obdo_from_la(oa, la, OBD_MD_FLTYPE | OBD_MD_FLATIME |
                                 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLSIZE);
 
index d65b366..d7e7645 100644 (file)
@@ -432,6 +432,7 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
                         GOTO(out_oa, rc);
                 }
                 oinfo.oi_md->lsm_object_id = oinfo.oi_oa->o_id;
+                oinfo.oi_md->lsm_object_gr = oinfo.oi_oa->o_gr;
         }
         if (inode->i_size) {
                 oinfo.oi_oa->o_size = inode->i_size;
index 4bdc262..65da4b8 100644 (file)
@@ -63,6 +63,8 @@
 
 #include "filter_internal.h"
 
+/* Group 0 is no longer a legal group, to catch uninitialized IDs */
+#define FILTER_MIN_GROUPS 3
 static struct lvfs_callback_ops filter_lvfs_ops;
 kmem_cache_t *ll_fmd_cachep;
 
@@ -678,9 +680,11 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                  * need to be set up like real exports as filter_connect() does.
                  */
                 exp = class_new_export(obd, (struct obd_uuid *)fcd->fcd_uuid);
+
                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
-                       " srv lr: "LPU64"\n", fcd->fcd_uuid, cl_idx,
-                       last_rcvd, le64_to_cpu(fsd->lsd_last_transno));
+                       " srv lr: "LPU64" fcd_group %d\n", fcd->fcd_uuid, cl_idx,
+                       last_rcvd, le64_to_cpu(fsd->lsd_last_transno), 
+                       le32_to_cpu(fcd->fcd_group));
                 if (IS_ERR(exp)) {
                         if (PTR_ERR(exp) == -EALREADY) {
                                 /* export already exists, zero out this one */
@@ -693,6 +697,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                 } else {
                         fed = &exp->exp_filter_data;
                         fed->fed_fcd = fcd;
+                        fed->fed_group = le32_to_cpu(fcd->fcd_group);
                         rc = filter_client_add(obd, filter, fed, cl_idx);
                         LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
 
@@ -753,34 +758,38 @@ static int filter_cleanup_groups(struct obd_device *obd)
         struct filter_obd *filter = &obd->u.filter;
         struct file *filp;
         struct dentry *dentry;
-        int i;
+        int i, j;
         ENTRY;
 
         if (filter->fo_dentry_O_groups != NULL) {
-                for (i = 0; i < FILTER_GROUPS; i++) {
+                for (i = 0; i < filter->fo_group_count; i++) {
                         dentry = filter->fo_dentry_O_groups[i];
                         if (dentry != NULL)
                                 f_dput(dentry);
                 }
                 OBD_FREE(filter->fo_dentry_O_groups,
-                         FILTER_GROUPS * sizeof(*filter->fo_dentry_O_groups));
+                         filter->fo_group_count * 
+                         sizeof(*filter->fo_dentry_O_groups));
                 filter->fo_dentry_O_groups = NULL;
         }
         if (filter->fo_last_objid_files != NULL) {
-                for (i = 0; i < FILTER_GROUPS; i++) {
+                for (i = 0; i < filter->fo_group_count; i++) {
                         filp = filter->fo_last_objid_files[i];
                         if (filp != NULL)
                                 filp_close(filp, 0);
                 }
                 OBD_FREE(filter->fo_last_objid_files,
-                         FILTER_GROUPS * sizeof(*filter->fo_last_objid_files));
+                         filter->fo_group_count * 
+                         sizeof(*filter->fo_last_objid_files));
                 filter->fo_last_objid_files = NULL;
         }
         if (filter->fo_dentry_O_sub != NULL) {
-                for (i = 0; i < filter->fo_subdir_count; i++) {
-                        dentry = filter->fo_dentry_O_sub[i];
-                        if (dentry != NULL)
-                                f_dput(dentry);
+                for (i = 0; i < filter->fo_group_count; i++) {
+                        for (j = 0; j < filter->fo_subdir_count; j++) {
+                                dentry = filter->fo_dentry_O_sub[i].dentry[j];
+                                if (dentry != NULL)
+                                        f_dput(dentry);
+                        }
                 }
                 OBD_FREE(filter->fo_dentry_O_sub,
                          filter->fo_subdir_count *
@@ -789,7 +798,8 @@ static int filter_cleanup_groups(struct obd_device *obd)
         }
         if (filter->fo_last_objids != NULL) {
                 OBD_FREE(filter->fo_last_objids,
-                         FILTER_GROUPS * sizeof(*filter->fo_last_objids));
+                         filter->fo_group_count * 
+                         sizeof(*filter->fo_last_objids));
                 filter->fo_last_objids = NULL;
         }
         if (filter->fo_dentry_O != NULL) {
@@ -799,13 +809,236 @@ static int filter_cleanup_groups(struct obd_device *obd)
         RETURN(0);
 }
 
+static int filter_update_last_group(struct obd_device *obd, int group)
+{
+        struct filter_obd *filter = &obd->u.filter;
+        struct file *filp = NULL;
+        int last_group = 0, rc;
+        loff_t off = 0;
+        ENTRY;
+
+        if (group <= filter->fo_committed_group)
+                RETURN(0);
+
+        filp = filp_open("LAST_GROUP", O_RDWR, 0700);
+        if (IS_ERR(filp)) {
+                rc = PTR_ERR(filp);
+                filp = NULL;
+                CERROR("cannot open LAST_GROUP: rc = %d\n", rc);
+                GOTO(cleanup, rc);
+        }
+
+        rc = fsfilt_read_record(obd, filp, &last_group, sizeof(__u32), &off);
+        if (rc) {
+                CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n",rc);
+                GOTO(cleanup, rc);
+        }
+        LASSERT(off == 0 || last_group >= FILTER_MIN_GROUPS);
+        CDEBUG(D_INODE, "%s: previous %d, new %d\n",
+               obd->obd_name, last_group, group);
+
+        off = 0;
+        last_group = group;
+        /* must be sync: bXXXX */
+        rc = fsfilt_write_record(obd, filp, &last_group, sizeof(__u32), &off, 1);
+        if (rc) {
+                CDEBUG(D_INODE, "error updating LAST_GROUP: rc %d\n", rc);
+                GOTO(cleanup, rc);
+        }
+
+        filter->fo_committed_group = group;
+cleanup:
+        if (filp)
+                filp_close(filp, 0);
+        RETURN(rc);
+}
+
+static int filter_read_group_internal(struct obd_device *obd, int group,
+                                      int create)
+{
+        struct filter_obd *filter = &obd->u.filter;
+        __u64 *new_objids = NULL;
+        struct filter_subdirs *new_subdirs = NULL, *tmp_subdirs = NULL;
+        struct dentry **new_groups = NULL;
+        struct file **new_files = NULL;
+        struct dentry *dentry;
+        struct file *filp;
+        int old_count = filter->fo_group_count, rc, stage = 0, i;
+        char name[25];
+        __u64 last_objid;
+        loff_t off = 0;
+
+        snprintf(name, 24, "%d", group);
+        name[24] = '\0';
+
+        if (!create) {
+                dentry = ll_lookup_one_len(name, filter->fo_dentry_O,
+                                           strlen(name));
+                if (IS_ERR(dentry)) {
+                        CERROR("Cannot lookup expected object group %d: %ld\n",
+                               group, PTR_ERR(dentry));
+                        RETURN(PTR_ERR(dentry));
+                }
+        } else {
+                dentry = simple_mkdir(filter->fo_dentry_O, name, 0700, 1);
+                if (IS_ERR(dentry)) {
+                        CERROR("cannot lookup/create O/%s: rc = %ld\n", name,
+                               PTR_ERR(dentry));
+                        RETURN(PTR_ERR(dentry));
+                }
+        }
+        stage = 1;
+
+        snprintf(name, 24, "O/%d/LAST_ID", group);
+        name[24] = '\0';
+        filp = filp_open(name, O_CREAT | O_RDWR, 0700);
+        if (IS_ERR(filp)) {
+                CERROR("cannot create %s: rc = %ld\n", name, PTR_ERR(filp));
+                GOTO(cleanup, rc = PTR_ERR(filp));
+        }
+        stage = 2;
+
+        rc = fsfilt_read_record(obd, filp, &last_objid, sizeof(__u64), &off);
+        if (rc) {
+                CDEBUG(D_INODE, "error reading %s: rc %d\n", name, rc);
+                GOTO(cleanup, rc);
+        }
+
+        if (filter->fo_subdir_count) {
+                OBD_ALLOC(tmp_subdirs, sizeof(*tmp_subdirs));
+                if (tmp_subdirs == NULL)
+                        GOTO(cleanup, rc = -ENOMEM);
+                stage = 3;
+
+                for (i = 0; i < filter->fo_subdir_count; i++) {
+                        char dir[20];
+                        snprintf(dir, sizeof(dir), "d%u", i);
+
+                        tmp_subdirs->dentry[i] = simple_mkdir(dentry, dir, 0700, 1);
+                        if (IS_ERR(tmp_subdirs->dentry[i])) {
+                                rc = PTR_ERR(tmp_subdirs->dentry[i]);
+                                CERROR("can't lookup/create O/%d/%s: rc = %d\n",
+                                       group, dir, rc);
+                                GOTO(cleanup, rc);
+                        }
+
+                        CDEBUG(D_INODE, "got/created O/%d/%s: %p\n", group, dir,
+                               tmp_subdirs->dentry[i]);
+                }
+        }
+
+        /* 'group' is an index; we need an array of length 'group + 1' */
+        if (group + 1 > old_count) {
+                int len = group + 1;
+                OBD_ALLOC(new_objids, len * sizeof(*new_objids));
+                OBD_ALLOC(new_subdirs, len * sizeof(*new_subdirs));
+                OBD_ALLOC(new_groups, len * sizeof(*new_groups));
+                OBD_ALLOC(new_files, len * sizeof(*new_files));
+                stage = 4;
+                if (new_objids == NULL || new_subdirs == NULL ||
+                    new_groups == NULL || new_files == NULL)
+                        GOTO(cleanup, rc = -ENOMEM);
+
+                memcpy(new_objids, filter->fo_last_objids,
+                       old_count * sizeof(*new_objids));
+                memcpy(new_subdirs, filter->fo_dentry_O_sub,
+                       old_count * sizeof(*new_subdirs));
+                memcpy(new_groups, filter->fo_dentry_O_groups,
+                       old_count * sizeof(*new_groups));
+                memcpy(new_files, filter->fo_last_objid_files,
+                       old_count * sizeof(*new_files));
+
+                if (old_count) {
+                        OBD_FREE(filter->fo_last_objids,
+                                 old_count * sizeof(*new_objids));
+                        OBD_FREE(filter->fo_dentry_O_sub,
+                                 old_count * sizeof(*new_subdirs));
+                        OBD_FREE(filter->fo_dentry_O_groups,
+                                 old_count * sizeof(*new_groups));
+                        OBD_FREE(filter->fo_last_objid_files,
+                                 old_count * sizeof(*new_files));
+                }
+                filter->fo_last_objids = new_objids;
+                filter->fo_dentry_O_sub = new_subdirs;
+                filter->fo_dentry_O_groups = new_groups;
+                filter->fo_last_objid_files = new_files;
+                filter->fo_group_count = len;
+        }
+
+        filter->fo_dentry_O_groups[group] = dentry;
+        filter->fo_last_objid_files[group] = filp;
+        if (filter->fo_subdir_count) {
+                filter->fo_dentry_O_sub[group] = *tmp_subdirs;
+                OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs));
+        }
+
+        filter_update_last_group(obd, group);
+        
+        if (filp->f_dentry->d_inode->i_size == 0) {
+                filter->fo_last_objids[group] = FILTER_INIT_OBJID;
+                rc = filter_update_last_objid(obd, group, 1);
+                RETURN(rc);
+        }
+
+        filter->fo_last_objids[group] = le64_to_cpu(last_objid);
+        CDEBUG(D_INODE, "%s: server last_objid group %d: "LPU64"\n",
+               obd->obd_name, group, last_objid);
+        RETURN(0);
+ cleanup:
+        switch (stage) {
+        case 4:
+                if (new_objids != NULL)
+                        OBD_FREE(new_objids, group * sizeof(*new_objids));
+                if (new_subdirs != NULL)
+                        OBD_FREE(new_subdirs, group * sizeof(*new_subdirs));
+                if (new_groups != NULL)
+                        OBD_FREE(new_groups, group * sizeof(*new_groups));
+                if (new_files != NULL)
+                        OBD_FREE(new_files, group * sizeof(*new_files));
+        case 3:
+                if (filter->fo_subdir_count) {
+                        for (i = 0; i < filter->fo_subdir_count; i++) {
+                                if (tmp_subdirs->dentry[i] != NULL)
+                                        dput(tmp_subdirs->dentry[i]);
+                        }
+                        OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs));
+                }
+        case 2:
+                filp_close(filp, 0);
+        case 1:
+                dput(dentry);
+        }
+        RETURN(rc);
+}
+
+static int filter_read_groups(struct obd_device *obd, int last_group,
+                              int create)
+{
+        struct filter_obd *filter = &obd->u.filter;
+        int old_count, group, rc = 0;
+
+        down(&filter->fo_init_lock);
+        old_count = filter->fo_group_count;
+        for (group = old_count; group <= last_group; group++) {
+                if (group == 0)
+                        continue; /* no group zero */
+
+                rc = filter_read_group_internal(obd, group, create);
+                if (rc != 0)
+                        break;
+        }
+        up(&filter->fo_init_lock);
+        return rc;
+}
+
 /* FIXME: object groups */
 static int filter_prep_groups(struct obd_device *obd)
 {
         struct filter_obd *filter = &obd->u.filter;
         struct dentry *dentry, *O_dentry;
         struct file *filp;
-        int i, rc = 0, cleanup_phase = 0;
+        int    last_group, rc = 0, cleanup_phase = 0;
+        loff_t off = 0;
         ENTRY;
 
         O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1);
@@ -866,6 +1099,53 @@ static int filter_prep_groups(struct obd_device *obd)
                 f_dput(dentry);
         }
 
+        cleanup_phase = 2; /* groups */
+
+        /* we have to initialize all groups before first connections from
+         * clients because they may send create/destroy for any group -bzzz */
+        filp = filp_open("LAST_GROUP", O_CREAT | O_RDWR, 0700);
+        if (IS_ERR(filp)) {
+                CERROR("cannot create LAST_GROUP: rc = %ld\n", PTR_ERR(filp));
+                GOTO(cleanup, rc = PTR_ERR(filp));
+        }
+        cleanup_phase = 3; /* filp */
+
+        rc = fsfilt_read_record(obd, filp, &last_group, sizeof(__u32), &off);
+        if (rc) {
+                CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n", rc);
+                GOTO(cleanup, rc);
+        }
+        if (off == 0) {
+                last_group = FILTER_MIN_GROUPS;
+        } else {
+                LASSERT(last_group >= FILTER_MIN_GROUPS);
+        }
+
+        CWARN("%s: initialize groups [%d,%d]\n", obd->obd_name,
+              FILTER_MIN_GROUPS, last_group);
+        filter->fo_committed_group = last_group;
+        rc = filter_read_groups(obd, last_group, 1);
+        if (rc)
+                GOTO(cleanup, rc);
+        
+        filp_close(filp, 0);
+        RETURN(0);
+
+ cleanup:
+        switch (cleanup_phase) {
+        case 3:
+                filp_close(filp, 0);
+        case 2:
+                filter_cleanup_groups(obd);
+        case 1:
+                f_dput(filter->fo_dentry_O);
+                filter->fo_dentry_O = NULL;
+        default:
+                break;
+        }
+        return rc;
+
+#if 0
         OBD_ALLOC(filter->fo_last_objids, FILTER_GROUPS * sizeof(__u64));
         if (filter->fo_last_objids == NULL)
                 GOTO(cleanup, rc = -ENOMEM);
@@ -950,6 +1230,7 @@ static int filter_prep_groups(struct obd_device *obd)
  cleanup:
         filter_cleanup_groups(obd);
         return rc;
+#endif 
 }
 
 /* setup the object store with correct subdirectories */
@@ -1047,8 +1328,9 @@ static void filter_post(struct obd_device *obd)
         if (rc)
                 CERROR("error writing server data: rc = %d\n", rc);
 
-        for (i = 0; i < FILTER_GROUPS; i++) {
-                rc = filter_update_last_objid(obd, i, (i == FILTER_GROUPS - 1));
+        for (i = 0; i < filter->fo_group_count; i++) {
+                rc = filter_update_last_objid(obd, i, 
+                                (i == filter->fo_group_count - 1));
                 if (rc)
                         CERROR("error writing group %d lastobjid: rc = %d\n",
                                i, rc);
@@ -1073,7 +1355,7 @@ static void filter_set_last_id(struct filter_obd *filter,
                                obd_id id, obd_gr group)
 {
         LASSERT(filter->fo_fsd != NULL);
-        LASSERT(group <= FILTER_GROUPS);
+        LASSERT(group <= filter->fo_group_count);
 
         spin_lock(&filter->fo_objidlock);
         filter->fo_last_objids[group] = id;
@@ -1084,7 +1366,7 @@ obd_id filter_last_id(struct filter_obd *filter, obd_gr group)
 {
         obd_id id;
         LASSERT(filter->fo_fsd != NULL);
-        LASSERT(group <= FILTER_GROUPS);
+        LASSERT(group <= filter->fo_group_count);
 
         /* FIXME: object groups */
         spin_lock(&filter->fo_objidlock);
@@ -1104,12 +1386,14 @@ static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent)
 struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
 {
         struct filter_obd *filter = &obd->u.filter;
-        LASSERT(group < FILTER_GROUPS); /* FIXME: object groups */
+        struct filter_subdirs *subdirs;
+        LASSERT(group < filter->fo_group_count); /* FIXME: object groups */
 
         if (group > 0 || filter->fo_subdir_count == 0)
                 return filter->fo_dentry_O_groups[group];
 
-        return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)];
+        subdirs = &filter->fo_dentry_O_sub[group];
+        return subdirs->dentry[objid & (filter->fo_subdir_count - 1)];
 }
 
 /* We never dget the object parent, so DON'T dput it either */
@@ -1195,11 +1479,12 @@ struct dentry *filter_fid2dentry(struct obd_device *obd,
         RETURN(dchild);
 }
 
-static int filter_prepare_destroy(struct obd_device *obd, obd_id objid)
+static int filter_prepare_destroy(struct obd_device *obd, obd_id objid, 
+                                  obd_id group)
 {
         struct lustre_handle lockh;
         int flags = LDLM_AST_DISCARD_DATA, rc;
-        struct ldlm_res_id res_id = { .name = { objid } };
+        struct ldlm_res_id res_id = { .name = { objid, 0, group, 0} };
         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
 
         ENTRY;
@@ -1640,7 +1925,10 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
         obd->obd_lvfs_ctxt.fs = get_ds();
         obd->obd_lvfs_ctxt.cb_ops = filter_lvfs_ops;
-
+        
+        sema_init(&filter->fo_init_lock, 1);
+        filter->fo_committed_group = 0;
+        
         rc = filter_prep(obd);
         if (rc)
                 GOTO(err_ops, rc);
@@ -2009,10 +2297,12 @@ static int filter_connect(const struct lu_context *ctx,
                           struct obd_uuid *cluuid,
                           struct obd_connect_data *data)
 {
+        struct lvfs_run_ctxt saved;
         struct obd_export *exp;
         struct filter_export_data *fed;
         struct filter_client_data *fcd = NULL;
         struct filter_obd *filter = &obd->u.filter;
+        __u32 group;
         int rc;
         ENTRY;
 
@@ -2031,19 +2321,42 @@ static int filter_connect(const struct lu_context *ctx,
         if (rc)
                 GOTO(cleanup, rc);
 
-        if (!obd->obd_replayable)
-                GOTO(cleanup, rc = 0);
+        group = data->ocd_group;
+        if (obd->obd_replayable) {
+                OBD_ALLOC(fcd, sizeof(*fcd));
+                if (!fcd) {
+                        CERROR("filter: out of memory for client data\n");
+                        GOTO(cleanup, rc = -ENOMEM);
+                }
 
-        OBD_ALLOC(fcd, sizeof(*fcd));
-        if (!fcd) {
-                CERROR("filter: out of memory for client data\n");
-                GOTO(cleanup, rc = -ENOMEM);
+                memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
+                fed->fed_fcd = fcd;
+                fed->fed_fcd->fcd_group = group;
+                rc = filter_client_add(obd, filter, fed, -1);
+                if (rc)
+                        GOTO(cleanup, rc);
+        } 
+        CWARN("%s: Received MDS connection ("LPX64"); group %d\n",
+               obd->obd_name, exp->exp_handle.h_cookie, group);
+        if (group == 0)
+                GOTO(cleanup, rc);
+        
+        if (fed->fed_group != 0 && fed->fed_group != group) {
+                CERROR("!!! This export (nid %s) used object group %d "
+                       "earlier; now it's trying to use group %d!  This could "
+                       "be a bug in the MDS.  Tell CFS.\n",
+                       obd_export_nid2str(exp), fed->fed_group, group);
+                GOTO(cleanup, rc = -EPROTO);
         }
+        fed->fed_group = group;
 
-        memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
-        fed->fed_fcd = fcd;
-
-        rc = filter_client_add(obd, filter, fed, -1);
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        rc = filter_read_groups(obd, group, 1);
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        if (rc != 0) {
+                CERROR("can't read group %u\n", group);
+                GOTO(cleanup, rc);
+        }
 
         GOTO(cleanup, rc);
 
@@ -2239,11 +2552,12 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd, struct obdo *oa,
                                   const char *what, int quiet)
 {
         struct dentry *dchild = NULL;
+        obd_gr group = 0;
 
-        if (!(oa->o_valid & OBD_MD_FLGROUP))
-                oa->o_gr = 0;
+        if (oa->o_valid & OBD_MD_FLGROUP)
+                group = oa->o_gr;
 
-        dchild = filter_fid2dentry(obd, NULL, oa->o_gr, oa->o_id);
+        dchild = filter_fid2dentry(obd, NULL, group, oa->o_id);
 
         if (IS_ERR(dchild)) {
                 CERROR("%s error looking up object: "LPU64"\n",
@@ -2301,7 +2615,6 @@ int filter_update_fidea(struct obd_export *exp, struct inode *inode,
 
                 if (!(oa->o_valid & OBD_MD_FLGROUP))
                         oa->o_gr = 0;
-
                 /* packing fid and converting it to LE for storing into EA.
                  * Here ->o_stripe_idx should be filled by LOV and rest of
                  * fields - by client. */
@@ -2462,7 +2775,8 @@ out_unlock:
 int filter_setattr(struct obd_export *exp, struct obd_info *oinfo,
                    struct obd_trans_info *oti)
 {
-        struct ldlm_res_id res_id = { .name = { oinfo->oi_oa->o_id } };
+        struct ldlm_res_id res_id = { .name = { oinfo->oi_oa->o_id, 0, 
+                                                oinfo->oi_oa->o_gr, 0 } };
         struct ldlm_valblock_ops *ns_lvbo;
         struct filter_mod_data *fmd;
         struct lvfs_run_ctxt saved;
@@ -2581,12 +2895,12 @@ static int filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
         LASSERT(down_trylock(&filter->fo_create_lock) != 0);
 
         memset(&doa, 0, sizeof(doa));
-        if (oa->o_valid & OBD_MD_FLGROUP) {
-                doa.o_valid |= OBD_MD_FLGROUP;
-                doa.o_gr = oa->o_gr;
-        } else {
-                doa.o_gr = 0;
-        }
+
+        LASSERT(oa->o_gr != 0);
+        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+
+        doa.o_valid |= OBD_MD_FLGROUP;
+        doa.o_gr = oa->o_gr;
         doa.o_mode = S_IFREG;
 
         if (!filter->fo_destroy_in_progress) {
@@ -2893,15 +3207,36 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
 static int filter_create(struct obd_export *exp, struct obdo *oa,
                          struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
+        struct filter_export_data *fed;
         struct obd_device *obd = NULL;
+        struct filter_obd *filter;
         struct lvfs_run_ctxt saved;
         struct lov_stripe_md *lsm = NULL;
-        int rc = 0, diff;
+        int rc = 0, diff, group = oa->o_gr;
         ENTRY;
 
+        if (!(oa->o_valid & OBD_MD_FLGROUP) || group == 0) {
+                CERROR("!!! nid %s sent invalid object group %d\n",
+                        obd_export_nid2str(exp), group);
+                RETURN(-EINVAL);
+        }
+
+        obd = exp->exp_obd;
+        fed = &exp->exp_filter_data;
+        filter = &obd->u.filter;
+
+        if (fed->fed_group != group) {
+                CERROR("!!! this export (nid %s) used object group %d "
+                        "earlier; now it's trying to use group %d!  This could "
+                        "be a bug in the MDS.  Tell CFS.\n",
+                        obd_export_nid2str(exp), fed->fed_group, group);
+                RETURN(-ENOTUNIQ);
+        }
+
+#if 0
         if (!(oa->o_valid & OBD_MD_FLGROUP))
                 oa->o_gr = 0;
-
+#endif
         CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n",
                oa->o_gr, oa->o_id);
         if (ea != NULL) {
@@ -2918,14 +3253,12 @@ static int filter_create(struct obd_export *exp, struct obdo *oa,
 
         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
             (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
-                if (oa->o_id > filter_last_id(&obd->u.filter, oa->o_gr)) {
+                if (oa->o_id > filter_last_id(filter, oa->o_gr)) {
                         CERROR("recreate objid "LPU64" > last id "LPU64"\n",
-                               oa->o_id, filter_last_id(&obd->u.filter,
+                               oa->o_id, filter_last_id(filter,
                                                         oa->o_gr));
                         rc = -EINVAL;
                 } else {
-                        struct filter_obd *filter = &obd->u.filter;
-
                         diff = 1;
                         down(&filter->fo_create_lock);
                         rc = filter_precreate(obd, oa, oa->o_gr, &diff);
@@ -2965,9 +3298,11 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa,
         struct iattr iattr;
         ENTRY;
 
+        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+#if 0
         if (!(oa->o_valid & OBD_MD_FLGROUP))
                 oa->o_gr = 0;
-
+#endif
         obd = exp->exp_obd;
         filter = &obd->u.filter;
 
@@ -2992,7 +3327,7 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa,
                 GOTO(cleanup, rc = -ENOENT);
         }
 
-        filter_prepare_destroy(obd, oa->o_id);
+        filter_prepare_destroy(obd, oa->o_id, oa->o_gr);
 
         /* Our MDC connection is established by the MDS to us */
         if (oa->o_valid & OBD_MD_FLCOOKIE) {
index 685fdf0..f8ff9d5 100644 (file)
@@ -17,7 +17,6 @@
 
 #define FILTER_INIT_OBJID 0
 
-#define FILTER_SUBDIR_COUNT      32            /* set to zero for no subdirs */
 #define FILTER_GROUPS 3 /* must be at least 3; not dynamic yet */
 
 #define FILTER_ROCOMPAT_SUPP (0)
@@ -35,7 +34,8 @@ struct filter_client_data {
         __u8  fcd_uuid[40];        /* client UUID */
         __u64 fcd_last_rcvd;       /* last completed transaction ID */
         __u64 fcd_last_xid;        /* client RPC xid for the last transaction */
-        __u8  fcd_padding[LR_CLIENT_SIZE - 56];
+        __u32 fcd_group;           /* mds group */
+        __u8  fcd_padding[LR_CLIENT_SIZE - 60];
 };
 
 /* Limit the returned fields marked valid to those that we actually might set */
index 001a0ad..d8cebef 100644 (file)
@@ -136,6 +136,7 @@ static int filter_recov_log_unlink_cb(struct llog_ctxt *ctxt,
         oa->o_valid |= OBD_MD_FLCOOKIE;
         oa->o_id = lur->lur_oid;
         oa->o_gr = lur->lur_ogen;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
         memcpy(obdo_logcookie(oa), cookie, sizeof(*cookie));
         oid = oa->o_id;
 
@@ -174,6 +175,7 @@ static int filter_recov_log_setattr_cb(struct llog_ctxt *ctxt,
                                  OBD_MD_FLCOOKIE);
         oinfo.oi_oa->o_id = lsr->lsr_oid;
         oinfo.oi_oa->o_gr = lsr->lsr_ogen;
+        oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
         oinfo.oi_oa->o_uid = lsr->lsr_uid;
         oinfo.oi_oa->o_gid = lsr->lsr_gid;
         memcpy(obdo_logcookie(oinfo.oi_oa), cookie, sizeof(*cookie));
index 06946fe..59e4226 100644 (file)
@@ -69,7 +69,8 @@ static int filter_lvbo_init(struct ldlm_resource *res)
         obd = res->lr_namespace->ns_lvbp;
         LASSERT(obd != NULL);
 
-        dentry = filter_fid2dentry(obd, NULL, 0, res->lr_name.name[0]);
+        dentry = filter_fid2dentry(obd, NULL, res->lr_name.name[2], 
+                                              res->lr_name.name[0]);
         if (IS_ERR(dentry)) {
                 rc = PTR_ERR(dentry);
                 CERROR("%s: bad object "LPU64"/"LPU64": rc %d\n", obd->obd_name,
@@ -164,9 +165,9 @@ static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m,
  disk_update:
         /* Update the LVB from the disk inode */
         obd = res->lr_namespace->ns_lvbp;
-        LASSERT(obd);
-
-        dentry = filter_fid2dentry(obd, NULL, 0, res->lr_name.name[0]);
+        
+        dentry = filter_fid2dentry(obd, NULL, res->lr_name.name[2], 
+                                              res->lr_name.name[0]);
         if (IS_ERR(dentry))
                 GOTO(out, rc = PTR_ERR(dentry));
 
index 4f165a8..37ae2e9 100644 (file)
@@ -36,8 +36,9 @@
 static int lprocfs_filter_rd_groups(char *page, char **start, off_t off,
                                     int count, int *eof, void *data)
 {
+        struct obd_device *obd = (struct obd_device *)data;
         *eof = 1;
-        return snprintf(page, count, "%u\n", FILTER_GROUPS);
+        return snprintf(page, count, "%u\n", obd->u.filter.fo_group_count);
 }
 
 static int lprocfs_filter_rd_tot_dirty(char *page, char **start, off_t off,
index 5665c88..3f6c15b 100644 (file)
@@ -239,10 +239,16 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
         ENTRY;
         LASSERT(oa);
         LASSERT(ea);
-
+        
+        LASSERT(oa->o_gr > 0);
+        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+        /* 
+         * FIXME: Now we can only create the object without cache,
+         * since we have only 1 oscc, only support one cache, will
+         * fix it soon Wangdi
+         */
         if ((oa->o_valid & OBD_MD_FLGROUP) && (oa->o_gr != 0))
                 RETURN(osc_real_create(exp, oa, ea, oti));
-
         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
             oa->o_flags == OBD_FL_RECREATE_OBJS) {
                 RETURN(osc_real_create(exp, oa, ea, oti));
index c53dff4..39fc297 100644 (file)
@@ -88,7 +88,9 @@ static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
 
         if (lsm) {
                 LASSERT(lsm->lsm_object_id);
+                LASSERT(lsm->lsm_object_gr);
                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
+                (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
         }
 
         RETURN(lmm_size);
@@ -135,7 +137,9 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
         if (lmm != NULL) {
                 /* XXX zero *lsmp? */
                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
+                (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
                 LASSERT((*lsmp)->lsm_object_id);
+                LASSERT((*lsmp)->lsm_object_gr);
         }
 
         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
@@ -250,6 +254,8 @@ static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         ENTRY;
 
+        LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) || 
+                                        oinfo->oi_oa->o_gr > 0);
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
                               OST_SETATTR, 2, size, NULL);
         if (!req)
@@ -400,6 +406,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
          * This needs to be fixed in a big way.
          */
         lsm->lsm_object_id = oa->o_id;
+        lsm->lsm_object_gr = oa->o_gr;
         *ea = lsm;
 
         if (oti != NULL) {
@@ -2800,9 +2807,12 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
                              ldlm_iterator_t replace, void *data)
 {
-        struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
+        struct ldlm_res_id res_id = { .name = {0} };
         struct obd_device *obd = class_exp2obd(exp);
 
+        res_id.name[0] = lsm->lsm_object_id;
+        res_id.name[2] = lsm->lsm_object_gr;
+
         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
         return 0;
 }
@@ -2882,7 +2892,7 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req,
 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                        struct obd_enqueue_info *einfo)
 {
-        struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
+        struct ldlm_res_id res_id = { .name = {0} };
         struct obd_device *obd = exp->exp_obd;
         struct ldlm_reply *rep;
         struct ptlrpc_request *req = NULL;
@@ -2890,6 +2900,9 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
         int rc;
         ENTRY;
 
+        res_id.name[0] = oinfo->oi_md->lsm_object_id;
+        res_id.name[2] = oinfo->oi_md->lsm_object_gr;
+
         /* Filesystem lock extents are extended to page boundaries so that
          * dealing with the page cache is a little smoother.  */
         oinfo->oi_policy.l_extent.start -=
@@ -3010,10 +3023,13 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
                      int *flags, void *data, struct lustre_handle *lockh)
 {
-        struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
+        struct ldlm_res_id res_id = { .name = {0} };
         struct obd_device *obd = exp->exp_obd;
         int rc;
         ENTRY;
+        
+        res_id.name[0] = lsm->lsm_object_id;
+        res_id.name[2] = lsm->lsm_object_gr;
 
         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
 
@@ -3065,20 +3081,32 @@ static int osc_cancel_unused(struct obd_export *exp,
                              struct lov_stripe_md *lsm,
                              int flags, void *opaque)
 {
-        struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
         struct obd_device *obd = class_exp2obd(exp);
+        struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
+
+        if (lsm != NULL) {
+                res_id.name[0] = lsm->lsm_object_id;
+                res_id.name[2] = lsm->lsm_object_gr;
+                resp = &res_id;
+        }
 
-        return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id,
-                                      flags, opaque);
+        return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, 
+                                      opaque);
 }
 
 static int osc_join_lru(struct obd_export *exp,
                         struct lov_stripe_md *lsm, int join)
 {
         struct obd_device *obd = class_exp2obd(exp);
-        struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
+        struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
+
+        if (lsm != NULL) {
+                res_id.name[0] = lsm->lsm_object_id;
+                res_id.name[2] = lsm->lsm_object_gr;
+                resp = &res_id;
+        }
 
-        return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
+        return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
 }
 
 static int osc_statfs_interpret(struct ptlrpc_request *req,
@@ -3203,12 +3231,14 @@ static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
                         RETURN(-ENOMEM);
 
                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
+                lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
         } else {
                 lum_size = sizeof(lum);
                 lumk = &lum;
         }
 
         lumk->lmm_object_id = lsm->lsm_object_id;
+        lumk->lmm_object_gr = lsm->lsm_object_gr;
         lumk->lmm_stripe_count = 1;
 
         if (copy_to_user(lump, lumk, lum_size))