Whamcloud - gitweb
b=20131
authorjxiong <jxiong>
Wed, 9 Sep 2009 03:29:03 +0000 (03:29 +0000)
committerjxiong <jxiong>
Wed, 9 Sep 2009 03:29:03 +0000 (03:29 +0000)
r=vitaly,wangdi

- fixed the access of free echo objects
- cleanup the code to identify non mdt objects

lustre/include/obd.h
lustre/lov/lov_request.c
lustre/obdecho/echo_client.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_capa.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_io.c
lustre/obdfilter/lproc_obdfilter.c
lustre/osc/osc_create.c
lustre/osc/osc_request.c

index 5550d89..2b7396b 100644 (file)
@@ -966,13 +966,30 @@ enum filter_groups {
         FILTER_GROUP_MDS1_N_BASE = 3
 };
 
-static inline __u64 obdo_mdsno(struct obdo *oa)
+/**
+  * In HEAD for CMD, the object is created in group number which is 3>=
+  * or indexing starts from 3. To test this assertions are added to disallow
+  * group 0. But to run 2.0 mds server on 1.8.x disk format (i.e. interop_mode)
+  * object in group 0 needs to be allowed.
+  * So for interop mode following changes needs to be done:
+  * 1. No need to assert on group 0 or allow group 0
+  * 2. The group number indexing starts from 0 instead of 3
+  */
+
+static inline int filter_group_is_mds(obd_gr group)
 {
-        if (oa->o_gr) {
-                LASSERT(oa->o_gr >= FILTER_GROUP_MDS1_N_BASE);
-                return oa->o_gr - FILTER_GROUP_MDS1_N_BASE + 1;
-        }
-        return 0;
+        return (group == FILTER_GROUP_MDS0 ||
+                group >= FILTER_GROUP_MDS1_N_BASE);
+}
+
+#define LASSERT_MDS_GROUP(group) LASSERT(filter_group_is_mds(group))
+
+static inline __u64 objgrp_to_mdsno(obd_gr group)
+{
+        LASSERT(filter_group_is_mds(group));
+        if (group == FILTER_GROUP_MDS0)
+                return 0;
+        return group - FILTER_GROUP_MDS1_N_BASE + 1;
 }
 
 static inline int mdt_to_obd_objgrp(int mdtid)
@@ -986,19 +1003,17 @@ static inline int mdt_to_obd_objgrp(int mdtid)
         return 0;
 }
 
-/**
-  * In HEAD for CMD, the object is created in group number which is 3>=
-  * or indexing starts from 3. To test this assertions are added to disallow
-  * group 0. But to run 2.0 mds server on 1.8.x disk format (i.e. interop_mode)
-  * object in group 0 needs to be allowed.
-  * So for interop mode following changes needs to be done:
-  * 1. No need to assert on group 0 or allow group 0
-  * 2. The group number indexing starts from 0 instead of 3
-  */
+static inline __u64 obdo_mdsno(struct obdo *oa)
+{
+        LASSERT((oa->o_valid & OBD_MD_FLGROUP));
+        return objgrp_to_mdsno(oa->o_gr);
+}
 
-#define CHECK_MDS_GROUP(group)          (group == FILTER_GROUP_MDS0 || \
-                                         group > FILTER_GROUP_MDS1_N_BASE)
-#define LASSERT_MDS_GROUP(group)        LASSERT(CHECK_MDS_GROUP(group))
+static inline int obdo_is_mds(struct obdo *oa)
+{
+        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+        return filter_group_is_mds(oa->o_gr);
+}
 
 struct obd_llog_group {
         struct list_head   olg_list;
@@ -1650,11 +1665,6 @@ static inline void init_obd_quota_ops(quota_interface_t *interface,
         obd_ops->o_quota_adjust_qunit = QUOTA_OP(interface, adjust_qunit);
 }
 
-static inline __u64 oinfo_mdsno(struct obd_info *oinfo)
-{
-        return obdo_mdsno(oinfo->oi_oa);
-}
-
 static inline struct lustre_capa *oinfo_capa(struct obd_info *oinfo)
 {
         return oinfo->oi_capa;
index 9e898e3..3fa33a3 100644 (file)
@@ -1219,11 +1219,6 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
                        sizeof(*req->rq_oi.oi_oa));
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
-                LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) ||
-                         CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr),
-                         "req->rq_oi.oi_oa->o_valid="LPX64" "
-                         "req->rq_oi.oi_oa->o_gr="LPU64"\n",
-                         req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr);
                 req->rq_oi.oi_oa->o_stripe_idx = i;
                 req->rq_oi.oi_cb_up = cb_setattr_update;
                 req->rq_oi.oi_capa = oinfo->oi_capa;
