several other capability code fixes.
RETURN(rc);
}
-static int cmm_init_capa_keys(struct md_device *md,
+static int cmm_init_capa_ctxt(const struct lu_env *env, struct md_device *md,
+ __u32 valid, unsigned long timeout, __u32 alg,
struct lustre_capa_key *keys)
{
struct cmm_device *cmm_dev = md2cmm_dev(md);
int rc;
ENTRY;
- LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_keys);
- rc = cmm_child_ops(cmm_dev)->mdo_init_capa_keys(cmm_dev->cmm_child,
+ LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt);
+ rc = cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt(env, cmm_dev->cmm_child,
+ valid, timeout, alg,
keys);
RETURN(rc);
}
.mdo_statfs = cmm_statfs,
.mdo_root_get = cmm_root_get,
.mdo_maxsize_get = cmm_maxsize_get,
- .mdo_init_capa_keys = cmm_init_capa_keys,
+ .mdo_init_capa_ctxt = cmm_init_capa_ctxt,
.mdo_update_capa_key= cmm_update_capa_key,
};
struct md_object *m;
ENTRY;
- o = lu_object_find(env, md2lu_dev(md)->ld_site, f, BYPASS_CAPA);
+ o = lu_object_find(env, md2lu_dev(md)->ld_site, f);
if (IS_ERR(o))
m = (struct md_object *)o;
else {
struct cmm_object *cmm_object_find(const struct lu_env *env,
struct cmm_device *d,
- const struct lu_fid *f,
- struct lustre_capa *capa)
+ const struct lu_fid *f)
{
struct lu_object *o;
struct cmm_object *m;
ENTRY;
- o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f,
- capa);
+ o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f);
if (IS_ERR(o))
m = (struct cmm_object *)o;
else
rc = dt_obj->do_body_ops->dbo_write(env, dt_obj,
seq_record_buf(info),
- &pos, th);
+ &pos, th, BYPASS_CAPA);
if (rc == sizeof(info->sti_record)) {
CDEBUG(D_INFO|D_WARNING, "%s: Store ranges: Space - "
DRANGE", Super - "DRANGE"\n", seq->lss_name,
LASSERT(info != NULL);
rc = dt_obj->do_body_ops->dbo_read(env, dt_obj,
- seq_record_buf(info), &pos);
+ seq_record_buf(info), &pos,
+ BYPASS_CAPA);
if (rc == sizeof(info->sti_record)) {
range_le_to_cpu(&seq->lss_space, &info->sti_record.ssr_space);
if (!IS_ERR(th)) {
rc = dt_obj->do_index_ops->dio_insert(env, dt_obj,
fld_rec(env, mds),
- fld_key(env, seq), th);
+ fld_key(env, seq), th,
+ BYPASS_CAPA);
dt_dev->dd_ops->dt_trans_stop(env, th);
} else
rc = PTR_ERR(th);
th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &txn);
if (!IS_ERR(th)) {
rc = dt_obj->do_index_ops->dio_delete(env, dt_obj,
- fld_key(env, seq), th);
+ fld_key(env, seq), th,
+ BYPASS_CAPA);
dt_dev->dd_ops->dt_trans_stop(env, th);
} else
rc = PTR_ERR(th);
ENTRY;
rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj, rec,
- fld_key(env, seq));
+ fld_key(env, seq), BYPASS_CAPA);
if (rc == 0)
*mds = be64_to_cpu(*(__u64 *)rec);
RETURN(rc);
*/
int (*dt_sync)(const struct lu_env *env, struct dt_device *dev);
void (*dt_ro)(const struct lu_env *env, struct dt_device *dev);
+ /*
+ * Initialize capability context.
+ */
+ int (*dt_init_capa_ctxt)(const struct lu_env *env,
+ struct dt_device *dev,
+ __u32 valid, unsigned long timeout,
+ __u32 alg, struct lustre_capa_key *keys);
/*
* dt get credits from osd
* precondition: lu_object_exists(&dt->do_lu);
*/
int (*do_attr_get)(const struct lu_env *env,
- struct dt_object *dt, struct lu_attr *attr);
+ struct dt_object *dt, struct lu_attr *attr,
+ struct lustre_capa *capa);
/*
* Set standard attributes.
*
int (*do_attr_set)(const struct lu_env *env,
struct dt_object *dt,
const struct lu_attr *attr,
- struct thandle *handle);
+ struct thandle *handle,
+ struct lustre_capa *capa);
/*
* Return a value of an extended attribute.
*
* precondition: dt_object_exists(dt);
*/
int (*do_xattr_get)(const struct lu_env *env, struct dt_object *dt,
- struct lu_buf *buf, const char *name);
+ struct lu_buf *buf, const char *name,
+ struct lustre_capa *capa);
/*
* Set value of an extended attribute.
*
*/
int (*do_xattr_set)(const struct lu_env *env,
struct dt_object *dt, const struct lu_buf *buf,
- const char *name, int fl, struct thandle *handle);
+ const char *name, int fl, struct thandle *handle,
+ struct lustre_capa *capa);
/*
* Delete existing extended attribute.
*
*/
int (*do_xattr_del)(const struct lu_env *env,
struct dt_object *dt,
- const char *name, struct thandle *handle);
+ const char *name, struct thandle *handle,
+ struct lustre_capa *capa);
/*
* Place list of existing extended attributes into @buf (which has
* length len).
* precondition: dt_object_exists(dt);
*/
int (*do_xattr_list)(const struct lu_env *env,
- struct dt_object *dt, struct lu_buf *buf);
+ struct dt_object *dt, struct lu_buf *buf,
+ struct lustre_capa *capa);
/*
* Create new object on this device.
*
void (*do_ref_del)(const struct lu_env *env,
struct dt_object *dt, struct thandle *th);
- int (*do_readpage)(const struct lu_env *env,
- struct dt_object *dt, const struct lu_rdpg *rdpg);
+ int (*do_readpage)(const struct lu_env *env,
+ struct dt_object *dt, const struct lu_rdpg *rdpg,
+ struct lustre_capa *capa);
+ int (*do_capa_get)(const struct lu_env *env,
+ struct dt_object *dt, struct lustre_capa *capa);
};
/*
* precondition: dt_object_exists(dt);
*/
ssize_t (*dbo_read)(const struct lu_env *env, struct dt_object *dt,
- struct lu_buf *buf, loff_t *pos);
+ struct lu_buf *buf, loff_t *pos,
+ struct lustre_capa *capa);
/*
* precondition: dt_object_exists(dt);
*/
ssize_t (*dbo_write)(const struct lu_env *env, struct dt_object *dt,
const struct lu_buf *buf, loff_t *pos,
- struct thandle *handle);
+ struct thandle *handle, struct lustre_capa *capa);
};
/*
* precondition: dt_object_exists(dt);
*/
int (*dio_lookup)(const struct lu_env *env, struct dt_object *dt,
- struct dt_rec *rec, const struct dt_key *key);
+ struct dt_rec *rec, const struct dt_key *key,
+ struct lustre_capa *capa);
/*
* precondition: dt_object_exists(dt);
*/
int (*dio_insert)(const struct lu_env *env, struct dt_object *dt,
const struct dt_rec *rec, const struct dt_key *key,
- struct thandle *handle);
+ struct thandle *handle, struct lustre_capa *capa);
/*
* precondition: dt_object_exists(dt);
*/
int (*dio_delete)(const struct lu_env *env, struct dt_object *dt,
- const struct dt_key *key, struct thandle *handle);
+ const struct dt_key *key, struct thandle *handle,
+ struct lustre_capa *capa);
/*
* Iterator interface
*/
* consistent.
*/
int (*loo_object_invariant)(const struct lu_object *o);
- /*
- * Called to authorize action by capability.
- */
- int (*loo_object_auth)(const struct lu_env *env,
- const struct lu_object *o,
- struct lustre_capa *capa,
- __u64 opc);
};
/*
*/
struct lu_fid loh_fid;
/*
- * Fid capability.
- */
- unsigned int loh_capa_bypass:1; /* bypass capability check */
- struct lustre_capa loh_capa; /* capability sent by client */
- /*
* Common object attributes, cached for efficiency. From enum
* lu_object_header_attr.
*/
__u32 s_cache_race;
__u32 s_lru_purged;
} ls_stats;
-
- /* Capability */
- struct lustre_capa_key *ls_capa_keys;
- unsigned long ls_capa_timeout;
- __u32 ls_capa_alg;
};
/*
* any case, additional reference is acquired on the returned object.
*/
struct lu_object *lu_object_find(const struct lu_env *env,
- struct lu_site *s, const struct lu_fid *f,
- struct lustre_capa *c);
-
-/*
- * Auth lu_object capability.
- */
-int lu_object_auth(const struct lu_env *env, const struct lu_object *o,
- struct lustre_capa *capa, __u64 opc);
+ struct lu_site *s, const struct lu_fid *f);
/*
* Helpers.
}
/*
- * Pointer to the fid capability of this object.
- */
-static inline struct lustre_capa *
-lu_object_capa(const struct lu_object *o)
-{
- return &o->lo_header->loh_capa;
-}
-
-static inline int lu_object_capa_bypass(const struct lu_object *o)
-{
- return o->lo_header->loh_capa_bypass;
-}
-
-/*
* return device operations vector for this object
*/
static inline struct lu_device_operations *
return o->lo_header->loh_attr;
}
-static inline void lu_object_bypass_capa(struct lu_object *o)
-{
- o->lo_header->loh_capa_bypass = 1;
-}
-
struct lu_rdpg {
/* input params, should be filled out by mdt */
__u32 rp_hash; /* hash */
MDS_SETXATTR = 50,
MDS_WRITEPAGE = 51,
MDS_IS_SUBDIR = 52,
- MDS_RENEW_CAPA = 53,
MDS_LAST_OPC
} mds_cmd_t;
static inline int capa_for_mds(struct lustre_capa *c)
{
- return (c->lc_opc & CAPA_OPC_MDS_ONLY) != 0;
+ return (c->lc_opc & CAPA_OPC_INDEX_INSERT) != 0;
}
static inline int capa_for_oss(struct lustre_capa *c)
{
- return (c->lc_opc & CAPA_OPC_OSS_ONLY) != 0;
+ return (c->lc_opc & CAPA_OPC_INDEX_INSERT) == 0;
}
/* lustre_capa.lc_flags */
enum {
CAPA_FL_SHORT_EXPIRY = 1, /* short capa expiry */
- CAPA_FL_ROOT = 2, /* root fid capa, will always renew */
};
/* lustre_capa.lc_hmac_alg */
extern int capa_count[];
extern cfs_mem_cache_t *capa_cachep;
-struct obd_capa *capa_add(struct lustre_capa *capa);
+void capa_add(struct lustre_capa *capa);
struct obd_capa *capa_lookup(struct lustre_capa *capa);
int capa_hmac(__u8 *hmac, struct lustre_capa *capa, __u8 *key);
#define BYPASS_CAPA (struct lustre_capa *)ERR_PTR(-ENOENT)
+enum {
+ CAPA_CTX_ON = 1,
+ CAPA_CTX_TIMEOUT = 1<<1,
+ CAPA_CTX_KEY_TIMEOUT = 1<<2,
+ CAPA_CTX_ALG = 1<<3,
+ CAPA_CTX_KEYS = 1<<4,
+};
#endif /* __LINUX_CAPA_H_ */
extern const struct req_format RQF_MDS_WRITEPAGE;
extern const struct req_format RQF_MDS_IS_SUBDIR;
extern const struct req_format RQF_MDS_DONE_WRITING;
-extern const struct req_format RQF_MDS_RENEW_CAPA;
/*
* This is format of direct (non-intent) MDS_GETATTR_NAME request.
struct mdt_identity *mu_identity;
};
+/* there are at most 4 fid in one operation, see rename */
+struct md_capainfo {
+ const struct lu_fid *mc_fid[4];
+ struct lustre_capa *mc_capa[4];
+};
+
/*
* Implemented in mdd/mdd_handler.c.
*
* related definitions.
*/
struct md_ucred *md_ucred(const struct lu_env *env);
+struct md_capainfo *md_capainfo(const struct lu_env *env);
/* metadata attributes */
enum ma_valid {
int (*mdo_statfs)(const struct lu_env *env, struct md_device *m,
struct kstatfs *sfs);
- int (*mdo_init_capa_keys)(struct md_device *m,
+ int (*mdo_init_capa_ctxt)(const struct lu_env *env, struct md_device *m,
+ __u32 valid, unsigned long timeout, __u32 alg,
struct lustre_capa_key *keys);
int (*mdo_update_capa_key)(const struct lu_env *env,
#define OBD_FAIL_MDS_WRITEPAGE_PACK 0x136
#define OBD_FAIL_MDS_IS_SUBDIR_NET 0x137
#define OBD_FAIL_MDS_IS_SUBDIR_PACK 0x138
-#define OBD_FAIL_MDS_RENEW_CAPA_NET 0x139
-#define OBD_FAIL_MDS_RENEW_CAPA_PACK 0x13a
#define OBD_FAIL_OST 0x200
#define OBD_FAIL_OST_CONNECT_NET 0x201
struct lustre_handle *fh)
{
op_data->fid1 = ll_i2info(inode)->lli_fid;
- op_data->mod_capa1 = ll_i2mdscapa(inode);
op_data->attr.ia_mode = inode->i_mode;
op_data->attr.ia_atime = inode->i_atime;
op_data->attr.ia_mtime = inode->i_mtime;
list_for_each_entry(ocapa, &lli->lli_oss_capas, u.cli.lli_list) {
if (!obd_capa_is_valid(ocapa))
continue;
- if ((capa_opc(&ocapa->c_capa) & opc) == opc)
+ if ((capa_opc(&ocapa->c_capa) & opc) != opc)
continue;
LASSERT(lu_fid_eq(capa_fid(&ocapa->c_capa),
if ((ll_i2sbi(inode)->ll_flags & LL_SBI_OSS_CAPA) == 0)
return NULL;
ENTRY;
+
LASSERT(opc == CAPA_OPC_OSS_WRITE ||
opc == (CAPA_OPC_OSS_WRITE | CAPA_OPC_OSS_READ) ||
opc == CAPA_OPC_OSS_TRUNC);
{
struct obd_capa *ocapa;
struct ll_inode_info *lli = ll_i2info(inode);
- ENTRY;
LASSERT(inode);
if ((ll_i2sbi(inode)->ll_flags & LL_SBI_MDS_CAPA) == 0)
- RETURN(NULL);
+ return NULL;
+ ENTRY;
spin_lock(&capa_lock);
ocapa = capa_get(lli->lli_mds_capa);
if (!ocapa && atomic_read(&ll_capa_debug)) {
CDEBUG(S_ISREG(inode->i_mode) || S_ISDIR(inode->i_mode) ?
- D_ERROR : D_SEC, "no MDS capa for (ino %lu)\n",
- inode->i_ino);
+ D_ERROR : D_SEC, "no MDS capability for fid "DFID"\n",
+ PFID(ll_inode2fid(inode)));
if (inode_have_md_lock(inode, MDS_INODELOCK_LOOKUP))
LBUG();
atomic_set(&ll_capa_debug, 0);
ocapa = do_lookup_oss_capa(inode, opc);
if (!ocapa) {
if (atomic_read(&ll_capa_debug)) {
- CDEBUG(D_ERROR, "no capa for (uid %u op %d ino %lu)\n",
- (unsigned)current->uid, opc, inode->i_ino);
+ CDEBUG(D_ERROR, "no opc %x capability for fid "DFID"\n",
+ opc, PFID(ll_inode2fid(inode)));
atomic_set(&ll_capa_debug, 0);
}
spin_unlock(&capa_lock);
sb->s_flags |= MS_POSIXACL;
#endif
sbi->ll_flags |= LL_SBI_ACL;
- } else {
+ } else if (sbi->ll_flags & LL_SBI_ACL) {
+ LCONSOLE_INFO("client wants to enable acl, but mdt not!\n");
sbi->ll_flags &= ~LL_SBI_ACL;
}
}
if (data->ocd_connect_flags & OBD_CONNECT_MDS_CAPA) {
- CDEBUG(D_SEC, "client enabled MDS capability!\n");
+ LCONSOLE_INFO("client enabled MDS capability!\n");
sbi->ll_flags |= LL_SBI_MDS_CAPA;
}
if (data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA) {
- CDEBUG(D_SEC, "client enabled OSS capability!\n");
+ LCONSOLE_INFO("client enabled OSS capability!\n");
sbi->ll_flags |= LL_SBI_OSS_CAPA;
}
lli->lli_open_fd_read_count = lli->lli_open_fd_write_count = 0;
lli->lli_open_fd_exec_count = 0;
INIT_LIST_HEAD(&lli->lli_dead_list);
+ INIT_LIST_HEAD(&lli->lli_oss_capas);
sema_init(&lli->lli_rmtperm_sem, 1);
}
list_del_init(&lli->lli_dead_list);
spin_unlock(&sbi->ll_deathrow_lock);
+ ll_clear_inode_capas(inode);
+
EXIT;
}
RETURN(rc);
}
-static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *ocapa,
+static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
renew_capa_cb_t cb)
{
struct obd_device *obd = exp->exp_obd;
if (rc)
RETURN(rc);
- tgt_exp = lmv_get_export(lmv, &ocapa->c_capa.lc_fid);
+ tgt_exp = lmv_get_export(lmv, &oc->c_capa.lc_fid);
if (IS_ERR(tgt_exp))
RETURN(PTR_ERR(tgt_exp));
- rc = md_renew_capa(tgt_exp, ocapa, cb);
+ rc = md_renew_capa(tgt_exp, oc, cb);
RETURN(rc);
}
RETURN(0);
}
+static int mdc_interpret_renew_capa(struct ptlrpc_request *req, void *unused,
+ int status)
+{
+ struct obd_capa *oc = req->rq_async_args.pointer_arg[0];
+ renew_capa_cb_t *cb = req->rq_async_args.pointer_arg[1];
+ struct mds_body *body = NULL;
+ struct lustre_capa *capa;
+ ENTRY;
+
+ if (status)
+ DEBUG_CAPA(D_ERROR, &oc->c_capa, "renew failed: %d for",
+ status);
+
+ body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
+ lustre_swab_mdt_body);
+ if (body == NULL)
+ GOTO(out, capa = ERR_PTR(-EFAULT));
+
+ if (body->flags)
+ GOTO(out, capa = ERR_PTR((long)body->flags));
+
+ if ((body->valid & OBD_MD_FLOSSCAPA) == 0)
+ GOTO(out, capa = ERR_PTR(-EFAULT));
+
+ capa = lustre_unpack_capa(req->rq_repmsg, REPLY_REC_OFF);
+ if (!capa)
+ capa = ERR_PTR(-EFAULT);
+
+ EXIT;
+out:
+ (*cb)(oc, capa);
+ return 0;
+}
+
static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc,
renew_capa_cb_t cb)
{
struct ptlrpc_request *req;
- int size[2] = { sizeof(struct ptlrpc_body),
+ int size[5] = { sizeof(struct ptlrpc_body),
+ sizeof(struct mdt_body),
sizeof(struct lustre_capa) };
- int repsize[3] = { sizeof(struct ptlrpc_body),
- sizeof(struct mdt_body),
- sizeof(struct lustre_capa) };
ENTRY;
req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
- MDS_RENEW_CAPA, 2, size, NULL);
+ MDS_GETATTR, 3, size, NULL);
if (!req)
RETURN(-ENOMEM);
- mdc_pack_capa(req, REQ_REC_OFF, oc);
+ mdc_pack_req_body(req, REQ_REC_OFF, OBD_MD_FLOSSCAPA,
+ &oc->c_capa.lc_fid, oc, 0, 0);
- ptlrpc_req_set_repsize(req, 3, repsize);
- req->rq_interpret_reply = cb;
+ ptlrpc_req_set_repsize(req, 5, size);
+ req->rq_async_args.pointer_arg[0] = oc;
+ req->rq_async_args.pointer_arg[1] = cb;
+ req->rq_interpret_reply = mdc_interpret_renew_capa;
ptlrpcd_add_req(req);
RETURN(0);
struct mdd_object *m;
ENTRY;
- o = lu_object_find(env, mdd2lu_dev(d)->ld_site, f, BYPASS_CAPA);
+ o = lu_object_find(env, mdd2lu_dev(d)->ld_site, f);
if (IS_ERR(o))
m = (struct mdd_object *)o;
else {
RETURN(rc);
}
-static inline int __mdd_la_get(const struct lu_env *env,
- struct mdd_object *obj, struct lu_attr *la)
+static inline int __mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
+ struct lu_attr *la, struct lustre_capa *capa)
{
- struct dt_object *next = mdd_object_child(obj);
+ struct dt_object *next = mdd_object_child(obj);
LASSERT(lu_object_exists(mdd2lu_obj(obj)));
- return next->do_ops->do_attr_get(env, next, la);
+ return next->do_ops->do_attr_get(env, next, la, capa);
}
static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
ENTRY;
mdd_read_lock(env, obj);
- rc = __mdd_la_get(env, obj, la);
+ rc = __mdd_la_get(env, obj, la, BYPASS_CAPA);
mdd_read_unlock(env, obj);
if (rc == 0)
mdd_flags_xlate(obj, la->la_flags);
struct md_ucred *uc = md_ucred(env);
int rc;
- rc = __mdd_la_get(env, cobj, tmp_la);
+ rc = __mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
if (rc) {
return rc;
} else if (tmp_la->la_uid == uc->mu_fsuid) {
return 0;
} else {
- rc = __mdd_la_get(env, pobj, tmp_la);
+ rc = __mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
if (rc)
return rc;
else if (!(tmp_la->la_mode & S_ISVTX))
int rc = 0;
ENTRY;
- rc = __mdd_la_get(env, mdd_obj, &ma->ma_attr);
+ rc = __mdd_la_get(env, mdd_obj, &ma->ma_attr,
+ mdd_object_capa(env, mdd_obj));
if (rc == 0)
ma->ma_valid = MA_INODE;
RETURN(rc);
next = mdd_object_child(mdd_obj);
mdd_read_lock(env, mdd_obj);
- rc = next->do_ops->do_xattr_get(env, next, buf, name);
+ rc = next->do_ops->do_xattr_get(env, next, buf, name,
+ mdd_object_capa(env, mdd_obj));
mdd_read_unlock(env, mdd_obj);
RETURN(rc);
next = mdd_object_child(mdd_obj);
mdd_read_lock(env, mdd_obj);
- rc = next->do_body_ops->dbo_read(env, next, buf, &pos);
+ rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
+ mdd_object_capa(env, mdd_obj));
mdd_read_unlock(env, mdd_obj);
RETURN(rc);
}
next = mdd_object_child(mdd_obj);
mdd_read_lock(env, mdd_obj);
- rc = next->do_ops->do_xattr_list(env, next, buf);
+ rc = next->do_ops->do_xattr_list(env, next, buf,
+ mdd_object_capa(env, mdd_obj));
mdd_read_unlock(env, mdd_obj);
RETURN(rc);
LASSERT(lu_object_exists(mdd2lu_obj(o)));
next = mdd_object_child(o);
- return next->do_ops->do_attr_set(env, next, attr, handle);
+ return next->do_ops->do_attr_set(env, next, attr, handle,
+ mdd_object_capa(env, o));
}
int mdd_attr_set_internal_locked(const struct lu_env *env,
int fl, struct thandle *handle)
{
struct dt_object *next;
+ struct lustre_capa *capa = mdd_object_capa(env, o);
int rc = 0;
ENTRY;
LASSERT(lu_object_exists(mdd2lu_obj(o)));
next = mdd_object_child(o);
if (buf->lb_buf && buf->lb_len > 0) {
- rc = next->do_ops->do_xattr_set(env, next, buf, name,
- 0, handle);
+ rc = next->do_ops->do_xattr_set(env, next, buf, name, 0, handle,
+ capa);
} else if (buf->lb_buf == NULL && buf->lb_len == 0) {
- rc = next->do_ops->do_xattr_del(env, next, name, handle);
+ rc = next->do_ops->do_xattr_del(env, next, name, handle, capa);
}
RETURN(rc);
}
if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
RETURN(-EPERM);
- rc = __mdd_la_get(env, obj, tmp_la);
+ rc = __mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
if (rc)
RETURN(rc);
RETURN(-EPERM);
mdd_read_lock(env, obj);
- rc = __mdd_la_get(env, obj, tmp_la);
+ rc = __mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
mdd_read_unlock(env, obj);
if (rc)
RETURN(rc);
LASSERT(lu_object_exists(mdd2lu_obj(obj)));
next = mdd_object_child(obj);
- return next->do_ops->do_xattr_del(env, next, name, handle);
+ return next->do_ops->do_xattr_del(env, next, name, handle,
+ mdd_object_capa(env, obj));
}
int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
static int __mdd_index_insert_only(const struct lu_env *env,
struct mdd_object *pobj,
const struct lu_fid *lf,
- const char *name, struct thandle *th)
+ const char *name, struct thandle *th,
+ struct lustre_capa *capa)
{
int rc;
struct dt_object *next = mdd_object_child(pobj);
if (dt_try_as_dir(env, next))
rc = next->do_index_ops->dio_insert(env, next,
(struct dt_rec *)lf,
- (struct dt_key *)name, th);
+ (struct dt_key *)name, th, capa);
else
rc = -ENOTDIR;
RETURN(rc);
/* insert new index, add reference if isdir, update times */
static int __mdd_index_insert(const struct lu_env *env,
struct mdd_object *pobj, const struct lu_fid *lf,
- const char *name, int isdir, struct thandle *th)
+ const char *name, int isdir, struct thandle *th,
+ struct lustre_capa *capa)
{
int rc;
struct dt_object *next = mdd_object_child(pobj);
rc = next->do_index_ops->dio_insert(env, next,
(struct dt_rec *)lf,
(struct dt_key *)name,
- th);
+ th, capa);
else
rc = -ENOTDIR;
static int __mdd_index_delete(const struct lu_env *env,
struct mdd_object *pobj, const char *name,
- int is_dir, struct thandle *handle)
+ int is_dir, struct thandle *handle,
+ struct lustre_capa *capa)
{
int rc;
struct dt_object *next = mdd_object_child(pobj);
if (dt_try_as_dir(env, next)) {
rc = next->do_index_ops->dio_delete(env, next,
(struct dt_key *)name,
- handle);
+ handle, capa);
if (rc == 0 && is_dir)
__mdd_ref_del(env, pobj, handle);
} else
GOTO(out, rc);
rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
- name, handle);
+ name, handle,
+ mdd_object_capa(env, mdd_tobj));
if (rc == 0)
__mdd_ref_add(env, mdd_sobj, handle);
GOTO(cleanup, rc);
is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
- rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
+ rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
+ mdd_object_capa(env, mdd_pobj));
if (rc)
GOTO(cleanup, rc);
if (rc)
GOTO(cleanup, rc);
- rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
+ rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
+ mdd_object_capa(env, mdd_spobj));
if (rc)
GOTO(cleanup, rc);
/* tobj can be remote one,
* so we do index_delete unconditionally and -ENOENT is allowed */
- rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
+ rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
+ mdd_object_capa(env, mdd_tpobj));
if (rc != 0 && rc != -ENOENT)
GOTO(cleanup, rc);
- rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle);
+ rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
+ mdd_object_capa(env, mdd_tpobj));
if (rc)
GOTO(cleanup, rc);
RETURN(rc);
if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir))
- rc = dir->do_index_ops->dio_lookup(env, dir, rec, key);
+ rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
+ mdd_object_capa(env, mdd_obj));
else
rc = -ENOTDIR;
/* add . and .. for newly created dir */
__mdd_ref_add(env, child, handle);
rc = __mdd_index_insert_only(env, child, mdo2fid(child),
- dot, handle);
+ dot, handle, BYPASS_CAPA);
if (rc == 0) {
rc = __mdd_index_insert_only(env, child, pfid,
- dotdot, handle);
+ dotdot, handle,
+ BYPASS_CAPA);
if (rc != 0) {
int rc2;
- rc2 = __mdd_index_delete(env,
- child, dot, 0, handle);
+ rc2 = __mdd_index_delete(env, child, dot, 0,
+ handle, BYPASS_CAPA);
if (rc2 != 0)
CERROR("Failure to cleanup after dotdot"
" creation: %d (%d)\n", rc2, rc);
/* sgid check */
mdd_read_lock(env, obj);
- rc = __mdd_la_get(env, obj, la);
+ rc = __mdd_la_get(env, obj, la, BYPASS_CAPA);
mdd_read_unlock(env, obj);
if (rc != 0)
RETURN(rc);
GOTO(cleanup, rc);
rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
- name, S_ISDIR(attr->la_mode), handle);
+ name, S_ISDIR(attr->la_mode), handle,
+ mdd_object_capa(env, mdd_pobj));
if (rc)
GOTO(cleanup, rc);
struct dt_object *dt = mdd_object_child(son);
const char *target_name = spec->u.sp_symname;
int sym_len = strlen(target_name);
+ const struct lu_buf *buf;
loff_t pos = 0;
- rc = dt->do_body_ops->dbo_write(env, dt,
- mdd_buf_get_const(env,
- target_name,
- sym_len),
- &pos, handle);
+ buf = mdd_buf_get_const(env, target_name, sym_len);
+ rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
+ mdd_object_capa(env, son));
if (rc == sym_len)
rc = 0;
else
if (inserted) {
rc2 = __mdd_index_delete(env, mdd_pobj, name,
S_ISDIR(attr->la_mode),
- handle);
+ handle, BYPASS_CAPA);
if (rc2)
CERROR("error can not cleanup destroy %d\n",
rc2);
if (rc)
GOTO(out_unlock, rc);
- rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle);
+ rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle,
+ BYPASS_CAPA);
out_unlock:
mdd_write_unlock(env, mdd_obj);
if (rc)
GOTO(out_unlock, rc);
- rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle);
+ rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle,
+ BYPASS_CAPA);
out_unlock:
mdd_write_unlock(env, mdd_obj);
/* if rename_tgt is called then we should just re-insert name with
* correct fid, no need to dec/inc parent nlink if obj is dir */
- rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle);
+ rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
if (rc)
GOTO(cleanup, rc);
- rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle);
+ rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
+ BYPASS_CAPA);
if (rc)
GOTO(cleanup, rc);
RETURN(0);
}
-static int mdd_init_capa_keys(struct md_device *m,
+static int mdd_init_capa_ctxt(const struct lu_env *env, struct md_device *m,
+ __u32 valid, unsigned long timeout, __u32 alg,
struct lustre_capa_key *keys)
{
struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
struct mds_obd *mds = &mdd2obd_dev(mdd)->u.mds;
+ int rc;
ENTRY;
- mds->mds_capa_keys = keys;
- RETURN(0);
+ if (valid & CAPA_CTX_KEYS)
+ mds->mds_capa_keys = keys;
+
+ rc = mdd_child_ops(mdd)->dt_init_capa_ctxt(env, mdd->mdd_child, valid,
+ timeout, alg, keys);
+ RETURN(rc);
}
static int mdd_update_capa_key(const struct lu_env *env,
if (mdd_is_dead_obj(obj))
RETURN(-ENOENT);
- rc = __mdd_la_get(env, obj, tmp_la);
+ rc = __mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
if (rc)
RETURN(rc);
if (rc)
GOTO(out_unlock, rc);
- rc = next->do_ops->do_readpage(env, next, rdpg);
+ rc = next->do_ops->do_readpage(env, next, rdpg,
+ mdd_object_capa(env, mdd_obj));
out_unlock:
mdd_read_unlock(env, mdd_obj);
buf->lb_buf = mdd_env_info(env)->mti_xattr_buf;
buf->lb_len = sizeof(mdd_env_info(env)->mti_xattr_buf);
rc = next->do_ops->do_xattr_get(env, next, buf,
- XATTR_NAME_ACL_ACCESS);
+ XATTR_NAME_ACL_ACCESS,
+ mdd_object_capa(env, obj));
if (rc <= 0)
RETURN(rc ? : -EACCES);
if (uc->mu_valid == UCRED_INVALID)
RETURN(-EACCES);
- rc = __mdd_la_get(env, obj, la);
+ rc = __mdd_la_get(env, obj, la, BYPASS_CAPA);
if (rc)
RETURN(rc);
RETURN(-EACCES);
if (getattr) {
- rc = __mdd_la_get(env, obj, la);
+ rc = __mdd_la_get(env, obj, la, BYPASS_CAPA);
if (rc)
RETURN(rc);
}
static int mdd_capa_get(const struct lu_env *env, struct md_object *obj,
struct lustre_capa *capa)
{
+ struct dt_object *next;
struct mdd_object *mdd_obj = md2mdd_obj(obj);
- struct mdd_device *mdd = mdo2mdd(obj);
- struct lu_site *ls = mdd->mdd_md_dev.md_lu_dev.ld_site;
- struct lustre_capa_key *key = &ls->ls_capa_keys[1];
- struct obd_capa *ocapa;
int rc;
ENTRY;
LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
+ next = mdd_object_child(mdd_obj);
- capa->lc_fid = *mdo2fid(mdd_obj);
- if (ls->ls_capa_timeout < CAPA_TIMEOUT)
- capa->lc_flags |= CAPA_FL_SHORT_EXPIRY;
- if (lu_fid_eq(&capa->lc_fid, &mdd->mdd_root_fid))
- capa->lc_flags |= CAPA_FL_ROOT;
- capa->lc_flags = ls->ls_capa_alg << 24;
-
- /* TODO: get right permission here after remote uid landing */
- ocapa = capa_lookup(capa);
- if (ocapa) {
- LASSERT(!capa_is_expired(ocapa));
- capa_cpy(capa, ocapa);
- capa_put(ocapa);
- RETURN(0);
- }
-
- capa->lc_keyid = key->lk_keyid;
- capa->lc_expiry = CURRENT_SECONDS + ls->ls_capa_timeout;
- rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
- if (rc)
- RETURN(rc);
+ rc = next->do_ops->do_capa_get(env, next, capa);
- capa_add(capa);
- RETURN(0);
+ RETURN(rc);
}
struct md_device_operations mdd_ops = {
.mdo_statfs = mdd_statfs,
.mdo_root_get = mdd_root_get,
.mdo_maxsize_get = mdd_maxsize_get,
- .mdo_init_capa_keys = mdd_init_capa_keys,
+ .mdo_init_capa_ctxt = mdd_init_capa_ctxt,
.mdo_update_capa_key= mdd_update_capa_key,
};
}
EXPORT_SYMBOL(md_ucred);
+static void *mdd_capainfo_key_init(const struct lu_context *ctx,
+ struct lu_context_key *key)
+{
+ struct md_capainfo *ci;
+
+ OBD_ALLOC_PTR(ci);
+ if (ci == NULL)
+ ci = ERR_PTR(-ENOMEM);
+ return ci;
+}
+
+static void mdd_capainfo_key_fini(const struct lu_context *ctx,
+ struct lu_context_key *key, void *data)
+{
+ struct md_capainfo *ci = data;
+ OBD_FREE_PTR(ci);
+}
+
+struct lu_context_key mdd_capainfo_key = {
+ .lct_tags = LCT_SESSION,
+ .lct_init = mdd_capainfo_key_init,
+ .lct_fini = mdd_capainfo_key_fini
+};
+
+struct md_capainfo *md_capainfo(const struct lu_env *env)
+{
+ /* NB, in mdt_init0 */
+ if (env->le_ses == NULL)
+ return NULL;
+ return lu_context_key_get(env->le_ses, &mdd_capainfo_key);
+}
+EXPORT_SYMBOL(md_capainfo);
+
static int mdd_type_init(struct lu_device_type *t)
{
int result;
result = lu_context_key_register(&mdd_thread_key);
if (result == 0)
result = lu_context_key_register(&mdd_ucred_key);
+ if (result == 0)
+ result = lu_context_key_register(&mdd_capainfo_key);
return result;
}
static void mdd_type_fini(struct lu_device_type *t)
{
+ lu_context_key_degister(&mdd_capainfo_key);
lu_context_key_degister(&mdd_ucred_key);
lu_context_key_degister(&mdd_thread_key);
}
return obd->u.mds.mds_max_cookiesize;
}
+static inline struct lustre_capa *mdd_object_capa(const struct lu_env *env,
+ const struct mdd_object *obj)
+{
+ struct md_capainfo *ci = md_capainfo(env);
+ const struct lu_fid *fid = mdo2fid(obj);
+ int i;
+
+ /* NB: in mdt_init0 */
+ if (!ci)
+ return BYPASS_CAPA;
+ for (i = 0; i < 4; i++)
+ if (ci->mc_fid[i] && lu_fid_eq(ci->mc_fid[i], fid))
+ return ci->mc_capa[i];
+ return NULL;
+}
+
#endif
next = mdd_object_child(obj);
rc = next->do_ops->do_xattr_get(env, next,
- mdd_buf_get(env, md, *md_size), name);
+ mdd_buf_get(env, md, *md_size), name,
+ mdd_object_capa(env, obj));
/*
* XXX: handling of -ENODATA, the right way is to have ->do_md_get()
* exported by dt layer.
int rc = 0;
ENTRY;
- rc = next->do_ops->do_attr_get(env, next, tmp_la);
+ rc = next->do_ops->do_attr_get(env, next, tmp_la,
+ mdd_object_capa(env, obj));
if (rc)
RETURN(rc);
ENTRY;
rc = dor->do_index_ops->dio_insert(env, dor, (struct dt_rec *)offset,
- (struct dt_key *)key, th);
+ (struct dt_key *)key, th,
+ BYPASS_CAPA);
RETURN(rc);
}
ENTRY;
LASSERT(dor);
rc = dor->do_index_ops->dio_delete(env, dor,
- (struct dt_key *)key, th);
+ (struct dt_key *)key, th,
+ BYPASS_CAPA);
RETURN(rc);
}
obj = mdt->mdt_ck_obj;
obj->do_ops->do_read_lock(env, obj);
- rc = obj->do_ops->do_attr_get(env, mdt->mdt_ck_obj, la);
+ rc = obj->do_ops->do_attr_get(env, mdt->mdt_ck_obj, la, BYPASS_CAPA);
obj->do_ops->do_read_unlock(env, obj);
if (rc)
RETURN(rc);
{
struct mdt_device *mdt = args;
struct ptlrpc_thread *thread = &mdt->mdt_ck_thread;
- struct lustre_capa_key *tmp, *key = red_capa_key(mdt);
+ struct lustre_capa_key *tmp, *key = &mdt->mdt_capa_keys[1];
struct lu_env env;
struct mdt_thread_info *info;
struct md_device *next;
cfs_waitq_signal(&thread->t_ctl_waitq);
wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
}
+
+
static int mdt_getstatus(struct mdt_thread_info *info)
{
struct md_device *next = info->mti_mdt->mdt_child;
- int rc;
struct mdt_body *body;
+ int rc;
ENTRY;
- if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
- rc = -ENOMEM;
- } else {
- body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
- rc = next->md_ops->mdo_root_get(info->mti_env, next,
- &body->fid1);
- if (rc == 0)
- body->valid |= OBD_MD_FLID;
+ if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
+ RETURN(-ENOMEM);
+
+ body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+ rc = next->md_ops->mdo_root_get(info->mti_env, next, &body->fid1);
+ if (rc == 0)
+ body->valid |= OBD_MD_FLID;
+
+ if (info->mti_mdt->mdt_opts.mo_mds_capa) {
+ struct mdt_object *root;
+ struct lustre_capa *capa;
+
+ root = mdt_object_find(info->mti_env, info->mti_mdt, &body->fid1);
+ if (IS_ERR(root))
+ RETURN(PTR_ERR(root));
+
+ capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
+ LASSERT(capa);
+ capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
+
+ rc = mo_capa_get(info->mti_env, mdt_object_child(root), capa);
+ mdt_object_put(info->mti_env, root);
+ if (rc)
+ RETURN(rc);
+ body->valid |= OBD_MD_FLMDSCAPA;
}
RETURN(rc);
if ((reqbody->valid & OBD_MD_FLMDSCAPA) && mdt->mdt_opts.mo_mds_capa) {
struct lustre_capa *capa;
- spin_lock(&capa_lock);
- info->mti_capa_key = *red_capa_key(mdt);
- spin_unlock(&capa_lock);
-
capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
LASSERT(capa);
capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
RETURN(rc);
}
+static int mdt_renew_capa(struct mdt_thread_info *info)
+{
+ struct mdt_object *obj = info->mti_object;
+ struct mdt_body *body;
+ struct lustre_capa *capa, *c;
+ int rc;
+ ENTRY;
+
+ c = req_capsule_client_get(&info->mti_pill, &RMF_CAPA1);
+ LASSERT(c);
+
+ capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
+ LASSERT(capa);
+
+ *capa = *c;
+ rc = mo_capa_get(info->mti_env, mdt_object_child(obj), capa);
+
+ body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+ LASSERT(body);
+
+ body->valid |= OBD_MD_FLOSSCAPA;
+ body->flags = (__u32)rc;
+
+ RETURN(0);
+}
+
static int mdt_getattr(struct mdt_thread_info *info)
{
struct mdt_object *obj = info->mti_object;
if (reqbody == NULL)
GOTO(out, rc = -EFAULT);
+ if (reqbody->valid & OBD_MD_FLOSSCAPA) {
+ rc = mdt_renew_capa(info);
+ GOTO(out, rc);
+ }
+
if (reqbody->valid & OBD_MD_FLRMTPERM) {
rc = mdt_init_ucred(info, reqbody);
if (rc)
}
if (rc == 0) {
/* Finally, we can get attr for child. */
+ mdt_set_capainfo(info, 0, mdt_object_fid(child),
+ BYPASS_CAPA);
rc = mdt_getattr_internal(info, child);
if (rc != 0)
mdt_object_unlock(info, child, lhc, 1);
*step 3: find the child object by fid & lock it.
* regardless if it is local or remote.
*/
- child = mdt_object_find(info->mti_env, info->mti_mdt, child_fid,
- BYPASS_CAPA);
+ child = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
if (IS_ERR(child))
GOTO(out_parent, rc = PTR_ERR(child));
if (is_resent) {
}
/* finally, we can get attr for child. */
+ mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
rc = mdt_getattr_internal(info, child);
if (rc != 0) {
mdt_object_unlock(info, child, lhc, 1);
return -EOPNOTSUPP;
}
-static int mdt_renew_capa(struct mdt_thread_info *info)
-{
- struct mdt_device *mdt = info->mti_mdt;
- struct mdt_object *obj = info->mti_object;
- struct mdt_body *body;
- struct lustre_capa *capa;
- int rc;
- ENTRY;
-
- body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
- LASSERT(body);
-
- capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
- LASSERT(capa);
-
- spin_lock(&capa_lock);
- info->mti_capa_key = *red_capa_key(mdt);
- spin_unlock(&capa_lock);
-
- *capa = obj->mot_header.loh_capa;
- /* TODO: add capa check */
- rc = mo_capa_get(info->mti_env, mdt_object_child(obj), capa);
- if (rc)
- RETURN(rc);
-
- RETURN(rc);
-}
-
/*
* OBD PING and other handlers.
*/
struct mdt_object *mdt_object_find(const struct lu_env *env,
struct mdt_device *d,
- const struct lu_fid *f,
- struct lustre_capa *c)
+ const struct lu_fid *f)
{
struct lu_object *o;
struct mdt_object *m;
ENTRY;
- if (!d->mdt_opts.mo_mds_capa)
- c = BYPASS_CAPA;
-
- o = lu_object_find(env, d->mdt_md_dev.md_lu_dev.ld_site, f, c);
+ o = lu_object_find(env, d->mdt_md_dev.md_lu_dev.ld_site, f);
if (IS_ERR(o))
m = (struct mdt_object *)o;
else
struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
const struct lu_fid *f,
struct mdt_lock_handle *lh,
- __u64 ibits,
- struct lustre_capa *capa)
+ __u64 ibits)
{
struct mdt_object *o;
- o = mdt_object_find(info->mti_env, info->mti_mdt, f, capa);
+ o = mdt_object_find(info->mti_env, info->mti_mdt, f);
if (!IS_ERR(o)) {
int rc;
*/
static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
{
- struct lustre_capa *capa = NULL;
const struct mdt_body *body;
struct mdt_object *obj;
const struct lu_env *env;
* instance MDS_IS_SUBDIR.
*/
if (req_capsule_has_field(pill, &RMF_CAPA1, RCL_CLIENT) &&
- req_capsule_field_present(pill, &RMF_CAPA1, RCL_CLIENT)) {
- int len = req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT);
- if (len == sizeof(struct lustre_capa))
- capa = req_capsule_client_get(pill, &RMF_CAPA1);
- }
+ req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
+ mdt_set_capainfo(info, 0, &body->fid1,
+ req_capsule_client_get(pill, &RMF_CAPA1));
- obj = mdt_object_find(env, info->mti_mdt, &body->fid1, capa);
+ obj = mdt_object_find(env, info->mti_mdt, &body->fid1);
if (!IS_ERR(obj)) {
if ((flags & HABEO_CORPUS) &&
!lu_object_exists(&obj->mot_obj.mo_lu)) {
static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
{
+ struct md_device *next = m->mdt_child;
struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
struct lu_site *ls = d->ld_site;
m->mdt_rootsquash_info = NULL;
}
+ next->md_ops->mdo_init_capa_ctxt(env, next, CAPA_CTX_KEYS, 0, 0, NULL);
+ cleanup_capas(CAPA_SITE_SERVER);
+ del_timer(&m->mdt_ck_timer);
+ mdt_ck_thread_stop(m);
+
/* finish the stack */
mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
int mdt_postrecov(const struct lu_env *, struct mdt_device *);
+static int mdt_init_capa_ctxt(const struct lu_env *env, struct mdt_device *m)
+{
+ struct md_device *next = m->mdt_child;
+ __u32 valid = CAPA_CTX_TIMEOUT | CAPA_CTX_ALG | CAPA_CTX_KEYS;
+ int rc;
+
+ if (m->mdt_opts.mo_mds_capa)
+ valid |= CAPA_CTX_ON;
+ rc = next->md_ops->mdo_init_capa_ctxt(env, next, valid,
+ m->mdt_capa_timeout,
+ m->mdt_capa_alg,
+ m->mdt_capa_keys);
+ return rc;
+}
+
static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
struct lu_device_type *ldt, struct lustre_cfg *cfg)
{
m->mdt_opts.mo_compat_resname = 0;
m->mdt_opts.mo_mds_capa = 0;
m->mdt_opts.mo_oss_capa = 0;
- m->mdt_capa_alg = CAPA_HMAC_ALG_SHA1;
m->mdt_capa_timeout = CAPA_TIMEOUT;
+ m->mdt_capa_alg = CAPA_HMAC_ALG_SHA1;
m->mdt_ck_timeout = CAPA_KEY_TIMEOUT;
obd->obd_replayable = 1;
spin_lock_init(&m->mdt_client_bitmap_lock);
m->mdt_ck_timer.data = (unsigned long)m;
init_timer(&m->mdt_ck_timer);
- s->ls_capa_keys = m->mdt_capa_keys;
- s->ls_capa_timeout = m->mdt_capa_timeout;
- s->ls_capa_alg = m->mdt_capa_alg;
-
rc = mdt_start_ptlrpc_service(m);
if (rc)
GOTO(err_capa, rc);
if (obd->obd_recovering == 0)
mdt_postrecov(env, m);
+ mdt_init_capa_ctxt(env, m);
RETURN(0);
err_stop_service:
DEF_MDT_HNDL_0(0, SYNC, mdt_sync),
DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, IS_SUBDIR, mdt_is_subdir),
DEF_MDT_HNDL_0(0, QUOTACHECK, mdt_quotacheck_handle),
-DEF_MDT_HNDL_0(0, QUOTACTL, mdt_quotactl_handle),
-DEF_MDT_HNDL_0(0 |HABEO_REFERO, RENEW_CAPA, mdt_renew_capa)
+DEF_MDT_HNDL_0(0, QUOTACTL, mdt_quotactl_handle)
};
#define DEF_OBD_HNDL(flags, name, fn) \
/* root squash */
struct rootsquash_info *mdt_rootsquash_info;
- /* capability */
- __u32 mdt_capa_alg;
+ /* capability keys */
unsigned long mdt_capa_timeout;
- unsigned long mdt_ck_timeout;
+ __u32 mdt_capa_alg;
struct dt_object *mdt_ck_obj;
+ unsigned long mdt_ck_timeout;
unsigned long mdt_ck_expiry;
struct timer_list mdt_ck_timer;
struct ptlrpc_thread mdt_ck_thread;
int rr_logcookielen;
const struct llog_cookie *rr_logcookies;
__u32 rr_flags;
- struct lustre_capa *rr_capa1;
- struct lustre_capa *rr_capa2;
};
enum mdt_reint_flag {
struct mdt_client_data mti_mcd;
loff_t mti_off;
struct txn_param mti_txn_param;
- struct lustre_capa_key mti_capa_key;
struct lu_buf mti_buf;
+ struct lustre_capa_key mti_capa_key;
};
/*
* Info allocated per-transaction.
struct mdt_object *mdt_object_find(const struct lu_env *,
struct mdt_device *,
- const struct lu_fid *,
- struct lustre_capa *);
+ const struct lu_fid *);
struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *,
const struct lu_fid *,
struct mdt_lock_handle *,
- __u64 ibits,
- struct lustre_capa *);
+ __u64 ibits);
void mdt_object_unlock_put(struct mdt_thread_info *,
struct mdt_object *,
struct mdt_lock_handle *,
int mdt_fix_attr_ucred(struct mdt_thread_info *, __u32);
+static inline struct mdt_device *mdt_dev(struct lu_device *d)
+{
+// LASSERT(lu_device_is_mdt(d));
+ return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
+}
+
/* mdt/mdt_identity.c */
#define MDT_IDENTITY_UPCALL_PATH "/usr/sbin/l_getidentity"
struct md_ucred *mdt_ucred(const struct mdt_thread_info *info);
+static inline int is_identity_get_disabled(struct upcall_cache *cache)
+{
+ return cache ? (strcmp(cache->uc_upcall, "NONE") == 0) : 1;
+}
+
/*
- * fid Capability
+ * Capability
*/
int mdt_ck_thread_start(struct mdt_device *mdt);
void mdt_ck_thread_stop(struct mdt_device *mdt);
void mdt_ck_timer_callback(unsigned long castmeharder);
int mdt_capa_keys_init(const struct lu_env *env, struct mdt_device *mdt);
-static inline struct mdt_device *mdt_dev(struct lu_device *d)
+static inline void mdt_set_capainfo(struct mdt_thread_info *info, int offset,
+ const struct lu_fid *fid,
+ struct lustre_capa *capa)
{
-// LASSERT(lu_device_is_mdt(d));
- return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
-}
+ struct mdt_device *dev = info->mti_mdt;
+ struct md_capainfo *ci;
-static inline struct lustre_capa_key *red_capa_key(struct mdt_device *mdt)
-{
- return &mdt->mdt_capa_keys[1];
-}
+ if (!dev->mdt_opts.mo_mds_capa)
+ return;
-static inline int is_identity_get_disabled(struct upcall_cache *cache)
-{
- return cache ? (strcmp(cache->uc_upcall, "NONE") == 0) : 1;
+ ci = md_capainfo(info->mti_env);
+ LASSERT(ci);
+ ci->mc_fid[offset] = fid;
+ ci->mc_capa[offset] = capa;
}
-
#endif /* __KERNEL__ */
#endif /* _MDT_H */
lustre_shrink_reply(req, offset, acl_size, 1);
offset += !!acl_size;
if (mdscapa && !(body->valid & OBD_MD_FLMDSCAPA))
- lustre_shrink_reply(req, offset, 0, 0);
+ lustre_shrink_reply(req, offset, 0, 1);
offset += mdscapa;
if (osscapa && !(body->valid & OBD_MD_FLOSSCAPA))
lustre_shrink_reply(req, offset, 0, 0);
ma->ma_valid = MA_INODE;
if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
- rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1);
+ mdt_set_capainfo(info, 0, rr->rr_fid1,
+ req_capsule_client_get(pill, &RMF_CAPA1));
RETURN(0);
}
LA_CTIME | LA_MTIME | LA_ATIME;
info->mti_spec.sp_cr_flags = rec->cr_flags;
- if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
- rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1);
+ if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT)) {
+ mdt_set_capainfo(info, 0, rr->rr_fid1,
+ req_capsule_client_get(pill, &RMF_CAPA1));
+ mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
+ }
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
if (S_ISDIR(attr->la_mode)) {
attr->la_valid = LA_UID | LA_GID | LA_CTIME | LA_MTIME;
if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
- rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1);
+ mdt_set_capainfo(info, 0, rr->rr_fid1,
+ req_capsule_client_get(pill, &RMF_CAPA1));
if (req_capsule_get_size(pill, &RMF_CAPA2, RCL_CLIENT))
- rr->rr_capa2 = req_capsule_client_get(pill, &RMF_CAPA2);
+ mdt_set_capainfo(info, 1, rr->rr_fid2,
+ req_capsule_client_get(pill, &RMF_CAPA2));
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
if (rr->rr_name == NULL)
attr->la_valid = LA_UID | LA_GID | LA_CTIME | LA_MTIME | LA_MODE;
if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
- rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1);
+ mdt_set_capainfo(info, 0, rr->rr_fid1,
+ req_capsule_client_get(pill, &RMF_CAPA1));
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
if (rr->rr_name == NULL)
attr->la_valid = LA_UID | LA_GID | LA_CTIME | LA_MTIME | LA_MODE;
if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
- rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1);
+ mdt_set_capainfo(info, 0, rr->rr_fid1,
+ req_capsule_client_get(pill, &RMF_CAPA1));
if (req_capsule_get_size(pill, &RMF_CAPA2, RCL_CLIENT))
- rr->rr_capa2 = req_capsule_client_get(pill, &RMF_CAPA2);
+ mdt_set_capainfo(info, 1, rr->rr_fid2,
+ req_capsule_client_get(pill, &RMF_CAPA2));
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT);
struct lu_attr *attr = &info->mti_attr.ma_attr;
struct req_capsule *pill = &info->mti_pill;
struct mdt_reint_record *rr = &info->mti_rr;
+ struct ptlrpc_request *req = mdt_info_req(info);
ENTRY;
rec = req_capsule_client_get(pill, &RMF_REC_CREATE);
info->mti_replayepoch = rec->cr_ioepoch;
if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
- rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1);
- if (req_capsule_get_size(pill, &RMF_CAPA2, RCL_CLIENT))
- rr->rr_capa2 = req_capsule_client_get(pill, &RMF_CAPA2);
+ mdt_set_capainfo(info, 0, rr->rr_fid1,
+ req_capsule_client_get(pill, &RMF_CAPA1));
+ if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) &&
+ (req_capsule_get_size(pill, &RMF_CAPA2, RCL_CLIENT)))
+ mdt_set_capainfo(info, 1, rr->rr_fid2,
+ req_capsule_client_get(pill, &RMF_CAPA2));
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
if (rr->rr_name == NULL)
if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) {
struct md_create_spec *sp = &info->mti_spec;
- struct ptlrpc_request *req = mdt_info_req(info);
sp->u.sp_ea.eadata = req_capsule_client_get(pill,
&RMF_EADATA);
sp->u.sp_ea.eadatalen = req_capsule_get_size(pill,
return ret;
}
+/* for debug only */
+static int lprocfs_rd_capa(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct obd_device *obd = data;
+ struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+ return snprintf(page, count, "capability on: %s %s\n",
+ mdt->mdt_opts.mo_oss_capa ? "oss" : "",
+ mdt->mdt_opts.mo_mds_capa ? "mds" : "");
+}
+
+static int lprocfs_wr_capa(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ int val, rc;
+
+ rc = lprocfs_write_helper(buffer, count, &val);
+ if (rc)
+ return rc;
+
+ if (val & ~0x3) {
+ CERROR("invalid value %u: only 0/1/2/3 is accepted.\n", val);
+ CERROR("\t0: disable capability\n"
+ "\t1: enable mds capability\n"
+ "\t2: enable oss capability\n"
+ "\t3: enable both mds and oss capability\n");
+ return -EINVAL;
+ }
+
+// mds_capa_onoff(obd, val);
+ return count;
+}
+
+static int lprocfs_rd_capa_count(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ return snprintf(page, count, "%d %d\n",
+ capa_count[CAPA_SITE_CLIENT],
+ capa_count[CAPA_SITE_SERVER]);
+}
+
+static int lprocfs_rd_capa_timeout(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct obd_device *obd = data;
+ struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+ return snprintf(page, count, "%lu\n", mdt->mdt_capa_timeout);
+}
+
+static int lprocfs_rd_ck_timeout(char *page, char **start, off_t off, int count,
+ int *eof, void *data)
+{
+ struct obd_device *obd = data;
+ struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+ return snprintf(page, count, "%lu\n", mdt->mdt_ck_timeout);
+}
+
static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
{ "uuid", lprocfs_rd_uuid, 0, 0 },
{ "recovery_status", lprocfs_obd_rd_recovery_status, 0, 0 },
{ "rootsquash_uid", lprocfs_rd_rootsquash_uid, 0, 0 },
{ "rootsquash_gid", lprocfs_rd_rootsquash_gid, 0, 0 },
{ "rootsquash_skips", lprocfs_rd_rootsquash_skips, 0, 0 },
+ { "capa", lprocfs_rd_capa, lprocfs_wr_capa, 0 },
+ { "capa_timeout", lprocfs_rd_capa_timeout, 0, 0 },
+ { "capa_key_timeout", lprocfs_rd_ck_timeout, 0, 0 },
+ { "capa_count", lprocfs_rd_capa_count, 0, 0 },
{ 0 }
};
int rc;
ENTRY;
- if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
- !(spec->sp_cr_flags & FMODE_WRITE))
+ if ((spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE) ||
+ !(spec->sp_cr_flags & FMODE_WRITE))
RETURN(0);
ma->ma_need = MA_INODE | MA_LOV;
}
}
- spin_lock(&capa_lock);
- info->mti_capa_key = *red_capa_key(mdt);
- spin_unlock(&capa_lock);
-
if (mdt->mdt_opts.mo_mds_capa) {
struct lustre_capa *capa;
RETURN(rc);
repbody->valid |= OBD_MD_FLMDSCAPA;
}
- if (mdt->mdt_opts.mo_oss_capa) {
+ if (mdt->mdt_opts.mo_oss_capa &&
+ S_ISREG(lu_object_attr(&o->mot_obj.mo_lu))) {
struct lustre_capa *capa;
capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA2);
LASSERT(capa);
- capa->lc_opc = CAPA_OPC_OSS_DEFAULT;
+ capa->lc_opc = CAPA_OPC_OSS_DEFAULT | capa_open_opc(flags);
rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa);
if (rc)
RETURN(rc);
* We failed after creation, but we do not know in which step
* we failed. So try to check the child object.
*/
- parent = mdt_object_find(env, mdt, rr->rr_fid1, rr->rr_capa1);
+ parent = mdt_object_find(env, mdt, rr->rr_fid1);
LASSERT(!IS_ERR(parent));
- child = mdt_object_find(env, mdt, rr->rr_fid2, rr->rr_capa2);
+ child = mdt_object_find(env, mdt, rr->rr_fid2);
LASSERT(!IS_ERR(child));
rc = lu_object_exists(&child->mot_obj.mo_lu);
int rc;
ENTRY;
- o = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2,
- rr->rr_capa2);
+ o = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
if (IS_ERR(o))
RETURN(rc = PTR_ERR(o));
int rc;
ENTRY;
- o = mdt_object_find(info->mti_env, info->mti_mdt, fid, BYPASS_CAPA);
+ o = mdt_object_find(info->mti_env, info->mti_mdt, fid);
if (IS_ERR(o))
RETURN(rc = PTR_ERR(o));
rc = lu_object_exists(&o->mot_obj.mo_lu);
if (rc > 0) {
+ mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
rc = mo_attr_get(info->mti_env, mdt_object_child(o), ma);
if (rc == 0)
rc = mdt_mfd_open(info, NULL, o, flags, 0, rep);
else
lh->mlh_mode = LCK_EX;
parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
- MDS_INODELOCK_UPDATE, rr->rr_capa1);
+ MDS_INODELOCK_UPDATE);
if (IS_ERR(parent))
GOTO(out, result = PTR_ERR(parent));
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
}
- child = mdt_object_find(info->mti_env, mdt, child_fid, BYPASS_CAPA);
+ child = mdt_object_find(info->mti_env, mdt, child_fid);
if (IS_ERR(child))
GOTO(out_parent, result = PTR_ERR(child));
+ mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
if (result == -ENOENT) {
/* Not found and with MDS_OPEN_CREAT: let's create it. */
mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
- rc = dt->do_body_ops->dbo_read(env, dt, buf, pos);
+ rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
if (rc == buf->lb_len)
rc = 0;
LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
LASSERT(th != NULL);
- rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th);
+ rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA);
if (rc == buf->lb_len)
rc = 0;
else if (rc >= 0)
obj = mdt->mdt_last_rcvd;
obj->do_ops->do_read_lock(env, obj);
- rc = obj->do_ops->do_attr_get(env, mdt->mdt_last_rcvd, la);
+ rc = obj->do_ops->do_attr_get(env, mdt->mdt_last_rcvd, la, BYPASS_CAPA);
obj->do_ops->do_read_unlock(env, obj);
if (rc)
RETURN(rc);
o = dt_store_open(env, mdt->mdt_bottom, CAPA_KEYS, &fid);
if(!IS_ERR(o)) {
- struct md_device *next = mdt->mdt_child;
mdt->mdt_ck_obj = o;
rc = mdt_capa_keys_init(env, mdt);
if (rc) {
mdt->mdt_ck_obj = NULL;
RETURN(rc);
}
- rc = next->md_ops->mdo_init_capa_keys(next, mdt->mdt_capa_keys);
} else {
rc = PTR_ERR(o);
CERROR("cannot open %s: rc = %d\n", CAPA_KEYS, rc);
return;
/* if no error, so child was created with requested fid */
- child = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid2,
- mti->mti_rr.rr_capa2);
+ child = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid2);
LASSERT(!IS_ERR(child));
body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
return;
body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
- obj = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid1,
- mti->mti_rr.rr_capa1);
+ obj = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid1);
LASSERT(!IS_ERR(obj));
mo_attr_get(mti->mti_env, mdt_object_child(obj), &mti->mti_attr);
mdt_pack_attr2body(body, &mti->mti_attr.ma_attr, mdt_object_fid(obj));
lh = &info->mti_lh[MDT_LH_PARENT];
lh->mlh_mode = LCK_EX;
- parent = mdt_object_find_lock(info, rr->rr_fid1,
- lh, MDS_INODELOCK_UPDATE,
- rr->rr_capa1);
+ parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
+ MDS_INODELOCK_UPDATE);
if (IS_ERR(parent))
RETURN(PTR_ERR(parent));
- child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2, BYPASS_CAPA);
+ child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2);
if (!IS_ERR(child)) {
struct md_object *next = mdt_object_child(parent);
mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
OBD_FAIL_MDS_REINT_CREATE_WRITE);
+ mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
rc = mdo_create(info->mti_env, next, rr->rr_name,
mdt_object_child(child),
&info->mti_spec, ma);
repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
- o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2,
- BYPASS_CAPA);
+ o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2);
if (!IS_ERR(o)) {
struct md_object *next = mdt_object_child(o);
(unsigned int)ma->ma_attr.la_valid);
repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
- mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1,
- rr->rr_capa1);
+ mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
if (IS_ERR(mo))
RETURN(rc = PTR_ERR(mo));
mdt_pack_attr2body(repbody, &ma->ma_attr, mdt_object_fid(mo));
- if (mdt->mdt_opts.mo_oss_capa) {
+ if (mdt->mdt_opts.mo_oss_capa &&
+ S_ISREG(lu_object_attr(&mo->mot_obj.mo_lu))) {
struct lustre_capa *capa;
capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
parent_lh = &info->mti_lh[MDT_LH_PARENT];
parent_lh->mlh_mode = LCK_EX;
mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
- MDS_INODELOCK_UPDATE, rr->rr_capa1);
+ MDS_INODELOCK_UPDATE);
if (IS_ERR(mp))
GOTO(out, rc = PTR_ERR(mp));
GOTO(out_unlock_parent, rc);
/* we will lock the child regardless it is local or remote. No harm. */
- mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid,
- BYPASS_CAPA);
+ mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
if (IS_ERR(mc))
GOTO(out_unlock_parent, rc = PTR_ERR(mc));
child_lh = &info->mti_lh[MDT_LH_CHILD];
* whether need MA_LOV and MA_COOKIE.
*/
ma->ma_need = MA_INODE;
+ mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
mdt_object_child(mc), rr->rr_name, ma);
if (rc)
lhs = &info->mti_lh[MDT_LH_PARENT];
lhs->mlh_mode = LCK_EX;
ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
- MDS_INODELOCK_UPDATE, rr->rr_capa1);
+ MDS_INODELOCK_UPDATE);
if (IS_ERR(ms))
RETURN(PTR_ERR(ms));
if (strlen(rr->rr_name) == 0) {
/* remote partial operation */
+ mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
rc = mo_ref_add(info->mti_env, mdt_object_child(ms));
GOTO(out_unlock_source, rc);
}
lhp = &info->mti_lh[MDT_LH_CHILD];
lhp->mlh_mode = LCK_EX;
mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
- MDS_INODELOCK_UPDATE, rr->rr_capa2);
+ MDS_INODELOCK_UPDATE);
if (IS_ERR(mp))
GOTO(out_unlock_source, rc = PTR_ERR(mp));
lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
lh_tgtdir->mlh_mode = LCK_EX;
mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
- MDS_INODELOCK_UPDATE, rr->rr_capa1);
+ MDS_INODELOCK_UPDATE);
if (IS_ERR(mtgtdir))
GOTO(out, rc = PTR_ERR(mtgtdir));
lh_tgt->mlh_mode = LCK_EX;
mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
- MDS_INODELOCK_LOOKUP, BYPASS_CAPA);
+ MDS_INODELOCK_LOOKUP);
if (IS_ERR(mtgt))
GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
ENTRY;
do {
- dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid,
- BYPASS_CAPA);
+ dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
if (!IS_ERR(dst)) {
rc = mdo_is_subdir(info->mti_env, mdt_object_child(dst),
fid, &dst_fid);
lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
lh_srcdirp->mlh_mode = LCK_EX;
msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
- MDS_INODELOCK_UPDATE, rr->rr_capa1);
+ MDS_INODELOCK_UPDATE);
if (IS_ERR(msrcdir))
GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
mtgtdir = msrcdir;
} else {
mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
- rr->rr_fid2, rr->rr_capa2);
+ rr->rr_fid2);
if (IS_ERR(mtgtdir))
GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
lh_oldp = &info->mti_lh[MDT_LH_OLD];
lh_oldp->mlh_mode = LCK_EX;
mold = mdt_object_find_lock(info, old_fid, lh_oldp,
- MDS_INODELOCK_LOOKUP, BYPASS_CAPA);
+ MDS_INODELOCK_LOOKUP);
if (IS_ERR(mold))
GOTO(out_unlock_target, rc = PTR_ERR(mold));
GOTO(out_unlock_old, rc = -EINVAL);
lh_newp->mlh_mode = LCK_EX;
- mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid,
- BYPASS_CAPA);
+ mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
if (IS_ERR(mnew))
GOTO(out_unlock_old, rc = PTR_ERR(mnew));
mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
OBD_FAIL_MDS_REINT_RENAME_WRITE);
+ mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
+ mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
/* Check if @dst is subdir of @src. */
rc = mdt_rename_check(info, old_fid);
if (rc)
}
/* add or update */
-struct obd_capa *capa_add(struct lustre_capa *capa)
+void capa_add(struct lustre_capa *capa)
{
struct hlist_head *head = capa_hash + capa_hashfn(&capa->lc_fid);
struct obd_capa *ocapa, *old = NULL;
ocapa = alloc_capa(CAPA_SITE_SERVER);
if (!ocapa)
- return NULL;
+ return;
spin_lock(&capa_lock);
DEBUG_CAPA(D_SEC, &ocapa->c_capa, "new");
spin_unlock(&capa_lock);
- return ocapa;
+ return;
}
spin_lock(&old->c_lock);
DEBUG_CAPA(D_SEC, &old->c_capa, "update");
free_capa(ocapa);
- return old;
}
struct obd_capa *capa_lookup(struct lustre_capa *capa)
int result;
if (dt_try_as_dir(env, dir))
- result = dir->do_index_ops->dio_lookup(env, dir, rec, key);
+ result = dir->do_index_ops->dio_lookup(env, dir, rec, key,
+ BYPASS_CAPA);
else
result = -ENOTDIR;
return result;
struct lu_object *obj;
struct dt_object *dt;
- obj = lu_object_find(env, dev->dd_lu_dev.ld_site, fid, BYPASS_CAPA);
+ obj = lu_object_find(env, dev->dd_lu_dev.ld_site, fid);
if (!IS_ERR(obj)) {
obj = lu_object_locate(obj->lo_header, dev->dd_lu_dev.ld_type);
LASSERT(obj != NULL);
if (result == 0) {
root = dt_locate(env, dt, fid);
if (!IS_ERR(root)) {
- lu_object_bypass_capa(&root->do_lu);
result = dt_lookup(env, root, name, fid);
if (result == 0)
child = dt_locate(env, dt, fid);
*/
static struct lu_object *lu_object_alloc(const struct lu_env *env,
struct lu_site *s,
- const struct lu_fid *f,
- const struct lustre_capa *capa)
+ const struct lu_fid *f)
{
struct lu_object *scan;
struct lu_object *top;
* after this point.
*/
top->lo_header->loh_fid = *f;
- if (capa == BYPASS_CAPA)
- lu_object_bypass_capa(top);
- else if (capa)
- top->lo_header->loh_capa = *capa;
-
layers = &top->lo_header->loh_layers;
do {
/*
* any case, additional reference is acquired on the returned object.
*/
struct lu_object *lu_object_find(const struct lu_env *env,
- struct lu_site *s, const struct lu_fid *f,
- struct lustre_capa *capa)
+ struct lu_site *s, const struct lu_fid *f)
{
struct lu_object *o;
struct lu_object *shadow;
struct hlist_head *bucket;
- int rc;
/*
* This uses standard index maintenance protocol:
o = htable_lookup(s, bucket, f);
spin_unlock(&s->ls_guard);
- if (o != NULL) {
- if (capa == BYPASS_CAPA) {
- o->lo_header->loh_capa_bypass = 1;
- } else {
- rc = lu_object_auth(env, o, capa,
- CAPA_OPC_INDEX_LOOKUP);
- if (rc)
- return ERR_PTR(rc);
- if (capa)
- o->lo_header->loh_capa = *capa;
- }
+ if (o != NULL)
return o;
- }
/*
* Allocate new object. This may result in rather complicated
* operations, including fld queries, inode loading, etc.
*/
- o = lu_object_alloc(env, s, f, capa);
+ o = lu_object_alloc(env, s, f);
if (IS_ERR(o))
return o;
}
EXPORT_SYMBOL(lu_object_find);
-int lu_object_auth(const struct lu_env *env, const struct lu_object *o,
- struct lustre_capa *capa, __u64 opc)
-{
- struct lu_object_header *top = o->lo_header;
- int rc;
-
- list_for_each_entry(o, &top->loh_layers, lo_linkage) {
- if (o->lo_ops->loo_object_auth) {
- rc = o->lo_ops->loo_object_auth(env, o, capa, opc);
- if (rc)
- return rc;
- }
- }
-
- return 0;
-}
-EXPORT_SYMBOL(lu_object_auth);
-
enum {
LU_SITE_HTABLE_BITS = 8,
LU_SITE_HTABLE_SIZE = (1 << LU_SITE_HTABLE_BITS),
* This means that it's enough to have _one_ lu_context.
*/
struct lu_env od_env_for_commit;
+ /*
+ * Capability
+ */
+ unsigned int od_fl_capa:1;
+ unsigned long od_capa_timeout;
+ __u32 od_capa_alg;
+ struct lustre_capa_key *od_capa_keys;
};
static int osd_root_get (const struct lu_env *env,
const struct txn_param *param);
static int osd_index_lookup (const struct lu_env *env,
struct dt_object *dt,
- struct dt_rec *rec, const struct dt_key *key);
+ struct dt_rec *rec, const struct dt_key *key,
+ struct lustre_capa *capa);
static int osd_index_insert (const struct lu_env *env,
struct dt_object *dt,
const struct dt_rec *rec,
const struct dt_key *key,
- struct thandle *handle);
+ struct thandle *handle,
+ struct lustre_capa *capa);
static int osd_index_delete (const struct lu_env *env,
struct dt_object *dt, const struct dt_key *key,
- struct thandle *handle);
+ struct thandle *handle,
+ struct lustre_capa *capa);
static int osd_index_probe (const struct lu_env *env,
struct osd_object *o,
const struct dt_index_features *feat);
EXIT;
}
+static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
+ __u32 valid, unsigned long timeout, __u32 alg,
+ struct lustre_capa_key *keys)
+{
+ struct osd_device *dev = osd_dt_dev(d);
+ ENTRY;
+
+ if (valid & CAPA_CTX_ON)
+ dev->od_fl_capa = 1;
+ else
+ dev->od_fl_capa = 0;
+
+ if (valid & CAPA_CTX_TIMEOUT)
+ dev->od_capa_timeout = timeout;
+
+ if (valid & CAPA_CTX_ALG)
+ dev->od_capa_alg = alg;
+
+ if (valid & CAPA_CTX_KEYS)
+ dev->od_capa_keys = keys;
+ RETURN(0);
+}
+
/* Note: we did not count into QUOTA here, If we mount with --data_journal
* we may need more*/
enum {
}
static struct dt_device_operations osd_dt_ops = {
- .dt_root_get = osd_root_get,
- .dt_statfs = osd_statfs,
- .dt_trans_start = osd_trans_start,
- .dt_trans_stop = osd_trans_stop,
- .dt_conf_get = osd_conf_get,
- .dt_sync = osd_sync,
- .dt_ro = osd_ro,
- .dt_credit_get = osd_credit_get
+ .dt_root_get = osd_root_get,
+ .dt_statfs = osd_statfs,
+ .dt_trans_start = osd_trans_start,
+ .dt_trans_stop = osd_trans_stop,
+ .dt_conf_get = osd_conf_get,
+ .dt_sync = osd_sync,
+ .dt_ro = osd_ro,
+ .dt_credit_get = osd_credit_get,
+ .dt_init_capa_ctxt = osd_init_capa_ctxt
};
static void osd_object_read_lock(const struct lu_env *env,
up_write(&obj->oo_sem);
}
-static inline int osd_object_auth(const struct lu_env *env,
- const struct lu_object *o,
- __u64 opc)
+static int capa_is_sane(const struct lu_env *env,
+ struct lustre_capa *capa,
+ struct lustre_capa_key *keys)
+{
+ struct obd_capa *c;
+ struct osd_thread_info *oti = lu_context_key_get(&env->le_ctx, &osd_key);
+ int i, rc = 1;
+ ENTRY;
+
+ c = capa_lookup(capa);
+ if (c) {
+ spin_lock(&c->c_lock);
+ if (memcmp(&c->c_capa, capa, sizeof(*capa))) {
+ DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
+ rc = -EACCES;
+ } else if (capa_is_expired(c)) {
+ DEBUG_CAPA(D_ERROR, capa, "expired");
+ rc = -ESTALE;
+ }
+ spin_unlock(&c->c_lock);
+
+ capa_put(c);
+ RETURN(rc);
+ }
+
+ spin_lock(&capa_lock);
+ for (i = 0; i < 2; i++) {
+ if (keys[i].lk_keyid == capa->lc_keyid) {
+ oti->oti_capa_key = keys[i];
+ break;
+ }
+ }
+ spin_unlock(&capa_lock);
+
+ if (i == 2) {
+ DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
+ RETURN(-ESTALE);
+ }
+
+ rc = capa_hmac(oti->oti_capa_hmac, capa, oti->oti_capa_key.lk_key);
+ if (rc)
+ RETURN(rc);
+ if (memcmp(oti->oti_capa_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
+ DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
+ RETURN(-EACCES);
+ }
+
+ capa_add(capa);
+
+ RETURN(1);
+}
+
+static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
+ struct lustre_capa *capa, __u64 opc)
{
- return o->lo_ops->loo_object_auth(env, o, lu_object_capa(o), opc);
+ const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
+ struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
+
+ if (!dev->od_fl_capa)
+ return 0;
+
+ if (capa == BYPASS_CAPA)
+ return 0;
+
+ if (!capa) {
+ CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
+ LBUG();
+ return -EACCES;
+ }
+
+ if (!lu_fid_eq(fid, &capa->lc_fid)) {
+ DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
+ PFID(fid));
+ return -EACCES;
+ }
+
+ if (!capa_opc_supported(capa, opc)) {
+ DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
+ return -EACCES;
+ }
+
+ if (!capa_is_sane(env, capa, dev->od_capa_keys)) {
+ DEBUG_CAPA(D_ERROR, capa, "insane");
+ return -EACCES;
+ }
+
+ return 0;
}
static int osd_attr_get(const struct lu_env *env,
struct dt_object *dt,
- struct lu_attr *attr)
+ struct lu_attr *attr,
+ struct lustre_capa *capa)
{
struct osd_object *obj = osd_dt_obj(dt);
LASSERT(osd_invariant(obj));
LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
- if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_READ))
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
return -EACCES;
return osd_inode_getattr(env, obj->oo_inode, attr);
static int osd_attr_set(const struct lu_env *env,
struct dt_object *dt,
const struct lu_attr *attr,
- struct thandle *handle)
+ struct thandle *handle,
+ struct lustre_capa *capa)
{
struct osd_object *obj = osd_dt_obj(dt);
LASSERT(handle != NULL);
LASSERT(osd_invariant(obj));
LASSERT(osd_write_locked(env, obj));
- if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_WRITE))
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
return -EACCES;
return osd_inode_setattr(env, obj->oo_inode, attr);
/*
* XXX missing: permission checks.
*/
- if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_INDEX_INSERT))
- RETURN(-EACCES);
/*
* XXX missing: sanity checks (valid ->la_mode, etc.)
}
static void osd_object_ref_add(const struct lu_env *env,
- struct dt_object *dt, struct thandle *th)
+ struct dt_object *dt,
+ struct thandle *th)
{
struct osd_object *obj = osd_dt_obj(dt);
struct inode *inode = obj->oo_inode;
LASSERT(osd_write_locked(env, obj));
LASSERT(th != NULL);
- if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_WRITE)) {
- LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu,
- "no capability to link!\n");
- return;
- }
-
if (inode->i_nlink < LDISKFS_LINK_MAX) {
inode->i_nlink ++;
mark_inode_dirty(inode);
}
static void osd_object_ref_del(const struct lu_env *env,
- struct dt_object *dt, struct thandle *th)
+ struct dt_object *dt,
+ struct thandle *th)
{
struct osd_object *obj = osd_dt_obj(dt);
struct inode *inode = obj->oo_inode;
LASSERT(osd_write_locked(env, obj));
LASSERT(th != NULL);
- if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_WRITE)) {
- LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu,
- "no capability to unlink!\n");
- return;
- }
-
if (inode->i_nlink > 0) {
inode->i_nlink --;
mark_inode_dirty(inode);
LASSERT(osd_invariant(obj));
}
-static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
- struct lu_buf *buf, const char *name)
+static int osd_xattr_get(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lu_buf *buf,
+ const char *name,
+ struct lustre_capa *capa)
{
struct osd_object *obj = osd_dt_obj(dt);
struct inode *inode = obj->oo_inode;
LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
- if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_READ))
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
return -EACCES;
dentry->d_inode = inode;
static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
const struct lu_buf *buf, const char *name, int fl,
- struct thandle *handle)
+ struct thandle *handle, struct lustre_capa *capa)
{
int fs_flags;
LASSERT(osd_write_locked(env, obj));
LASSERT(handle != NULL);
- if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_WRITE))
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
return -EACCES;
dentry->d_inode = inode;
buf->lb_buf, buf->lb_len, fs_flags);
}
-static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
- struct lu_buf *buf)
+static int osd_xattr_list(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lu_buf *buf,
+ struct lustre_capa *capa)
{
struct osd_object *obj = osd_dt_obj(dt);
struct inode *inode = obj->oo_inode;
LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
- if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_READ))
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
return -EACCES;
dentry->d_inode = inode;
return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
}
-static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
- const char *name, struct thandle *handle)
+static int osd_xattr_del(const struct lu_env *env,
+ struct dt_object *dt,
+ const char *name,
+ struct thandle *handle,
+ struct lustre_capa *capa)
{
struct osd_object *obj = osd_dt_obj(dt);
struct inode *inode = obj->oo_inode;
LASSERT(osd_write_locked(env, obj));
LASSERT(handle != NULL);
- if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_WRITE))
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
return -EACCES;
dentry->d_inode = inode;
__u32 *start, __u32 *end, struct lu_dirent **last)
{
int result;
- struct osd_thread_info *info = lu_context_key_get(&env->le_ctx, &osd_key);
+ struct osd_thread_info *info = lu_context_key_get(&env->le_ctx,
+ &osd_key);
struct lu_fid *fid = &info->oti_fid;
struct lu_dirent *ent;
}
static int osd_readpage(const struct lu_env *env,
- struct dt_object *dt, const struct lu_rdpg *rdpg)
+ struct dt_object *dt,
+ const struct lu_rdpg *rdpg,
+ struct lustre_capa *capa)
{
struct dt_it *it;
struct osd_object *obj = osd_dt_obj(dt);
LASSERT(rdpg->rp_pages != NULL);
- if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_BODY_READ))
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
return -EACCES;
if (rdpg->rp_count <= 0)
return rc ? rc : rc1;
}
+static int osd_capa_get(const struct lu_env *env,
+ struct dt_object *dt, struct lustre_capa *capa)
+{
+ struct osd_thread_info *info = lu_context_key_get(&env->le_ctx,
+ &osd_key);
+ const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_device *dev = osd_obj2dev(obj);
+ struct lustre_capa_key *key = &info->oti_capa_key;
+ struct obd_capa *oc;
+ int rc;
+ ENTRY;
+
+ LASSERT(dt_object_exists(dt));
+ LASSERT(osd_invariant(obj));
+
+ capa->lc_fid = *fid;
+ if (dev->od_capa_timeout < CAPA_TIMEOUT)
+ capa->lc_flags |= CAPA_FL_SHORT_EXPIRY;
+
+ capa->lc_flags = dev->od_capa_alg << 24;
+
+ /* TODO: get right permission here */
+ oc = capa_lookup(capa);
+ if (oc) {
+ LASSERT(!capa_is_expired(oc));
+ capa_cpy(capa, oc);
+ capa_put(oc);
+ RETURN(0);
+ }
+
+ spin_lock(&capa_lock);
+ *key = dev->od_capa_keys[1];
+ capa->lc_expiry = CURRENT_SECONDS + dev->od_capa_timeout;
+ spin_unlock(&capa_lock);
+
+ capa->lc_keyid = key->lk_keyid;
+ rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
+ if (rc)
+ RETURN(rc);
+
+ capa_add(capa);
+ RETURN(0);
+}
+
static struct dt_object_operations osd_obj_ops = {
.do_read_lock = osd_object_read_lock,
.do_write_lock = osd_object_write_lock,
.do_xattr_del = osd_xattr_del,
.do_xattr_list = osd_xattr_list,
.do_readpage = osd_readpage,
+ .do_capa_get = osd_capa_get,
};
/*
*/
static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
- struct lu_buf *buf, loff_t *pos)
+ struct lu_buf *buf, loff_t *pos,
+ struct lustre_capa *capa)
{
struct inode *inode = osd_dt_obj(dt)->oo_inode;
struct file *file;
mm_segment_t seg;
ssize_t result;
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
+ RETURN(-EACCES);
+
file = osd_rw_init(env, inode, &seg);
/*
* We'd like to use vfs_read() here, but it messes with
static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
const struct lu_buf *buf, loff_t *pos,
- struct thandle *handle)
+ struct thandle *handle, struct lustre_capa *capa)
{
struct inode *inode = osd_dt_obj(dt)->oo_inode;
struct file *file;
LASSERT(handle != NULL);
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
+ RETURN(-EACCES);
+
file = osd_rw_init(env, inode, &seg);
if (file->f_op->write)
result = file->f_op->write(file, buf->lb_buf, buf->lb_len, pos);
LASSERT(osd_invariant(obj));
LASSERT(dt_object_exists(dt));
- if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_INDEX_LOOKUP))
- RETURN(-EACCES);
-
if (osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode) {
dt->do_index_ops = &osd_index_compat_ops;
result = 0;
}
static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
- const struct dt_key *key, struct thandle *handle)
+ const struct dt_key *key, struct thandle *handle,
+ struct lustre_capa *capa)
{
struct osd_object *obj = osd_dt_obj(dt);
struct osd_thandle *oh;
LASSERT(obj->oo_ipd != NULL);
LASSERT(handle != NULL);
- if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_INDEX_DELETE))
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
RETURN(-EACCES);
oh = container_of0(handle, struct osd_thandle, ot_super);
}
static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
- struct dt_rec *rec, const struct dt_key *key)
+ struct dt_rec *rec, const struct dt_key *key,
+ struct lustre_capa *capa)
{
struct osd_object *obj = osd_dt_obj(dt);
int rc;
LASSERT(obj->oo_container.ic_object == obj->oo_inode);
LASSERT(obj->oo_ipd != NULL);
- if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_INDEX_LOOKUP))
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
return -EACCES;
rc = iam_lookup(&obj->oo_container, (const struct iam_key *)key,
static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
const struct dt_rec *rec, const struct dt_key *key,
- struct thandle *th)
+ struct thandle *th, struct lustre_capa *capa)
{
struct osd_object *obj = osd_dt_obj(dt);
LASSERT(obj->oo_ipd != NULL);
LASSERT(th != NULL);
- if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_INDEX_INSERT))
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
return -EACCES;
oh = container_of0(th, struct osd_thandle, ot_super);
static int osd_index_compat_delete(const struct lu_env *env,
struct dt_object *dt,
const struct dt_key *key,
- struct thandle *handle)
+ struct thandle *handle,
+ struct lustre_capa *capa)
{
struct osd_object *obj = osd_dt_obj(dt);
LASSERT(S_ISDIR(obj->oo_inode->i_mode));
ENTRY;
+#if 0
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
+ RETURN(-EACCES);
+#endif
+
RETURN(-EOPNOTSUPP);
}
static int osd_index_compat_lookup(const struct lu_env *env,
struct dt_object *dt,
- struct dt_rec *rec, const struct dt_key *key)
+ struct dt_rec *rec, const struct dt_key *key,
+ struct lustre_capa *capa)
{
struct osd_object *obj = osd_dt_obj(dt);
LASSERT(S_ISDIR(obj->oo_inode->i_mode));
LASSERT(osd_has_index(obj));
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
+ return -EACCES;
+
info->oti_str.name = (const char *)key;
info->oti_str.len = strlen((const char *)key);
static int osd_index_compat_insert(const struct lu_env *env,
struct dt_object *dt,
const struct dt_rec *rec,
- const struct dt_key *key, struct thandle *th)
+ const struct dt_key *key, struct thandle *th,
+ struct lustre_capa *capa)
{
struct osd_object *obj = osd_dt_obj(dt);
LASSERT(osd_invariant(obj));
LASSERT(th != NULL);
- luch = lu_object_find(env, ludev->ld_site, fid, BYPASS_CAPA);
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
+ return -EACCES;
+
+ luch = lu_object_find(env, ludev->ld_site, fid);
if (!IS_ERR(luch)) {
if (lu_object_exists(luch)) {
struct osd_object *child;
return osd_invariant(osd_obj(l));
}
-static int capa_is_sane(const struct lu_env *env,
- struct lustre_capa *capa,
- struct lustre_capa_key *keys)
-{
- struct obd_capa *c;
- struct osd_thread_info *oti = lu_context_key_get(&env->le_ctx, &osd_key);
- int i, rc = 0;
- ENTRY;
-
- c = capa_lookup(capa);
- if (c) {
- spin_lock(&c->c_lock);
- if (memcmp(&c->c_capa, capa, sizeof(*capa))) {
- DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
- rc = -EACCES;
- } else if (capa_is_expired(c)) {
- DEBUG_CAPA(D_ERROR, capa, "expired");
- rc = -ESTALE;
- }
- spin_unlock(&c->c_lock);
-
- capa_put(c);
- RETURN(rc);
- }
-
- spin_lock(&capa_lock);
- for (i = 0; i < 2; i++) {
- if (keys[i].lk_keyid == capa->lc_keyid) {
- oti->oti_capa_key = keys[i];
- break;
- }
- }
- spin_unlock(&capa_lock);
-
- if (i == 2) {
- DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
- RETURN(-ESTALE);
- }
-
- rc = capa_hmac(oti->oti_capa_hmac, capa, oti->oti_capa_key.lk_key);
- if (rc)
- RETURN(rc);
- if (memcmp(oti->oti_capa_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
- DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
- RETURN(-EACCES);
- }
-
- capa_add(capa);
-
- RETURN(0);
-}
-
-static int osd_object_capa_auth(const struct lu_env *env,
- const struct lu_object *obj,
- struct lustre_capa *capa,
- __u64 opc)
-{
- const struct lu_fid *fid = lu_object_fid(obj);
-
- return 0;
-
- if (lu_object_capa_bypass(obj))
- return 0;
-
- if (!capa) {
- CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
- return -EACCES;
- }
-
- if (!lu_fid_eq(fid, &capa->lc_fid)) {
- DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
- PFID(fid));
- return -EACCES;
- }
-
- if (!capa_opc_supported(capa, opc)) {
- DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
- return -EACCES;
- }
-
- if (!capa_is_sane(env, capa, obj->lo_dev->ld_site->ls_capa_keys)) {
- DEBUG_CAPA(D_ERROR, capa, "insane");
- return -EACCES;
- }
-
- return 0;
-}
-
static struct lu_object_operations osd_lu_obj_ops = {
.loo_object_init = osd_object_init,
.loo_object_delete = osd_object_delete,
.loo_object_release = osd_object_release,
.loo_object_free = osd_object_free,
.loo_object_print = osd_object_print,
- .loo_object_invariant = osd_object_invariant,
- .loo_object_auth = osd_object_capa_auth
+ .loo_object_invariant = osd_object_invariant
};
static struct lu_device_operations osd_lu_ops = {
} else {
rc = oi->oi_dir->do_index_ops->dio_lookup
(info->oti_env, oi->oi_dir,
- (struct dt_rec *)id, oi_fid_key(info, fid));
+ (struct dt_rec *)id, oi_fid_key(info, fid),
+ BYPASS_CAPA);
osd_inode_id_init(id, id->oii_ino, id->oii_gen);
}
return rc;
osd_inode_id_init(id, id0->oii_ino, id0->oii_gen);
return idx->do_index_ops->dio_insert(info->oti_env, idx,
(const struct dt_rec *)id,
- oi_fid_key(info, fid), th);
+ oi_fid_key(info, fid), th,
+ BYPASS_CAPA);
}
/*
idx = oi->oi_dir;
dev = lu2dt_dev(idx->do_lu.lo_dev);
return idx->do_index_ops->dio_delete(info->oti_env, idx,
- oi_fid_key(info, fid), th);
+ oi_fid_key(info, fid), th,
+ BYPASS_CAPA);
}
&RMF_MDT_BODY
};
-static const struct req_msg_field *mdt_renew_capa_client[] = {
- &RMF_PTLRPC_BODY,
- &RMF_CAPA1
-};
-
static const struct req_msg_field *mdt_body_capa[] = {
&RMF_PTLRPC_BODY,
&RMF_MDT_BODY,
&RQF_MDS_READPAGE,
&RQF_MDS_WRITEPAGE,
&RQF_MDS_IS_SUBDIR,
- &RQF_MDS_DONE_WRITING,
- &RQF_MDS_RENEW_CAPA
+ &RQF_MDS_DONE_WRITING
};
struct req_msg_field {
mdt_body_only, mdt_body_only);
EXPORT_SYMBOL(RQF_MDS_IS_SUBDIR);
-const struct req_format RQF_MDS_RENEW_CAPA =
- DEFINE_REQ_FMT0("MDS_RENEW_CAPA",
- mdt_renew_capa_client, mdt_body_capa);
-EXPORT_SYMBOL(RQF_MDS_RENEW_CAPA);
-
#if !defined(__REQ_LAYOUT_USER__)
int req_layout_init(void)
{ MDS_SETXATTR, "mds_setxattr" },
{ MDS_WRITEPAGE, "mds_writepage" },
{ MDS_IS_SUBDIR, "mds_is_subdir" },
- { MDS_RENEW_CAPA, "mds_renew_capa" },
{ LDLM_ENQUEUE, "ldlm_enqueue" },
{ LDLM_CONVERT, "ldlm_convert" },
{ LDLM_CANCEL, "ldlm_cancel" },