From: wangdi Date: Thu, 7 Sep 2006 06:41:05 +0000 (+0000) Subject: 1)merge filter_group support from HEAD to cmd_new X-Git-Tag: v1_8_0_110~486^2~999 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=6ebb8d9c24282b0ca7f08808924083b732a564c9;p=fs%2Flustre-release.git 1)merge filter_group support from HEAD to cmd_new 2)some fixes after this merge --- diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index b297be5..c4f7eff 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -487,6 +487,7 @@ struct obd_connect_data { __u32 ocd_version; /* lustre release version number */ __u32 ocd_grant; /* initial cache grant amount (bytes) */ __u32 ocd_index; /* LOV index to connect to */ + __u32 ocd_group; /* MDS group on OST */ __u32 ocd_brw_size; /* Maximum BRW size in bytes */ __u64 ocd_ibits_known; /* inode bits this client understands */ __u32 ocd_nllu; /* non-local-lustre-user */ diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index 511ba12..ca5cabb 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -63,6 +63,7 @@ struct filter_export_data { struct list_head fed_mod_list; /* files being modified */ int fed_mod_count;/* items in fed_writing list */ long fed_pending; /* bytes just being written */ + __u32 fed_group; }; struct obd_export { diff --git a/lustre/include/obd.h b/lustre/include/obd.h index d89889c..6192e9a 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -264,8 +264,16 @@ struct obd_device_target { struct lustre_quota_ctxt obt_qctxt; }; +#define FILTER_SUBDIR_COUNT 32 /* set to zero for no subdirs */ + #define FILTER_GROUP_LLOG 1 #define FILTER_GROUP_ECHO 2 +#define FILTER_GROUP_MDS0 3 + +struct filter_subdirs { + cfs_dentry_t *dentry[FILTER_SUBDIR_COUNT]; +}; + struct filter_ext { __u64 fe_start; @@ -277,9 +285,15 @@ struct filter_obd { struct obd_device_target fo_obt; const char *fo_fstype; struct vfsmount *fo_vfsmnt; + + int fo_group_count; cfs_dentry_t *fo_dentry_O; cfs_dentry_t **fo_dentry_O_groups; - cfs_dentry_t **fo_dentry_O_sub; + struct filter_subdirs *fo_dentry_O_sub; + struct semaphore fo_init_lock; /* group initialization lock */ + int fo_committed_group; + + spinlock_t fo_objidlock; /* protect fo_lastobjid */ spinlock_t fo_translock; /* protect fsd_last_transno */ struct file *fo_rcvd_filp; @@ -485,6 +499,7 @@ struct mds_obd { char *mds_profile; struct obd_export *mds_osc_exp; /* XXX lov_exp */ struct lov_desc mds_lov_desc; + __u32 mds_id; obd_id *mds_lov_objids; int mds_lov_objids_size; __u32 mds_lov_objids_in_file; diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 171210c..5d682fa 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -562,7 +562,7 @@ int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm, oa->o_mode = S_IFREG; oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | - OBD_MD_FLCTIME; + OBD_MD_FLCTIME | OBD_MD_FLGROUP; set = ptlrpc_prep_set(); if (set == NULL) { @@ -623,7 +623,7 @@ static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock) check: if (lsm->lsm_oinfo[stripe].loi_id != lock->l_resource->lr_name.name[0]|| - lsm->lsm_oinfo[stripe].loi_gr != lock->l_resource->lr_name.name[1]){ + lsm->lsm_oinfo[stripe].loi_gr != lock->l_resource->lr_name.name[2]){ LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64, lsm->lsm_oinfo[stripe].loi_id, lsm->lsm_oinfo[stripe].loi_gr); @@ -2114,9 +2114,10 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data) RETURN(rc ? rc : -ENOMEM); oa->o_id = lsm->lsm_object_id; - oa->o_valid = OBD_MD_FLID; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | - OBD_MD_FLMTIME | OBD_MD_FLCTIME); + OBD_MD_FLMTIME | OBD_MD_FLCTIME | + OBD_MD_FLGROUP); err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm, 0, OBD_OBJECT_EOF); diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 3d24735..09a9120 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -1380,11 +1380,13 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) if (oa) { oa->o_id = lsm->lsm_object_id; - oa->o_valid = OBD_MD_FLID; + oa->o_gr = lsm->lsm_object_gr; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; flags = OBD_MD_FLTYPE | OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME | - OBD_MD_FLFID | OBD_MD_FLGENER; + OBD_MD_FLFID | OBD_MD_FLGENER | + OBD_MD_FLGROUP; obdo_from_inode(oa, inode, flags); @@ -1807,8 +1809,10 @@ int ll_iocontrol(struct inode *inode, struct file *file, } oinfo.oi_oa->o_id = lsm->lsm_object_id; + oinfo.oi_oa->o_gr = lsm->lsm_object_gr; oinfo.oi_oa->o_flags = flags; - oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS; + oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | + OBD_MD_FLGROUP; obdo_from_inode(oinfo.oi_oa, inode, OBD_MD_FLFID | OBD_MD_FLGENER); diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 4a111b8..7ecf181 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -505,6 +505,7 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, RETURN(ERR_PTR(-ENOMEM)); /* prepare operatoin hint first */ + ll_prepare_md_op_data(op_data, parent, NULL, dentry->d_name.name, dentry->d_name.len, lookup_flags); @@ -1114,8 +1115,9 @@ int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir) GOTO(out_free_memmd, rc = -ENOMEM); oa->o_id = lsm->lsm_object_id; + oa->o_gr = lsm->lsm_object_gr; oa->o_mode = body->mode & S_IFMT; - oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP; if (body->valid & OBD_MD_FLCOOKIE) { oa->o_valid |= OBD_MD_FLCOOKIE; diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 57c8a45..8ad18e5 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -174,7 +174,7 @@ void ll_truncate(struct inode *inode) oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF; oinfo.oi_oa = &oa; oa.o_id = lli->lli_smd->lsm_object_id; - oa.o_valid = OBD_MD_FLID; + oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE | OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME | @@ -220,7 +220,9 @@ int ll_prepare_write(struct file *file, struct page *page, unsigned from, oa.o_mode = inode->i_mode; oa.o_id = lsm->lsm_object_id; - oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE; + oa.o_gr = lsm->lsm_object_gr; + oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | + OBD_MD_FLTYPE | OBD_MD_FLGROUP; obdo_from_inode(&oa, inode, OBD_MD_FLFID | OBD_MD_FLGENER); oinfo.oi_oa = &oa; @@ -363,7 +365,8 @@ void ll_inode_fill_obdo(struct inode *inode, int cmd, struct obdo *oa) lsm = ll_i2info(inode)->lli_smd; oa->o_id = lsm->lsm_object_id; - oa->o_valid = OBD_MD_FLID; + oa->o_gr = lsm->lsm_object_gr; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME; if (cmd & OBD_BRW_WRITE) { oa->o_valid |= OBD_MD_FLEPOCH; diff --git a/lustre/lov/lov_log.c b/lustre/lov/lov_log.c index f7b41f7..5b07c30 100644 --- a/lustre/lov/lov_log.c +++ b/lustre/lov/lov_log.c @@ -89,7 +89,7 @@ static int lov_llog_origin_add(struct llog_ctxt *ctxt, default: break; } - + LASSERT(lsm->lsm_object_gr == loi->loi_gr); rc += llog_add(cctxt, rec, NULL, logcookies + rc, numcookies - rc); } diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 0ff9f57..f4c9ceb 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -1972,12 +1972,15 @@ static int lov_change_cbdata(struct obd_export *exp, if (!exp || !exp->exp_obd) RETURN(-ENODEV); - + + LASSERT(lsm->lsm_object_gr > 0); + lov = &exp->exp_obd->u.lov; for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { struct lov_stripe_md submd; submd.lsm_object_id = loi->loi_id; + submd.lsm_object_gr = lsm->lsm_object_gr; submd.lsm_stripe_count = 0; rc = obd_change_cbdata(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp, &submd, it, data); @@ -2002,6 +2005,7 @@ static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm, if (!exp || !exp->exp_obd) RETURN(-ENODEV); + LASSERT(lsm->lsm_object_gr > 0); LASSERT(lockh); lov = &exp->exp_obd->u.lov; rc = lov_prep_cancel_set(exp, &oinfo, lsm, mode, lockh, &set); @@ -2058,6 +2062,7 @@ static int lov_cancel_unused(struct obd_export *exp, ASSERT_LSM_MAGIC(lsm); + LASSERT(lsm->lsm_object_gr > 0); for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { struct lov_stripe_md submd; int err; @@ -2067,6 +2072,7 @@ static int lov_cancel_unused(struct obd_export *exp, CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); submd.lsm_object_id = loi->loi_id; + submd.lsm_object_gr = lsm->lsm_object_gr; submd.lsm_stripe_count = 0; err = obd_cancel_unused(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp, &submd, flags, opaque); @@ -2445,7 +2451,7 @@ static int lov_set_info_async(struct obd_export *exp, obd_count keylen, } if (KEY_IS(KEY_MDS_CONN) || KEY_IS("unlinked")) { - if (vallen != 0) + if (vallen != 0 && KEY_IS("unlinked")) GOTO(out, rc = -EINVAL); } else { GOTO(out, rc = -EINVAL); diff --git a/lustre/lov/lov_qos.c b/lustre/lov/lov_qos.c index 2622590..1d1a056 100644 --- a/lustre/lov/lov_qos.c +++ b/lustre/lov/lov_qos.c @@ -770,6 +770,8 @@ int qos_prep_create(struct obd_export *exp, struct lov_request_set *set) lsm = set->set_oi->oi_md; lsm->lsm_object_id = src_oa->o_id; + lsm->lsm_object_gr = src_oa->o_gr; + if (!lsm->lsm_stripe_size) lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size; if (!lsm->lsm_pattern) { diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index f441f25..2107483 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -326,6 +326,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, /* XXX LOV STACKING: submd should be from the subobj */ req->rq_oi.oi_md->lsm_object_id = loi->loi_id; + req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr; req->rq_oi.oi_md->lsm_stripe_count = 0; req->rq_oi.oi_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid; @@ -437,6 +438,7 @@ int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo, /* XXX LOV STACKING: submd should be from the subobj */ req->rq_oi.oi_md->lsm_object_id = loi->loi_id; + req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr; req->rq_oi.oi_md->lsm_stripe_count = 0; lov_set_add_req(req, set); @@ -520,6 +522,7 @@ int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo, /* XXX LOV STACKING: submd should be from the subobj */ req->rq_oi.oi_md->lsm_object_id = loi->loi_id; + req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr; req->rq_oi.oi_md->lsm_stripe_count = 0; lov_set_add_req(req, set); @@ -595,6 +598,8 @@ static int create_done(struct obd_export *exp, struct lov_request_set *set, LBUG(); } ret_oa->o_id = src_oa->o_id; + ret_oa->o_gr = src_oa->o_gr; + ret_oa->o_valid |= OBD_MD_FLGROUP; memcpy(src_oa, ret_oa, sizeof(*src_oa)); obdo_free(ret_oa); @@ -680,6 +685,7 @@ int lov_update_create_set(struct lov_request_set *set, oti->oti_objid[req->rq_idx] = req->rq_oi.oi_oa->o_id; loi->loi_id = req->rq_oi.oi_oa->o_id; + loi->loi_gr = req->rq_oi.oi_oa->o_gr; loi->loi_ost_idx = req->rq_idx; CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n", lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx); @@ -1184,6 +1190,8 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo, memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, sizeof(*req->rq_oi.oi_oa)); req->rq_oi.oi_oa->o_id = loi->loi_id; + LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) + || req->rq_oi.oi_oa->o_gr>0); req->rq_oi.oi_oa->o_stripe_idx = i; req->rq_oi.oi_cb_up = cb_setattr_update; req->rq_rqset = set; @@ -1289,6 +1297,9 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo, memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, sizeof(*req->rq_oi.oi_oa)); req->rq_oi.oi_oa->o_id = loi->loi_id; + req->rq_oi.oi_oa->o_gr = loi->loi_gr; + req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP; + req->rq_oi.oi_oa->o_stripe_idx = i; req->rq_oi.oi_cb_up = cb_update_punch; req->rq_rqset = set; diff --git a/lustre/mdd/mdd_lov.c b/lustre/mdd/mdd_lov.c index 846f669..fa7663e 100644 --- a/lustre/mdd/mdd_lov.c +++ b/lustre/mdd/mdd_lov.c @@ -123,7 +123,7 @@ int mdd_init_obd(const struct lu_context *ctxt, struct mdd_device *mdd, CERROR("can not find obd %s \n", MDD_OBD_NAME); LBUG(); } - + obd->u.mds.mds_id = index; rc = class_setup(obd, lcfg); if (rc) GOTO(class_detach, rc); @@ -347,10 +347,11 @@ int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd, oa->o_uid = 0; /* must have 0 uid / gid on OST */ oa->o_gid = 0; + oa->o_gr = FILTER_GROUP_MDS0 + mdd2lu_dev(mdd)->ld_site->ls_node_id; oa->o_mode = S_IFREG | 0600; oa->o_id = mdd_lov_create_id(lu_object_fid(mdd2lu_obj(child))); oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS | - OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID; + OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP; oa->o_size = 0; if (!(create_flags & MDS_OPEN_HAS_OBJS)) { @@ -360,6 +361,8 @@ int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd, 0, &lsm, (void*)eadata); if (rc) GOTO(out_oa, rc); + lsm->lsm_object_id = oa->o_id; + lsm->lsm_object_gr = oa->o_gr; } else if (parent != NULL) { /* get lov ea from parent and set to lov */ struct lov_mds_md *__lmm; @@ -389,6 +392,7 @@ int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd, } GOTO(out_oa, rc); } + LASSERT(lsm->lsm_object_gr >= FILTER_GROUP_MDS0); } else { LASSERT(eadata != NULL); rc = obd_iocontrol(OBD_IOC_LOV_SETEA, lov_exp, 0, &lsm, @@ -396,6 +400,7 @@ int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd, if (rc) GOTO(out_oa, rc); lsm->lsm_object_id = oa->o_id; + lsm->lsm_object_gr = oa->o_gr; } /*Sometimes, we may truncate some object(without lsm) *then open (with write flags)it, so creating lsm above. @@ -408,6 +413,8 @@ int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd, memset(oinfo, 0, sizeof(*oinfo)); oa->o_size = la->la_size; + /* when setting attr to ost, FLBKSZ is not needed */ + oa->o_valid &= ~OBD_MD_FLBLKSZ; obdo_from_la(oa, la, OBD_MD_FLTYPE | OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLSIZE); diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index d65b366..d7e7645 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -432,6 +432,7 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, GOTO(out_oa, rc); } oinfo.oi_md->lsm_object_id = oinfo.oi_oa->o_id; + oinfo.oi_md->lsm_object_gr = oinfo.oi_oa->o_gr; } if (inode->i_size) { oinfo.oi_oa->o_size = inode->i_size; diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 4bdc262..65da4b8 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -63,6 +63,8 @@ #include "filter_internal.h" +/* Group 0 is no longer a legal group, to catch uninitialized IDs */ +#define FILTER_MIN_GROUPS 3 static struct lvfs_callback_ops filter_lvfs_ops; kmem_cache_t *ll_fmd_cachep; @@ -678,9 +680,11 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) * need to be set up like real exports as filter_connect() does. */ exp = class_new_export(obd, (struct obd_uuid *)fcd->fcd_uuid); + CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64 - " srv lr: "LPU64"\n", fcd->fcd_uuid, cl_idx, - last_rcvd, le64_to_cpu(fsd->lsd_last_transno)); + " srv lr: "LPU64" fcd_group %d\n", fcd->fcd_uuid, cl_idx, + last_rcvd, le64_to_cpu(fsd->lsd_last_transno), + le32_to_cpu(fcd->fcd_group)); if (IS_ERR(exp)) { if (PTR_ERR(exp) == -EALREADY) { /* export already exists, zero out this one */ @@ -693,6 +697,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) } else { fed = &exp->exp_filter_data; fed->fed_fcd = fcd; + fed->fed_group = le32_to_cpu(fcd->fcd_group); rc = filter_client_add(obd, filter, fed, cl_idx); LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */ @@ -753,34 +758,38 @@ static int filter_cleanup_groups(struct obd_device *obd) struct filter_obd *filter = &obd->u.filter; struct file *filp; struct dentry *dentry; - int i; + int i, j; ENTRY; if (filter->fo_dentry_O_groups != NULL) { - for (i = 0; i < FILTER_GROUPS; i++) { + for (i = 0; i < filter->fo_group_count; i++) { dentry = filter->fo_dentry_O_groups[i]; if (dentry != NULL) f_dput(dentry); } OBD_FREE(filter->fo_dentry_O_groups, - FILTER_GROUPS * sizeof(*filter->fo_dentry_O_groups)); + filter->fo_group_count * + sizeof(*filter->fo_dentry_O_groups)); filter->fo_dentry_O_groups = NULL; } if (filter->fo_last_objid_files != NULL) { - for (i = 0; i < FILTER_GROUPS; i++) { + for (i = 0; i < filter->fo_group_count; i++) { filp = filter->fo_last_objid_files[i]; if (filp != NULL) filp_close(filp, 0); } OBD_FREE(filter->fo_last_objid_files, - FILTER_GROUPS * sizeof(*filter->fo_last_objid_files)); + filter->fo_group_count * + sizeof(*filter->fo_last_objid_files)); filter->fo_last_objid_files = NULL; } if (filter->fo_dentry_O_sub != NULL) { - for (i = 0; i < filter->fo_subdir_count; i++) { - dentry = filter->fo_dentry_O_sub[i]; - if (dentry != NULL) - f_dput(dentry); + for (i = 0; i < filter->fo_group_count; i++) { + for (j = 0; j < filter->fo_subdir_count; j++) { + dentry = filter->fo_dentry_O_sub[i].dentry[j]; + if (dentry != NULL) + f_dput(dentry); + } } OBD_FREE(filter->fo_dentry_O_sub, filter->fo_subdir_count * @@ -789,7 +798,8 @@ static int filter_cleanup_groups(struct obd_device *obd) } if (filter->fo_last_objids != NULL) { OBD_FREE(filter->fo_last_objids, - FILTER_GROUPS * sizeof(*filter->fo_last_objids)); + filter->fo_group_count * + sizeof(*filter->fo_last_objids)); filter->fo_last_objids = NULL; } if (filter->fo_dentry_O != NULL) { @@ -799,13 +809,236 @@ static int filter_cleanup_groups(struct obd_device *obd) RETURN(0); } +static int filter_update_last_group(struct obd_device *obd, int group) +{ + struct filter_obd *filter = &obd->u.filter; + struct file *filp = NULL; + int last_group = 0, rc; + loff_t off = 0; + ENTRY; + + if (group <= filter->fo_committed_group) + RETURN(0); + + filp = filp_open("LAST_GROUP", O_RDWR, 0700); + if (IS_ERR(filp)) { + rc = PTR_ERR(filp); + filp = NULL; + CERROR("cannot open LAST_GROUP: rc = %d\n", rc); + GOTO(cleanup, rc); + } + + rc = fsfilt_read_record(obd, filp, &last_group, sizeof(__u32), &off); + if (rc) { + CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n",rc); + GOTO(cleanup, rc); + } + LASSERT(off == 0 || last_group >= FILTER_MIN_GROUPS); + CDEBUG(D_INODE, "%s: previous %d, new %d\n", + obd->obd_name, last_group, group); + + off = 0; + last_group = group; + /* must be sync: bXXXX */ + rc = fsfilt_write_record(obd, filp, &last_group, sizeof(__u32), &off, 1); + if (rc) { + CDEBUG(D_INODE, "error updating LAST_GROUP: rc %d\n", rc); + GOTO(cleanup, rc); + } + + filter->fo_committed_group = group; +cleanup: + if (filp) + filp_close(filp, 0); + RETURN(rc); +} + +static int filter_read_group_internal(struct obd_device *obd, int group, + int create) +{ + struct filter_obd *filter = &obd->u.filter; + __u64 *new_objids = NULL; + struct filter_subdirs *new_subdirs = NULL, *tmp_subdirs = NULL; + struct dentry **new_groups = NULL; + struct file **new_files = NULL; + struct dentry *dentry; + struct file *filp; + int old_count = filter->fo_group_count, rc, stage = 0, i; + char name[25]; + __u64 last_objid; + loff_t off = 0; + + snprintf(name, 24, "%d", group); + name[24] = '\0'; + + if (!create) { + dentry = ll_lookup_one_len(name, filter->fo_dentry_O, + strlen(name)); + if (IS_ERR(dentry)) { + CERROR("Cannot lookup expected object group %d: %ld\n", + group, PTR_ERR(dentry)); + RETURN(PTR_ERR(dentry)); + } + } else { + dentry = simple_mkdir(filter->fo_dentry_O, name, 0700, 1); + if (IS_ERR(dentry)) { + CERROR("cannot lookup/create O/%s: rc = %ld\n", name, + PTR_ERR(dentry)); + RETURN(PTR_ERR(dentry)); + } + } + stage = 1; + + snprintf(name, 24, "O/%d/LAST_ID", group); + name[24] = '\0'; + filp = filp_open(name, O_CREAT | O_RDWR, 0700); + if (IS_ERR(filp)) { + CERROR("cannot create %s: rc = %ld\n", name, PTR_ERR(filp)); + GOTO(cleanup, rc = PTR_ERR(filp)); + } + stage = 2; + + rc = fsfilt_read_record(obd, filp, &last_objid, sizeof(__u64), &off); + if (rc) { + CDEBUG(D_INODE, "error reading %s: rc %d\n", name, rc); + GOTO(cleanup, rc); + } + + if (filter->fo_subdir_count) { + OBD_ALLOC(tmp_subdirs, sizeof(*tmp_subdirs)); + if (tmp_subdirs == NULL) + GOTO(cleanup, rc = -ENOMEM); + stage = 3; + + for (i = 0; i < filter->fo_subdir_count; i++) { + char dir[20]; + snprintf(dir, sizeof(dir), "d%u", i); + + tmp_subdirs->dentry[i] = simple_mkdir(dentry, dir, 0700, 1); + if (IS_ERR(tmp_subdirs->dentry[i])) { + rc = PTR_ERR(tmp_subdirs->dentry[i]); + CERROR("can't lookup/create O/%d/%s: rc = %d\n", + group, dir, rc); + GOTO(cleanup, rc); + } + + CDEBUG(D_INODE, "got/created O/%d/%s: %p\n", group, dir, + tmp_subdirs->dentry[i]); + } + } + + /* 'group' is an index; we need an array of length 'group + 1' */ + if (group + 1 > old_count) { + int len = group + 1; + OBD_ALLOC(new_objids, len * sizeof(*new_objids)); + OBD_ALLOC(new_subdirs, len * sizeof(*new_subdirs)); + OBD_ALLOC(new_groups, len * sizeof(*new_groups)); + OBD_ALLOC(new_files, len * sizeof(*new_files)); + stage = 4; + if (new_objids == NULL || new_subdirs == NULL || + new_groups == NULL || new_files == NULL) + GOTO(cleanup, rc = -ENOMEM); + + memcpy(new_objids, filter->fo_last_objids, + old_count * sizeof(*new_objids)); + memcpy(new_subdirs, filter->fo_dentry_O_sub, + old_count * sizeof(*new_subdirs)); + memcpy(new_groups, filter->fo_dentry_O_groups, + old_count * sizeof(*new_groups)); + memcpy(new_files, filter->fo_last_objid_files, + old_count * sizeof(*new_files)); + + if (old_count) { + OBD_FREE(filter->fo_last_objids, + old_count * sizeof(*new_objids)); + OBD_FREE(filter->fo_dentry_O_sub, + old_count * sizeof(*new_subdirs)); + OBD_FREE(filter->fo_dentry_O_groups, + old_count * sizeof(*new_groups)); + OBD_FREE(filter->fo_last_objid_files, + old_count * sizeof(*new_files)); + } + filter->fo_last_objids = new_objids; + filter->fo_dentry_O_sub = new_subdirs; + filter->fo_dentry_O_groups = new_groups; + filter->fo_last_objid_files = new_files; + filter->fo_group_count = len; + } + + filter->fo_dentry_O_groups[group] = dentry; + filter->fo_last_objid_files[group] = filp; + if (filter->fo_subdir_count) { + filter->fo_dentry_O_sub[group] = *tmp_subdirs; + OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs)); + } + + filter_update_last_group(obd, group); + + if (filp->f_dentry->d_inode->i_size == 0) { + filter->fo_last_objids[group] = FILTER_INIT_OBJID; + rc = filter_update_last_objid(obd, group, 1); + RETURN(rc); + } + + filter->fo_last_objids[group] = le64_to_cpu(last_objid); + CDEBUG(D_INODE, "%s: server last_objid group %d: "LPU64"\n", + obd->obd_name, group, last_objid); + RETURN(0); + cleanup: + switch (stage) { + case 4: + if (new_objids != NULL) + OBD_FREE(new_objids, group * sizeof(*new_objids)); + if (new_subdirs != NULL) + OBD_FREE(new_subdirs, group * sizeof(*new_subdirs)); + if (new_groups != NULL) + OBD_FREE(new_groups, group * sizeof(*new_groups)); + if (new_files != NULL) + OBD_FREE(new_files, group * sizeof(*new_files)); + case 3: + if (filter->fo_subdir_count) { + for (i = 0; i < filter->fo_subdir_count; i++) { + if (tmp_subdirs->dentry[i] != NULL) + dput(tmp_subdirs->dentry[i]); + } + OBD_FREE(tmp_subdirs, sizeof(*tmp_subdirs)); + } + case 2: + filp_close(filp, 0); + case 1: + dput(dentry); + } + RETURN(rc); +} + +static int filter_read_groups(struct obd_device *obd, int last_group, + int create) +{ + struct filter_obd *filter = &obd->u.filter; + int old_count, group, rc = 0; + + down(&filter->fo_init_lock); + old_count = filter->fo_group_count; + for (group = old_count; group <= last_group; group++) { + if (group == 0) + continue; /* no group zero */ + + rc = filter_read_group_internal(obd, group, create); + if (rc != 0) + break; + } + up(&filter->fo_init_lock); + return rc; +} + /* FIXME: object groups */ static int filter_prep_groups(struct obd_device *obd) { struct filter_obd *filter = &obd->u.filter; struct dentry *dentry, *O_dentry; struct file *filp; - int i, rc = 0, cleanup_phase = 0; + int last_group, rc = 0, cleanup_phase = 0; + loff_t off = 0; ENTRY; O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1); @@ -866,6 +1099,53 @@ static int filter_prep_groups(struct obd_device *obd) f_dput(dentry); } + cleanup_phase = 2; /* groups */ + + /* we have to initialize all groups before first connections from + * clients because they may send create/destroy for any group -bzzz */ + filp = filp_open("LAST_GROUP", O_CREAT | O_RDWR, 0700); + if (IS_ERR(filp)) { + CERROR("cannot create LAST_GROUP: rc = %ld\n", PTR_ERR(filp)); + GOTO(cleanup, rc = PTR_ERR(filp)); + } + cleanup_phase = 3; /* filp */ + + rc = fsfilt_read_record(obd, filp, &last_group, sizeof(__u32), &off); + if (rc) { + CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n", rc); + GOTO(cleanup, rc); + } + if (off == 0) { + last_group = FILTER_MIN_GROUPS; + } else { + LASSERT(last_group >= FILTER_MIN_GROUPS); + } + + CWARN("%s: initialize groups [%d,%d]\n", obd->obd_name, + FILTER_MIN_GROUPS, last_group); + filter->fo_committed_group = last_group; + rc = filter_read_groups(obd, last_group, 1); + if (rc) + GOTO(cleanup, rc); + + filp_close(filp, 0); + RETURN(0); + + cleanup: + switch (cleanup_phase) { + case 3: + filp_close(filp, 0); + case 2: + filter_cleanup_groups(obd); + case 1: + f_dput(filter->fo_dentry_O); + filter->fo_dentry_O = NULL; + default: + break; + } + return rc; + +#if 0 OBD_ALLOC(filter->fo_last_objids, FILTER_GROUPS * sizeof(__u64)); if (filter->fo_last_objids == NULL) GOTO(cleanup, rc = -ENOMEM); @@ -950,6 +1230,7 @@ static int filter_prep_groups(struct obd_device *obd) cleanup: filter_cleanup_groups(obd); return rc; +#endif } /* setup the object store with correct subdirectories */ @@ -1047,8 +1328,9 @@ static void filter_post(struct obd_device *obd) if (rc) CERROR("error writing server data: rc = %d\n", rc); - for (i = 0; i < FILTER_GROUPS; i++) { - rc = filter_update_last_objid(obd, i, (i == FILTER_GROUPS - 1)); + for (i = 0; i < filter->fo_group_count; i++) { + rc = filter_update_last_objid(obd, i, + (i == filter->fo_group_count - 1)); if (rc) CERROR("error writing group %d lastobjid: rc = %d\n", i, rc); @@ -1073,7 +1355,7 @@ static void filter_set_last_id(struct filter_obd *filter, obd_id id, obd_gr group) { LASSERT(filter->fo_fsd != NULL); - LASSERT(group <= FILTER_GROUPS); + LASSERT(group <= filter->fo_group_count); spin_lock(&filter->fo_objidlock); filter->fo_last_objids[group] = id; @@ -1084,7 +1366,7 @@ obd_id filter_last_id(struct filter_obd *filter, obd_gr group) { obd_id id; LASSERT(filter->fo_fsd != NULL); - LASSERT(group <= FILTER_GROUPS); + LASSERT(group <= filter->fo_group_count); /* FIXME: object groups */ spin_lock(&filter->fo_objidlock); @@ -1104,12 +1386,14 @@ static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent) struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid) { struct filter_obd *filter = &obd->u.filter; - LASSERT(group < FILTER_GROUPS); /* FIXME: object groups */ + struct filter_subdirs *subdirs; + LASSERT(group < filter->fo_group_count); /* FIXME: object groups */ if (group > 0 || filter->fo_subdir_count == 0) return filter->fo_dentry_O_groups[group]; - return filter->fo_dentry_O_sub[objid & (filter->fo_subdir_count - 1)]; + subdirs = &filter->fo_dentry_O_sub[group]; + return subdirs->dentry[objid & (filter->fo_subdir_count - 1)]; } /* We never dget the object parent, so DON'T dput it either */ @@ -1195,11 +1479,12 @@ struct dentry *filter_fid2dentry(struct obd_device *obd, RETURN(dchild); } -static int filter_prepare_destroy(struct obd_device *obd, obd_id objid) +static int filter_prepare_destroy(struct obd_device *obd, obd_id objid, + obd_id group) { struct lustre_handle lockh; int flags = LDLM_AST_DISCARD_DATA, rc; - struct ldlm_res_id res_id = { .name = { objid } }; + struct ldlm_res_id res_id = { .name = { objid, 0, group, 0} }; ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } }; ENTRY; @@ -1640,7 +1925,10 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, obd->obd_lvfs_ctxt.pwd = mnt->mnt_root; obd->obd_lvfs_ctxt.fs = get_ds(); obd->obd_lvfs_ctxt.cb_ops = filter_lvfs_ops; - + + sema_init(&filter->fo_init_lock, 1); + filter->fo_committed_group = 0; + rc = filter_prep(obd); if (rc) GOTO(err_ops, rc); @@ -2009,10 +2297,12 @@ static int filter_connect(const struct lu_context *ctx, struct obd_uuid *cluuid, struct obd_connect_data *data) { + struct lvfs_run_ctxt saved; struct obd_export *exp; struct filter_export_data *fed; struct filter_client_data *fcd = NULL; struct filter_obd *filter = &obd->u.filter; + __u32 group; int rc; ENTRY; @@ -2031,19 +2321,42 @@ static int filter_connect(const struct lu_context *ctx, if (rc) GOTO(cleanup, rc); - if (!obd->obd_replayable) - GOTO(cleanup, rc = 0); + group = data->ocd_group; + if (obd->obd_replayable) { + OBD_ALLOC(fcd, sizeof(*fcd)); + if (!fcd) { + CERROR("filter: out of memory for client data\n"); + GOTO(cleanup, rc = -ENOMEM); + } - OBD_ALLOC(fcd, sizeof(*fcd)); - if (!fcd) { - CERROR("filter: out of memory for client data\n"); - GOTO(cleanup, rc = -ENOMEM); + memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid)); + fed->fed_fcd = fcd; + fed->fed_fcd->fcd_group = group; + rc = filter_client_add(obd, filter, fed, -1); + if (rc) + GOTO(cleanup, rc); + } + CWARN("%s: Received MDS connection ("LPX64"); group %d\n", + obd->obd_name, exp->exp_handle.h_cookie, group); + if (group == 0) + GOTO(cleanup, rc); + + if (fed->fed_group != 0 && fed->fed_group != group) { + CERROR("!!! This export (nid %s) used object group %d " + "earlier; now it's trying to use group %d! This could " + "be a bug in the MDS. Tell CFS.\n", + obd_export_nid2str(exp), fed->fed_group, group); + GOTO(cleanup, rc = -EPROTO); } + fed->fed_group = group; - memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid)); - fed->fed_fcd = fcd; - - rc = filter_client_add(obd, filter, fed, -1); + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + rc = filter_read_groups(obd, group, 1); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + if (rc != 0) { + CERROR("can't read group %u\n", group); + GOTO(cleanup, rc); + } GOTO(cleanup, rc); @@ -2239,11 +2552,12 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd, struct obdo *oa, const char *what, int quiet) { struct dentry *dchild = NULL; + obd_gr group = 0; - if (!(oa->o_valid & OBD_MD_FLGROUP)) - oa->o_gr = 0; + if (oa->o_valid & OBD_MD_FLGROUP) + group = oa->o_gr; - dchild = filter_fid2dentry(obd, NULL, oa->o_gr, oa->o_id); + dchild = filter_fid2dentry(obd, NULL, group, oa->o_id); if (IS_ERR(dchild)) { CERROR("%s error looking up object: "LPU64"\n", @@ -2301,7 +2615,6 @@ int filter_update_fidea(struct obd_export *exp, struct inode *inode, if (!(oa->o_valid & OBD_MD_FLGROUP)) oa->o_gr = 0; - /* packing fid and converting it to LE for storing into EA. * Here ->o_stripe_idx should be filled by LOV and rest of * fields - by client. */ @@ -2462,7 +2775,8 @@ out_unlock: int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, struct obd_trans_info *oti) { - struct ldlm_res_id res_id = { .name = { oinfo->oi_oa->o_id } }; + struct ldlm_res_id res_id = { .name = { oinfo->oi_oa->o_id, 0, + oinfo->oi_oa->o_gr, 0 } }; struct ldlm_valblock_ops *ns_lvbo; struct filter_mod_data *fmd; struct lvfs_run_ctxt saved; @@ -2581,12 +2895,12 @@ static int filter_destroy_precreated(struct obd_export *exp, struct obdo *oa, LASSERT(down_trylock(&filter->fo_create_lock) != 0); memset(&doa, 0, sizeof(doa)); - if (oa->o_valid & OBD_MD_FLGROUP) { - doa.o_valid |= OBD_MD_FLGROUP; - doa.o_gr = oa->o_gr; - } else { - doa.o_gr = 0; - } + + LASSERT(oa->o_gr != 0); + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + + doa.o_valid |= OBD_MD_FLGROUP; + doa.o_gr = oa->o_gr; doa.o_mode = S_IFREG; if (!filter->fo_destroy_in_progress) { @@ -2893,15 +3207,36 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, static int filter_create(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md **ea, struct obd_trans_info *oti) { + struct filter_export_data *fed; struct obd_device *obd = NULL; + struct filter_obd *filter; struct lvfs_run_ctxt saved; struct lov_stripe_md *lsm = NULL; - int rc = 0, diff; + int rc = 0, diff, group = oa->o_gr; ENTRY; + if (!(oa->o_valid & OBD_MD_FLGROUP) || group == 0) { + CERROR("!!! nid %s sent invalid object group %d\n", + obd_export_nid2str(exp), group); + RETURN(-EINVAL); + } + + obd = exp->exp_obd; + fed = &exp->exp_filter_data; + filter = &obd->u.filter; + + if (fed->fed_group != group) { + CERROR("!!! this export (nid %s) used object group %d " + "earlier; now it's trying to use group %d! This could " + "be a bug in the MDS. Tell CFS.\n", + obd_export_nid2str(exp), fed->fed_group, group); + RETURN(-ENOTUNIQ); + } + +#if 0 if (!(oa->o_valid & OBD_MD_FLGROUP)) oa->o_gr = 0; - +#endif CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n", oa->o_gr, oa->o_id); if (ea != NULL) { @@ -2918,14 +3253,12 @@ static int filter_create(struct obd_export *exp, struct obdo *oa, if ((oa->o_valid & OBD_MD_FLFLAGS) && (oa->o_flags & OBD_FL_RECREATE_OBJS)) { - if (oa->o_id > filter_last_id(&obd->u.filter, oa->o_gr)) { + if (oa->o_id > filter_last_id(filter, oa->o_gr)) { CERROR("recreate objid "LPU64" > last id "LPU64"\n", - oa->o_id, filter_last_id(&obd->u.filter, + oa->o_id, filter_last_id(filter, oa->o_gr)); rc = -EINVAL; } else { - struct filter_obd *filter = &obd->u.filter; - diff = 1; down(&filter->fo_create_lock); rc = filter_precreate(obd, oa, oa->o_gr, &diff); @@ -2965,9 +3298,11 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, struct iattr iattr; ENTRY; + LASSERT(oa->o_valid & OBD_MD_FLGROUP); +#if 0 if (!(oa->o_valid & OBD_MD_FLGROUP)) oa->o_gr = 0; - +#endif obd = exp->exp_obd; filter = &obd->u.filter; @@ -2992,7 +3327,7 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, GOTO(cleanup, rc = -ENOENT); } - filter_prepare_destroy(obd, oa->o_id); + filter_prepare_destroy(obd, oa->o_id, oa->o_gr); /* Our MDC connection is established by the MDS to us */ if (oa->o_valid & OBD_MD_FLCOOKIE) { diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index 685fdf0..f8ff9d5 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -17,7 +17,6 @@ #define FILTER_INIT_OBJID 0 -#define FILTER_SUBDIR_COUNT 32 /* set to zero for no subdirs */ #define FILTER_GROUPS 3 /* must be at least 3; not dynamic yet */ #define FILTER_ROCOMPAT_SUPP (0) @@ -35,7 +34,8 @@ struct filter_client_data { __u8 fcd_uuid[40]; /* client UUID */ __u64 fcd_last_rcvd; /* last completed transaction ID */ __u64 fcd_last_xid; /* client RPC xid for the last transaction */ - __u8 fcd_padding[LR_CLIENT_SIZE - 56]; + __u32 fcd_group; /* mds group */ + __u8 fcd_padding[LR_CLIENT_SIZE - 60]; }; /* Limit the returned fields marked valid to those that we actually might set */ diff --git a/lustre/obdfilter/filter_log.c b/lustre/obdfilter/filter_log.c index 001a0ad..d8cebef 100644 --- a/lustre/obdfilter/filter_log.c +++ b/lustre/obdfilter/filter_log.c @@ -136,6 +136,7 @@ static int filter_recov_log_unlink_cb(struct llog_ctxt *ctxt, oa->o_valid |= OBD_MD_FLCOOKIE; oa->o_id = lur->lur_oid; oa->o_gr = lur->lur_ogen; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; memcpy(obdo_logcookie(oa), cookie, sizeof(*cookie)); oid = oa->o_id; @@ -174,6 +175,7 @@ static int filter_recov_log_setattr_cb(struct llog_ctxt *ctxt, OBD_MD_FLCOOKIE); oinfo.oi_oa->o_id = lsr->lsr_oid; oinfo.oi_oa->o_gr = lsr->lsr_ogen; + oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; oinfo.oi_oa->o_uid = lsr->lsr_uid; oinfo.oi_oa->o_gid = lsr->lsr_gid; memcpy(obdo_logcookie(oinfo.oi_oa), cookie, sizeof(*cookie)); diff --git a/lustre/obdfilter/filter_lvb.c b/lustre/obdfilter/filter_lvb.c index 06946fe..59e4226 100644 --- a/lustre/obdfilter/filter_lvb.c +++ b/lustre/obdfilter/filter_lvb.c @@ -69,7 +69,8 @@ static int filter_lvbo_init(struct ldlm_resource *res) obd = res->lr_namespace->ns_lvbp; LASSERT(obd != NULL); - dentry = filter_fid2dentry(obd, NULL, 0, res->lr_name.name[0]); + dentry = filter_fid2dentry(obd, NULL, res->lr_name.name[2], + res->lr_name.name[0]); if (IS_ERR(dentry)) { rc = PTR_ERR(dentry); CERROR("%s: bad object "LPU64"/"LPU64": rc %d\n", obd->obd_name, @@ -164,9 +165,9 @@ static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m, disk_update: /* Update the LVB from the disk inode */ obd = res->lr_namespace->ns_lvbp; - LASSERT(obd); - - dentry = filter_fid2dentry(obd, NULL, 0, res->lr_name.name[0]); + + dentry = filter_fid2dentry(obd, NULL, res->lr_name.name[2], + res->lr_name.name[0]); if (IS_ERR(dentry)) GOTO(out, rc = PTR_ERR(dentry)); diff --git a/lustre/obdfilter/lproc_obdfilter.c b/lustre/obdfilter/lproc_obdfilter.c index 4f165a8..37ae2e9 100644 --- a/lustre/obdfilter/lproc_obdfilter.c +++ b/lustre/obdfilter/lproc_obdfilter.c @@ -36,8 +36,9 @@ static int lprocfs_filter_rd_groups(char *page, char **start, off_t off, int count, int *eof, void *data) { + struct obd_device *obd = (struct obd_device *)data; *eof = 1; - return snprintf(page, count, "%u\n", FILTER_GROUPS); + return snprintf(page, count, "%u\n", obd->u.filter.fo_group_count); } static int lprocfs_filter_rd_tot_dirty(char *page, char **start, off_t off, diff --git a/lustre/osc/osc_create.c b/lustre/osc/osc_create.c index 5665c88..3f6c15b 100644 --- a/lustre/osc/osc_create.c +++ b/lustre/osc/osc_create.c @@ -239,10 +239,16 @@ int osc_create(struct obd_export *exp, struct obdo *oa, ENTRY; LASSERT(oa); LASSERT(ea); - + + LASSERT(oa->o_gr > 0); + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + /* + * FIXME: Now we can only create the object without cache, + * since we have only 1 oscc, only support one cache, will + * fix it soon Wangdi + */ if ((oa->o_valid & OBD_MD_FLGROUP) && (oa->o_gr != 0)) RETURN(osc_real_create(exp, oa, ea, oti)); - if ((oa->o_valid & OBD_MD_FLFLAGS) && oa->o_flags == OBD_FL_RECREATE_OBJS) { RETURN(osc_real_create(exp, oa, ea, oti)); diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index c53dff4..39fc297 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -88,7 +88,9 @@ static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, if (lsm) { LASSERT(lsm->lsm_object_id); + LASSERT(lsm->lsm_object_gr); (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id); + (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr); } RETURN(lmm_size); @@ -135,7 +137,9 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, if (lmm != NULL) { /* XXX zero *lsmp? */ (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id); + (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr); LASSERT((*lsmp)->lsm_object_id); + LASSERT((*lsmp)->lsm_object_gr); } (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; @@ -250,6 +254,8 @@ static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo, int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; ENTRY; + LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) || + oinfo->oi_oa->o_gr > 0); req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION, OST_SETATTR, 2, size, NULL); if (!req) @@ -400,6 +406,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, * This needs to be fixed in a big way. */ lsm->lsm_object_id = oa->o_id; + lsm->lsm_object_gr = oa->o_gr; *ea = lsm; if (oti != NULL) { @@ -2800,9 +2807,12 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data, static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, ldlm_iterator_t replace, void *data) { - struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} }; + struct ldlm_res_id res_id = { .name = {0} }; struct obd_device *obd = class_exp2obd(exp); + res_id.name[0] = lsm->lsm_object_id; + res_id.name[2] = lsm->lsm_object_gr; + ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); return 0; } @@ -2882,7 +2892,7 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req, static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, struct obd_enqueue_info *einfo) { - struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} }; + struct ldlm_res_id res_id = { .name = {0} }; struct obd_device *obd = exp->exp_obd; struct ldlm_reply *rep; struct ptlrpc_request *req = NULL; @@ -2890,6 +2900,9 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, int rc; ENTRY; + res_id.name[0] = oinfo->oi_md->lsm_object_id; + res_id.name[2] = oinfo->oi_md->lsm_object_gr; + /* Filesystem lock extents are extended to page boundaries so that * dealing with the page cache is a little smoother. */ oinfo->oi_policy.l_extent.start -= @@ -3010,10 +3023,13 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm, __u32 type, ldlm_policy_data_t *policy, __u32 mode, int *flags, void *data, struct lustre_handle *lockh) { - struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} }; + struct ldlm_res_id res_id = { .name = {0} }; struct obd_device *obd = exp->exp_obd; int rc; ENTRY; + + res_id.name[0] = lsm->lsm_object_id; + res_id.name[2] = lsm->lsm_object_gr; OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO); @@ -3065,20 +3081,32 @@ static int osc_cancel_unused(struct obd_export *exp, struct lov_stripe_md *lsm, int flags, void *opaque) { - struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} }; struct obd_device *obd = class_exp2obd(exp); + struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL; + + if (lsm != NULL) { + res_id.name[0] = lsm->lsm_object_id; + res_id.name[2] = lsm->lsm_object_gr; + resp = &res_id; + } - return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, - flags, opaque); + return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, + opaque); } static int osc_join_lru(struct obd_export *exp, struct lov_stripe_md *lsm, int join) { struct obd_device *obd = class_exp2obd(exp); - struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} }; + struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL; + + if (lsm != NULL) { + res_id.name[0] = lsm->lsm_object_id; + res_id.name[2] = lsm->lsm_object_gr; + resp = &res_id; + } - return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join); + return ldlm_cli_join_lru(obd->obd_namespace, resp, join); } static int osc_statfs_interpret(struct ptlrpc_request *req, @@ -3203,12 +3231,14 @@ static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump) RETURN(-ENOMEM); lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id; + lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr; } else { lum_size = sizeof(lum); lumk = &lum; } lumk->lmm_object_id = lsm->lsm_object_id; + lumk->lmm_object_gr = lsm->lsm_object_gr; lumk->lmm_stripe_count = 1; if (copy_to_user(lump, lumk, lum_size))