index f84459f..cd61720 100644 (file)
@@ -764,9 +764,10 @@ static struct lu_device *echo_device_free(const struct lu_env *env,
 {
         struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
         struct echo_client_obd *ec   = ed->ed_ec;
+        struct echo_object     *eco;
         struct lu_device       *next = ed->ed_next;
 
-        printk("ed = %p, ec = %p, next = %p\n", ed, ec, next);
+        CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n", ed, next);
 
         /* destroy locks */
         spin_lock(&ec->ec_lock);
@@ -797,21 +798,34 @@ static struct lu_device *echo_device_free(const struct lu_env *env,
         LASSERT(ed->ed_site);
         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
 
-        /* check if there are objects still alive, assume only one reference */
+        /* check if there are objects still alive.
+         * It shouldn't have any object because lu_site_purge would cleanup
+         * all of cached objects. Anyway, probably the echo device is being
+         * parallelly accessed.
+         */
         spin_lock(&ec->ec_lock);
-        while (!list_empty(&ec->ec_objects)) {
-                struct echo_object *eco;
-                eco = list_entry(ec->ec_objects.next, struct echo_object,
-                                 eo_obj_chain);
-                spin_unlock(&ec->ec_lock);
-
+        list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
                 eco->eo_deleted = 1;
-                cl_echo_object_put(eco);
+        spin_unlock(&ec->ec_lock);
 
+        /* purge again */
+        lu_site_purge(env, &ed->ed_site->cs_lu, -1);
+
+        CDEBUG(D_INFO, "Waiting for the reference of echo object to be dropped\n");
+
+        /* Wait for the last reference to be dropped. */
+        spin_lock(&ec->ec_lock);
+        while (!list_empty(&ec->ec_objects)) {
+                spin_unlock(&ec->ec_lock);
+                CERROR("echo_client still has objects at cleanup time, "
+                       "wait for 1 second\n");
+                cfs_schedule_timeout(CFS_TASK_UNINT, cfs_time_seconds(1));
                 spin_lock(&ec->ec_lock);
         }
         spin_unlock(&ec->ec_lock);
 
+        CDEBUG(D_INFO, "No object exists, exiting...\n");
+
         echo_client_cleanup(d->ld_obd);
 
         while (next)
@@ -872,6 +886,10 @@ static struct echo_object *cl_echo_object_find(struct echo_device *d,
         LASSERT(lsm);
         LASSERT(lsm->lsm_object_id);
 
+        /* Never return an object if the obd is to be freed. */
+        if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
+                RETURN(ERR_PTR(-ENODEV));
+
         env = cl_env_get(&refcheck);
         if (IS_ERR(env))
                 RETURN((void *)env);
@@ -1250,9 +1268,9 @@ static int echo_create_object(struct echo_device *ed, int on_target,
 
         rc = 0;
         if (on_target) {
-                oa->o_gr = FILTER_GROUP_ECHO;
-                oa->o_valid |= OBD_MD_FLGROUP;
-
+                /* Only echo objects are allowed to be created */
+                LASSERT((oa->o_valid & OBD_MD_FLGROUP) &&
+                        (oa->o_gr == FILTER_GROUP_ECHO));
                 rc = obd_create(ec->ec_exp, oa, &lsm, oti);
                 if (rc != 0) {
                         CERROR("Cannot create objects, rc = %d\n", rc);
@@ -1595,6 +1613,9 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
                 ret = obd_commitrw(rw, exp, oa, 1,&ioo,rnb,npages,lnb,oti,ret);
                 if (ret != 0)
                         GOTO(out, ret);
+
+                /* Reset oti otherwise it would confuse ldiskfs. */
+                memset(oti, 0, sizeof(*oti));
         }
 
 out:
@@ -1612,30 +1633,31 @@ static int echo_client_brw_ioctl(int rw, struct obd_export *exp,
         struct echo_device *ed = obd2echo_dev(obd);
         struct echo_client_obd *ec = ed->ed_ec;
         struct obd_trans_info dummy_oti = { .oti_thread = NULL };
+        struct obdo *oa = &data->ioc_obdo1;
         struct echo_object *eco;
         int rc;
         int async = 1;
         ENTRY;
 
-        rc = echo_get_object(&eco, ed, &data->ioc_obdo1);
+        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+
+        rc = echo_get_object(&eco, ed, oa);
         if (rc)
                 RETURN(rc);
 
-        data->ioc_obdo1.o_valid &= ~OBD_MD_FLHANDLE;
-        data->ioc_obdo1.o_valid |= OBD_MD_FLGROUP;
-        data->ioc_obdo1.o_gr = FILTER_GROUP_ECHO;
+        oa->o_valid &= ~OBD_MD_FLHANDLE;
 
         switch((long)data->ioc_pbuf1) {
         case 1:
                 async = 0;
                 /* fall through */
         case 2:
-                rc = echo_client_kbrw(ed, rw, &data->ioc_obdo1,
+                rc = echo_client_kbrw(ed, rw, oa,
                                       eco, data->ioc_offset,
                                       data->ioc_count, async, &dummy_oti);
                 break;
         case 3:
-                rc = echo_client_prep_commit(ec->ec_exp, rw, &data->ioc_obdo1,
+                rc = echo_client_prep_commit(ec->ec_exp, rw, oa,
                                             eco, data->ioc_offset,
                                             data->ioc_count, data->ioc_plen1,
                                             &dummy_oti);
@@ -1716,12 +1738,20 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
 
         memset(&dummy_oti, 0, sizeof(dummy_oti));
 
+        oa = &data->ioc_obdo1;
+        if (!(oa->o_valid & OBD_MD_FLGROUP)) {
+                oa->o_valid |= OBD_MD_FLGROUP;
+                oa->o_gr = FILTER_GROUP_ECHO;
+        }
+        /* assume we can touch filter native objects with echo device. */
+        /* LASSERT(oa->o_gr == FILTER_GROUP_ECHO); */
+
         switch (cmd) {
         case OBD_IOC_CREATE:                    /* may create echo object */
                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
                         GOTO (out, rc = -EPERM);
 
-                rc = echo_create_object (ed, 1, &data->ioc_obdo1,
+                rc = echo_create_object (ed, 1, oa,
                                          data->ioc_pbuf1, data->ioc_plen1,
                                          &dummy_oti);
                 GOTO(out, rc);
@@ -1730,11 +1760,8 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
                         GOTO (out, rc = -EPERM);
 
-                rc = echo_get_object (&eco, ed, &data->ioc_obdo1);
+                rc = echo_get_object (&eco, ed, oa);
                 if (rc == 0) {
-                        oa = &data->ioc_obdo1;
-                        oa->o_gr = FILTER_GROUP_ECHO;
-                        oa->o_valid |= OBD_MD_FLGROUP;
                         rc = obd_destroy(ec->ec_exp, oa, eco->eo_lsm,
                                          &dummy_oti, NULL, NULL);
                         if (rc == 0)
@@ -1744,11 +1771,11 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
                 GOTO(out, rc);
 
         case OBD_IOC_GETATTR:
-                rc = echo_get_object (&eco, ed, &data->ioc_obdo1);
+                rc = echo_get_object (&eco, ed, oa);
                 if (rc == 0) {
                         struct obd_info oinfo = { { { 0 } } };
                         oinfo.oi_md = eco->eo_lsm;
-                        oinfo.oi_oa = &data->ioc_obdo1;
+                        oinfo.oi_oa = oa;
                         rc = obd_getattr(ec->ec_exp, &oinfo);
                         echo_put_object(eco);
                 }
@@ -1758,10 +1785,10 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
                         GOTO (out, rc = -EPERM);
 
-                rc = echo_get_object (&eco, ed, &data->ioc_obdo1);
+                rc = echo_get_object (&eco, ed, oa);
                 if (rc == 0) {
                         struct obd_info oinfo = { { { 0 } } };
-                        oinfo.oi_oa = &data->ioc_obdo1;
+                        oinfo.oi_oa = oa;
                         oinfo.oi_md = eco->eo_lsm;
 
                         rc = obd_setattr(ec->ec_exp, &oinfo, NULL);
@@ -1780,7 +1807,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
                 GOTO(out, rc);
 
         case ECHO_IOC_GET_STRIPE:
-                rc = echo_get_object(&eco, ed, &data->ioc_obdo1);
+                rc = echo_get_object(&eco, ed, oa);
                 if (rc == 0) {
                         rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1,
                                               data->ioc_plen1);
@@ -1793,13 +1820,13 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
                         GOTO (out, rc = -EPERM);
 
                 if (data->ioc_pbuf1 == NULL) {  /* unset */
-                        rc = echo_get_object(&eco, ed, &data->ioc_obdo1);
+                        rc = echo_get_object(&eco, ed, oa);
                         if (rc == 0) {
                                 eco->eo_deleted = 1;
                                 echo_put_object(eco);
                         }
                 } else {
-                        rc = echo_create_object(ed, 0, &data->ioc_obdo1,
+                        rc = echo_create_object(ed, 0, oa,
                                                 data->ioc_pbuf1,
                                                 data->ioc_plen1, &dummy_oti);
                 }
@@ -1809,14 +1836,14 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
                         GOTO (out, rc = -EPERM);
 
-                rc = echo_client_enqueue(exp, &data->ioc_obdo1,
+                rc = echo_client_enqueue(exp, oa,
                                          data->ioc_conn1, /* lock mode */
                                          data->ioc_offset,
                                          data->ioc_count);/*extent*/
                 GOTO (out, rc);
 
         case ECHO_IOC_CANCEL:
-                rc = echo_client_cancel(exp, &data->ioc_obdo1);
+                rc = echo_client_cancel(exp, oa);
                 GOTO (out, rc);
 
         default:
@@ -1880,6 +1907,12 @@ static int echo_client_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
         ocd->ocd_group = FILTER_GROUP_ECHO;
 
         rc = obd_connect(NULL, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
+        if (rc == 0) {
+                /* Turn off pinger because it connects to tgt obd directly. */
+                spin_lock(&tgt->obd_dev_lock);
+                list_del_init(&ec->ec_exp->exp_obd_chain_timed);
+                spin_unlock(&tgt->obd_dev_lock);
+        }
 
         OBD_FREE(ocd, sizeof(*ocd));
 
@@ -1903,6 +1936,7 @@ static int echo_client_cleanup(struct obd_device *obddev)
                 RETURN(-EBUSY);
         }
 
+        LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
         rc = obd_disconnect(ec->ec_exp);
         if (rc != 0)
                 CERROR("fail to disconnect device: %d\n", rc);
index 13786ce..8ef247e 100644 (file)
@@ -1003,8 +1003,6 @@ static int filter_update_last_group(struct obd_device *obd, int group)
                 CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n",rc);
                 GOTO(cleanup, rc);
         }
-        LASSERTF(off == 0 || CHECK_MDS_GROUP(last_group),
-                 "off = %llu and last_group = %d\n", off, last_group);
 
         CDEBUG(D_INODE, "%s: previous %d, new %d\n",
                obd->obd_name, last_group, group);
@@ -1078,8 +1076,7 @@ static int filter_read_group_internal(struct obd_device *obd, int group,
                 GOTO(cleanup, rc);
         }
 
-        if (filter->fo_subdir_count &&
-            !(group == FILTER_GROUP_LLOG || group == FILTER_GROUP_ECHO)) {
+        if (filter->fo_subdir_count && filter_group_is_mds(group)) {
                 OBD_ALLOC(tmp_subdirs, sizeof(*tmp_subdirs));
                 if (tmp_subdirs == NULL)
                         GOTO(cleanup, rc = -ENOMEM);
@@ -1143,7 +1140,7 @@ static int filter_read_group_internal(struct obd_device *obd, int group,
 
         filter->fo_dentry_O_groups[group] = dentry;
         filter->fo_last_objid_files[group] = filp;
-        if (filter->fo_subdir_count) {
+        if (filter->fo_subdir_count && filter_group_is_mds(group)) {
                 filter->fo_dentry_O_sub[group] = *tmp_subdirs;
                 OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs));
         }
@@ -1172,7 +1169,7 @@ static int filter_read_group_internal(struct obd_device *obd, int group,
                 if (new_files != NULL)
                         OBD_FREE(new_files, len * sizeof(*new_files));
         case 3:
-                if (filter->fo_subdir_count) {
+                if (filter->fo_subdir_count && filter_group_is_mds(group)) {
                         for (i = 0; i < filter->fo_subdir_count; i++) {
                                 if (tmp_subdirs->dentry[i] != NULL)
                                         dput(tmp_subdirs->dentry[i]);
@@ -1196,7 +1193,6 @@ static int filter_read_groups(struct obd_device *obd, int last_group,
         down(&filter->fo_init_lock);
         old_count = filter->fo_group_count;
         for (group = old_count; group <= last_group; group++) {
-
                 rc = filter_read_group_internal(obd, group, create);
                 if (rc != 0)
                         break;
@@ -1240,11 +1236,9 @@ static int filter_prep_groups(struct obd_device *obd)
                 CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n", rc);
                 GOTO(cleanup, rc);
         }
-        if (off == 0) {
+
+        if (off == 0)
                 last_group = FILTER_GROUP_MDS0;
-        } else {
-                LASSERT_MDS_GROUP(last_group);
-        }
 
         CWARN("%s: initialize groups [%d,%d]\n", obd->obd_name,
               FILTER_GROUP_MDS0, last_group);
@@ -1435,8 +1429,7 @@ struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
         struct filter_subdirs *subdirs;
         LASSERT(group < filter->fo_group_count); /* FIXME: object groups */
 
-        if ((group > FILTER_GROUP_MDS0 && group < FILTER_GROUP_MDS1_N_BASE) ||
-             filter->fo_subdir_count == 0)
+        if (!filter_group_is_mds(group) || filter->fo_subdir_count == 0)
                 return filter->fo_dentry_O_groups[group];
 
         subdirs = &filter->fo_dentry_O_sub[group];
@@ -2934,10 +2927,6 @@ static void filter_grant_discard(struct obd_export *exp)
         struct filter_export_data *fed = &exp->exp_filter_data;
 
         spin_lock(&obd->obd_osfs_lock);
-        spin_lock(&obd->obd_dev_lock);
-        list_del_init(&exp->exp_obd_chain);
-        spin_unlock(&obd->obd_dev_lock);
-
         LASSERTF(filter->fo_tot_granted >= fed->fed_grant,
                  "%s: tot_granted "LPU64" cli %s/%p fed_grant %ld\n",
                  obd->obd_name, filter->fo_tot_granted,
@@ -3138,7 +3127,8 @@ static int filter_getattr(struct obd_export *exp, struct obd_info *oinfo)
         int rc = 0;
         ENTRY;
 
-        rc = filter_auth_capa(exp, NULL, oinfo_mdsno(oinfo),
+        LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
+        rc = filter_auth_capa(exp, NULL, oinfo->oi_oa->o_gr,
                               oinfo_capa(oinfo), CAPA_OPC_META_READ);
         if (rc)
                 RETURN(rc);
@@ -3386,12 +3376,14 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo,
 
         if (oa->o_valid & OBD_FL_TRUNC)
                 opc |= CAPA_OPC_OSS_TRUNC;
-        rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), capa, opc);
+
+        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+        rc = filter_auth_capa(exp, NULL, oa->o_gr, capa, opc);
         if (rc)
                 RETURN(rc);
 
         if (oa->o_valid & (OBD_MD_FLUID | OBD_MD_FLGID)) {
-                rc = filter_capa_fixoa(exp, oa, obdo_mdsno(oa), capa);
+                rc = filter_capa_fixoa(exp, oa, oa->o_gr, capa);
                 if (rc)
                         RETURN(rc);
         }
@@ -3642,8 +3634,7 @@ static int filter_handle_precreate(struct obd_export *exp, struct obdo *oa,
                         GOTO(out, rc = 0);
                 }
                 /* only precreate if group == 0 and o_id is specfied */
-                if (group == FILTER_GROUP_LLOG ||
-                    group == FILTER_GROUP_ECHO || oa->o_id == 0)
+                if (!filter_group_is_mds(group) || oa->o_id == 0)
                         diff = 1;
                 else
                         diff = oa->o_id - filter_last_id(filter, group);
@@ -4030,8 +4021,7 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa,
         ENTRY;
 
         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
-
-        rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa),
+        rc = filter_auth_capa(exp, NULL, oa->o_gr,
                               (struct lustre_capa *)capa, CAPA_OPC_OSS_DESTROY);
         if (rc)
                 RETURN(rc);
@@ -4239,7 +4229,8 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         int rc, rc2;
         ENTRY;
 
-        rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa),
+        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+        rc = filter_auth_capa(exp, NULL, oa->o_gr,
                               (struct lustre_capa *)capa, CAPA_OPC_OSS_WRITE);
         if (rc)
                 RETURN(rc);
@@ -4636,17 +4627,10 @@ static int __init obdfilter_init(void)
 {
         struct lprocfs_static_vars lvars;
         int rc, i;
-        struct obdo *oa;
 
         /** sanity check for group<->mdsno conversion */
-        OBD_ALLOC_PTR(oa);
-        if (oa == NULL)
-                return -ENOMEM;
-        for (i = 0; i < 32; i++) {
-                 oa->o_gr = mdt_to_obd_objgrp(i);
-                 LASSERT(obdo_mdsno(oa) == i);
-        }
-        OBD_FREE_PTR(oa);
+        for (i = 0; i < 32; i++)
+                 LASSERT(objgrp_to_mdsno(mdt_to_obd_objgrp(i)) == i);
 
         lprocfs_filter_init_vars(&lvars);
 
index bc43a6a..a93f115 100644 (file)
@@ -112,7 +112,7 @@ int filter_update_capa_key(struct obd_device *obd, struct lustre_capa_key *new)
         RETURN(0);
 }
 
-int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, __u64 mdsid,
+int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, obd_gr group,
                      struct lustre_capa *capa, __u64 opc)
 {
         struct obd_device *obd = exp->exp_obd;
@@ -120,10 +120,15 @@ int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, __u64 mdsid,
         struct filter_capa_key *k;
         struct lustre_capa_key key;
         struct obd_capa *oc;
+        __u64 mdsid;
         __u8 *hmac;
         int keys_ready = 0, key_found = 0, rc = 0;
         ENTRY;
 
+        /* skip capa check for llog and obdecho */
+        if (!filter_group_is_mds(group))
+                RETURN(0);
+
         /* capability is disabled */
         if (!filter->fo_fl_oss_capa)
                 RETURN(0);
@@ -131,6 +136,7 @@ int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, __u64 mdsid,
         if (!(exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA))
                 RETURN(0);
 
+        mdsid = objgrp_to_mdsno(group);
         if (capa == NULL) {
                 if (fid)
                         CERROR("mdsno/fid/opc "LPU64"/"DFID"/"LPX64
@@ -221,18 +227,24 @@ int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, __u64 mdsid,
         RETURN(0);
 }
 
-int filter_capa_fixoa(struct obd_export *exp, struct obdo *oa, __u64 mdsid,
+int filter_capa_fixoa(struct obd_export *exp, struct obdo *oa, obd_gr group,
                       struct lustre_capa *capa)
 {
+        __u64 mdsid;
         int rc = 0;
         ENTRY;
 
+        /* skip capa check for llog and obdecho */
+        if (!filter_group_is_mds(group))
+                RETURN(0);
+
         if (!(exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA))
                 RETURN(0);
 
         if (unlikely(!capa))
                 RETURN(-EACCES);
 
+        mdsid = objgrp_to_mdsno(group);
         if (capa_flags(capa) == LC_ID_CONVERT) {
                 struct obd_device *obd = exp->exp_obd;
                 struct filter_obd *filter = &obd->u.filter;
index 7ef4a6f..c93f44f 100644 (file)
@@ -231,9 +231,9 @@ static void lprocfs_filter_init_vars(struct lprocfs_static_vars *lvars)
 extern quota_interface_t *filter_quota_interface_ref;
 
 int filter_update_capa_key(struct obd_device *obd, struct lustre_capa_key *key);
-int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, __u64 mdsid,
+int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, obd_gr group,
                      struct lustre_capa *capa, __u64 opc);
-int filter_capa_fixoa(struct obd_export *exp, struct obdo *oa, __u64 mdsid,
+int filter_capa_fixoa(struct obd_export *exp, struct obdo *oa, obd_gr group,
                       struct lustre_capa *capa);
 void filter_free_capa_keys(struct filter_obd *filter);
 
index e6d289a..30b8b71 100644 (file)
@@ -419,7 +419,8 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         LASSERTF(objcount == 1, "%d\n", objcount);
         LASSERTF(obj->ioo_bufcnt > 0, "%d\n", obj->ioo_bufcnt);
 
-        rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), capa,
+        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+        rc = filter_auth_capa(exp, NULL, oa->o_gr, capa,
                               CAPA_OPC_OSS_READ);
         if (rc)
                 RETURN(rc);
@@ -686,12 +687,14 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         LASSERT(objcount == 1);
         LASSERT(obj->ioo_bufcnt > 0);
 
-        rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), capa,
+        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+        rc = filter_auth_capa(exp, NULL, oa->o_gr, capa,
                               CAPA_OPC_OSS_WRITE);
         if (rc)
                 RETURN(rc);
 
-        if (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self) 
+        if (exp->exp_connection &&
+            exp->exp_connection->c_peer.nid == exp->exp_connection->c_self) 
                 localreq = 1;
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
@@ -714,7 +717,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
 
         if (oa->o_valid & (OBD_MD_FLUID | OBD_MD_FLGID) &&
             dentry->d_inode->i_mode & (S_ISUID | S_ISGID)) {
-                rc = filter_capa_fixoa(exp, oa, obdo_mdsno(oa), capa);
+                rc = filter_capa_fixoa(exp, oa, oa->o_gr, capa);
                 if (rc)
                         GOTO(cleanup, rc);
         }
index fdd0eba..e18d6e9 100644 (file)
@@ -110,7 +110,7 @@ static int lprocfs_filter_rd_last_id(char *page, char **start, off_t off,
         count -= rc;
         retval += rc;
 
-        for (i = FILTER_GROUP_MDS1_N_BASE + 1; i < filter->fo_group_count; i++) {
+        for (i = FILTER_GROUP_MDS1_N_BASE; i < filter->fo_group_count; i++) {
                 rc = snprintf(page, count, LPU64"\n",filter_last_id(filter, i));
                 if (rc < 0) {
                         retval = rc;
index a8578e6..48af0a1 100644 (file)
@@ -451,8 +451,7 @@ int osc_create_async(struct obd_export *exp, struct obd_info *oinfo,
         struct obdo *oa = oinfo->oi_oa;
         ENTRY;
 
-        if ((oa->o_valid & OBD_MD_FLGROUP) &&
-            (oa->o_gr == FILTER_GROUP_ECHO || oa->o_gr == FILTER_GROUP_LLOG)){
+        if ((oa->o_valid & OBD_MD_FLGROUP) && !filter_group_is_mds(oa->o_gr)) {
                 rc = osc_real_create(exp, oinfo->oi_oa, ea, oti);
                 rc = oinfo->oi_cb_up(oinfo, rc);
                 RETURN(rc);
@@ -519,7 +518,6 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
 
         LASSERT(oa);
         LASSERT(ea);
-        LASSERT_MDS_GROUP(oa->o_gr);
         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
 
         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
@@ -527,7 +525,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                 RETURN(osc_real_create(exp, oa, ea, oti));
         }
 
-        if (oa->o_gr == FILTER_GROUP_LLOG || oa->o_gr == FILTER_GROUP_ECHO)
+        if (!filter_group_is_mds(oa->o_gr))
                 RETURN(osc_real_create(exp, oa, ea, oti));
 
         /* this is the special case where create removes orphans */
index f8fd889..81d45d5 100644 (file)
@@ -312,10 +312,7 @@ static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
         int                    rc;
         ENTRY;
 
-        LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
-                 CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
-                 "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
-                 oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
+        LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
 
         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
         if (req == NULL)