From a497ceddc22e86692bd1064b1d2b3a2a127722a5 Mon Sep 17 00:00:00 2001 From: lsy Date: Thu, 5 Oct 2006 12:59:47 +0000 Subject: [PATCH] revised capability code to not break md stack according to nikita's advice. several other capability code fixes. --- lustre/cmm/cmm_device.c | 10 +- lustre/cmm/cmm_object.c | 2 +- lustre/cmm/cmm_split.c | 6 +- lustre/fid/fid_store.c | 5 +- lustre/fld/fld_index.c | 8 +- lustre/include/dt_object.h | 45 +++-- lustre/include/lu_object.h | 45 +---- lustre/include/lustre/lustre_idl.h | 6 +- lustre/include/lustre_capa.h | 9 +- lustre/include/lustre_req_layout.h | 1 - lustre/include/md_object.h | 10 +- lustre/include/obd_support.h | 2 - lustre/llite/file.c | 1 - lustre/llite/llite_capa.c | 15 +- lustre/llite/llite_lib.c | 10 +- lustre/lmv/lmv_obd.c | 6 +- lustre/mdc/mdc_request.c | 51 ++++- lustre/mdd/mdd_handler.c | 207 +++++++++++-------- lustre/mdd/mdd_internal.h | 16 ++ lustre/mdd/mdd_lov.c | 6 +- lustre/mdd/mdd_orphans.c | 6 +- lustre/mdt/mdt_capa.c | 6 +- lustre/mdt/mdt_handler.c | 159 +++++++++------ lustre/mdt/mdt_internal.h | 52 ++--- lustre/mdt/mdt_lib.c | 38 ++-- lustre/mdt/mdt_lproc.c | 64 ++++++ lustre/mdt/mdt_open.c | 28 ++- lustre/mdt/mdt_recovery.c | 14 +- lustre/mdt/mdt_reint.c | 46 ++--- lustre/obdclass/capa.c | 7 +- lustre/obdclass/dt_object.c | 6 +- lustre/obdclass/lu_object.c | 45 +---- lustre/osd/osd_handler.c | 402 +++++++++++++++++++++++-------------- lustre/osd/osd_oi.c | 9 +- lustre/ptlrpc/layout.c | 13 +- lustre/ptlrpc/lproc_ptlrpc.c | 1 - 36 files changed, 799 insertions(+), 558 deletions(-) diff --git a/lustre/cmm/cmm_device.c b/lustre/cmm/cmm_device.c index fe6d349..43fe651 100644 --- a/lustre/cmm/cmm_device.c +++ b/lustre/cmm/cmm_device.c @@ -86,14 +86,16 @@ static int cmm_maxsize_get(const struct lu_env *env, struct md_device *md, RETURN(rc); } -static int cmm_init_capa_keys(struct md_device *md, +static int cmm_init_capa_ctxt(const struct lu_env *env, struct md_device *md, + __u32 valid, unsigned long timeout, __u32 alg, struct lustre_capa_key *keys) { struct cmm_device *cmm_dev = md2cmm_dev(md); int rc; ENTRY; - LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_keys); - rc = cmm_child_ops(cmm_dev)->mdo_init_capa_keys(cmm_dev->cmm_child, + LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt); + rc = cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt(env, cmm_dev->cmm_child, + valid, timeout, alg, keys); RETURN(rc); } @@ -115,7 +117,7 @@ static struct md_device_operations cmm_md_ops = { .mdo_statfs = cmm_statfs, .mdo_root_get = cmm_root_get, .mdo_maxsize_get = cmm_maxsize_get, - .mdo_init_capa_keys = cmm_init_capa_keys, + .mdo_init_capa_ctxt = cmm_init_capa_ctxt, .mdo_update_capa_key= cmm_update_capa_key, }; diff --git a/lustre/cmm/cmm_object.c b/lustre/cmm/cmm_object.c index cd6191e..953845e 100644 --- a/lustre/cmm/cmm_object.c +++ b/lustre/cmm/cmm_object.c @@ -434,7 +434,7 @@ struct md_object *md_object_find(const struct lu_env *env, struct md_object *m; ENTRY; - o = lu_object_find(env, md2lu_dev(md)->ld_site, f, BYPASS_CAPA); + o = lu_object_find(env, md2lu_dev(md)->ld_site, f); if (IS_ERR(o)) m = (struct md_object *)o; else { diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c index e5a37b6..a9553f0 100644 --- a/lustre/cmm/cmm_split.c +++ b/lustre/cmm/cmm_split.c @@ -127,15 +127,13 @@ static int cmm_alloc_fid(const struct lu_env *env, struct cmm_device *cmm, struct cmm_object *cmm_object_find(const struct lu_env *env, struct cmm_device *d, - const struct lu_fid *f, - struct lustre_capa *capa) + const struct lu_fid *f) { struct lu_object *o; struct cmm_object *m; ENTRY; - o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f, - capa); + o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f); if (IS_ERR(o)) m = (struct cmm_object *)o; else diff --git a/lustre/fid/fid_store.c b/lustre/fid/fid_store.c index 3d72f1f..f396633 100644 --- a/lustre/fid/fid_store.c +++ b/lustre/fid/fid_store.c @@ -89,7 +89,7 @@ int seq_store_write(struct lu_server_seq *seq, rc = dt_obj->do_body_ops->dbo_write(env, dt_obj, seq_record_buf(info), - &pos, th); + &pos, th, BYPASS_CAPA); if (rc == sizeof(info->sti_record)) { CDEBUG(D_INFO|D_WARNING, "%s: Store ranges: Space - " DRANGE", Super - "DRANGE"\n", seq->lss_name, @@ -122,7 +122,8 @@ int seq_store_read(struct lu_server_seq *seq, LASSERT(info != NULL); rc = dt_obj->do_body_ops->dbo_read(env, dt_obj, - seq_record_buf(info), &pos); + seq_record_buf(info), &pos, + BYPASS_CAPA); if (rc == sizeof(info->sti_record)) { range_le_to_cpu(&seq->lss_space, &info->sti_record.ssr_space); diff --git a/lustre/fld/fld_index.c b/lustre/fld/fld_index.c index 8517976..267dbfa 100644 --- a/lustre/fld/fld_index.c +++ b/lustre/fld/fld_index.c @@ -117,7 +117,8 @@ int fld_index_create(struct lu_server_fld *fld, if (!IS_ERR(th)) { rc = dt_obj->do_index_ops->dio_insert(env, dt_obj, fld_rec(env, mds), - fld_key(env, seq), th); + fld_key(env, seq), th, + BYPASS_CAPA); dt_dev->dd_ops->dt_trans_stop(env, th); } else rc = PTR_ERR(th); @@ -140,7 +141,8 @@ int fld_index_delete(struct lu_server_fld *fld, th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &txn); if (!IS_ERR(th)) { rc = dt_obj->do_index_ops->dio_delete(env, dt_obj, - fld_key(env, seq), th); + fld_key(env, seq), th, + BYPASS_CAPA); dt_dev->dd_ops->dt_trans_stop(env, th); } else rc = PTR_ERR(th); @@ -157,7 +159,7 @@ int fld_index_lookup(struct lu_server_fld *fld, ENTRY; rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj, rec, - fld_key(env, seq)); + fld_key(env, seq), BYPASS_CAPA); if (rc == 0) *mds = be64_to_cpu(*(__u64 *)rec); RETURN(rc); diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index 639b865..95dd462 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -95,6 +95,13 @@ struct dt_device_operations { */ int (*dt_sync)(const struct lu_env *env, struct dt_device *dev); void (*dt_ro)(const struct lu_env *env, struct dt_device *dev); + /* + * Initialize capability context. + */ + int (*dt_init_capa_ctxt)(const struct lu_env *env, + struct dt_device *dev, + __u32 valid, unsigned long timeout, + __u32 alg, struct lustre_capa_key *keys); /* * dt get credits from osd @@ -161,7 +168,8 @@ struct dt_object_operations { * precondition: lu_object_exists(&dt->do_lu); */ int (*do_attr_get)(const struct lu_env *env, - struct dt_object *dt, struct lu_attr *attr); + struct dt_object *dt, struct lu_attr *attr, + struct lustre_capa *capa); /* * Set standard attributes. * @@ -170,14 +178,16 @@ struct dt_object_operations { int (*do_attr_set)(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, - struct thandle *handle); + struct thandle *handle, + struct lustre_capa *capa); /* * Return a value of an extended attribute. * * precondition: dt_object_exists(dt); */ int (*do_xattr_get)(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf, const char *name); + struct lu_buf *buf, const char *name, + struct lustre_capa *capa); /* * Set value of an extended attribute. * @@ -187,7 +197,8 @@ struct dt_object_operations { */ int (*do_xattr_set)(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, - const char *name, int fl, struct thandle *handle); + const char *name, int fl, struct thandle *handle, + struct lustre_capa *capa); /* * Delete existing extended attribute. * @@ -195,7 +206,8 @@ struct dt_object_operations { */ int (*do_xattr_del)(const struct lu_env *env, struct dt_object *dt, - const char *name, struct thandle *handle); + const char *name, struct thandle *handle, + struct lustre_capa *capa); /* * Place list of existing extended attributes into @buf (which has * length len). @@ -203,7 +215,8 @@ struct dt_object_operations { * precondition: dt_object_exists(dt); */ int (*do_xattr_list)(const struct lu_env *env, - struct dt_object *dt, struct lu_buf *buf); + struct dt_object *dt, struct lu_buf *buf, + struct lustre_capa *capa); /* * Create new object on this device. * @@ -236,8 +249,11 @@ struct dt_object_operations { void (*do_ref_del)(const struct lu_env *env, struct dt_object *dt, struct thandle *th); - int (*do_readpage)(const struct lu_env *env, - struct dt_object *dt, const struct lu_rdpg *rdpg); + int (*do_readpage)(const struct lu_env *env, + struct dt_object *dt, const struct lu_rdpg *rdpg, + struct lustre_capa *capa); + int (*do_capa_get)(const struct lu_env *env, + struct dt_object *dt, struct lustre_capa *capa); }; /* @@ -248,13 +264,14 @@ struct dt_body_operations { * precondition: dt_object_exists(dt); */ ssize_t (*dbo_read)(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf, loff_t *pos); + struct lu_buf *buf, loff_t *pos, + struct lustre_capa *capa); /* * precondition: dt_object_exists(dt); */ ssize_t (*dbo_write)(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, - struct thandle *handle); + struct thandle *handle, struct lustre_capa *capa); }; /* @@ -280,18 +297,20 @@ struct dt_index_operations { * precondition: dt_object_exists(dt); */ int (*dio_lookup)(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key); + struct dt_rec *rec, const struct dt_key *key, + struct lustre_capa *capa); /* * precondition: dt_object_exists(dt); */ int (*dio_insert)(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, - struct thandle *handle); + struct thandle *handle, struct lustre_capa *capa); /* * precondition: dt_object_exists(dt); */ int (*dio_delete)(const struct lu_env *env, struct dt_object *dt, - const struct dt_key *key, struct thandle *handle); + const struct dt_key *key, struct thandle *handle, + struct lustre_capa *capa); /* * Iterator interface */ diff --git a/lustre/include/lu_object.h b/lustre/include/lu_object.h index c9b4fc6..ca5e426 100644 --- a/lustre/include/lu_object.h +++ b/lustre/include/lu_object.h @@ -209,13 +209,6 @@ struct lu_object_operations { * consistent. */ int (*loo_object_invariant)(const struct lu_object *o); - /* - * Called to authorize action by capability. - */ - int (*loo_object_auth)(const struct lu_env *env, - const struct lu_object *o, - struct lustre_capa *capa, - __u64 opc); }; /* @@ -456,11 +449,6 @@ struct lu_object_header { */ struct lu_fid loh_fid; /* - * Fid capability. - */ - unsigned int loh_capa_bypass:1; /* bypass capability check */ - struct lustre_capa loh_capa; /* capability sent by client */ - /* * Common object attributes, cached for efficiency. From enum * lu_object_header_attr. */ @@ -581,11 +569,6 @@ struct lu_site { __u32 s_cache_race; __u32 s_lru_purged; } ls_stats; - - /* Capability */ - struct lustre_capa_key *ls_capa_keys; - unsigned long ls_capa_timeout; - __u32 ls_capa_alg; }; /* @@ -699,14 +682,7 @@ void lu_site_purge(const struct lu_env *env, * any case, additional reference is acquired on the returned object. */ struct lu_object *lu_object_find(const struct lu_env *env, - struct lu_site *s, const struct lu_fid *f, - struct lustre_capa *c); - -/* - * Auth lu_object capability. - */ -int lu_object_auth(const struct lu_env *env, const struct lu_object *o, - struct lustre_capa *capa, __u64 opc); + struct lu_site *s, const struct lu_fid *f); /* * Helpers. @@ -738,20 +714,6 @@ static inline const struct lu_fid *lu_object_fid(const struct lu_object *o) } /* - * Pointer to the fid capability of this object. - */ -static inline struct lustre_capa * -lu_object_capa(const struct lu_object *o) -{ - return &o->lo_header->loh_capa; -} - -static inline int lu_object_capa_bypass(const struct lu_object *o) -{ - return o->lo_header->loh_capa_bypass; -} - -/* * return device operations vector for this object */ static inline struct lu_device_operations * @@ -844,11 +806,6 @@ static inline const __u32 lu_object_attr(const struct lu_object *o) return o->lo_header->loh_attr; } -static inline void lu_object_bypass_capa(struct lu_object *o) -{ - o->lo_header->loh_capa_bypass = 1; -} - struct lu_rdpg { /* input params, should be filled out by mdt */ __u32 rp_hash; /* hash */ diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index e605aa5..c3a1666 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -847,7 +847,6 @@ typedef enum { MDS_SETXATTR = 50, MDS_WRITEPAGE = 51, MDS_IS_SUBDIR = 52, - MDS_RENEW_CAPA = 53, MDS_LAST_OPC } mds_cmd_t; @@ -1941,18 +1940,17 @@ enum { static inline int capa_for_mds(struct lustre_capa *c) { - return (c->lc_opc & CAPA_OPC_MDS_ONLY) != 0; + return (c->lc_opc & CAPA_OPC_INDEX_INSERT) != 0; } static inline int capa_for_oss(struct lustre_capa *c) { - return (c->lc_opc & CAPA_OPC_OSS_ONLY) != 0; + return (c->lc_opc & CAPA_OPC_INDEX_INSERT) == 0; } /* lustre_capa.lc_flags */ enum { CAPA_FL_SHORT_EXPIRY = 1, /* short capa expiry */ - CAPA_FL_ROOT = 2, /* root fid capa, will always renew */ }; /* lustre_capa.lc_hmac_alg */ diff --git a/lustre/include/lustre_capa.h b/lustre/include/lustre_capa.h index dc36383..3123fbd 100644 --- a/lustre/include/lustre_capa.h +++ b/lustre/include/lustre_capa.h @@ -152,7 +152,7 @@ extern spinlock_t capa_lock; extern int capa_count[]; extern cfs_mem_cache_t *capa_cachep; -struct obd_capa *capa_add(struct lustre_capa *capa); +void capa_add(struct lustre_capa *capa); struct obd_capa *capa_lookup(struct lustre_capa *capa); int capa_hmac(__u8 *hmac, struct lustre_capa *capa, __u8 *key); @@ -343,4 +343,11 @@ struct filter_capa_key { #define BYPASS_CAPA (struct lustre_capa *)ERR_PTR(-ENOENT) +enum { + CAPA_CTX_ON = 1, + CAPA_CTX_TIMEOUT = 1<<1, + CAPA_CTX_KEY_TIMEOUT = 1<<2, + CAPA_CTX_ALG = 1<<3, + CAPA_CTX_KEYS = 1<<4, +}; #endif /* __LINUX_CAPA_H_ */ diff --git a/lustre/include/lustre_req_layout.h b/lustre/include/lustre_req_layout.h index dd548ac..57a8bbf 100644 --- a/lustre/include/lustre_req_layout.h +++ b/lustre/include/lustre_req_layout.h @@ -112,7 +112,6 @@ extern const struct req_format RQF_MDS_READPAGE; extern const struct req_format RQF_MDS_WRITEPAGE; extern const struct req_format RQF_MDS_IS_SUBDIR; extern const struct req_format RQF_MDS_DONE_WRITING; -extern const struct req_format RQF_MDS_RENEW_CAPA; /* * This is format of direct (non-intent) MDS_GETATTR_NAME request. diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index 7bad122..3c61d10 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -71,6 +71,12 @@ struct md_ucred { struct mdt_identity *mu_identity; }; +/* there are at most 4 fid in one operation, see rename */ +struct md_capainfo { + const struct lu_fid *mc_fid[4]; + struct lustre_capa *mc_capa[4]; +}; + /* * Implemented in mdd/mdd_handler.c. * @@ -78,6 +84,7 @@ struct md_ucred { * related definitions. */ struct md_ucred *md_ucred(const struct lu_env *env); +struct md_capainfo *md_capainfo(const struct lu_env *env); /* metadata attributes */ enum ma_valid { @@ -231,7 +238,8 @@ struct md_device_operations { int (*mdo_statfs)(const struct lu_env *env, struct md_device *m, struct kstatfs *sfs); - int (*mdo_init_capa_keys)(struct md_device *m, + int (*mdo_init_capa_ctxt)(const struct lu_env *env, struct md_device *m, + __u32 valid, unsigned long timeout, __u32 alg, struct lustre_capa_key *keys); int (*mdo_update_capa_key)(const struct lu_env *env, diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 6a93eda..2ec7f9b 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -97,8 +97,6 @@ extern int obd_race_state; #define OBD_FAIL_MDS_WRITEPAGE_PACK 0x136 #define OBD_FAIL_MDS_IS_SUBDIR_NET 0x137 #define OBD_FAIL_MDS_IS_SUBDIR_PACK 0x138 -#define OBD_FAIL_MDS_RENEW_CAPA_NET 0x139 -#define OBD_FAIL_MDS_RENEW_CAPA_PACK 0x13a #define OBD_FAIL_OST 0x200 #define OBD_FAIL_OST_CONNECT_NET 0x201 diff --git a/lustre/llite/file.c b/lustre/llite/file.c index ed0e62e..8536d9e 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -52,7 +52,6 @@ void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data, struct lustre_handle *fh) { op_data->fid1 = ll_i2info(inode)->lli_fid; - op_data->mod_capa1 = ll_i2mdscapa(inode); op_data->attr.ia_mode = inode->i_mode; op_data->attr.ia_atime = inode->i_atime; op_data->attr.ia_mtime = inode->i_mtime; diff --git a/lustre/llite/llite_capa.c b/lustre/llite/llite_capa.c index 1957886..552012d 100644 --- a/lustre/llite/llite_capa.c +++ b/lustre/llite/llite_capa.c @@ -292,7 +292,7 @@ static struct obd_capa *do_lookup_oss_capa(struct inode *inode, int opc) list_for_each_entry(ocapa, &lli->lli_oss_capas, u.cli.lli_list) { if (!obd_capa_is_valid(ocapa)) continue; - if ((capa_opc(&ocapa->c_capa) & opc) == opc) + if ((capa_opc(&ocapa->c_capa) & opc) != opc) continue; LASSERT(lu_fid_eq(capa_fid(&ocapa->c_capa), @@ -315,6 +315,7 @@ struct obd_capa *ll_lookup_oss_capa(struct inode *inode, __u64 opc) if ((ll_i2sbi(inode)->ll_flags & LL_SBI_OSS_CAPA) == 0) return NULL; ENTRY; + LASSERT(opc == CAPA_OPC_OSS_WRITE || opc == (CAPA_OPC_OSS_WRITE | CAPA_OPC_OSS_READ) || opc == CAPA_OPC_OSS_TRUNC); @@ -357,11 +358,11 @@ struct obd_capa *ll_i2mdscapa(struct inode *inode) { struct obd_capa *ocapa; struct ll_inode_info *lli = ll_i2info(inode); - ENTRY; LASSERT(inode); if ((ll_i2sbi(inode)->ll_flags & LL_SBI_MDS_CAPA) == 0) - RETURN(NULL); + return NULL; + ENTRY; spin_lock(&capa_lock); ocapa = capa_get(lli->lli_mds_capa); @@ -374,8 +375,8 @@ struct obd_capa *ll_i2mdscapa(struct inode *inode) if (!ocapa && atomic_read(&ll_capa_debug)) { CDEBUG(S_ISREG(inode->i_mode) || S_ISDIR(inode->i_mode) ? - D_ERROR : D_SEC, "no MDS capa for (ino %lu)\n", - inode->i_ino); + D_ERROR : D_SEC, "no MDS capability for fid "DFID"\n", + PFID(ll_inode2fid(inode))); if (inode_have_md_lock(inode, MDS_INODELOCK_LOOKUP)) LBUG(); atomic_set(&ll_capa_debug, 0); @@ -573,8 +574,8 @@ void ll_oss_capa_open(struct inode *inode, struct file *file) ocapa = do_lookup_oss_capa(inode, opc); if (!ocapa) { if (atomic_read(&ll_capa_debug)) { - CDEBUG(D_ERROR, "no capa for (uid %u op %d ino %lu)\n", - (unsigned)current->uid, opc, inode->i_ino); + CDEBUG(D_ERROR, "no opc %x capability for fid "DFID"\n", + opc, PFID(ll_inode2fid(inode))); atomic_set(&ll_capa_debug, 0); } spin_unlock(&capa_lock); diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 7b2b8ff..9e20768 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -258,7 +258,8 @@ static int client_common_fill_super(struct super_block *sb, sb->s_flags |= MS_POSIXACL; #endif sbi->ll_flags |= LL_SBI_ACL; - } else { + } else if (sbi->ll_flags & LL_SBI_ACL) { + LCONSOLE_INFO("client wants to enable acl, but mdt not!\n"); sbi->ll_flags &= ~LL_SBI_ACL; } @@ -284,12 +285,12 @@ static int client_common_fill_super(struct super_block *sb, } if (data->ocd_connect_flags & OBD_CONNECT_MDS_CAPA) { - CDEBUG(D_SEC, "client enabled MDS capability!\n"); + LCONSOLE_INFO("client enabled MDS capability!\n"); sbi->ll_flags |= LL_SBI_MDS_CAPA; } if (data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA) { - CDEBUG(D_SEC, "client enabled OSS capability!\n"); + LCONSOLE_INFO("client enabled OSS capability!\n"); sbi->ll_flags |= LL_SBI_OSS_CAPA; } @@ -778,6 +779,7 @@ void ll_lli_init(struct ll_inode_info *lli) lli->lli_open_fd_read_count = lli->lli_open_fd_write_count = 0; lli->lli_open_fd_exec_count = 0; INIT_LIST_HEAD(&lli->lli_dead_list); + INIT_LIST_HEAD(&lli->lli_oss_capas); sema_init(&lli->lli_rmtperm_sem, 1); } @@ -1248,6 +1250,8 @@ void ll_clear_inode(struct inode *inode) list_del_init(&lli->lli_dead_list); spin_unlock(&sbi->ll_deathrow_lock); + ll_clear_inode_capas(inode); + EXIT; } diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 1e6617a..b470c2b 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -2502,7 +2502,7 @@ static int lmv_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid, RETURN(rc); } -static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *ocapa, +static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc, renew_capa_cb_t cb) { struct obd_device *obd = exp->exp_obd; @@ -2515,11 +2515,11 @@ static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *ocapa, if (rc) RETURN(rc); - tgt_exp = lmv_get_export(lmv, &ocapa->c_capa.lc_fid); + tgt_exp = lmv_get_export(lmv, &oc->c_capa.lc_fid); if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); - rc = md_renew_capa(tgt_exp, ocapa, cb); + rc = md_renew_capa(tgt_exp, oc, cb); RETURN(rc); } diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index a118851..6f98ef9 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -1612,26 +1612,61 @@ int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid, RETURN(0); } +static int mdc_interpret_renew_capa(struct ptlrpc_request *req, void *unused, + int status) +{ + struct obd_capa *oc = req->rq_async_args.pointer_arg[0]; + renew_capa_cb_t *cb = req->rq_async_args.pointer_arg[1]; + struct mds_body *body = NULL; + struct lustre_capa *capa; + ENTRY; + + if (status) + DEBUG_CAPA(D_ERROR, &oc->c_capa, "renew failed: %d for", + status); + + body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), + lustre_swab_mdt_body); + if (body == NULL) + GOTO(out, capa = ERR_PTR(-EFAULT)); + + if (body->flags) + GOTO(out, capa = ERR_PTR((long)body->flags)); + + if ((body->valid & OBD_MD_FLOSSCAPA) == 0) + GOTO(out, capa = ERR_PTR(-EFAULT)); + + capa = lustre_unpack_capa(req->rq_repmsg, REPLY_REC_OFF); + if (!capa) + capa = ERR_PTR(-EFAULT); + + EXIT; +out: + (*cb)(oc, capa); + return 0; +} + static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc, renew_capa_cb_t cb) { struct ptlrpc_request *req; - int size[2] = { sizeof(struct ptlrpc_body), + int size[5] = { sizeof(struct ptlrpc_body), + sizeof(struct mdt_body), sizeof(struct lustre_capa) }; - int repsize[3] = { sizeof(struct ptlrpc_body), - sizeof(struct mdt_body), - sizeof(struct lustre_capa) }; ENTRY; req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_RENEW_CAPA, 2, size, NULL); + MDS_GETATTR, 3, size, NULL); if (!req) RETURN(-ENOMEM); - mdc_pack_capa(req, REQ_REC_OFF, oc); + mdc_pack_req_body(req, REQ_REC_OFF, OBD_MD_FLOSSCAPA, + &oc->c_capa.lc_fid, oc, 0, 0); - ptlrpc_req_set_repsize(req, 3, repsize); - req->rq_interpret_reply = cb; + ptlrpc_req_set_repsize(req, 5, size); + req->rq_async_args.pointer_arg[0] = oc; + req->rq_async_args.pointer_arg[1] = cb; + req->rq_interpret_reply = mdc_interpret_renew_capa; ptlrpcd_add_req(req); RETURN(0); diff --git a/lustre/mdd/mdd_handler.c b/lustre/mdd/mdd_handler.c index 4866110..b541f3b 100644 --- a/lustre/mdd/mdd_handler.c +++ b/lustre/mdd/mdd_handler.c @@ -469,7 +469,7 @@ struct mdd_object *mdd_object_find(const struct lu_env *env, struct mdd_object *m; ENTRY; - o = lu_object_find(env, mdd2lu_dev(d)->ld_site, f, BYPASS_CAPA); + o = lu_object_find(env, mdd2lu_dev(d)->ld_site, f); if (IS_ERR(o)) m = (struct mdd_object *)o; else { @@ -525,12 +525,12 @@ static int mdd_may_create(const struct lu_env *env, RETURN(rc); } -static inline int __mdd_la_get(const struct lu_env *env, - struct mdd_object *obj, struct lu_attr *la) +static inline int __mdd_la_get(const struct lu_env *env, struct mdd_object *obj, + struct lu_attr *la, struct lustre_capa *capa) { - struct dt_object *next = mdd_object_child(obj); + struct dt_object *next = mdd_object_child(obj); LASSERT(lu_object_exists(mdd2lu_obj(obj))); - return next->do_ops->do_attr_get(env, next, la); + return next->do_ops->do_attr_get(env, next, la, capa); } static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags) @@ -551,7 +551,7 @@ static int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj) ENTRY; mdd_read_lock(env, obj); - rc = __mdd_la_get(env, obj, la); + rc = __mdd_la_get(env, obj, la, BYPASS_CAPA); mdd_read_unlock(env, obj); if (rc == 0) mdd_flags_xlate(obj, la->la_flags); @@ -584,13 +584,13 @@ static inline int mdd_is_sticky(const struct lu_env *env, struct md_ucred *uc = md_ucred(env); int rc; - rc = __mdd_la_get(env, cobj, tmp_la); + rc = __mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA); if (rc) { return rc; } else if (tmp_la->la_uid == uc->mu_fsuid) { return 0; } else { - rc = __mdd_la_get(env, pobj, tmp_la); + rc = __mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA); if (rc) return rc; else if (!(tmp_la->la_mode & S_ISVTX)) @@ -652,7 +652,8 @@ static int __mdd_iattr_get(const struct lu_env *env, int rc = 0; ENTRY; - rc = __mdd_la_get(env, mdd_obj, &ma->ma_attr); + rc = __mdd_la_get(env, mdd_obj, &ma->ma_attr, + mdd_object_capa(env, mdd_obj)); if (rc == 0) ma->ma_valid = MA_INODE; RETURN(rc); @@ -756,7 +757,8 @@ static int mdd_xattr_get(const struct lu_env *env, next = mdd_object_child(mdd_obj); mdd_read_lock(env, mdd_obj); - rc = next->do_ops->do_xattr_get(env, next, buf, name); + rc = next->do_ops->do_xattr_get(env, next, buf, name, + mdd_object_capa(env, mdd_obj)); mdd_read_unlock(env, mdd_obj); RETURN(rc); @@ -779,7 +781,8 @@ static int mdd_readlink(const struct lu_env *env, struct md_object *obj, next = mdd_object_child(mdd_obj); mdd_read_lock(env, mdd_obj); - rc = next->do_body_ops->dbo_read(env, next, buf, &pos); + rc = next->do_body_ops->dbo_read(env, next, buf, &pos, + mdd_object_capa(env, mdd_obj)); mdd_read_unlock(env, mdd_obj); RETURN(rc); } @@ -797,7 +800,8 @@ static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj, next = mdd_object_child(mdd_obj); mdd_read_lock(env, mdd_obj); - rc = next->do_ops->do_xattr_list(env, next, buf); + rc = next->do_ops->do_xattr_list(env, next, buf, + mdd_object_capa(env, mdd_obj)); mdd_read_unlock(env, mdd_obj); RETURN(rc); @@ -1077,7 +1081,8 @@ int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *o, LASSERT(lu_object_exists(mdd2lu_obj(o))); next = mdd_object_child(o); - return next->do_ops->do_attr_set(env, next, attr, handle); + return next->do_ops->do_attr_set(env, next, attr, handle, + mdd_object_capa(env, o)); } int mdd_attr_set_internal_locked(const struct lu_env *env, @@ -1097,16 +1102,17 @@ static int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *o, int fl, struct thandle *handle) { struct dt_object *next; + struct lustre_capa *capa = mdd_object_capa(env, o); int rc = 0; ENTRY; LASSERT(lu_object_exists(mdd2lu_obj(o))); next = mdd_object_child(o); if (buf->lb_buf && buf->lb_len > 0) { - rc = next->do_ops->do_xattr_set(env, next, buf, name, - 0, handle); + rc = next->do_ops->do_xattr_set(env, next, buf, name, 0, handle, + capa); } else if (buf->lb_buf == NULL && buf->lb_len == 0) { - rc = next->do_ops->do_xattr_del(env, next, name, handle); + rc = next->do_ops->do_xattr_del(env, next, name, handle, capa); } RETURN(rc); } @@ -1138,7 +1144,7 @@ int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj, if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE)) RETURN(-EPERM); - rc = __mdd_la_get(env, obj, tmp_la); + rc = __mdd_la_get(env, obj, tmp_la, BYPASS_CAPA); if (rc) RETURN(rc); @@ -1396,7 +1402,7 @@ static int mdd_xattr_sanity_check(const struct lu_env *env, RETURN(-EPERM); mdd_read_lock(env, obj); - rc = __mdd_la_get(env, obj, tmp_la); + rc = __mdd_la_get(env, obj, tmp_la, BYPASS_CAPA); mdd_read_unlock(env, obj); if (rc) RETURN(rc); @@ -1452,7 +1458,8 @@ static int __mdd_xattr_del(const struct lu_env *env,struct mdd_device *mdd, LASSERT(lu_object_exists(mdd2lu_obj(obj))); next = mdd_object_child(obj); - return next->do_ops->do_xattr_del(env, next, name, handle); + return next->do_ops->do_xattr_del(env, next, name, handle, + mdd_object_capa(env, obj)); } int mdd_xattr_del(const struct lu_env *env, struct md_object *obj, @@ -1485,7 +1492,8 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj, static int __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj, const struct lu_fid *lf, - const char *name, struct thandle *th) + const char *name, struct thandle *th, + struct lustre_capa *capa) { int rc; struct dt_object *next = mdd_object_child(pobj); @@ -1494,7 +1502,7 @@ static int __mdd_index_insert_only(const struct lu_env *env, if (dt_try_as_dir(env, next)) rc = next->do_index_ops->dio_insert(env, next, (struct dt_rec *)lf, - (struct dt_key *)name, th); + (struct dt_key *)name, th, capa); else rc = -ENOTDIR; RETURN(rc); @@ -1503,7 +1511,8 @@ static int __mdd_index_insert_only(const struct lu_env *env, /* insert new index, add reference if isdir, update times */ static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj, const struct lu_fid *lf, - const char *name, int isdir, struct thandle *th) + const char *name, int isdir, struct thandle *th, + struct lustre_capa *capa) { int rc; struct dt_object *next = mdd_object_child(pobj); @@ -1517,7 +1526,7 @@ static int __mdd_index_insert(const struct lu_env *env, rc = next->do_index_ops->dio_insert(env, next, (struct dt_rec *)lf, (struct dt_key *)name, - th); + th, capa); else rc = -ENOTDIR; @@ -1536,7 +1545,8 @@ static int __mdd_index_insert(const struct lu_env *env, static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj, const char *name, - int is_dir, struct thandle *handle) + int is_dir, struct thandle *handle, + struct lustre_capa *capa) { int rc; struct dt_object *next = mdd_object_child(pobj); @@ -1545,7 +1555,7 @@ static int __mdd_index_delete(const struct lu_env *env, if (dt_try_as_dir(env, next)) { rc = next->do_index_ops->dio_delete(env, next, (struct dt_key *)name, - handle); + handle, capa); if (rc == 0 && is_dir) __mdd_ref_del(env, pobj, handle); } else @@ -1599,7 +1609,8 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, GOTO(out, rc); rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj), - name, handle); + name, handle, + mdd_object_capa(env, mdd_tobj)); if (rc == 0) __mdd_ref_add(env, mdd_sobj, handle); @@ -1753,7 +1764,8 @@ static int mdd_unlink(const struct lu_env *env, GOTO(cleanup, rc); is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu)); - rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle); + rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle, + mdd_object_capa(env, mdd_pobj)); if (rc) GOTO(cleanup, rc); @@ -2008,17 +2020,20 @@ static int mdd_rename(const struct lu_env *env, if (rc) GOTO(cleanup, rc); - rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle); + rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle, + mdd_object_capa(env, mdd_spobj)); if (rc) GOTO(cleanup, rc); /* tobj can be remote one, * so we do index_delete unconditionally and -ENOENT is allowed */ - rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle); + rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle, + mdd_object_capa(env, mdd_tpobj)); if (rc != 0 && rc != -ENOENT) GOTO(cleanup, rc); - rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle); + rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle, + mdd_object_capa(env, mdd_tpobj)); if (rc) GOTO(cleanup, rc); @@ -2090,7 +2105,8 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, RETURN(rc); if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) - rc = dir->do_index_ops->dio_lookup(env, dir, rec, key); + rc = dir->do_index_ops->dio_lookup(env, dir, rec, key, + mdd_object_capa(env, mdd_obj)); else rc = -ENOTDIR; @@ -2170,15 +2186,16 @@ static int __mdd_object_initialize(const struct lu_env *env, /* add . and .. for newly created dir */ __mdd_ref_add(env, child, handle); rc = __mdd_index_insert_only(env, child, mdo2fid(child), - dot, handle); + dot, handle, BYPASS_CAPA); if (rc == 0) { rc = __mdd_index_insert_only(env, child, pfid, - dotdot, handle); + dotdot, handle, + BYPASS_CAPA); if (rc != 0) { int rc2; - rc2 = __mdd_index_delete(env, - child, dot, 0, handle); + rc2 = __mdd_index_delete(env, child, dot, 0, + handle, BYPASS_CAPA); if (rc2 != 0) CERROR("Failure to cleanup after dotdot" " creation: %d (%d)\n", rc2, rc); @@ -2297,7 +2314,7 @@ static int mdd_create_sanity_check(const struct lu_env *env, /* sgid check */ mdd_read_lock(env, obj); - rc = __mdd_la_get(env, obj, la); + rc = __mdd_la_get(env, obj, la, BYPASS_CAPA); mdd_read_unlock(env, obj); if (rc != 0) RETURN(rc); @@ -2429,7 +2446,8 @@ static int mdd_create(const struct lu_env *env, GOTO(cleanup, rc); rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son), - name, S_ISDIR(attr->la_mode), handle); + name, S_ISDIR(attr->la_mode), handle, + mdd_object_capa(env, mdd_pobj)); if (rc) GOTO(cleanup, rc); @@ -2453,13 +2471,12 @@ static int mdd_create(const struct lu_env *env, struct dt_object *dt = mdd_object_child(son); const char *target_name = spec->u.sp_symname; int sym_len = strlen(target_name); + const struct lu_buf *buf; loff_t pos = 0; - rc = dt->do_body_ops->dbo_write(env, dt, - mdd_buf_get_const(env, - target_name, - sym_len), - &pos, handle); + buf = mdd_buf_get_const(env, target_name, sym_len); + rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle, + mdd_object_capa(env, son)); if (rc == sym_len) rc = 0; else @@ -2481,7 +2498,7 @@ cleanup: if (inserted) { rc2 = __mdd_index_delete(env, mdd_pobj, name, S_ISDIR(attr->la_mode), - handle); + handle, BYPASS_CAPA); if (rc2) CERROR("error can not cleanup destroy %d\n", rc2); @@ -2620,7 +2637,8 @@ static int mdd_name_insert(const struct lu_env *env, if (rc) GOTO(out_unlock, rc); - rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle); + rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle, + BYPASS_CAPA); out_unlock: mdd_write_unlock(env, mdd_obj); @@ -2677,7 +2695,8 @@ static int mdd_name_remove(const struct lu_env *env, if (rc) GOTO(out_unlock, rc); - rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle); + rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle, + BYPASS_CAPA); out_unlock: mdd_write_unlock(env, mdd_obj); @@ -2746,11 +2765,12 @@ static int mdd_rename_tgt(const struct lu_env *env, /* if rename_tgt is called then we should just re-insert name with * correct fid, no need to dec/inc parent nlink if obj is dir */ - rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle); + rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA); if (rc) GOTO(cleanup, rc); - rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle); + rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle, + BYPASS_CAPA); if (rc) GOTO(cleanup, rc); @@ -2809,15 +2829,21 @@ static int mdd_maxsize_get(const struct lu_env *env, struct md_device *m, RETURN(0); } -static int mdd_init_capa_keys(struct md_device *m, +static int mdd_init_capa_ctxt(const struct lu_env *env, struct md_device *m, + __u32 valid, unsigned long timeout, __u32 alg, struct lustre_capa_key *keys) { struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev); struct mds_obd *mds = &mdd2obd_dev(mdd)->u.mds; + int rc; ENTRY; - mds->mds_capa_keys = keys; - RETURN(0); + if (valid & CAPA_CTX_KEYS) + mds->mds_capa_keys = keys; + + rc = mdd_child_ops(mdd)->dt_init_capa_ctxt(env, mdd->mdd_child, valid, + timeout, alg, keys); + RETURN(rc); } static int mdd_update_capa_key(const struct lu_env *env, @@ -2918,7 +2944,7 @@ static int mdd_open_sanity_check(const struct lu_env *env, if (mdd_is_dead_obj(obj)) RETURN(-ENOENT); - rc = __mdd_la_get(env, obj, tmp_la); + rc = __mdd_la_get(env, obj, tmp_la, BYPASS_CAPA); if (rc) RETURN(rc); @@ -3043,7 +3069,8 @@ static int mdd_readpage(const struct lu_env *env, struct md_object *obj, if (rc) GOTO(out_unlock, rc); - rc = next->do_ops->do_readpage(env, next, rdpg); + rc = next->do_ops->do_readpage(env, next, rdpg, + mdd_object_capa(env, mdd_obj)); out_unlock: mdd_read_unlock(env, mdd_obj); @@ -3140,7 +3167,8 @@ static int mdd_check_acl(const struct lu_env *env, struct mdd_object *obj, buf->lb_buf = mdd_env_info(env)->mti_xattr_buf; buf->lb_len = sizeof(mdd_env_info(env)->mti_xattr_buf); rc = next->do_ops->do_xattr_get(env, next, buf, - XATTR_NAME_ACL_ACCESS); + XATTR_NAME_ACL_ACCESS, + mdd_object_capa(env, obj)); if (rc <= 0) RETURN(rc ? : -EACCES); @@ -3172,7 +3200,7 @@ static int mdd_exec_permission_lite(const struct lu_env *env, if (uc->mu_valid == UCRED_INVALID) RETURN(-EACCES); - rc = __mdd_la_get(env, obj, la); + rc = __mdd_la_get(env, obj, la, BYPASS_CAPA); if (rc) RETURN(rc); @@ -3224,7 +3252,7 @@ static int __mdd_permission_internal(const struct lu_env *env, RETURN(-EACCES); if (getattr) { - rc = __mdd_la_get(env, obj, la); + rc = __mdd_la_get(env, obj, la, BYPASS_CAPA); if (rc) RETURN(rc); } @@ -3302,47 +3330,24 @@ static int mdd_permission(const struct lu_env *env, struct md_object *obj, static int mdd_capa_get(const struct lu_env *env, struct md_object *obj, struct lustre_capa *capa) { + struct dt_object *next; struct mdd_object *mdd_obj = md2mdd_obj(obj); - struct mdd_device *mdd = mdo2mdd(obj); - struct lu_site *ls = mdd->mdd_md_dev.md_lu_dev.ld_site; - struct lustre_capa_key *key = &ls->ls_capa_keys[1]; - struct obd_capa *ocapa; int rc; ENTRY; LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj))); + next = mdd_object_child(mdd_obj); - capa->lc_fid = *mdo2fid(mdd_obj); - if (ls->ls_capa_timeout < CAPA_TIMEOUT) - capa->lc_flags |= CAPA_FL_SHORT_EXPIRY; - if (lu_fid_eq(&capa->lc_fid, &mdd->mdd_root_fid)) - capa->lc_flags |= CAPA_FL_ROOT; - capa->lc_flags = ls->ls_capa_alg << 24; - - /* TODO: get right permission here after remote uid landing */ - ocapa = capa_lookup(capa); - if (ocapa) { - LASSERT(!capa_is_expired(ocapa)); - capa_cpy(capa, ocapa); - capa_put(ocapa); - RETURN(0); - } - - capa->lc_keyid = key->lk_keyid; - capa->lc_expiry = CURRENT_SECONDS + ls->ls_capa_timeout; - rc = capa_hmac(capa->lc_hmac, capa, key->lk_key); - if (rc) - RETURN(rc); + rc = next->do_ops->do_capa_get(env, next, capa); - capa_add(capa); - RETURN(0); + RETURN(rc); } struct md_device_operations mdd_ops = { .mdo_statfs = mdd_statfs, .mdo_root_get = mdd_root_get, .mdo_maxsize_get = mdd_maxsize_get, - .mdo_init_capa_keys = mdd_init_capa_keys, + .mdo_init_capa_ctxt = mdd_init_capa_ctxt, .mdo_update_capa_key= mdd_update_capa_key, }; @@ -3442,6 +3447,39 @@ struct md_ucred *md_ucred(const struct lu_env *env) } EXPORT_SYMBOL(md_ucred); +static void *mdd_capainfo_key_init(const struct lu_context *ctx, + struct lu_context_key *key) +{ + struct md_capainfo *ci; + + OBD_ALLOC_PTR(ci); + if (ci == NULL) + ci = ERR_PTR(-ENOMEM); + return ci; +} + +static void mdd_capainfo_key_fini(const struct lu_context *ctx, + struct lu_context_key *key, void *data) +{ + struct md_capainfo *ci = data; + OBD_FREE_PTR(ci); +} + +struct lu_context_key mdd_capainfo_key = { + .lct_tags = LCT_SESSION, + .lct_init = mdd_capainfo_key_init, + .lct_fini = mdd_capainfo_key_fini +}; + +struct md_capainfo *md_capainfo(const struct lu_env *env) +{ + /* NB, in mdt_init0 */ + if (env->le_ses == NULL) + return NULL; + return lu_context_key_get(env->le_ses, &mdd_capainfo_key); +} +EXPORT_SYMBOL(md_capainfo); + static int mdd_type_init(struct lu_device_type *t) { int result; @@ -3449,11 +3487,14 @@ static int mdd_type_init(struct lu_device_type *t) result = lu_context_key_register(&mdd_thread_key); if (result == 0) result = lu_context_key_register(&mdd_ucred_key); + if (result == 0) + result = lu_context_key_register(&mdd_capainfo_key); return result; } static void mdd_type_fini(struct lu_device_type *t) { + lu_context_key_degister(&mdd_capainfo_key); lu_context_key_degister(&mdd_ucred_key); lu_context_key_degister(&mdd_thread_key); } diff --git a/lustre/mdd/mdd_internal.h b/lustre/mdd/mdd_internal.h index ab22611..bc112fd 100644 --- a/lustre/mdd/mdd_internal.h +++ b/lustre/mdd/mdd_internal.h @@ -225,4 +225,20 @@ static inline int mdd_lov_cookiesize(const struct lu_env *env, return obd->u.mds.mds_max_cookiesize; } +static inline struct lustre_capa *mdd_object_capa(const struct lu_env *env, + const struct mdd_object *obj) +{ + struct md_capainfo *ci = md_capainfo(env); + const struct lu_fid *fid = mdo2fid(obj); + int i; + + /* NB: in mdt_init0 */ + if (!ci) + return BYPASS_CAPA; + for (i = 0; i < 4; i++) + if (ci->mc_fid[i] && lu_fid_eq(ci->mc_fid[i], fid)) + return ci->mc_capa[i]; + return NULL; +} + #endif diff --git a/lustre/mdd/mdd_lov.c b/lustre/mdd/mdd_lov.c index 85c040e..00a3043 100644 --- a/lustre/mdd/mdd_lov.c +++ b/lustre/mdd/mdd_lov.c @@ -193,7 +193,8 @@ int mdd_get_md(const struct lu_env *env, struct mdd_object *obj, next = mdd_object_child(obj); rc = next->do_ops->do_xattr_get(env, next, - mdd_buf_get(env, md, *md_size), name); + mdd_buf_get(env, md, *md_size), name, + mdd_object_capa(env, obj)); /* * XXX: handling of -ENODATA, the right way is to have ->do_md_get() * exported by dt layer. @@ -569,7 +570,8 @@ int mdd_lov_setattr_async(const struct lu_env *env, struct mdd_object *obj, int rc = 0; ENTRY; - rc = next->do_ops->do_attr_get(env, next, tmp_la); + rc = next->do_ops->do_attr_get(env, next, tmp_la, + mdd_object_capa(env, obj)); if (rc) RETURN(rc); diff --git a/lustre/mdd/mdd_orphans.c b/lustre/mdd/mdd_orphans.c index 64e3bbc..dfa05ab 100644 --- a/lustre/mdd/mdd_orphans.c +++ b/lustre/mdd/mdd_orphans.c @@ -76,7 +76,8 @@ static int orph_index_insert(const struct lu_env *env, ENTRY; rc = dor->do_index_ops->dio_insert(env, dor, (struct dt_rec *)offset, - (struct dt_key *)key, th); + (struct dt_key *)key, th, + BYPASS_CAPA); RETURN(rc); } @@ -91,7 +92,8 @@ static int orph_index_delete(const struct lu_env *env, ENTRY; LASSERT(dor); rc = dor->do_index_ops->dio_delete(env, dor, - (struct dt_key *)key, th); + (struct dt_key *)key, th, + BYPASS_CAPA); RETURN(rc); } diff --git a/lustre/mdt/mdt_capa.c b/lustre/mdt/mdt_capa.c index 89f169b..384db5a 100644 --- a/lustre/mdt/mdt_capa.c +++ b/lustre/mdt/mdt_capa.c @@ -147,7 +147,7 @@ int mdt_capa_keys_init(const struct lu_env *env, struct mdt_device *mdt) obj = mdt->mdt_ck_obj; obj->do_ops->do_read_lock(env, obj); - rc = obj->do_ops->do_attr_get(env, mdt->mdt_ck_obj, la); + rc = obj->do_ops->do_attr_get(env, mdt->mdt_ck_obj, la, BYPASS_CAPA); obj->do_ops->do_read_unlock(env, obj); if (rc) RETURN(rc); @@ -194,7 +194,7 @@ static int mdt_ck_thread_main(void *args) { struct mdt_device *mdt = args; struct ptlrpc_thread *thread = &mdt->mdt_ck_thread; - struct lustre_capa_key *tmp, *key = red_capa_key(mdt); + struct lustre_capa_key *tmp, *key = &mdt->mdt_capa_keys[1]; struct lu_env env; struct mdt_thread_info *info; struct md_device *next; @@ -295,3 +295,5 @@ void mdt_ck_thread_stop(struct mdt_device *mdt) cfs_waitq_signal(&thread->t_ctl_waitq); wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED); } + + diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 4c36945..eaa427f 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -157,19 +157,36 @@ void mdt_set_disposition(struct mdt_thread_info *info, static int mdt_getstatus(struct mdt_thread_info *info) { struct md_device *next = info->mti_mdt->mdt_child; - int rc; struct mdt_body *body; + int rc; ENTRY; - if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) { - rc = -ENOMEM; - } else { - body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); - rc = next->md_ops->mdo_root_get(info->mti_env, next, - &body->fid1); - if (rc == 0) - body->valid |= OBD_MD_FLID; + if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) + RETURN(-ENOMEM); + + body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); + rc = next->md_ops->mdo_root_get(info->mti_env, next, &body->fid1); + if (rc == 0) + body->valid |= OBD_MD_FLID; + + if (info->mti_mdt->mdt_opts.mo_mds_capa) { + struct mdt_object *root; + struct lustre_capa *capa; + + root = mdt_object_find(info->mti_env, info->mti_mdt, &body->fid1); + if (IS_ERR(root)) + RETURN(PTR_ERR(root)); + + capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1); + LASSERT(capa); + capa->lc_opc = CAPA_OPC_MDS_DEFAULT; + + rc = mo_capa_get(info->mti_env, mdt_object_child(root), capa); + mdt_object_put(info->mti_env, root); + if (rc) + RETURN(rc); + body->valid |= OBD_MD_FLMDSCAPA; } RETURN(rc); @@ -390,10 +407,6 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, if ((reqbody->valid & OBD_MD_FLMDSCAPA) && mdt->mdt_opts.mo_mds_capa) { struct lustre_capa *capa; - spin_lock(&capa_lock); - info->mti_capa_key = *red_capa_key(mdt); - spin_unlock(&capa_lock); - capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1); LASSERT(capa); capa->lc_opc = CAPA_OPC_MDS_DEFAULT; @@ -406,6 +419,32 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, RETURN(rc); } +static int mdt_renew_capa(struct mdt_thread_info *info) +{ + struct mdt_object *obj = info->mti_object; + struct mdt_body *body; + struct lustre_capa *capa, *c; + int rc; + ENTRY; + + c = req_capsule_client_get(&info->mti_pill, &RMF_CAPA1); + LASSERT(c); + + capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1); + LASSERT(capa); + + *capa = *c; + rc = mo_capa_get(info->mti_env, mdt_object_child(obj), capa); + + body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); + LASSERT(body); + + body->valid |= OBD_MD_FLOSSCAPA; + body->flags = (__u32)rc; + + RETURN(0); +} + static int mdt_getattr(struct mdt_thread_info *info) { struct mdt_object *obj = info->mti_object; @@ -420,6 +459,11 @@ static int mdt_getattr(struct mdt_thread_info *info) if (reqbody == NULL) GOTO(out, rc = -EFAULT); + if (reqbody->valid & OBD_MD_FLOSSCAPA) { + rc = mdt_renew_capa(info); + GOTO(out, rc); + } + if (reqbody->valid & OBD_MD_FLRMTPERM) { rc = mdt_init_ucred(info, reqbody); if (rc) @@ -541,6 +585,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, } if (rc == 0) { /* Finally, we can get attr for child. */ + mdt_set_capainfo(info, 0, mdt_object_fid(child), + BYPASS_CAPA); rc = mdt_getattr_internal(info, child); if (rc != 0) mdt_object_unlock(info, child, lhc, 1); @@ -567,8 +613,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, *step 3: find the child object by fid & lock it. * regardless if it is local or remote. */ - child = mdt_object_find(info->mti_env, info->mti_mdt, child_fid, - BYPASS_CAPA); + child = mdt_object_find(info->mti_env, info->mti_mdt, child_fid); if (IS_ERR(child)) GOTO(out_parent, rc = PTR_ERR(child)); if (is_resent) { @@ -591,6 +636,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, } /* finally, we can get attr for child. */ + mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA); rc = mdt_getattr_internal(info, child); if (rc != 0) { mdt_object_unlock(info, child, lhc, 1); @@ -1150,34 +1196,6 @@ static int mdt_quotactl_handle(struct mdt_thread_info *info) return -EOPNOTSUPP; } -static int mdt_renew_capa(struct mdt_thread_info *info) -{ - struct mdt_device *mdt = info->mti_mdt; - struct mdt_object *obj = info->mti_object; - struct mdt_body *body; - struct lustre_capa *capa; - int rc; - ENTRY; - - body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); - LASSERT(body); - - capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1); - LASSERT(capa); - - spin_lock(&capa_lock); - info->mti_capa_key = *red_capa_key(mdt); - spin_unlock(&capa_lock); - - *capa = obj->mot_header.loh_capa; - /* TODO: add capa check */ - rc = mo_capa_get(info->mti_env, mdt_object_child(obj), capa); - if (rc) - RETURN(rc); - - RETURN(rc); -} - /* * OBD PING and other handlers. */ @@ -1274,17 +1292,13 @@ static struct mdt_object *mdt_obj(struct lu_object *o) struct mdt_object *mdt_object_find(const struct lu_env *env, struct mdt_device *d, - const struct lu_fid *f, - struct lustre_capa *c) + const struct lu_fid *f) { struct lu_object *o; struct mdt_object *m; ENTRY; - if (!d->mdt_opts.mo_mds_capa) - c = BYPASS_CAPA; - - o = lu_object_find(env, d->mdt_md_dev.md_lu_dev.ld_site, f, c); + o = lu_object_find(env, d->mdt_md_dev.md_lu_dev.ld_site, f); if (IS_ERR(o)) m = (struct mdt_object *)o; else @@ -1352,12 +1366,11 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o, struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info, const struct lu_fid *f, struct mdt_lock_handle *lh, - __u64 ibits, - struct lustre_capa *capa) + __u64 ibits) { struct mdt_object *o; - o = mdt_object_find(info->mti_env, info->mti_mdt, f, capa); + o = mdt_object_find(info->mti_env, info->mti_mdt, f); if (!IS_ERR(o)) { int rc; @@ -1436,7 +1449,6 @@ static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep) */ static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags) { - struct lustre_capa *capa = NULL; const struct mdt_body *body; struct mdt_object *obj; const struct lu_env *env; @@ -1461,13 +1473,11 @@ static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags) * instance MDS_IS_SUBDIR. */ if (req_capsule_has_field(pill, &RMF_CAPA1, RCL_CLIENT) && - req_capsule_field_present(pill, &RMF_CAPA1, RCL_CLIENT)) { - int len = req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT); - if (len == sizeof(struct lustre_capa)) - capa = req_capsule_client_get(pill, &RMF_CAPA1); - } + req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT)) + mdt_set_capainfo(info, 0, &body->fid1, + req_capsule_client_get(pill, &RMF_CAPA1)); - obj = mdt_object_find(env, info->mti_mdt, &body->fid1, capa); + obj = mdt_object_find(env, info->mti_mdt, &body->fid1); if (!IS_ERR(obj)) { if ((flags & HABEO_CORPUS) && !lu_object_exists(&obj->mot_obj.mo_lu)) { @@ -3089,6 +3099,7 @@ out: static void mdt_fini(const struct lu_env *env, struct mdt_device *m) { + struct md_device *next = m->mdt_child; struct lu_device *d = &m->mdt_md_dev.md_lu_dev; struct lu_site *ls = d->ld_site; @@ -3124,6 +3135,11 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) m->mdt_rootsquash_info = NULL; } + next->md_ops->mdo_init_capa_ctxt(env, next, CAPA_CTX_KEYS, 0, 0, NULL); + cleanup_capas(CAPA_SITE_SERVER); + del_timer(&m->mdt_ck_timer); + mdt_ck_thread_stop(m); + /* finish the stack */ mdt_stack_fini(env, m, md2lu_dev(m->mdt_child)); @@ -3140,6 +3156,21 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) int mdt_postrecov(const struct lu_env *, struct mdt_device *); +static int mdt_init_capa_ctxt(const struct lu_env *env, struct mdt_device *m) +{ + struct md_device *next = m->mdt_child; + __u32 valid = CAPA_CTX_TIMEOUT | CAPA_CTX_ALG | CAPA_CTX_KEYS; + int rc; + + if (m->mdt_opts.mo_mds_capa) + valid |= CAPA_CTX_ON; + rc = next->md_ops->mdo_init_capa_ctxt(env, next, valid, + m->mdt_capa_timeout, + m->mdt_capa_alg, + m->mdt_capa_keys); + return rc; +} + static int mdt_init0(const struct lu_env *env, struct mdt_device *m, struct lu_device_type *ldt, struct lustre_cfg *cfg) { @@ -3170,8 +3201,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, m->mdt_opts.mo_compat_resname = 0; m->mdt_opts.mo_mds_capa = 0; m->mdt_opts.mo_oss_capa = 0; - m->mdt_capa_alg = CAPA_HMAC_ALG_SHA1; m->mdt_capa_timeout = CAPA_TIMEOUT; + m->mdt_capa_alg = CAPA_HMAC_ALG_SHA1; m->mdt_ck_timeout = CAPA_KEY_TIMEOUT; obd->obd_replayable = 1; spin_lock_init(&m->mdt_client_bitmap_lock); @@ -3254,10 +3285,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, m->mdt_ck_timer.data = (unsigned long)m; init_timer(&m->mdt_ck_timer); - s->ls_capa_keys = m->mdt_capa_keys; - s->ls_capa_timeout = m->mdt_capa_timeout; - s->ls_capa_alg = m->mdt_capa_alg; - rc = mdt_start_ptlrpc_service(m); if (rc) GOTO(err_capa, rc); @@ -3271,6 +3298,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, if (obd->obd_recovering == 0) mdt_postrecov(env, m); + mdt_init_capa_ctxt(env, m); RETURN(0); err_stop_service: @@ -4062,8 +4090,7 @@ DEF_MDT_HNDL_F(0 |HABEO_REFERO, PIN, mdt_pin), DEF_MDT_HNDL_0(0, SYNC, mdt_sync), DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, IS_SUBDIR, mdt_is_subdir), DEF_MDT_HNDL_0(0, QUOTACHECK, mdt_quotacheck_handle), -DEF_MDT_HNDL_0(0, QUOTACTL, mdt_quotactl_handle), -DEF_MDT_HNDL_0(0 |HABEO_REFERO, RENEW_CAPA, mdt_renew_capa) +DEF_MDT_HNDL_0(0, QUOTACTL, mdt_quotactl_handle) }; #define DEF_OBD_HNDL(flags, name, fn) \ diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 16bcc91..b8db60c 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -170,11 +170,11 @@ struct mdt_device { /* root squash */ struct rootsquash_info *mdt_rootsquash_info; - /* capability */ - __u32 mdt_capa_alg; + /* capability keys */ unsigned long mdt_capa_timeout; - unsigned long mdt_ck_timeout; + __u32 mdt_capa_alg; struct dt_object *mdt_ck_obj; + unsigned long mdt_ck_timeout; unsigned long mdt_ck_expiry; struct timer_list mdt_ck_timer; struct ptlrpc_thread mdt_ck_thread; @@ -220,8 +220,6 @@ struct mdt_reint_record { int rr_logcookielen; const struct llog_cookie *rr_logcookies; __u32 rr_flags; - struct lustre_capa *rr_capa1; - struct lustre_capa *rr_capa2; }; enum mdt_reint_flag { @@ -326,8 +324,8 @@ struct mdt_thread_info { struct mdt_client_data mti_mcd; loff_t mti_off; struct txn_param mti_txn_param; - struct lustre_capa_key mti_capa_key; struct lu_buf mti_buf; + struct lustre_capa_key mti_capa_key; }; /* * Info allocated per-transaction. @@ -392,13 +390,11 @@ void mdt_object_unlock(struct mdt_thread_info *, struct mdt_object *mdt_object_find(const struct lu_env *, struct mdt_device *, - const struct lu_fid *, - struct lustre_capa *); + const struct lu_fid *); struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *, const struct lu_fid *, struct mdt_lock_handle *, - __u64 ibits, - struct lustre_capa *); + __u64 ibits); void mdt_object_unlock_put(struct mdt_thread_info *, struct mdt_object *, struct mdt_lock_handle *, @@ -507,6 +503,12 @@ int mdt_remote_perm_reverse_idmap(struct ptlrpc_request *, int mdt_fix_attr_ucred(struct mdt_thread_info *, __u32); +static inline struct mdt_device *mdt_dev(struct lu_device *d) +{ +// LASSERT(lu_device_is_mdt(d)); + return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev); +} + /* mdt/mdt_identity.c */ #define MDT_IDENTITY_UPCALL_PATH "/usr/sbin/l_getidentity" @@ -577,29 +579,33 @@ do { \ struct md_ucred *mdt_ucred(const struct mdt_thread_info *info); +static inline int is_identity_get_disabled(struct upcall_cache *cache) +{ + return cache ? (strcmp(cache->uc_upcall, "NONE") == 0) : 1; +} + /* - * fid Capability + * Capability */ int mdt_ck_thread_start(struct mdt_device *mdt); void mdt_ck_thread_stop(struct mdt_device *mdt); void mdt_ck_timer_callback(unsigned long castmeharder); int mdt_capa_keys_init(const struct lu_env *env, struct mdt_device *mdt); -static inline struct mdt_device *mdt_dev(struct lu_device *d) +static inline void mdt_set_capainfo(struct mdt_thread_info *info, int offset, + const struct lu_fid *fid, + struct lustre_capa *capa) { -// LASSERT(lu_device_is_mdt(d)); - return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev); -} + struct mdt_device *dev = info->mti_mdt; + struct md_capainfo *ci; -static inline struct lustre_capa_key *red_capa_key(struct mdt_device *mdt) -{ - return &mdt->mdt_capa_keys[1]; -} + if (!dev->mdt_opts.mo_mds_capa) + return; -static inline int is_identity_get_disabled(struct upcall_cache *cache) -{ - return cache ? (strcmp(cache->uc_upcall, "NONE") == 0) : 1; + ci = md_capainfo(info->mti_env); + LASSERT(ci); + ci->mc_fid[offset] = fid; + ci->mc_capa[offset] = capa; } - #endif /* __KERNEL__ */ #endif /* _MDT_H */ diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c index 0e56a58..f36ca46 100644 --- a/lustre/mdt/mdt_lib.c +++ b/lustre/mdt/mdt_lib.c @@ -502,7 +502,7 @@ void mdt_shrink_reply(struct mdt_thread_info *info, int offset, lustre_shrink_reply(req, offset, acl_size, 1); offset += !!acl_size; if (mdscapa && !(body->valid & OBD_MD_FLMDSCAPA)) - lustre_shrink_reply(req, offset, 0, 0); + lustre_shrink_reply(req, offset, 0, 1); offset += mdscapa; if (osscapa && !(body->valid & OBD_MD_FLOSSCAPA)) lustre_shrink_reply(req, offset, 0, 0); @@ -635,7 +635,8 @@ static int mdt_setattr_unpack_rec(struct mdt_thread_info *info) ma->ma_valid = MA_INODE; if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT)) - rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1); + mdt_set_capainfo(info, 0, rr->rr_fid1, + req_capsule_client_get(pill, &RMF_CAPA1)); RETURN(0); } @@ -732,8 +733,11 @@ static int mdt_create_unpack(struct mdt_thread_info *info) LA_CTIME | LA_MTIME | LA_ATIME; info->mti_spec.sp_cr_flags = rec->cr_flags; - if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT)) - rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1); + if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT)) { + mdt_set_capainfo(info, 0, rr->rr_fid1, + req_capsule_client_get(pill, &RMF_CAPA1)); + mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA); + } rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); if (S_ISDIR(attr->la_mode)) { @@ -797,9 +801,11 @@ static int mdt_link_unpack(struct mdt_thread_info *info) attr->la_valid = LA_UID | LA_GID | LA_CTIME | LA_MTIME; if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT)) - rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1); + mdt_set_capainfo(info, 0, rr->rr_fid1, + req_capsule_client_get(pill, &RMF_CAPA1)); if (req_capsule_get_size(pill, &RMF_CAPA2, RCL_CLIENT)) - rr->rr_capa2 = req_capsule_client_get(pill, &RMF_CAPA2); + mdt_set_capainfo(info, 1, rr->rr_fid2, + req_capsule_client_get(pill, &RMF_CAPA2)); rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); if (rr->rr_name == NULL) @@ -837,7 +843,8 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info) attr->la_valid = LA_UID | LA_GID | LA_CTIME | LA_MTIME | LA_MODE; if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT)) - rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1); + mdt_set_capainfo(info, 0, rr->rr_fid1, + req_capsule_client_get(pill, &RMF_CAPA1)); rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); if (rr->rr_name == NULL) @@ -876,9 +883,11 @@ static int mdt_rename_unpack(struct mdt_thread_info *info) attr->la_valid = LA_UID | LA_GID | LA_CTIME | LA_MTIME | LA_MODE; if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT)) - rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1); + mdt_set_capainfo(info, 0, rr->rr_fid1, + req_capsule_client_get(pill, &RMF_CAPA1)); if (req_capsule_get_size(pill, &RMF_CAPA2, RCL_CLIENT)) - rr->rr_capa2 = req_capsule_client_get(pill, &RMF_CAPA2); + mdt_set_capainfo(info, 1, rr->rr_fid2, + req_capsule_client_get(pill, &RMF_CAPA2)); rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT); @@ -895,6 +904,7 @@ static int mdt_open_unpack(struct mdt_thread_info *info) struct lu_attr *attr = &info->mti_attr.ma_attr; struct req_capsule *pill = &info->mti_pill; struct mdt_reint_record *rr = &info->mti_rr; + struct ptlrpc_request *req = mdt_info_req(info); ENTRY; rec = req_capsule_client_get(pill, &RMF_REC_CREATE); @@ -922,9 +932,12 @@ static int mdt_open_unpack(struct mdt_thread_info *info) info->mti_replayepoch = rec->cr_ioepoch; if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT)) - rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1); - if (req_capsule_get_size(pill, &RMF_CAPA2, RCL_CLIENT)) - rr->rr_capa2 = req_capsule_client_get(pill, &RMF_CAPA2); + mdt_set_capainfo(info, 0, rr->rr_fid1, + req_capsule_client_get(pill, &RMF_CAPA1)); + if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) && + (req_capsule_get_size(pill, &RMF_CAPA2, RCL_CLIENT))) + mdt_set_capainfo(info, 1, rr->rr_fid2, + req_capsule_client_get(pill, &RMF_CAPA2)); rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); if (rr->rr_name == NULL) @@ -932,7 +945,6 @@ static int mdt_open_unpack(struct mdt_thread_info *info) if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) { struct md_create_spec *sp = &info->mti_spec; - struct ptlrpc_request *req = mdt_info_req(info); sp->u.sp_ea.eadata = req_capsule_client_get(pill, &RMF_EADATA); sp->u.sp_ea.eadatalen = req_capsule_get_size(pill, diff --git a/lustre/mdt/mdt_lproc.c b/lustre/mdt/mdt_lproc.c index 0ee67a9..c01170f 100644 --- a/lustre/mdt/mdt_lproc.c +++ b/lustre/mdt/mdt_lproc.c @@ -416,6 +416,66 @@ static int lprocfs_rd_rootsquash_skips(char *page, char **start, off_t off, return ret; } +/* for debug only */ +static int lprocfs_rd_capa(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = data; + struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); + + return snprintf(page, count, "capability on: %s %s\n", + mdt->mdt_opts.mo_oss_capa ? "oss" : "", + mdt->mdt_opts.mo_mds_capa ? "mds" : ""); +} + +static int lprocfs_wr_capa(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + int val, rc; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc) + return rc; + + if (val & ~0x3) { + CERROR("invalid value %u: only 0/1/2/3 is accepted.\n", val); + CERROR("\t0: disable capability\n" + "\t1: enable mds capability\n" + "\t2: enable oss capability\n" + "\t3: enable both mds and oss capability\n"); + return -EINVAL; + } + +// mds_capa_onoff(obd, val); + return count; +} + +static int lprocfs_rd_capa_count(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + return snprintf(page, count, "%d %d\n", + capa_count[CAPA_SITE_CLIENT], + capa_count[CAPA_SITE_SERVER]); +} + +static int lprocfs_rd_capa_timeout(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = data; + struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); + + return snprintf(page, count, "%lu\n", mdt->mdt_capa_timeout); +} + +static int lprocfs_rd_ck_timeout(char *page, char **start, off_t off, int count, + int *eof, void *data) +{ + struct obd_device *obd = data; + struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); + + return snprintf(page, count, "%lu\n", mdt->mdt_ck_timeout); +} + static struct lprocfs_vars lprocfs_mdt_obd_vars[] = { { "uuid", lprocfs_rd_uuid, 0, 0 }, { "recovery_status", lprocfs_obd_rd_recovery_status, 0, 0 }, @@ -438,6 +498,10 @@ static struct lprocfs_vars lprocfs_mdt_obd_vars[] = { { "rootsquash_uid", lprocfs_rd_rootsquash_uid, 0, 0 }, { "rootsquash_gid", lprocfs_rd_rootsquash_gid, 0, 0 }, { "rootsquash_skips", lprocfs_rd_rootsquash_skips, 0, 0 }, + { "capa", lprocfs_rd_capa, lprocfs_wr_capa, 0 }, + { "capa_timeout", lprocfs_rd_capa_timeout, 0, 0 }, + { "capa_key_timeout", lprocfs_rd_ck_timeout, 0, 0 }, + { "capa_count", lprocfs_rd_capa_count, 0, 0 }, { 0 } }; diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 89dd16e..5bd3caf 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -80,8 +80,8 @@ static int mdt_create_data(struct mdt_thread_info *info, int rc; ENTRY; - if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE || - !(spec->sp_cr_flags & FMODE_WRITE)) + if ((spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE) || + !(spec->sp_cr_flags & FMODE_WRITE)) RETURN(0); ma->ma_need = MA_INODE | MA_LOV; @@ -346,10 +346,6 @@ static int mdt_mfd_open(struct mdt_thread_info *info, } } - spin_lock(&capa_lock); - info->mti_capa_key = *red_capa_key(mdt); - spin_unlock(&capa_lock); - if (mdt->mdt_opts.mo_mds_capa) { struct lustre_capa *capa; @@ -361,12 +357,13 @@ static int mdt_mfd_open(struct mdt_thread_info *info, RETURN(rc); repbody->valid |= OBD_MD_FLMDSCAPA; } - if (mdt->mdt_opts.mo_oss_capa) { + if (mdt->mdt_opts.mo_oss_capa && + S_ISREG(lu_object_attr(&o->mot_obj.mo_lu))) { struct lustre_capa *capa; capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA2); LASSERT(capa); - capa->lc_opc = CAPA_OPC_OSS_DEFAULT; + capa->lc_opc = CAPA_OPC_OSS_DEFAULT | capa_open_opc(flags); rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa); if (rc) RETURN(rc); @@ -540,10 +537,10 @@ void mdt_reconstruct_open(struct mdt_thread_info *info, * We failed after creation, but we do not know in which step * we failed. So try to check the child object. */ - parent = mdt_object_find(env, mdt, rr->rr_fid1, rr->rr_capa1); + parent = mdt_object_find(env, mdt, rr->rr_fid1); LASSERT(!IS_ERR(parent)); - child = mdt_object_find(env, mdt, rr->rr_fid2, rr->rr_capa2); + child = mdt_object_find(env, mdt, rr->rr_fid2); LASSERT(!IS_ERR(child)); rc = lu_object_exists(&child->mot_obj.mo_lu); @@ -591,8 +588,7 @@ static int mdt_open_by_fid(struct mdt_thread_info* info, int rc; ENTRY; - o = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2, - rr->rr_capa2); + o = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2); if (IS_ERR(o)) RETURN(rc = PTR_ERR(o)); @@ -637,12 +633,13 @@ static int mdt_cross_open(struct mdt_thread_info* info, int rc; ENTRY; - o = mdt_object_find(info->mti_env, info->mti_mdt, fid, BYPASS_CAPA); + o = mdt_object_find(info->mti_env, info->mti_mdt, fid); if (IS_ERR(o)) RETURN(rc = PTR_ERR(o)); rc = lu_object_exists(&o->mot_obj.mo_lu); if (rc > 0) { + mdt_set_capainfo(info, 0, fid, BYPASS_CAPA); rc = mo_attr_get(info->mti_env, mdt_object_child(o), ma); if (rc == 0) rc = mdt_mfd_open(info, NULL, o, flags, 0, rep); @@ -747,7 +744,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) else lh->mlh_mode = LCK_EX; parent = mdt_object_find_lock(info, rr->rr_fid1, lh, - MDS_INODELOCK_UPDATE, rr->rr_capa1); + MDS_INODELOCK_UPDATE); if (IS_ERR(parent)) GOTO(out, result = PTR_ERR(parent)); @@ -778,10 +775,11 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS); } - child = mdt_object_find(info->mti_env, mdt, child_fid, BYPASS_CAPA); + child = mdt_object_find(info->mti_env, mdt, child_fid); if (IS_ERR(child)) GOTO(out_parent, result = PTR_ERR(child)); + mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA); if (result == -ENOENT) { /* Not found and with MDS_OPEN_CREAT: let's create it. */ mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE); diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index 02e4b36..dd63f2d 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -70,7 +70,7 @@ int mdt_record_read(const struct lu_env *env, LASSERTF(dt != NULL, "dt is NULL when we want to read record\n"); - rc = dt->do_body_ops->dbo_read(env, dt, buf, pos); + rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA); if (rc == buf->lb_len) rc = 0; @@ -87,7 +87,7 @@ int mdt_record_write(const struct lu_env *env, LASSERTF(dt != NULL, "dt is NULL when we want to write record\n"); LASSERT(th != NULL); - rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th); + rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA); if (rc == buf->lb_len) rc = 0; else if (rc >= 0) @@ -435,7 +435,7 @@ static int mdt_server_data_init(const struct lu_env *env, obj = mdt->mdt_last_rcvd; obj->do_ops->do_read_lock(env, obj); - rc = obj->do_ops->do_attr_get(env, mdt->mdt_last_rcvd, la); + rc = obj->do_ops->do_attr_get(env, mdt->mdt_last_rcvd, la, BYPASS_CAPA); obj->do_ops->do_read_unlock(env, obj); if (rc) RETURN(rc); @@ -944,7 +944,6 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt, o = dt_store_open(env, mdt->mdt_bottom, CAPA_KEYS, &fid); if(!IS_ERR(o)) { - struct md_device *next = mdt->mdt_child; mdt->mdt_ck_obj = o; rc = mdt_capa_keys_init(env, mdt); if (rc) { @@ -952,7 +951,6 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt, mdt->mdt_ck_obj = NULL; RETURN(rc); } - rc = next->md_ops->mdo_init_capa_keys(next, mdt->mdt_capa_keys); } else { rc = PTR_ERR(o); CERROR("cannot open %s: rc = %d\n", CAPA_KEYS, rc); @@ -1032,8 +1030,7 @@ static void mdt_reconstruct_create(struct mdt_thread_info *mti, return; /* if no error, so child was created with requested fid */ - child = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid2, - mti->mti_rr.rr_capa2); + child = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid2); LASSERT(!IS_ERR(child)); body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY); @@ -1062,8 +1059,7 @@ static void mdt_reconstruct_setattr(struct mdt_thread_info *mti, return; body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY); - obj = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid1, - mti->mti_rr.rr_capa1); + obj = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid1); LASSERT(!IS_ERR(obj)); mo_attr_get(mti->mti_env, mdt_object_child(obj), &mti->mti_attr); mdt_pack_attr2body(body, &mti->mti_attr.ma_attr, mdt_object_fid(obj)); diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 3774e54..eb40fe9 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -53,13 +53,12 @@ static int mdt_md_create(struct mdt_thread_info *info) lh = &info->mti_lh[MDT_LH_PARENT]; lh->mlh_mode = LCK_EX; - parent = mdt_object_find_lock(info, rr->rr_fid1, - lh, MDS_INODELOCK_UPDATE, - rr->rr_capa1); + parent = mdt_object_find_lock(info, rr->rr_fid1, lh, + MDS_INODELOCK_UPDATE); if (IS_ERR(parent)) RETURN(PTR_ERR(parent)); - child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2, BYPASS_CAPA); + child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2); if (!IS_ERR(child)) { struct md_object *next = mdt_object_child(parent); @@ -67,6 +66,7 @@ static int mdt_md_create(struct mdt_thread_info *info) mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_CREATE_WRITE); + mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA); rc = mdo_create(info->mti_env, next, rr->rr_name, mdt_object_child(child), &info->mti_spec, ma); @@ -96,8 +96,7 @@ static int mdt_md_mkobj(struct mdt_thread_info *info) repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); - o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2, - BYPASS_CAPA); + o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2); if (!IS_ERR(o)) { struct md_object *next = mdt_object_child(o); @@ -211,8 +210,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, (unsigned int)ma->ma_attr.la_valid); repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); - mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1, - rr->rr_capa1); + mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); if (IS_ERR(mo)) RETURN(rc = PTR_ERR(mo)); @@ -277,7 +275,8 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, mdt_pack_attr2body(repbody, &ma->ma_attr, mdt_object_fid(mo)); - if (mdt->mdt_opts.mo_oss_capa) { + if (mdt->mdt_opts.mo_oss_capa && + S_ISREG(lu_object_attr(&mo->mot_obj.mo_lu))) { struct lustre_capa *capa; capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1); @@ -354,7 +353,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, parent_lh = &info->mti_lh[MDT_LH_PARENT]; parent_lh->mlh_mode = LCK_EX; mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh, - MDS_INODELOCK_UPDATE, rr->rr_capa1); + MDS_INODELOCK_UPDATE); if (IS_ERR(mp)) GOTO(out, rc = PTR_ERR(mp)); @@ -387,8 +386,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, GOTO(out_unlock_parent, rc); /* we will lock the child regardless it is local or remote. No harm. */ - mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid, - BYPASS_CAPA); + mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid); if (IS_ERR(mc)) GOTO(out_unlock_parent, rc = PTR_ERR(mc)); child_lh = &info->mti_lh[MDT_LH_CHILD]; @@ -405,6 +403,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, * whether need MA_LOV and MA_COOKIE. */ ma->ma_need = MA_INODE; + mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA); rc = mdo_unlink(info->mti_env, mdt_object_child(mp), mdt_object_child(mc), rr->rr_name, ma); if (rc) @@ -448,12 +447,13 @@ static int mdt_reint_link(struct mdt_thread_info *info, lhs = &info->mti_lh[MDT_LH_PARENT]; lhs->mlh_mode = LCK_EX; ms = mdt_object_find_lock(info, rr->rr_fid1, lhs, - MDS_INODELOCK_UPDATE, rr->rr_capa1); + MDS_INODELOCK_UPDATE); if (IS_ERR(ms)) RETURN(PTR_ERR(ms)); if (strlen(rr->rr_name) == 0) { /* remote partial operation */ + mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA); rc = mo_ref_add(info->mti_env, mdt_object_child(ms)); GOTO(out_unlock_source, rc); } @@ -461,7 +461,7 @@ static int mdt_reint_link(struct mdt_thread_info *info, lhp = &info->mti_lh[MDT_LH_CHILD]; lhp->mlh_mode = LCK_EX; mp = mdt_object_find_lock(info, rr->rr_fid2, lhp, - MDS_INODELOCK_UPDATE, rr->rr_capa2); + MDS_INODELOCK_UPDATE); if (IS_ERR(mp)) GOTO(out_unlock_source, rc = PTR_ERR(mp)); @@ -505,7 +505,7 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info) lh_tgtdir = &info->mti_lh[MDT_LH_PARENT]; lh_tgtdir->mlh_mode = LCK_EX; mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir, - MDS_INODELOCK_UPDATE, rr->rr_capa1); + MDS_INODELOCK_UPDATE); if (IS_ERR(mtgtdir)) GOTO(out, rc = PTR_ERR(mtgtdir)); @@ -518,7 +518,7 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info) lh_tgt->mlh_mode = LCK_EX; mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt, - MDS_INODELOCK_LOOKUP, BYPASS_CAPA); + MDS_INODELOCK_LOOKUP); if (IS_ERR(mtgt)) GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt)); @@ -604,8 +604,7 @@ static int mdt_rename_check(struct mdt_thread_info *info, struct lu_fid *fid) ENTRY; do { - dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid, - BYPASS_CAPA); + dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid); if (!IS_ERR(dst)) { rc = mdo_is_subdir(info->mti_env, mdt_object_child(dst), fid, &dst_fid); @@ -664,7 +663,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info, lh_srcdirp = &info->mti_lh[MDT_LH_PARENT]; lh_srcdirp->mlh_mode = LCK_EX; msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp, - MDS_INODELOCK_UPDATE, rr->rr_capa1); + MDS_INODELOCK_UPDATE); if (IS_ERR(msrcdir)) GOTO(out_rename_lock, rc = PTR_ERR(msrcdir)); @@ -676,7 +675,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info, mtgtdir = msrcdir; } else { mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt, - rr->rr_fid2, rr->rr_capa2); + rr->rr_fid2); if (IS_ERR(mtgtdir)) GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir)); @@ -701,7 +700,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info, lh_oldp = &info->mti_lh[MDT_LH_OLD]; lh_oldp->mlh_mode = LCK_EX; mold = mdt_object_find_lock(info, old_fid, lh_oldp, - MDS_INODELOCK_LOOKUP, BYPASS_CAPA); + MDS_INODELOCK_LOOKUP); if (IS_ERR(mold)) GOTO(out_unlock_target, rc = PTR_ERR(mold)); @@ -719,8 +718,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info, GOTO(out_unlock_old, rc = -EINVAL); lh_newp->mlh_mode = LCK_EX; - mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid, - BYPASS_CAPA); + mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid); if (IS_ERR(mnew)) GOTO(out_unlock_old, rc = PTR_ERR(mnew)); @@ -752,6 +750,8 @@ static int mdt_reint_rename(struct mdt_thread_info *info, mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_RENAME_WRITE); + mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA); + mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA); /* Check if @dst is subdir of @src. */ rc = mdt_rename_check(info, old_fid); if (rc) diff --git a/lustre/obdclass/capa.c b/lustre/obdclass/capa.c index ebc97b7..bdc4a76 100644 --- a/lustre/obdclass/capa.c +++ b/lustre/obdclass/capa.c @@ -174,14 +174,14 @@ static inline void free_capa_lru(struct list_head *head) } /* add or update */ -struct obd_capa *capa_add(struct lustre_capa *capa) +void capa_add(struct lustre_capa *capa) { struct hlist_head *head = capa_hash + capa_hashfn(&capa->lc_fid); struct obd_capa *ocapa, *old = NULL; ocapa = alloc_capa(CAPA_SITE_SERVER); if (!ocapa) - return NULL; + return; spin_lock(&capa_lock); @@ -198,7 +198,7 @@ struct obd_capa *capa_add(struct lustre_capa *capa) DEBUG_CAPA(D_SEC, &ocapa->c_capa, "new"); spin_unlock(&capa_lock); - return ocapa; + return; } spin_lock(&old->c_lock); @@ -213,7 +213,6 @@ struct obd_capa *capa_add(struct lustre_capa *capa) DEBUG_CAPA(D_SEC, &old->c_capa, "update"); free_capa(ocapa); - return old; } struct obd_capa *capa_lookup(struct lustre_capa *capa) diff --git a/lustre/obdclass/dt_object.c b/lustre/obdclass/dt_object.c index 9171d86..a6111ac 100644 --- a/lustre/obdclass/dt_object.c +++ b/lustre/obdclass/dt_object.c @@ -149,7 +149,8 @@ static int dt_lookup(const struct lu_env *env, struct dt_object *dir, int result; if (dt_try_as_dir(env, dir)) - result = dir->do_index_ops->dio_lookup(env, dir, rec, key); + result = dir->do_index_ops->dio_lookup(env, dir, rec, key, + BYPASS_CAPA); else result = -ENOTDIR; return result; @@ -162,7 +163,7 @@ static struct dt_object *dt_locate(const struct lu_env *env, struct lu_object *obj; struct dt_object *dt; - obj = lu_object_find(env, dev->dd_lu_dev.ld_site, fid, BYPASS_CAPA); + obj = lu_object_find(env, dev->dd_lu_dev.ld_site, fid); if (!IS_ERR(obj)) { obj = lu_object_locate(obj->lo_header, dev->dd_lu_dev.ld_type); LASSERT(obj != NULL); @@ -185,7 +186,6 @@ struct dt_object *dt_store_open(const struct lu_env *env, if (result == 0) { root = dt_locate(env, dt, fid); if (!IS_ERR(root)) { - lu_object_bypass_capa(&root->do_lu); result = dt_lookup(env, root, name, fid); if (result == 0) child = dt_locate(env, dt, fid); diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index b0a4b77..8f977cb 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -107,8 +107,7 @@ EXPORT_SYMBOL(lu_object_put); */ static struct lu_object *lu_object_alloc(const struct lu_env *env, struct lu_site *s, - const struct lu_fid *f, - const struct lustre_capa *capa) + const struct lu_fid *f) { struct lu_object *scan; struct lu_object *top; @@ -130,11 +129,6 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, * after this point. */ top->lo_header->loh_fid = *f; - if (capa == BYPASS_CAPA) - lu_object_bypass_capa(top); - else if (capa) - top->lo_header->loh_capa = *capa; - layers = &top->lo_header->loh_layers; do { /* @@ -428,13 +422,11 @@ static __u32 fid_hash(const struct lu_fid *f) * any case, additional reference is acquired on the returned object. */ struct lu_object *lu_object_find(const struct lu_env *env, - struct lu_site *s, const struct lu_fid *f, - struct lustre_capa *capa) + struct lu_site *s, const struct lu_fid *f) { struct lu_object *o; struct lu_object *shadow; struct hlist_head *bucket; - int rc; /* * This uses standard index maintenance protocol: @@ -455,25 +447,14 @@ struct lu_object *lu_object_find(const struct lu_env *env, o = htable_lookup(s, bucket, f); spin_unlock(&s->ls_guard); - if (o != NULL) { - if (capa == BYPASS_CAPA) { - o->lo_header->loh_capa_bypass = 1; - } else { - rc = lu_object_auth(env, o, capa, - CAPA_OPC_INDEX_LOOKUP); - if (rc) - return ERR_PTR(rc); - if (capa) - o->lo_header->loh_capa = *capa; - } + if (o != NULL) return o; - } /* * Allocate new object. This may result in rather complicated * operations, including fld queries, inode loading, etc. */ - o = lu_object_alloc(env, s, f, capa); + o = lu_object_alloc(env, s, f); if (IS_ERR(o)) return o; @@ -496,24 +477,6 @@ struct lu_object *lu_object_find(const struct lu_env *env, } EXPORT_SYMBOL(lu_object_find); -int lu_object_auth(const struct lu_env *env, const struct lu_object *o, - struct lustre_capa *capa, __u64 opc) -{ - struct lu_object_header *top = o->lo_header; - int rc; - - list_for_each_entry(o, &top->loh_layers, lo_linkage) { - if (o->lo_ops->loo_object_auth) { - rc = o->lo_ops->loo_object_auth(env, o, capa, opc); - if (rc) - return rc; - } - } - - return 0; -} -EXPORT_SYMBOL(lu_object_auth); - enum { LU_SITE_HTABLE_BITS = 8, LU_SITE_HTABLE_SIZE = (1 << LU_SITE_HTABLE_BITS), diff --git a/lustre/osd/osd_handler.c b/lustre/osd/osd_handler.c index a0cae5d..55706c0 100644 --- a/lustre/osd/osd_handler.c +++ b/lustre/osd/osd_handler.c @@ -110,6 +110,13 @@ struct osd_device { * This means that it's enough to have _one_ lu_context. */ struct lu_env od_env_for_commit; + /* + * Capability + */ + unsigned int od_fl_capa:1; + unsigned long od_capa_timeout; + __u32 od_capa_alg; + struct lustre_capa_key *od_capa_keys; }; static int osd_root_get (const struct lu_env *env, @@ -151,15 +158,18 @@ static int osd_param_is_sane (const struct osd_device *dev, const struct txn_param *param); static int osd_index_lookup (const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key); + struct dt_rec *rec, const struct dt_key *key, + struct lustre_capa *capa); static int osd_index_insert (const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, - struct thandle *handle); + struct thandle *handle, + struct lustre_capa *capa); static int osd_index_delete (const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, - struct thandle *handle); + struct thandle *handle, + struct lustre_capa *capa); static int osd_index_probe (const struct lu_env *env, struct osd_object *o, const struct dt_index_features *feat); @@ -666,6 +676,29 @@ static void osd_ro(const struct lu_env *env, struct dt_device *d) EXIT; } +static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d, + __u32 valid, unsigned long timeout, __u32 alg, + struct lustre_capa_key *keys) +{ + struct osd_device *dev = osd_dt_dev(d); + ENTRY; + + if (valid & CAPA_CTX_ON) + dev->od_fl_capa = 1; + else + dev->od_fl_capa = 0; + + if (valid & CAPA_CTX_TIMEOUT) + dev->od_capa_timeout = timeout; + + if (valid & CAPA_CTX_ALG) + dev->od_capa_alg = alg; + + if (valid & CAPA_CTX_KEYS) + dev->od_capa_keys = keys; + RETURN(0); +} + /* Note: we did not count into QUOTA here, If we mount with --data_journal * we may need more*/ enum { @@ -720,14 +753,15 @@ static int osd_credit_get(const struct lu_env *env, struct dt_device *d, } static struct dt_device_operations osd_dt_ops = { - .dt_root_get = osd_root_get, - .dt_statfs = osd_statfs, - .dt_trans_start = osd_trans_start, - .dt_trans_stop = osd_trans_stop, - .dt_conf_get = osd_conf_get, - .dt_sync = osd_sync, - .dt_ro = osd_ro, - .dt_credit_get = osd_credit_get + .dt_root_get = osd_root_get, + .dt_statfs = osd_statfs, + .dt_trans_start = osd_trans_start, + .dt_trans_stop = osd_trans_stop, + .dt_conf_get = osd_conf_get, + .dt_sync = osd_sync, + .dt_ro = osd_ro, + .dt_credit_get = osd_credit_get, + .dt_init_capa_ctxt = osd_init_capa_ctxt }; static void osd_object_read_lock(const struct lu_env *env, @@ -785,16 +819,99 @@ static void osd_object_write_unlock(const struct lu_env *env, up_write(&obj->oo_sem); } -static inline int osd_object_auth(const struct lu_env *env, - const struct lu_object *o, - __u64 opc) +static int capa_is_sane(const struct lu_env *env, + struct lustre_capa *capa, + struct lustre_capa_key *keys) +{ + struct obd_capa *c; + struct osd_thread_info *oti = lu_context_key_get(&env->le_ctx, &osd_key); + int i, rc = 1; + ENTRY; + + c = capa_lookup(capa); + if (c) { + spin_lock(&c->c_lock); + if (memcmp(&c->c_capa, capa, sizeof(*capa))) { + DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch"); + rc = -EACCES; + } else if (capa_is_expired(c)) { + DEBUG_CAPA(D_ERROR, capa, "expired"); + rc = -ESTALE; + } + spin_unlock(&c->c_lock); + + capa_put(c); + RETURN(rc); + } + + spin_lock(&capa_lock); + for (i = 0; i < 2; i++) { + if (keys[i].lk_keyid == capa->lc_keyid) { + oti->oti_capa_key = keys[i]; + break; + } + } + spin_unlock(&capa_lock); + + if (i == 2) { + DEBUG_CAPA(D_ERROR, capa, "no matched capa key"); + RETURN(-ESTALE); + } + + rc = capa_hmac(oti->oti_capa_hmac, capa, oti->oti_capa_key.lk_key); + if (rc) + RETURN(rc); + if (memcmp(oti->oti_capa_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) { + DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch"); + RETURN(-EACCES); + } + + capa_add(capa); + + RETURN(1); +} + +static int osd_object_auth(const struct lu_env *env, struct dt_object *dt, + struct lustre_capa *capa, __u64 opc) { - return o->lo_ops->loo_object_auth(env, o, lu_object_capa(o), opc); + const struct lu_fid *fid = lu_object_fid(&dt->do_lu); + struct osd_device *dev = osd_dev(dt->do_lu.lo_dev); + + if (!dev->od_fl_capa) + return 0; + + if (capa == BYPASS_CAPA) + return 0; + + if (!capa) { + CERROR("no capability is provided for fid "DFID"\n", PFID(fid)); + LBUG(); + return -EACCES; + } + + if (!lu_fid_eq(fid, &capa->lc_fid)) { + DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with", + PFID(fid)); + return -EACCES; + } + + if (!capa_opc_supported(capa, opc)) { + DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc); + return -EACCES; + } + + if (!capa_is_sane(env, capa, dev->od_capa_keys)) { + DEBUG_CAPA(D_ERROR, capa, "insane"); + return -EACCES; + } + + return 0; } static int osd_attr_get(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr) + struct lu_attr *attr, + struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); @@ -802,7 +919,7 @@ static int osd_attr_get(const struct lu_env *env, LASSERT(osd_invariant(obj)); LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj)); - if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_READ)) + if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ)) return -EACCES; return osd_inode_getattr(env, obj->oo_inode, attr); @@ -811,7 +928,8 @@ static int osd_attr_get(const struct lu_env *env, static int osd_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, - struct thandle *handle) + struct thandle *handle, + struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); LASSERT(handle != NULL); @@ -819,7 +937,7 @@ static int osd_attr_set(const struct lu_env *env, LASSERT(osd_invariant(obj)); LASSERT(osd_write_locked(env, obj)); - if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_WRITE)) + if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) return -EACCES; return osd_inode_setattr(env, obj->oo_inode, attr); @@ -1074,8 +1192,6 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, /* * XXX missing: permission checks. */ - if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_INDEX_INSERT)) - RETURN(-EACCES); /* * XXX missing: sanity checks (valid ->la_mode, etc.) @@ -1111,7 +1227,8 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, } static void osd_object_ref_add(const struct lu_env *env, - struct dt_object *dt, struct thandle *th) + struct dt_object *dt, + struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; @@ -1121,12 +1238,6 @@ static void osd_object_ref_add(const struct lu_env *env, LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); - if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_WRITE)) { - LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu, - "no capability to link!\n"); - return; - } - if (inode->i_nlink < LDISKFS_LINK_MAX) { inode->i_nlink ++; mark_inode_dirty(inode); @@ -1137,7 +1248,8 @@ static void osd_object_ref_add(const struct lu_env *env, } static void osd_object_ref_del(const struct lu_env *env, - struct dt_object *dt, struct thandle *th) + struct dt_object *dt, + struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; @@ -1147,12 +1259,6 @@ static void osd_object_ref_del(const struct lu_env *env, LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); - if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_WRITE)) { - LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu, - "no capability to unlink!\n"); - return; - } - if (inode->i_nlink > 0) { inode->i_nlink --; mark_inode_dirty(inode); @@ -1162,8 +1268,11 @@ static void osd_object_ref_del(const struct lu_env *env, LASSERT(osd_invariant(obj)); } -static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf, const char *name) +static int osd_xattr_get(const struct lu_env *env, + struct dt_object *dt, + struct lu_buf *buf, + const char *name, + struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; @@ -1174,7 +1283,7 @@ static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt, LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL); LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj)); - if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_READ)) + if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ)) return -EACCES; dentry->d_inode = inode; @@ -1183,7 +1292,7 @@ static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt, static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, - struct thandle *handle) + struct thandle *handle, struct lustre_capa *capa) { int fs_flags; @@ -1197,7 +1306,7 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt, LASSERT(osd_write_locked(env, obj)); LASSERT(handle != NULL); - if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_WRITE)) + if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) return -EACCES; dentry->d_inode = inode; @@ -1213,8 +1322,10 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt, buf->lb_buf, buf->lb_len, fs_flags); } -static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf) +static int osd_xattr_list(const struct lu_env *env, + struct dt_object *dt, + struct lu_buf *buf, + struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; @@ -1225,15 +1336,18 @@ static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt, LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL); LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj)); - if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_READ)) + if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ)) return -EACCES; dentry->d_inode = inode; return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len); } -static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt, - const char *name, struct thandle *handle) +static int osd_xattr_del(const struct lu_env *env, + struct dt_object *dt, + const char *name, + struct thandle *handle, + struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; @@ -1245,7 +1359,7 @@ static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt, LASSERT(osd_write_locked(env, obj)); LASSERT(handle != NULL); - if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_WRITE)) + if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) return -EACCES; dentry->d_inode = inode; @@ -1258,7 +1372,8 @@ static int osd_dir_page_build(const struct lu_env *env, int first, __u32 *start, __u32 *end, struct lu_dirent **last) { int result; - struct osd_thread_info *info = lu_context_key_get(&env->le_ctx, &osd_key); + struct osd_thread_info *info = lu_context_key_get(&env->le_ctx, + &osd_key); struct lu_fid *fid = &info->oti_fid; struct lu_dirent *ent; @@ -1315,7 +1430,9 @@ static int osd_dir_page_build(const struct lu_env *env, int first, } static int osd_readpage(const struct lu_env *env, - struct dt_object *dt, const struct lu_rdpg *rdpg) + struct dt_object *dt, + const struct lu_rdpg *rdpg, + struct lustre_capa *capa) { struct dt_it *it; struct osd_object *obj = osd_dt_obj(dt); @@ -1329,7 +1446,7 @@ static int osd_readpage(const struct lu_env *env, LASSERT(rdpg->rp_pages != NULL); - if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_BODY_READ)) + if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ)) return -EACCES; if (rdpg->rp_count <= 0) @@ -1400,6 +1517,51 @@ static int osd_readpage(const struct lu_env *env, return rc ? rc : rc1; } +static int osd_capa_get(const struct lu_env *env, + struct dt_object *dt, struct lustre_capa *capa) +{ + struct osd_thread_info *info = lu_context_key_get(&env->le_ctx, + &osd_key); + const struct lu_fid *fid = lu_object_fid(&dt->do_lu); + struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *dev = osd_obj2dev(obj); + struct lustre_capa_key *key = &info->oti_capa_key; + struct obd_capa *oc; + int rc; + ENTRY; + + LASSERT(dt_object_exists(dt)); + LASSERT(osd_invariant(obj)); + + capa->lc_fid = *fid; + if (dev->od_capa_timeout < CAPA_TIMEOUT) + capa->lc_flags |= CAPA_FL_SHORT_EXPIRY; + + capa->lc_flags = dev->od_capa_alg << 24; + + /* TODO: get right permission here */ + oc = capa_lookup(capa); + if (oc) { + LASSERT(!capa_is_expired(oc)); + capa_cpy(capa, oc); + capa_put(oc); + RETURN(0); + } + + spin_lock(&capa_lock); + *key = dev->od_capa_keys[1]; + capa->lc_expiry = CURRENT_SECONDS + dev->od_capa_timeout; + spin_unlock(&capa_lock); + + capa->lc_keyid = key->lk_keyid; + rc = capa_hmac(capa->lc_hmac, capa, key->lk_key); + if (rc) + RETURN(rc); + + capa_add(capa); + RETURN(0); +} + static struct dt_object_operations osd_obj_ops = { .do_read_lock = osd_object_read_lock, .do_write_lock = osd_object_write_lock, @@ -1416,6 +1578,7 @@ static struct dt_object_operations osd_obj_ops = { .do_xattr_del = osd_xattr_del, .do_xattr_list = osd_xattr_list, .do_readpage = osd_readpage, + .do_capa_get = osd_capa_get, }; /* @@ -1423,13 +1586,17 @@ static struct dt_object_operations osd_obj_ops = { */ static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf, loff_t *pos) + struct lu_buf *buf, loff_t *pos, + struct lustre_capa *capa) { struct inode *inode = osd_dt_obj(dt)->oo_inode; struct file *file; mm_segment_t seg; ssize_t result; + if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ)) + RETURN(-EACCES); + file = osd_rw_init(env, inode, &seg); /* * We'd like to use vfs_read() here, but it messes with @@ -1448,7 +1615,7 @@ static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt, static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, - struct thandle *handle) + struct thandle *handle, struct lustre_capa *capa) { struct inode *inode = osd_dt_obj(dt)->oo_inode; struct file *file; @@ -1457,6 +1624,9 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, LASSERT(handle != NULL); + if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE)) + RETURN(-EACCES); + file = osd_rw_init(env, inode, &seg); if (file->f_op->write) result = file->f_op->write(file, buf->lb_buf, buf->lb_len, pos); @@ -1514,9 +1684,6 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, LASSERT(osd_invariant(obj)); LASSERT(dt_object_exists(dt)); - if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_INDEX_LOOKUP)) - RETURN(-EACCES); - if (osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode) { dt->do_index_ops = &osd_index_compat_ops; result = 0; @@ -1554,7 +1721,8 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, } static int osd_index_delete(const struct lu_env *env, struct dt_object *dt, - const struct dt_key *key, struct thandle *handle) + const struct dt_key *key, struct thandle *handle, + struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); struct osd_thandle *oh; @@ -1568,7 +1736,7 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt, LASSERT(obj->oo_ipd != NULL); LASSERT(handle != NULL); - if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_INDEX_DELETE)) + if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE)) RETURN(-EACCES); oh = container_of0(handle, struct osd_thandle, ot_super); @@ -1582,7 +1750,8 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt, } static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key) + struct dt_rec *rec, const struct dt_key *key, + struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); int rc; @@ -1594,7 +1763,7 @@ static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt, LASSERT(obj->oo_container.ic_object == obj->oo_inode); LASSERT(obj->oo_ipd != NULL); - if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_INDEX_LOOKUP)) + if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP)) return -EACCES; rc = iam_lookup(&obj->oo_container, (const struct iam_key *)key, @@ -1607,7 +1776,7 @@ static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt, static int osd_index_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, - struct thandle *th) + struct thandle *th, struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); @@ -1622,7 +1791,7 @@ static int osd_index_insert(const struct lu_env *env, struct dt_object *dt, LASSERT(obj->oo_ipd != NULL); LASSERT(th != NULL); - if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_INDEX_INSERT)) + if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT)) return -EACCES; oh = container_of0(th, struct osd_thandle, ot_super); @@ -1769,7 +1938,8 @@ static struct dt_index_operations osd_index_ops = { static int osd_index_compat_delete(const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, - struct thandle *handle) + struct thandle *handle, + struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); @@ -1777,6 +1947,11 @@ static int osd_index_compat_delete(const struct lu_env *env, LASSERT(S_ISDIR(obj->oo_inode->i_mode)); ENTRY; +#if 0 + if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE)) + RETURN(-EACCES); +#endif + RETURN(-EOPNOTSUPP); } @@ -1796,7 +1971,8 @@ static int osd_build_fid(struct osd_device *osd, static int osd_index_compat_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key) + struct dt_rec *rec, const struct dt_key *key, + struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); @@ -1816,6 +1992,9 @@ static int osd_index_compat_lookup(const struct lu_env *env, LASSERT(S_ISDIR(obj->oo_inode->i_mode)); LASSERT(osd_has_index(obj)); + if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP)) + return -EACCES; + info->oti_str.name = (const char *)key; info->oti_str.len = strlen((const char *)key); @@ -1907,7 +2086,8 @@ static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev, static int osd_index_compat_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, - const struct dt_key *key, struct thandle *th) + const struct dt_key *key, struct thandle *th, + struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); @@ -1925,7 +2105,10 @@ static int osd_index_compat_insert(const struct lu_env *env, LASSERT(osd_invariant(obj)); LASSERT(th != NULL); - luch = lu_object_find(env, ludev->ld_site, fid, BYPASS_CAPA); + if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT)) + return -EACCES; + + luch = lu_object_find(env, ludev->ld_site, fid); if (!IS_ERR(luch)) { if (lu_object_exists(luch)) { struct osd_object *child; @@ -2375,102 +2558,13 @@ static int osd_object_invariant(const struct lu_object *l) return osd_invariant(osd_obj(l)); } -static int capa_is_sane(const struct lu_env *env, - struct lustre_capa *capa, - struct lustre_capa_key *keys) -{ - struct obd_capa *c; - struct osd_thread_info *oti = lu_context_key_get(&env->le_ctx, &osd_key); - int i, rc = 0; - ENTRY; - - c = capa_lookup(capa); - if (c) { - spin_lock(&c->c_lock); - if (memcmp(&c->c_capa, capa, sizeof(*capa))) { - DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch"); - rc = -EACCES; - } else if (capa_is_expired(c)) { - DEBUG_CAPA(D_ERROR, capa, "expired"); - rc = -ESTALE; - } - spin_unlock(&c->c_lock); - - capa_put(c); - RETURN(rc); - } - - spin_lock(&capa_lock); - for (i = 0; i < 2; i++) { - if (keys[i].lk_keyid == capa->lc_keyid) { - oti->oti_capa_key = keys[i]; - break; - } - } - spin_unlock(&capa_lock); - - if (i == 2) { - DEBUG_CAPA(D_ERROR, capa, "no matched capa key"); - RETURN(-ESTALE); - } - - rc = capa_hmac(oti->oti_capa_hmac, capa, oti->oti_capa_key.lk_key); - if (rc) - RETURN(rc); - if (memcmp(oti->oti_capa_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) { - DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch"); - RETURN(-EACCES); - } - - capa_add(capa); - - RETURN(0); -} - -static int osd_object_capa_auth(const struct lu_env *env, - const struct lu_object *obj, - struct lustre_capa *capa, - __u64 opc) -{ - const struct lu_fid *fid = lu_object_fid(obj); - - return 0; - - if (lu_object_capa_bypass(obj)) - return 0; - - if (!capa) { - CERROR("no capability is provided for fid "DFID"\n", PFID(fid)); - return -EACCES; - } - - if (!lu_fid_eq(fid, &capa->lc_fid)) { - DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with", - PFID(fid)); - return -EACCES; - } - - if (!capa_opc_supported(capa, opc)) { - DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc); - return -EACCES; - } - - if (!capa_is_sane(env, capa, obj->lo_dev->ld_site->ls_capa_keys)) { - DEBUG_CAPA(D_ERROR, capa, "insane"); - return -EACCES; - } - - return 0; -} - static struct lu_object_operations osd_lu_obj_ops = { .loo_object_init = osd_object_init, .loo_object_delete = osd_object_delete, .loo_object_release = osd_object_release, .loo_object_free = osd_object_free, .loo_object_print = osd_object_print, - .loo_object_invariant = osd_object_invariant, - .loo_object_auth = osd_object_capa_auth + .loo_object_invariant = osd_object_invariant }; static struct lu_device_operations osd_lu_ops = { diff --git a/lustre/osd/osd_oi.c b/lustre/osd/osd_oi.c index 1b3cb1e..9fda0f8 100644 --- a/lustre/osd/osd_oi.c +++ b/lustre/osd/osd_oi.c @@ -164,7 +164,8 @@ int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi, } else { rc = oi->oi_dir->do_index_ops->dio_lookup (info->oti_env, oi->oi_dir, - (struct dt_rec *)id, oi_fid_key(info, fid)); + (struct dt_rec *)id, oi_fid_key(info, fid), + BYPASS_CAPA); osd_inode_id_init(id, id->oii_ino, id->oii_gen); } return rc; @@ -190,7 +191,8 @@ int osd_oi_insert(struct osd_thread_info *info, struct osd_oi *oi, osd_inode_id_init(id, id0->oii_ino, id0->oii_gen); return idx->do_index_ops->dio_insert(info->oti_env, idx, (const struct dt_rec *)id, - oi_fid_key(info, fid), th); + oi_fid_key(info, fid), th, + BYPASS_CAPA); } /* @@ -209,6 +211,7 @@ int osd_oi_delete(struct osd_thread_info *info, idx = oi->oi_dir; dev = lu2dt_dev(idx->do_lu.lo_dev); return idx->do_index_ops->dio_delete(info->oti_env, idx, - oi_fid_key(info, fid), th); + oi_fid_key(info, fid), th, + BYPASS_CAPA); } diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index b4db7ba..9261e2c 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -73,11 +73,6 @@ static const struct req_msg_field *mdt_body_only[] = { &RMF_MDT_BODY }; -static const struct req_msg_field *mdt_renew_capa_client[] = { - &RMF_PTLRPC_BODY, - &RMF_CAPA1 -}; - static const struct req_msg_field *mdt_body_capa[] = { &RMF_PTLRPC_BODY, &RMF_MDT_BODY, @@ -366,8 +361,7 @@ static const struct req_format *req_formats[] = { &RQF_MDS_READPAGE, &RQF_MDS_WRITEPAGE, &RQF_MDS_IS_SUBDIR, - &RQF_MDS_DONE_WRITING, - &RQF_MDS_RENEW_CAPA + &RQF_MDS_DONE_WRITING }; struct req_msg_field { @@ -725,11 +719,6 @@ const struct req_format RQF_MDS_IS_SUBDIR = mdt_body_only, mdt_body_only); EXPORT_SYMBOL(RQF_MDS_IS_SUBDIR); -const struct req_format RQF_MDS_RENEW_CAPA = - DEFINE_REQ_FMT0("MDS_RENEW_CAPA", - mdt_renew_capa_client, mdt_body_capa); -EXPORT_SYMBOL(RQF_MDS_RENEW_CAPA); - #if !defined(__REQ_LAYOUT_USER__) int req_layout_init(void) diff --git a/lustre/ptlrpc/lproc_ptlrpc.c b/lustre/ptlrpc/lproc_ptlrpc.c index 5b44000..a430307 100644 --- a/lustre/ptlrpc/lproc_ptlrpc.c +++ b/lustre/ptlrpc/lproc_ptlrpc.c @@ -77,7 +77,6 @@ struct ll_rpc_opcode { { MDS_SETXATTR, "mds_setxattr" }, { MDS_WRITEPAGE, "mds_writepage" }, { MDS_IS_SUBDIR, "mds_is_subdir" }, - { MDS_RENEW_CAPA, "mds_renew_capa" }, { LDLM_ENQUEUE, "ldlm_enqueue" }, { LDLM_CONVERT, "ldlm_convert" }, { LDLM_CANCEL, "ldlm_cancel" }, -- 1.8.3.1