From 2b87d9a6d9eefab2209030b167df7998c3d4c1d0 Mon Sep 17 00:00:00 2001 From: jxiong Date: Wed, 9 Sep 2009 03:29:03 +0000 Subject: [PATCH] b=20131 r=vitaly,wangdi - fixed the access of free echo objects - cleanup the code to identify non mdt objects --- lustre/include/obd.h | 56 +++++++++++++--------- lustre/lov/lov_request.c | 5 -- lustre/obdecho/echo_client.c | 98 +++++++++++++++++++++++++------------- lustre/obdfilter/filter.c | 52 +++++++------------- lustre/obdfilter/filter_capa.c | 16 ++++++- lustre/obdfilter/filter_internal.h | 4 +- lustre/obdfilter/filter_io.c | 11 +++-- lustre/obdfilter/lproc_obdfilter.c | 2 +- lustre/osc/osc_create.c | 6 +-- lustre/osc/osc_request.c | 5 +- 10 files changed, 144 insertions(+), 111 deletions(-) diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 5550d89..2b7396b 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -966,13 +966,30 @@ enum filter_groups { FILTER_GROUP_MDS1_N_BASE = 3 }; -static inline __u64 obdo_mdsno(struct obdo *oa) +/** + * In HEAD for CMD, the object is created in group number which is 3>= + * or indexing starts from 3. To test this assertions are added to disallow + * group 0. But to run 2.0 mds server on 1.8.x disk format (i.e. interop_mode) + * object in group 0 needs to be allowed. + * So for interop mode following changes needs to be done: + * 1. No need to assert on group 0 or allow group 0 + * 2. The group number indexing starts from 0 instead of 3 + */ + +static inline int filter_group_is_mds(obd_gr group) { - if (oa->o_gr) { - LASSERT(oa->o_gr >= FILTER_GROUP_MDS1_N_BASE); - return oa->o_gr - FILTER_GROUP_MDS1_N_BASE + 1; - } - return 0; + return (group == FILTER_GROUP_MDS0 || + group >= FILTER_GROUP_MDS1_N_BASE); +} + +#define LASSERT_MDS_GROUP(group) LASSERT(filter_group_is_mds(group)) + +static inline __u64 objgrp_to_mdsno(obd_gr group) +{ + LASSERT(filter_group_is_mds(group)); + if (group == FILTER_GROUP_MDS0) + return 0; + return group - FILTER_GROUP_MDS1_N_BASE + 1; } static inline int mdt_to_obd_objgrp(int mdtid) @@ -986,19 +1003,17 @@ static inline int mdt_to_obd_objgrp(int mdtid) return 0; } -/** - * In HEAD for CMD, the object is created in group number which is 3>= - * or indexing starts from 3. To test this assertions are added to disallow - * group 0. But to run 2.0 mds server on 1.8.x disk format (i.e. interop_mode) - * object in group 0 needs to be allowed. - * So for interop mode following changes needs to be done: - * 1. No need to assert on group 0 or allow group 0 - * 2. The group number indexing starts from 0 instead of 3 - */ +static inline __u64 obdo_mdsno(struct obdo *oa) +{ + LASSERT((oa->o_valid & OBD_MD_FLGROUP)); + return objgrp_to_mdsno(oa->o_gr); +} -#define CHECK_MDS_GROUP(group) (group == FILTER_GROUP_MDS0 || \ - group > FILTER_GROUP_MDS1_N_BASE) -#define LASSERT_MDS_GROUP(group) LASSERT(CHECK_MDS_GROUP(group)) +static inline int obdo_is_mds(struct obdo *oa) +{ + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + return filter_group_is_mds(oa->o_gr); +} struct obd_llog_group { struct list_head olg_list; @@ -1650,11 +1665,6 @@ static inline void init_obd_quota_ops(quota_interface_t *interface, obd_ops->o_quota_adjust_qunit = QUOTA_OP(interface, adjust_qunit); } -static inline __u64 oinfo_mdsno(struct obd_info *oinfo) -{ - return obdo_mdsno(oinfo->oi_oa); -} - static inline struct lustre_capa *oinfo_capa(struct obd_info *oinfo) { return oinfo->oi_capa; diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index 9e898e3..3fa33a3 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -1219,11 +1219,6 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo, memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, sizeof(*req->rq_oi.oi_oa)); req->rq_oi.oi_oa->o_id = loi->loi_id; - LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) || - CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr), - "req->rq_oi.oi_oa->o_valid="LPX64" " - "req->rq_oi.oi_oa->o_gr="LPU64"\n", - req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr); req->rq_oi.oi_oa->o_stripe_idx = i; req->rq_oi.oi_cb_up = cb_setattr_update; req->rq_oi.oi_capa = oinfo->oi_capa; diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index f84459f..cd61720 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -764,9 +764,10 @@ static struct lu_device *echo_device_free(const struct lu_env *env, { struct echo_device *ed = cl2echo_dev(lu2cl_dev(d)); struct echo_client_obd *ec = ed->ed_ec; + struct echo_object *eco; struct lu_device *next = ed->ed_next; - printk("ed = %p, ec = %p, next = %p\n", ed, ec, next); + CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n", ed, next); /* destroy locks */ spin_lock(&ec->ec_lock); @@ -797,21 +798,34 @@ static struct lu_device *echo_device_free(const struct lu_env *env, LASSERT(ed->ed_site); lu_site_purge(env, &ed->ed_site->cs_lu, -1); - /* check if there are objects still alive, assume only one reference */ + /* check if there are objects still alive. + * It shouldn't have any object because lu_site_purge would cleanup + * all of cached objects. Anyway, probably the echo device is being + * parallelly accessed. + */ spin_lock(&ec->ec_lock); - while (!list_empty(&ec->ec_objects)) { - struct echo_object *eco; - eco = list_entry(ec->ec_objects.next, struct echo_object, - eo_obj_chain); - spin_unlock(&ec->ec_lock); - + list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain) eco->eo_deleted = 1; - cl_echo_object_put(eco); + spin_unlock(&ec->ec_lock); + /* purge again */ + lu_site_purge(env, &ed->ed_site->cs_lu, -1); + + CDEBUG(D_INFO, "Waiting for the reference of echo object to be dropped\n"); + + /* Wait for the last reference to be dropped. */ + spin_lock(&ec->ec_lock); + while (!list_empty(&ec->ec_objects)) { + spin_unlock(&ec->ec_lock); + CERROR("echo_client still has objects at cleanup time, " + "wait for 1 second\n"); + cfs_schedule_timeout(CFS_TASK_UNINT, cfs_time_seconds(1)); spin_lock(&ec->ec_lock); } spin_unlock(&ec->ec_lock); + CDEBUG(D_INFO, "No object exists, exiting...\n"); + echo_client_cleanup(d->ld_obd); while (next) @@ -872,6 +886,10 @@ static struct echo_object *cl_echo_object_find(struct echo_device *d, LASSERT(lsm); LASSERT(lsm->lsm_object_id); + /* Never return an object if the obd is to be freed. */ + if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping) + RETURN(ERR_PTR(-ENODEV)); + env = cl_env_get(&refcheck); if (IS_ERR(env)) RETURN((void *)env); @@ -1250,9 +1268,9 @@ static int echo_create_object(struct echo_device *ed, int on_target, rc = 0; if (on_target) { - oa->o_gr = FILTER_GROUP_ECHO; - oa->o_valid |= OBD_MD_FLGROUP; - + /* Only echo objects are allowed to be created */ + LASSERT((oa->o_valid & OBD_MD_FLGROUP) && + (oa->o_gr == FILTER_GROUP_ECHO)); rc = obd_create(ec->ec_exp, oa, &lsm, oti); if (rc != 0) { CERROR("Cannot create objects, rc = %d\n", rc); @@ -1595,6 +1613,9 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, ret = obd_commitrw(rw, exp, oa, 1,&ioo,rnb,npages,lnb,oti,ret); if (ret != 0) GOTO(out, ret); + + /* Reset oti otherwise it would confuse ldiskfs. */ + memset(oti, 0, sizeof(*oti)); } out: @@ -1612,30 +1633,31 @@ static int echo_client_brw_ioctl(int rw, struct obd_export *exp, struct echo_device *ed = obd2echo_dev(obd); struct echo_client_obd *ec = ed->ed_ec; struct obd_trans_info dummy_oti = { .oti_thread = NULL }; + struct obdo *oa = &data->ioc_obdo1; struct echo_object *eco; int rc; int async = 1; ENTRY; - rc = echo_get_object(&eco, ed, &data->ioc_obdo1); + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + + rc = echo_get_object(&eco, ed, oa); if (rc) RETURN(rc); - data->ioc_obdo1.o_valid &= ~OBD_MD_FLHANDLE; - data->ioc_obdo1.o_valid |= OBD_MD_FLGROUP; - data->ioc_obdo1.o_gr = FILTER_GROUP_ECHO; + oa->o_valid &= ~OBD_MD_FLHANDLE; switch((long)data->ioc_pbuf1) { case 1: async = 0; /* fall through */ case 2: - rc = echo_client_kbrw(ed, rw, &data->ioc_obdo1, + rc = echo_client_kbrw(ed, rw, oa, eco, data->ioc_offset, data->ioc_count, async, &dummy_oti); break; case 3: - rc = echo_client_prep_commit(ec->ec_exp, rw, &data->ioc_obdo1, + rc = echo_client_prep_commit(ec->ec_exp, rw, oa, eco, data->ioc_offset, data->ioc_count, data->ioc_plen1, &dummy_oti); @@ -1716,12 +1738,20 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, memset(&dummy_oti, 0, sizeof(dummy_oti)); + oa = &data->ioc_obdo1; + if (!(oa->o_valid & OBD_MD_FLGROUP)) { + oa->o_valid |= OBD_MD_FLGROUP; + oa->o_gr = FILTER_GROUP_ECHO; + } + /* assume we can touch filter native objects with echo device. */ + /* LASSERT(oa->o_gr == FILTER_GROUP_ECHO); */ + switch (cmd) { case OBD_IOC_CREATE: /* may create echo object */ if (!cfs_capable(CFS_CAP_SYS_ADMIN)) GOTO (out, rc = -EPERM); - rc = echo_create_object (ed, 1, &data->ioc_obdo1, + rc = echo_create_object (ed, 1, oa, data->ioc_pbuf1, data->ioc_plen1, &dummy_oti); GOTO(out, rc); @@ -1730,11 +1760,8 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, if (!cfs_capable(CFS_CAP_SYS_ADMIN)) GOTO (out, rc = -EPERM); - rc = echo_get_object (&eco, ed, &data->ioc_obdo1); + rc = echo_get_object (&eco, ed, oa); if (rc == 0) { - oa = &data->ioc_obdo1; - oa->o_gr = FILTER_GROUP_ECHO; - oa->o_valid |= OBD_MD_FLGROUP; rc = obd_destroy(ec->ec_exp, oa, eco->eo_lsm, &dummy_oti, NULL, NULL); if (rc == 0) @@ -1744,11 +1771,11 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, GOTO(out, rc); case OBD_IOC_GETATTR: - rc = echo_get_object (&eco, ed, &data->ioc_obdo1); + rc = echo_get_object (&eco, ed, oa); if (rc == 0) { struct obd_info oinfo = { { { 0 } } }; oinfo.oi_md = eco->eo_lsm; - oinfo.oi_oa = &data->ioc_obdo1; + oinfo.oi_oa = oa; rc = obd_getattr(ec->ec_exp, &oinfo); echo_put_object(eco); } @@ -1758,10 +1785,10 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, if (!cfs_capable(CFS_CAP_SYS_ADMIN)) GOTO (out, rc = -EPERM); - rc = echo_get_object (&eco, ed, &data->ioc_obdo1); + rc = echo_get_object (&eco, ed, oa); if (rc == 0) { struct obd_info oinfo = { { { 0 } } }; - oinfo.oi_oa = &data->ioc_obdo1; + oinfo.oi_oa = oa; oinfo.oi_md = eco->eo_lsm; rc = obd_setattr(ec->ec_exp, &oinfo, NULL); @@ -1780,7 +1807,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, GOTO(out, rc); case ECHO_IOC_GET_STRIPE: - rc = echo_get_object(&eco, ed, &data->ioc_obdo1); + rc = echo_get_object(&eco, ed, oa); if (rc == 0) { rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1, data->ioc_plen1); @@ -1793,13 +1820,13 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, GOTO (out, rc = -EPERM); if (data->ioc_pbuf1 == NULL) { /* unset */ - rc = echo_get_object(&eco, ed, &data->ioc_obdo1); + rc = echo_get_object(&eco, ed, oa); if (rc == 0) { eco->eo_deleted = 1; echo_put_object(eco); } } else { - rc = echo_create_object(ed, 0, &data->ioc_obdo1, + rc = echo_create_object(ed, 0, oa, data->ioc_pbuf1, data->ioc_plen1, &dummy_oti); } @@ -1809,14 +1836,14 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, if (!cfs_capable(CFS_CAP_SYS_ADMIN)) GOTO (out, rc = -EPERM); - rc = echo_client_enqueue(exp, &data->ioc_obdo1, + rc = echo_client_enqueue(exp, oa, data->ioc_conn1, /* lock mode */ data->ioc_offset, data->ioc_count);/*extent*/ GOTO (out, rc); case ECHO_IOC_CANCEL: - rc = echo_client_cancel(exp, &data->ioc_obdo1); + rc = echo_client_cancel(exp, oa); GOTO (out, rc); default: @@ -1880,6 +1907,12 @@ static int echo_client_setup(struct obd_device *obddev, struct lustre_cfg *lcfg) ocd->ocd_group = FILTER_GROUP_ECHO; rc = obd_connect(NULL, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL); + if (rc == 0) { + /* Turn off pinger because it connects to tgt obd directly. */ + spin_lock(&tgt->obd_dev_lock); + list_del_init(&ec->ec_exp->exp_obd_chain_timed); + spin_unlock(&tgt->obd_dev_lock); + } OBD_FREE(ocd, sizeof(*ocd)); @@ -1903,6 +1936,7 @@ static int echo_client_cleanup(struct obd_device *obddev) RETURN(-EBUSY); } + LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0); rc = obd_disconnect(ec->ec_exp); if (rc != 0) CERROR("fail to disconnect device: %d\n", rc); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 13786ce..8ef247e 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1003,8 +1003,6 @@ static int filter_update_last_group(struct obd_device *obd, int group) CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n",rc); GOTO(cleanup, rc); } - LASSERTF(off == 0 || CHECK_MDS_GROUP(last_group), - "off = %llu and last_group = %d\n", off, last_group); CDEBUG(D_INODE, "%s: previous %d, new %d\n", obd->obd_name, last_group, group); @@ -1078,8 +1076,7 @@ static int filter_read_group_internal(struct obd_device *obd, int group, GOTO(cleanup, rc); } - if (filter->fo_subdir_count && - !(group == FILTER_GROUP_LLOG || group == FILTER_GROUP_ECHO)) { + if (filter->fo_subdir_count && filter_group_is_mds(group)) { OBD_ALLOC(tmp_subdirs, sizeof(*tmp_subdirs)); if (tmp_subdirs == NULL) GOTO(cleanup, rc = -ENOMEM); @@ -1143,7 +1140,7 @@ static int filter_read_group_internal(struct obd_device *obd, int group, filter->fo_dentry_O_groups[group] = dentry; filter->fo_last_objid_files[group] = filp; - if (filter->fo_subdir_count) { + if (filter->fo_subdir_count && filter_group_is_mds(group)) { filter->fo_dentry_O_sub[group] = *tmp_subdirs; OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs)); } @@ -1172,7 +1169,7 @@ static int filter_read_group_internal(struct obd_device *obd, int group, if (new_files != NULL) OBD_FREE(new_files, len * sizeof(*new_files)); case 3: - if (filter->fo_subdir_count) { + if (filter->fo_subdir_count && filter_group_is_mds(group)) { for (i = 0; i < filter->fo_subdir_count; i++) { if (tmp_subdirs->dentry[i] != NULL) dput(tmp_subdirs->dentry[i]); @@ -1196,7 +1193,6 @@ static int filter_read_groups(struct obd_device *obd, int last_group, down(&filter->fo_init_lock); old_count = filter->fo_group_count; for (group = old_count; group <= last_group; group++) { - rc = filter_read_group_internal(obd, group, create); if (rc != 0) break; @@ -1240,11 +1236,9 @@ static int filter_prep_groups(struct obd_device *obd) CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n", rc); GOTO(cleanup, rc); } - if (off == 0) { + + if (off == 0) last_group = FILTER_GROUP_MDS0; - } else { - LASSERT_MDS_GROUP(last_group); - } CWARN("%s: initialize groups [%d,%d]\n", obd->obd_name, FILTER_GROUP_MDS0, last_group); @@ -1435,8 +1429,7 @@ struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid) struct filter_subdirs *subdirs; LASSERT(group < filter->fo_group_count); /* FIXME: object groups */ - if ((group > FILTER_GROUP_MDS0 && group < FILTER_GROUP_MDS1_N_BASE) || - filter->fo_subdir_count == 0) + if (!filter_group_is_mds(group) || filter->fo_subdir_count == 0) return filter->fo_dentry_O_groups[group]; subdirs = &filter->fo_dentry_O_sub[group]; @@ -2934,10 +2927,6 @@ static void filter_grant_discard(struct obd_export *exp) struct filter_export_data *fed = &exp->exp_filter_data; spin_lock(&obd->obd_osfs_lock); - spin_lock(&obd->obd_dev_lock); - list_del_init(&exp->exp_obd_chain); - spin_unlock(&obd->obd_dev_lock); - LASSERTF(filter->fo_tot_granted >= fed->fed_grant, "%s: tot_granted "LPU64" cli %s/%p fed_grant %ld\n", obd->obd_name, filter->fo_tot_granted, @@ -3138,7 +3127,8 @@ static int filter_getattr(struct obd_export *exp, struct obd_info *oinfo) int rc = 0; ENTRY; - rc = filter_auth_capa(exp, NULL, oinfo_mdsno(oinfo), + LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP); + rc = filter_auth_capa(exp, NULL, oinfo->oi_oa->o_gr, oinfo_capa(oinfo), CAPA_OPC_META_READ); if (rc) RETURN(rc); @@ -3386,12 +3376,14 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, if (oa->o_valid & OBD_FL_TRUNC) opc |= CAPA_OPC_OSS_TRUNC; - rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), capa, opc); + + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + rc = filter_auth_capa(exp, NULL, oa->o_gr, capa, opc); if (rc) RETURN(rc); if (oa->o_valid & (OBD_MD_FLUID | OBD_MD_FLGID)) { - rc = filter_capa_fixoa(exp, oa, obdo_mdsno(oa), capa); + rc = filter_capa_fixoa(exp, oa, oa->o_gr, capa); if (rc) RETURN(rc); } @@ -3642,8 +3634,7 @@ static int filter_handle_precreate(struct obd_export *exp, struct obdo *oa, GOTO(out, rc = 0); } /* only precreate if group == 0 and o_id is specfied */ - if (group == FILTER_GROUP_LLOG || - group == FILTER_GROUP_ECHO || oa->o_id == 0) + if (!filter_group_is_mds(group) || oa->o_id == 0) diff = 1; else diff = oa->o_id - filter_last_id(filter, group); @@ -4030,8 +4021,7 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, ENTRY; LASSERT(oa->o_valid & OBD_MD_FLGROUP); - - rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), + rc = filter_auth_capa(exp, NULL, oa->o_gr, (struct lustre_capa *)capa, CAPA_OPC_OSS_DESTROY); if (rc) RETURN(rc); @@ -4239,7 +4229,8 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa, int rc, rc2; ENTRY; - rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + rc = filter_auth_capa(exp, NULL, oa->o_gr, (struct lustre_capa *)capa, CAPA_OPC_OSS_WRITE); if (rc) RETURN(rc); @@ -4636,17 +4627,10 @@ static int __init obdfilter_init(void) { struct lprocfs_static_vars lvars; int rc, i; - struct obdo *oa; /** sanity check for group<->mdsno conversion */ - OBD_ALLOC_PTR(oa); - if (oa == NULL) - return -ENOMEM; - for (i = 0; i < 32; i++) { - oa->o_gr = mdt_to_obd_objgrp(i); - LASSERT(obdo_mdsno(oa) == i); - } - OBD_FREE_PTR(oa); + for (i = 0; i < 32; i++) + LASSERT(objgrp_to_mdsno(mdt_to_obd_objgrp(i)) == i); lprocfs_filter_init_vars(&lvars); diff --git a/lustre/obdfilter/filter_capa.c b/lustre/obdfilter/filter_capa.c index bc43a6a..a93f115 100644 --- a/lustre/obdfilter/filter_capa.c +++ b/lustre/obdfilter/filter_capa.c @@ -112,7 +112,7 @@ int filter_update_capa_key(struct obd_device *obd, struct lustre_capa_key *new) RETURN(0); } -int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, __u64 mdsid, +int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, obd_gr group, struct lustre_capa *capa, __u64 opc) { struct obd_device *obd = exp->exp_obd; @@ -120,10 +120,15 @@ int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, __u64 mdsid, struct filter_capa_key *k; struct lustre_capa_key key; struct obd_capa *oc; + __u64 mdsid; __u8 *hmac; int keys_ready = 0, key_found = 0, rc = 0; ENTRY; + /* skip capa check for llog and obdecho */ + if (!filter_group_is_mds(group)) + RETURN(0); + /* capability is disabled */ if (!filter->fo_fl_oss_capa) RETURN(0); @@ -131,6 +136,7 @@ int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, __u64 mdsid, if (!(exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA)) RETURN(0); + mdsid = objgrp_to_mdsno(group); if (capa == NULL) { if (fid) CERROR("mdsno/fid/opc "LPU64"/"DFID"/"LPX64 @@ -221,18 +227,24 @@ int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, __u64 mdsid, RETURN(0); } -int filter_capa_fixoa(struct obd_export *exp, struct obdo *oa, __u64 mdsid, +int filter_capa_fixoa(struct obd_export *exp, struct obdo *oa, obd_gr group, struct lustre_capa *capa) { + __u64 mdsid; int rc = 0; ENTRY; + /* skip capa check for llog and obdecho */ + if (!filter_group_is_mds(group)) + RETURN(0); + if (!(exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA)) RETURN(0); if (unlikely(!capa)) RETURN(-EACCES); + mdsid = objgrp_to_mdsno(group); if (capa_flags(capa) == LC_ID_CONVERT) { struct obd_device *obd = exp->exp_obd; struct filter_obd *filter = &obd->u.filter; diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index 7ef4a6f..c93f44f 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -231,9 +231,9 @@ static void lprocfs_filter_init_vars(struct lprocfs_static_vars *lvars) extern quota_interface_t *filter_quota_interface_ref; int filter_update_capa_key(struct obd_device *obd, struct lustre_capa_key *key); -int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, __u64 mdsid, +int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, obd_gr group, struct lustre_capa *capa, __u64 opc); -int filter_capa_fixoa(struct obd_export *exp, struct obdo *oa, __u64 mdsid, +int filter_capa_fixoa(struct obd_export *exp, struct obdo *oa, obd_gr group, struct lustre_capa *capa); void filter_free_capa_keys(struct filter_obd *filter); diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c index e6d289a..30b8b71 100644 --- a/lustre/obdfilter/filter_io.c +++ b/lustre/obdfilter/filter_io.c @@ -419,7 +419,8 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, LASSERTF(objcount == 1, "%d\n", objcount); LASSERTF(obj->ioo_bufcnt > 0, "%d\n", obj->ioo_bufcnt); - rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), capa, + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + rc = filter_auth_capa(exp, NULL, oa->o_gr, capa, CAPA_OPC_OSS_READ); if (rc) RETURN(rc); @@ -686,12 +687,14 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, LASSERT(objcount == 1); LASSERT(obj->ioo_bufcnt > 0); - rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), capa, + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + rc = filter_auth_capa(exp, NULL, oa->o_gr, capa, CAPA_OPC_OSS_WRITE); if (rc) RETURN(rc); - if (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self) + if (exp->exp_connection && + exp->exp_connection->c_peer.nid == exp->exp_connection->c_self) localreq = 1; push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); @@ -714,7 +717,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, if (oa->o_valid & (OBD_MD_FLUID | OBD_MD_FLGID) && dentry->d_inode->i_mode & (S_ISUID | S_ISGID)) { - rc = filter_capa_fixoa(exp, oa, obdo_mdsno(oa), capa); + rc = filter_capa_fixoa(exp, oa, oa->o_gr, capa); if (rc) GOTO(cleanup, rc); } diff --git a/lustre/obdfilter/lproc_obdfilter.c b/lustre/obdfilter/lproc_obdfilter.c index fdd0eba..e18d6e9 100644 --- a/lustre/obdfilter/lproc_obdfilter.c +++ b/lustre/obdfilter/lproc_obdfilter.c @@ -110,7 +110,7 @@ static int lprocfs_filter_rd_last_id(char *page, char **start, off_t off, count -= rc; retval += rc; - for (i = FILTER_GROUP_MDS1_N_BASE + 1; i < filter->fo_group_count; i++) { + for (i = FILTER_GROUP_MDS1_N_BASE; i < filter->fo_group_count; i++) { rc = snprintf(page, count, LPU64"\n",filter_last_id(filter, i)); if (rc < 0) { retval = rc; diff --git a/lustre/osc/osc_create.c b/lustre/osc/osc_create.c index a8578e6..48af0a1 100644 --- a/lustre/osc/osc_create.c +++ b/lustre/osc/osc_create.c @@ -451,8 +451,7 @@ int osc_create_async(struct obd_export *exp, struct obd_info *oinfo, struct obdo *oa = oinfo->oi_oa; ENTRY; - if ((oa->o_valid & OBD_MD_FLGROUP) && - (oa->o_gr == FILTER_GROUP_ECHO || oa->o_gr == FILTER_GROUP_LLOG)){ + if ((oa->o_valid & OBD_MD_FLGROUP) && !filter_group_is_mds(oa->o_gr)) { rc = osc_real_create(exp, oinfo->oi_oa, ea, oti); rc = oinfo->oi_cb_up(oinfo, rc); RETURN(rc); @@ -519,7 +518,6 @@ int osc_create(struct obd_export *exp, struct obdo *oa, LASSERT(oa); LASSERT(ea); - LASSERT_MDS_GROUP(oa->o_gr); LASSERT(oa->o_valid & OBD_MD_FLGROUP); if ((oa->o_valid & OBD_MD_FLFLAGS) && @@ -527,7 +525,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa, RETURN(osc_real_create(exp, oa, ea, oti)); } - if (oa->o_gr == FILTER_GROUP_LLOG || oa->o_gr == FILTER_GROUP_ECHO) + if (!filter_group_is_mds(oa->o_gr)) RETURN(osc_real_create(exp, oa, ea, oti)); /* this is the special case where create removes orphans */ diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index f8fd889..81d45d5 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -312,10 +312,7 @@ static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo, int rc; ENTRY; - LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) || - CHECK_MDS_GROUP(oinfo->oi_oa->o_gr), - "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n", - oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr); + LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP); req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); if (req == NULL) -- 1.8.3.1