Whamcloud - gitweb
revised capability code to not break md stack according to nikita's advice.
authorlsy <lsy>
Thu, 5 Oct 2006 12:59:47 +0000 (12:59 +0000)
committerlsy <lsy>
Thu, 5 Oct 2006 12:59:47 +0000 (12:59 +0000)
several other capability code fixes.

36 files changed:
lustre/cmm/cmm_device.c
lustre/cmm/cmm_object.c
lustre/cmm/cmm_split.c
lustre/fid/fid_store.c
lustre/fld/fld_index.c
lustre/include/dt_object.h
lustre/include/lu_object.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_capa.h
lustre/include/lustre_req_layout.h
lustre/include/md_object.h
lustre/include/obd_support.h
lustre/llite/file.c
lustre/llite/llite_capa.c
lustre/llite/llite_lib.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_request.c
lustre/mdd/mdd_handler.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_lov.c
lustre/mdd/mdd_orphans.c
lustre/mdt/mdt_capa.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_lproc.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_recovery.c
lustre/mdt/mdt_reint.c
lustre/obdclass/capa.c
lustre/obdclass/dt_object.c
lustre/obdclass/lu_object.c
lustre/osd/osd_handler.c
lustre/osd/osd_oi.c
lustre/ptlrpc/layout.c
lustre/ptlrpc/lproc_ptlrpc.c

index fe6d349..43fe651 100644 (file)
@@ -86,14 +86,16 @@ static int cmm_maxsize_get(const struct lu_env *env, struct md_device *md,
         RETURN(rc);
 }
 
-static int cmm_init_capa_keys(struct md_device *md,
+static int cmm_init_capa_ctxt(const struct lu_env *env, struct md_device *md,
+                              __u32 valid, unsigned long timeout, __u32 alg,
                               struct lustre_capa_key *keys)
 {
         struct cmm_device *cmm_dev = md2cmm_dev(md);
         int rc;
         ENTRY;
-        LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_keys);
-        rc = cmm_child_ops(cmm_dev)->mdo_init_capa_keys(cmm_dev->cmm_child,
+        LASSERT(cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt);
+        rc = cmm_child_ops(cmm_dev)->mdo_init_capa_ctxt(env, cmm_dev->cmm_child,
+                                                        valid, timeout, alg,
                                                         keys);
         RETURN(rc);
 }
@@ -115,7 +117,7 @@ static struct md_device_operations cmm_md_ops = {
         .mdo_statfs         = cmm_statfs,
         .mdo_root_get       = cmm_root_get,
         .mdo_maxsize_get    = cmm_maxsize_get,
-        .mdo_init_capa_keys = cmm_init_capa_keys,
+        .mdo_init_capa_ctxt = cmm_init_capa_ctxt,
         .mdo_update_capa_key= cmm_update_capa_key,
 };
 
index cd6191e..953845e 100644 (file)
@@ -434,7 +434,7 @@ struct md_object *md_object_find(const struct lu_env *env,
         struct md_object *m;
         ENTRY;
 
-        o = lu_object_find(env, md2lu_dev(md)->ld_site, f, BYPASS_CAPA);
+        o = lu_object_find(env, md2lu_dev(md)->ld_site, f);
         if (IS_ERR(o))
                 m = (struct md_object *)o;
         else {
index e5a37b6..a9553f0 100644 (file)
@@ -127,15 +127,13 @@ static int cmm_alloc_fid(const struct lu_env *env, struct cmm_device *cmm,
 
 struct cmm_object *cmm_object_find(const struct lu_env *env,
                                    struct cmm_device *d,
-                                   const struct lu_fid *f,
-                                   struct lustre_capa *capa)
+                                   const struct lu_fid *f)
 {
         struct lu_object *o;
         struct cmm_object *m;
         ENTRY;
 
-        o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f,
-                           capa);
+        o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f);
         if (IS_ERR(o))
                 m = (struct cmm_object *)o;
         else
index 3d72f1f..f396633 100644 (file)
@@ -89,7 +89,7 @@ int seq_store_write(struct lu_server_seq *seq,
 
                 rc = dt_obj->do_body_ops->dbo_write(env, dt_obj,
                                                     seq_record_buf(info),
-                                                    &pos, th);
+                                                    &pos, th, BYPASS_CAPA);
                 if (rc == sizeof(info->sti_record)) {
                         CDEBUG(D_INFO|D_WARNING, "%s: Store ranges: Space - "
                                DRANGE", Super - "DRANGE"\n", seq->lss_name,
@@ -122,7 +122,8 @@ int seq_store_read(struct lu_server_seq *seq,
         LASSERT(info != NULL);
 
         rc = dt_obj->do_body_ops->dbo_read(env, dt_obj,
-                                           seq_record_buf(info), &pos);
+                                           seq_record_buf(info), &pos,
+                                           BYPASS_CAPA);
 
         if (rc == sizeof(info->sti_record)) {
                 range_le_to_cpu(&seq->lss_space, &info->sti_record.ssr_space);
index 8517976..267dbfa 100644 (file)
@@ -117,7 +117,8 @@ int fld_index_create(struct lu_server_fld *fld,
         if (!IS_ERR(th)) {
                 rc = dt_obj->do_index_ops->dio_insert(env, dt_obj,
                                                       fld_rec(env, mds),
-                                                      fld_key(env, seq), th);
+                                                      fld_key(env, seq), th,
+                                                      BYPASS_CAPA);
                 dt_dev->dd_ops->dt_trans_stop(env, th);
         } else
                 rc = PTR_ERR(th);
@@ -140,7 +141,8 @@ int fld_index_delete(struct lu_server_fld *fld,
         th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &txn);
         if (!IS_ERR(th)) {
                 rc = dt_obj->do_index_ops->dio_delete(env, dt_obj,
-                                                      fld_key(env, seq), th);
+                                                      fld_key(env, seq), th,
+                                                      BYPASS_CAPA);
                 dt_dev->dd_ops->dt_trans_stop(env, th);
         } else
                 rc = PTR_ERR(th);
@@ -157,7 +159,7 @@ int fld_index_lookup(struct lu_server_fld *fld,
         ENTRY;
 
         rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj, rec,
-                                              fld_key(env, seq));
+                                              fld_key(env, seq), BYPASS_CAPA);
         if (rc == 0)
                 *mds = be64_to_cpu(*(__u64 *)rec);
         RETURN(rc);
index 639b865..95dd462 100644 (file)
@@ -95,6 +95,13 @@ struct dt_device_operations {
          */
         int   (*dt_sync)(const struct lu_env *env, struct dt_device *dev);
         void  (*dt_ro)(const struct lu_env *env, struct dt_device *dev);
+        /*
+         * Initialize capability context.
+         */
+        int   (*dt_init_capa_ctxt)(const struct lu_env *env,
+                                   struct dt_device *dev,
+                                   __u32 valid, unsigned long timeout,
+                                   __u32 alg, struct lustre_capa_key *keys);
 
         /*
          *  dt get credits from osd 
@@ -161,7 +168,8 @@ struct dt_object_operations {
          * precondition: lu_object_exists(&dt->do_lu);
          */
         int   (*do_attr_get)(const struct lu_env *env,
-                             struct dt_object *dt, struct lu_attr *attr);
+                             struct dt_object *dt, struct lu_attr *attr,
+                             struct lustre_capa *capa);
         /*
          * Set standard attributes.
          *
@@ -170,14 +178,16 @@ struct dt_object_operations {
         int   (*do_attr_set)(const struct lu_env *env,
                              struct dt_object *dt,
                              const struct lu_attr *attr,
-                             struct thandle *handle);
+                             struct thandle *handle,
+                             struct lustre_capa *capa);
         /*
          * Return a value of an extended attribute.
          *
          * precondition: dt_object_exists(dt);
          */
         int   (*do_xattr_get)(const struct lu_env *env, struct dt_object *dt,
-                              struct lu_buf *buf, const char *name);
+                              struct lu_buf *buf, const char *name,
+                              struct lustre_capa *capa);
         /*
          * Set value of an extended attribute.
          *
@@ -187,7 +197,8 @@ struct dt_object_operations {
          */
         int   (*do_xattr_set)(const struct lu_env *env,
                               struct dt_object *dt, const struct lu_buf *buf,
-                              const char *name, int fl, struct thandle *handle);
+                              const char *name, int fl, struct thandle *handle,
+                              struct lustre_capa *capa);
         /*
          * Delete existing extended attribute.
          *
@@ -195,7 +206,8 @@ struct dt_object_operations {
          */
         int   (*do_xattr_del)(const struct lu_env *env,
                               struct dt_object *dt,
-                              const char *name, struct thandle *handle);
+                              const char *name, struct thandle *handle,
+                              struct lustre_capa *capa);
         /*
          * Place list of existing extended attributes into @buf (which has
          * length len).
@@ -203,7 +215,8 @@ struct dt_object_operations {
          * precondition: dt_object_exists(dt);
          */
         int   (*do_xattr_list)(const struct lu_env *env,
-                               struct dt_object *dt, struct lu_buf *buf);
+                               struct dt_object *dt, struct lu_buf *buf,
+                               struct lustre_capa *capa);
         /*
          * Create new object on this device.
          *
@@ -236,8 +249,11 @@ struct dt_object_operations {
         void  (*do_ref_del)(const struct lu_env *env,
                             struct dt_object *dt, struct thandle *th);
 
-        int (*do_readpage)(const struct lu_env *env,
-                           struct dt_object *dt, const struct lu_rdpg *rdpg);
+        int   (*do_readpage)(const struct lu_env *env,
+                             struct dt_object *dt, const struct lu_rdpg *rdpg,
+                             struct lustre_capa *capa);
+        int   (*do_capa_get)(const struct lu_env *env,
+                             struct dt_object *dt, struct lustre_capa *capa);
 };
 
 /*
@@ -248,13 +264,14 @@ struct dt_body_operations {
          * precondition: dt_object_exists(dt);
          */
         ssize_t (*dbo_read)(const struct lu_env *env, struct dt_object *dt,
-                            struct lu_buf *buf, loff_t *pos);
+                            struct lu_buf *buf, loff_t *pos,
+                            struct lustre_capa *capa);
         /*
          * precondition: dt_object_exists(dt);
          */
         ssize_t (*dbo_write)(const struct lu_env *env, struct dt_object *dt,
                              const struct lu_buf *buf, loff_t *pos,
-                             struct thandle *handle);
+                             struct thandle *handle, struct lustre_capa *capa);
 };
 
 /*
@@ -280,18 +297,20 @@ struct dt_index_operations {
          * precondition: dt_object_exists(dt);
          */
         int (*dio_lookup)(const struct lu_env *env, struct dt_object *dt,
-                          struct dt_rec *rec, const struct dt_key *key);
+                          struct dt_rec *rec, const struct dt_key *key,
+                          struct lustre_capa *capa);
         /*
          * precondition: dt_object_exists(dt);
          */
         int (*dio_insert)(const struct lu_env *env, struct dt_object *dt,
                           const struct dt_rec *rec, const struct dt_key *key,
-                          struct thandle *handle);
+                          struct thandle *handle, struct lustre_capa *capa);
         /*
          * precondition: dt_object_exists(dt);
          */
         int (*dio_delete)(const struct lu_env *env, struct dt_object *dt,
-                          const struct dt_key *key, struct thandle *handle);
+                          const struct dt_key *key, struct thandle *handle,
+                          struct lustre_capa *capa);
         /*
          * Iterator interface
          */
index c9b4fc6..ca5e426 100644 (file)
@@ -209,13 +209,6 @@ struct lu_object_operations {
          * consistent.
          */
         int (*loo_object_invariant)(const struct lu_object *o);
-        /*
-         * Called to authorize action by capability.
-         */
-        int (*loo_object_auth)(const struct lu_env *env,
-                               const struct lu_object *o,
-                               struct lustre_capa *capa,
-                               __u64 opc);
 };
 
 /*
@@ -456,11 +449,6 @@ struct lu_object_header {
          */
         struct lu_fid     loh_fid;
         /*
-         * Fid capability.
-         */
-        unsigned int       loh_capa_bypass:1; /* bypass capability check */
-        struct lustre_capa loh_capa;          /* capability sent by client */
-        /*
          * Common object attributes, cached for efficiency. From enum
          * lu_object_header_attr.
          */
@@ -581,11 +569,6 @@ struct lu_site {
                 __u32 s_cache_race;
                 __u32 s_lru_purged;
         } ls_stats;
-
-        /* Capability */
-        struct lustre_capa_key *ls_capa_keys;
-        unsigned long           ls_capa_timeout;
-        __u32                   ls_capa_alg;
 };
 
 /*
@@ -699,14 +682,7 @@ void lu_site_purge(const struct lu_env *env,
  * any case, additional reference is acquired on the returned object.
  */
 struct lu_object *lu_object_find(const struct lu_env *env,
-                                 struct lu_site *s, const struct lu_fid *f,
-                                 struct lustre_capa *c);
-
-/*
- * Auth lu_object capability.
- */
-int lu_object_auth(const struct lu_env *env, const struct lu_object *o,
-                   struct lustre_capa *capa, __u64 opc);
+                                 struct lu_site *s, const struct lu_fid *f);
 
 /*
  * Helpers.
@@ -738,20 +714,6 @@ static inline const struct lu_fid *lu_object_fid(const struct lu_object *o)
 }
 
 /*
- * Pointer to the fid capability of this object.
- */
-static inline struct lustre_capa *
-lu_object_capa(const struct lu_object *o)
-{
-        return &o->lo_header->loh_capa;
-}
-
-static inline int lu_object_capa_bypass(const struct lu_object *o)
-{
-        return o->lo_header->loh_capa_bypass;
-}
-
-/*
  * return device operations vector for this object
  */
 static inline struct lu_device_operations *
@@ -844,11 +806,6 @@ static inline const __u32 lu_object_attr(const struct lu_object *o)
         return o->lo_header->loh_attr;
 }
 
-static inline void lu_object_bypass_capa(struct lu_object *o)
-{
-        o->lo_header->loh_capa_bypass = 1;
-}
-
 struct lu_rdpg {
         /* input params, should be filled out by mdt */
         __u32                   rp_hash;        /* hash */
index e605aa5..c3a1666 100644 (file)
@@ -847,7 +847,6 @@ typedef enum {
         MDS_SETXATTR     = 50,
         MDS_WRITEPAGE    = 51,
         MDS_IS_SUBDIR    = 52,
-        MDS_RENEW_CAPA   = 53,
         MDS_LAST_OPC
 } mds_cmd_t;
 
@@ -1941,18 +1940,17 @@ enum {
 
 static inline int capa_for_mds(struct lustre_capa *c)
 {
-        return (c->lc_opc & CAPA_OPC_MDS_ONLY) != 0;
+        return (c->lc_opc & CAPA_OPC_INDEX_INSERT) != 0;
 }
 
 static inline int capa_for_oss(struct lustre_capa *c)
 {
-        return (c->lc_opc & CAPA_OPC_OSS_ONLY) != 0;
+        return (c->lc_opc & CAPA_OPC_INDEX_INSERT) == 0;
 }
 
 /* lustre_capa.lc_flags */
 enum {
         CAPA_FL_SHORT_EXPIRY = 1, /* short capa expiry */
-        CAPA_FL_ROOT         = 2, /* root fid capa, will always renew */
 };
 
 /* lustre_capa.lc_hmac_alg */
index dc36383..3123fbd 100644 (file)
@@ -152,7 +152,7 @@ extern spinlock_t capa_lock;
 extern int capa_count[];
 extern cfs_mem_cache_t *capa_cachep;
 
-struct obd_capa *capa_add(struct lustre_capa *capa);
+void capa_add(struct lustre_capa *capa);
 struct obd_capa *capa_lookup(struct lustre_capa *capa);
 
 int capa_hmac(__u8 *hmac, struct lustre_capa *capa, __u8 *key);
@@ -343,4 +343,11 @@ struct filter_capa_key {
 
 #define BYPASS_CAPA (struct lustre_capa *)ERR_PTR(-ENOENT)
 
+enum {
+        CAPA_CTX_ON           = 1,
+        CAPA_CTX_TIMEOUT      = 1<<1,
+        CAPA_CTX_KEY_TIMEOUT  = 1<<2,
+        CAPA_CTX_ALG          = 1<<3,
+        CAPA_CTX_KEYS         = 1<<4,
+};
 #endif /* __LINUX_CAPA_H_ */
index dd548ac..57a8bbf 100644 (file)
@@ -112,7 +112,6 @@ extern const struct req_format RQF_MDS_READPAGE;
 extern const struct req_format RQF_MDS_WRITEPAGE;
 extern const struct req_format RQF_MDS_IS_SUBDIR;
 extern const struct req_format RQF_MDS_DONE_WRITING;
-extern const struct req_format RQF_MDS_RENEW_CAPA;
 
 /*
  * This is format of direct (non-intent) MDS_GETATTR_NAME request.
index 7bad122..3c61d10 100644 (file)
@@ -71,6 +71,12 @@ struct md_ucred {
        struct mdt_identity    *mu_identity;
 };
 
+/* there are at most 4 fid in one operation, see rename */
+struct md_capainfo {
+        const struct lu_fid    *mc_fid[4];
+        struct lustre_capa     *mc_capa[4];
+};
+
 /*
  * Implemented in mdd/mdd_handler.c.
  *
@@ -78,6 +84,7 @@ struct md_ucred {
  * related definitions.
  */
 struct md_ucred *md_ucred(const struct lu_env *env);
+struct md_capainfo *md_capainfo(const struct lu_env *env);
 
 /* metadata attributes */
 enum ma_valid {
@@ -231,7 +238,8 @@ struct md_device_operations {
         int (*mdo_statfs)(const struct lu_env *env, struct md_device *m,
                           struct kstatfs *sfs);
 
-        int (*mdo_init_capa_keys)(struct md_device *m,
+        int (*mdo_init_capa_ctxt)(const struct lu_env *env, struct md_device *m,
+                                  __u32 valid, unsigned long timeout, __u32 alg,
                                   struct lustre_capa_key *keys);
 
         int (*mdo_update_capa_key)(const struct lu_env *env,
index 6a93eda..2ec7f9b 100644 (file)
@@ -97,8 +97,6 @@ extern int obd_race_state;
 #define OBD_FAIL_MDS_WRITEPAGE_PACK      0x136
 #define OBD_FAIL_MDS_IS_SUBDIR_NET       0x137
 #define OBD_FAIL_MDS_IS_SUBDIR_PACK      0x138
-#define OBD_FAIL_MDS_RENEW_CAPA_NET      0x139
-#define OBD_FAIL_MDS_RENEW_CAPA_PACK     0x13a
 
 #define OBD_FAIL_OST                     0x200
 #define OBD_FAIL_OST_CONNECT_NET         0x201
index ed0e62e..8536d9e 100644 (file)
@@ -52,7 +52,6 @@ void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
                           struct lustre_handle *fh)
 {
         op_data->fid1 = ll_i2info(inode)->lli_fid;
-        op_data->mod_capa1 = ll_i2mdscapa(inode);
         op_data->attr.ia_mode = inode->i_mode;
         op_data->attr.ia_atime = inode->i_atime;
         op_data->attr.ia_mtime = inode->i_mtime;
index 1957886..552012d 100644 (file)
@@ -292,7 +292,7 @@ static struct obd_capa *do_lookup_oss_capa(struct inode *inode, int opc)
         list_for_each_entry(ocapa, &lli->lli_oss_capas, u.cli.lli_list) {
                 if (!obd_capa_is_valid(ocapa))
                         continue;
-                if ((capa_opc(&ocapa->c_capa) & opc) == opc)
+                if ((capa_opc(&ocapa->c_capa) & opc) != opc)
                         continue;
 
                 LASSERT(lu_fid_eq(capa_fid(&ocapa->c_capa),
@@ -315,6 +315,7 @@ struct obd_capa *ll_lookup_oss_capa(struct inode *inode, __u64 opc)
         if ((ll_i2sbi(inode)->ll_flags & LL_SBI_OSS_CAPA) == 0)
                 return NULL;
         ENTRY;
+
         LASSERT(opc == CAPA_OPC_OSS_WRITE ||
                 opc == (CAPA_OPC_OSS_WRITE | CAPA_OPC_OSS_READ) ||
                 opc == CAPA_OPC_OSS_TRUNC);
@@ -357,11 +358,11 @@ struct obd_capa *ll_i2mdscapa(struct inode *inode)
 {
         struct obd_capa *ocapa;
         struct ll_inode_info *lli = ll_i2info(inode);
-        ENTRY;
 
         LASSERT(inode);
         if ((ll_i2sbi(inode)->ll_flags & LL_SBI_MDS_CAPA) == 0)
-                RETURN(NULL);
+                return NULL;
+        ENTRY;
 
         spin_lock(&capa_lock);
         ocapa = capa_get(lli->lli_mds_capa);
@@ -374,8 +375,8 @@ struct obd_capa *ll_i2mdscapa(struct inode *inode)
 
         if (!ocapa && atomic_read(&ll_capa_debug)) {
                 CDEBUG(S_ISREG(inode->i_mode) || S_ISDIR(inode->i_mode) ?
-                       D_ERROR : D_SEC, "no MDS capa for (ino %lu)\n",
-                       inode->i_ino);
+                       D_ERROR : D_SEC, "no MDS capability for fid "DFID"\n",
+                       PFID(ll_inode2fid(inode)));
                 if (inode_have_md_lock(inode, MDS_INODELOCK_LOOKUP))
                         LBUG();
                 atomic_set(&ll_capa_debug, 0);
@@ -573,8 +574,8 @@ void ll_oss_capa_open(struct inode *inode, struct file *file)
         ocapa = do_lookup_oss_capa(inode, opc);
         if (!ocapa) {
                 if (atomic_read(&ll_capa_debug)) {
-                        CDEBUG(D_ERROR, "no capa for (uid %u op %d ino %lu)\n",
-                               (unsigned)current->uid, opc, inode->i_ino);
+                        CDEBUG(D_ERROR, "no opc %x capability for fid "DFID"\n",
+                               opc, PFID(ll_inode2fid(inode)));
                         atomic_set(&ll_capa_debug, 0);
                 }
                 spin_unlock(&capa_lock);
index 7b2b8ff..9e20768 100644 (file)
@@ -258,7 +258,8 @@ static int client_common_fill_super(struct super_block *sb,
                 sb->s_flags |= MS_POSIXACL;
 #endif
                 sbi->ll_flags |= LL_SBI_ACL;
-        } else {
+        } else if (sbi->ll_flags & LL_SBI_ACL) {
+                LCONSOLE_INFO("client wants to enable acl, but mdt not!\n");
                 sbi->ll_flags &= ~LL_SBI_ACL;
         }
 
@@ -284,12 +285,12 @@ static int client_common_fill_super(struct super_block *sb,
         }
 
         if (data->ocd_connect_flags & OBD_CONNECT_MDS_CAPA) {
-                CDEBUG(D_SEC, "client enabled MDS capability!\n");
+                LCONSOLE_INFO("client enabled MDS capability!\n");
                 sbi->ll_flags |= LL_SBI_MDS_CAPA;
         }
 
         if (data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA) {
-                CDEBUG(D_SEC, "client enabled OSS capability!\n");
+                LCONSOLE_INFO("client enabled OSS capability!\n");
                 sbi->ll_flags |= LL_SBI_OSS_CAPA;
         }
 
@@ -778,6 +779,7 @@ void ll_lli_init(struct ll_inode_info *lli)
         lli->lli_open_fd_read_count = lli->lli_open_fd_write_count = 0;
         lli->lli_open_fd_exec_count = 0;
         INIT_LIST_HEAD(&lli->lli_dead_list);
+        INIT_LIST_HEAD(&lli->lli_oss_capas);
         sema_init(&lli->lli_rmtperm_sem, 1);
 }
 
@@ -1248,6 +1250,8 @@ void ll_clear_inode(struct inode *inode)
         list_del_init(&lli->lli_dead_list);
         spin_unlock(&sbi->ll_deathrow_lock);
 
+        ll_clear_inode_capas(inode);
+
         EXIT;
 }
 
index 1e6617a..b470c2b 100644 (file)
@@ -2502,7 +2502,7 @@ static int lmv_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
         RETURN(rc);
 }
 
-static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *ocapa,
+static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
                           renew_capa_cb_t cb)
 {
         struct obd_device *obd = exp->exp_obd;
@@ -2515,11 +2515,11 @@ static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *ocapa,
         if (rc)
                 RETURN(rc);
 
-        tgt_exp = lmv_get_export(lmv, &ocapa->c_capa.lc_fid);
+        tgt_exp = lmv_get_export(lmv, &oc->c_capa.lc_fid);
         if (IS_ERR(tgt_exp))
                 RETURN(PTR_ERR(tgt_exp));
 
-        rc = md_renew_capa(tgt_exp, ocapa, cb);
+        rc = md_renew_capa(tgt_exp, oc, cb);
         RETURN(rc);
 }
 
index a118851..6f98ef9 100644 (file)
@@ -1612,26 +1612,61 @@ int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
         RETURN(0);
 }
 
+static int mdc_interpret_renew_capa(struct ptlrpc_request *req, void *unused,
+                                    int status)
+{
+        struct obd_capa *oc = req->rq_async_args.pointer_arg[0];
+        renew_capa_cb_t *cb = req->rq_async_args.pointer_arg[1];
+        struct mds_body *body = NULL;
+        struct lustre_capa *capa;
+        ENTRY;
+
+        if (status)
+                DEBUG_CAPA(D_ERROR, &oc->c_capa, "renew failed: %d for",
+                           status);
+
+        body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
+                                  lustre_swab_mdt_body);
+        if (body == NULL)
+                GOTO(out, capa = ERR_PTR(-EFAULT));
+
+        if (body->flags)
+                GOTO(out, capa = ERR_PTR((long)body->flags));
+
+        if ((body->valid & OBD_MD_FLOSSCAPA) == 0)
+                GOTO(out, capa = ERR_PTR(-EFAULT));
+
+        capa = lustre_unpack_capa(req->rq_repmsg, REPLY_REC_OFF);
+        if (!capa)
+                capa = ERR_PTR(-EFAULT);
+
+        EXIT;
+out:
+        (*cb)(oc, capa);
+        return 0;
+}
+
 static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc,
                           renew_capa_cb_t cb)
 {
         struct ptlrpc_request *req;
-        int size[2] = { sizeof(struct ptlrpc_body),
+        int size[5] = { sizeof(struct ptlrpc_body),
+                        sizeof(struct mdt_body),
                         sizeof(struct lustre_capa) };
-        int repsize[3] = { sizeof(struct ptlrpc_body),
-                           sizeof(struct mdt_body),
-                           sizeof(struct lustre_capa) };
         ENTRY;
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              MDS_RENEW_CAPA, 2, size, NULL);
+                              MDS_GETATTR, 3, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
-        mdc_pack_capa(req, REQ_REC_OFF, oc);
+        mdc_pack_req_body(req, REQ_REC_OFF, OBD_MD_FLOSSCAPA,
+                          &oc->c_capa.lc_fid, oc, 0, 0);
 
-        ptlrpc_req_set_repsize(req, 3, repsize);
-        req->rq_interpret_reply = cb;
+        ptlrpc_req_set_repsize(req, 5, size);
+        req->rq_async_args.pointer_arg[0] = oc;
+        req->rq_async_args.pointer_arg[1] = cb;
+        req->rq_interpret_reply = mdc_interpret_renew_capa;
         ptlrpcd_add_req(req);
 
         RETURN(0);
index 4866110..b541f3b 100644 (file)
@@ -469,7 +469,7 @@ struct mdd_object *mdd_object_find(const struct lu_env *env,
         struct mdd_object *m;
         ENTRY;
 
-        o = lu_object_find(env, mdd2lu_dev(d)->ld_site, f, BYPASS_CAPA);
+        o = lu_object_find(env, mdd2lu_dev(d)->ld_site, f);
         if (IS_ERR(o))
                 m = (struct mdd_object *)o;
         else {
@@ -525,12 +525,12 @@ static int mdd_may_create(const struct lu_env *env,
         RETURN(rc);
 }
 
-static inline int __mdd_la_get(const struct lu_env *env,
-                               struct mdd_object *obj, struct lu_attr *la)
+static inline int __mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
+                               struct lu_attr *la, struct lustre_capa *capa)
 {
-        struct dt_object  *next = mdd_object_child(obj);
+        struct dt_object *next = mdd_object_child(obj);
         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
-        return next->do_ops->do_attr_get(env, next, la);
+        return next->do_ops->do_attr_get(env, next, la, capa);
 }
 
 static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
@@ -551,7 +551,7 @@ static int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
 
         ENTRY;
         mdd_read_lock(env, obj);
-        rc = __mdd_la_get(env, obj, la);
+        rc = __mdd_la_get(env, obj, la, BYPASS_CAPA);
         mdd_read_unlock(env, obj);
         if (rc == 0)
                 mdd_flags_xlate(obj, la->la_flags);
@@ -584,13 +584,13 @@ static inline int mdd_is_sticky(const struct lu_env *env,
         struct md_ucred *uc = md_ucred(env);
         int rc;
 
-        rc = __mdd_la_get(env, cobj, tmp_la);
+        rc = __mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
         if (rc) {
                 return rc;
         } else if (tmp_la->la_uid == uc->mu_fsuid) {
                 return 0;
         } else {
-                rc = __mdd_la_get(env, pobj, tmp_la);
+                rc = __mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
                 if (rc)
                         return rc;
                 else if (!(tmp_la->la_mode & S_ISVTX))
@@ -652,7 +652,8 @@ static int __mdd_iattr_get(const struct lu_env *env,
         int rc = 0;
         ENTRY;
 
-        rc = __mdd_la_get(env, mdd_obj, &ma->ma_attr);
+        rc = __mdd_la_get(env, mdd_obj, &ma->ma_attr,
+                          mdd_object_capa(env, mdd_obj));
         if (rc == 0)
                 ma->ma_valid = MA_INODE;
         RETURN(rc);
@@ -756,7 +757,8 @@ static int mdd_xattr_get(const struct lu_env *env,
 
         next = mdd_object_child(mdd_obj);
         mdd_read_lock(env, mdd_obj);
-        rc = next->do_ops->do_xattr_get(env, next, buf, name);
+        rc = next->do_ops->do_xattr_get(env, next, buf, name,
+                                        mdd_object_capa(env, mdd_obj));
         mdd_read_unlock(env, mdd_obj);
 
         RETURN(rc);
@@ -779,7 +781,8 @@ static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
 
         next = mdd_object_child(mdd_obj);
         mdd_read_lock(env, mdd_obj);
-        rc = next->do_body_ops->dbo_read(env, next, buf, &pos);
+        rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
+                                         mdd_object_capa(env, mdd_obj));
         mdd_read_unlock(env, mdd_obj);
         RETURN(rc);
 }
@@ -797,7 +800,8 @@ static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
 
         next = mdd_object_child(mdd_obj);
         mdd_read_lock(env, mdd_obj);
-        rc = next->do_ops->do_xattr_list(env, next, buf);
+        rc = next->do_ops->do_xattr_list(env, next, buf,
+                                         mdd_object_capa(env, mdd_obj));
         mdd_read_unlock(env, mdd_obj);
 
         RETURN(rc);
@@ -1077,7 +1081,8 @@ int mdd_attr_set_internal(const struct lu_env *env, struct mdd_object *o,
 
         LASSERT(lu_object_exists(mdd2lu_obj(o)));
         next = mdd_object_child(o);
-        return next->do_ops->do_attr_set(env, next, attr, handle);
+        return next->do_ops->do_attr_set(env, next, attr, handle,
+                                         mdd_object_capa(env, o));
 }
 
 int mdd_attr_set_internal_locked(const struct lu_env *env,
@@ -1097,16 +1102,17 @@ static int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *o,
                            int fl, struct thandle *handle)
 {
         struct dt_object *next;
+        struct lustre_capa *capa = mdd_object_capa(env, o);
         int rc = 0;
         ENTRY;
 
         LASSERT(lu_object_exists(mdd2lu_obj(o)));
         next = mdd_object_child(o);
         if (buf->lb_buf && buf->lb_len > 0) {
-                rc = next->do_ops->do_xattr_set(env, next, buf, name,
-                                                0, handle);
+                rc = next->do_ops->do_xattr_set(env, next, buf, name, 0, handle,
+                                                capa);
         } else if (buf->lb_buf == NULL && buf->lb_len == 0) {
-                rc = next->do_ops->do_xattr_del(env, next, name, handle);
+                rc = next->do_ops->do_xattr_del(env, next, name, handle, capa);
         }
         RETURN(rc);
 }
@@ -1138,7 +1144,7 @@ int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
                 RETURN(-EPERM);
 
-        rc = __mdd_la_get(env, obj, tmp_la);
+        rc = __mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
         if (rc)
                 RETURN(rc);
 
@@ -1396,7 +1402,7 @@ static int mdd_xattr_sanity_check(const struct lu_env *env,
                 RETURN(-EPERM);
 
         mdd_read_lock(env, obj);
-        rc = __mdd_la_get(env, obj, tmp_la);
+        rc = __mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
         mdd_read_unlock(env, obj);
         if (rc)
                 RETURN(rc);
@@ -1452,7 +1458,8 @@ static int __mdd_xattr_del(const struct lu_env *env,struct mdd_device *mdd,
 
         LASSERT(lu_object_exists(mdd2lu_obj(obj)));
         next = mdd_object_child(obj);
-        return next->do_ops->do_xattr_del(env, next, name, handle);
+        return next->do_ops->do_xattr_del(env, next, name, handle,
+                                          mdd_object_capa(env, obj));
 }
 
 int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
@@ -1485,7 +1492,8 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
 static int __mdd_index_insert_only(const struct lu_env *env,
                                    struct mdd_object *pobj,
                                    const struct lu_fid *lf,
-                                   const char *name, struct thandle *th)
+                                   const char *name, struct thandle *th,
+                                   struct lustre_capa *capa)
 {
         int rc;
         struct dt_object *next = mdd_object_child(pobj);
@@ -1494,7 +1502,7 @@ static int __mdd_index_insert_only(const struct lu_env *env,
         if (dt_try_as_dir(env, next))
                 rc = next->do_index_ops->dio_insert(env, next,
                                          (struct dt_rec *)lf,
-                                         (struct dt_key *)name, th);
+                                         (struct dt_key *)name, th, capa);
         else
                 rc = -ENOTDIR;
         RETURN(rc);
@@ -1503,7 +1511,8 @@ static int __mdd_index_insert_only(const struct lu_env *env,
 /* insert new index, add reference if isdir, update times */
 static int __mdd_index_insert(const struct lu_env *env,
                              struct mdd_object *pobj, const struct lu_fid *lf,
-                             const char *name, int isdir, struct thandle *th)
+                             const char *name, int isdir, struct thandle *th,
+                             struct lustre_capa *capa)
 {
         int rc;
         struct dt_object *next = mdd_object_child(pobj);
@@ -1517,7 +1526,7 @@ static int __mdd_index_insert(const struct lu_env *env,
                 rc = next->do_index_ops->dio_insert(env, next,
                                                     (struct dt_rec *)lf,
                                                     (struct dt_key *)name,
-                                                    th);
+                                                    th, capa);
         else
                 rc = -ENOTDIR;
 
@@ -1536,7 +1545,8 @@ static int __mdd_index_insert(const struct lu_env *env,
 
 static int __mdd_index_delete(const struct lu_env *env,
                               struct mdd_object *pobj, const char *name,
-                              int is_dir, struct thandle *handle)
+                              int is_dir, struct thandle *handle,
+                              struct lustre_capa *capa)
 {
         int rc;
         struct dt_object *next = mdd_object_child(pobj);
@@ -1545,7 +1555,7 @@ static int __mdd_index_delete(const struct lu_env *env,
         if (dt_try_as_dir(env, next)) {
                 rc = next->do_index_ops->dio_delete(env, next,
                                                     (struct dt_key *)name,
-                                                    handle);
+                                                    handle, capa);
                 if (rc == 0 && is_dir)
                         __mdd_ref_del(env, pobj, handle);
         } else
@@ -1599,7 +1609,8 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
                 GOTO(out, rc);
 
         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
-                                     name, handle);
+                                     name, handle,
+                                     mdd_object_capa(env, mdd_tobj));
         if (rc == 0)
                 __mdd_ref_add(env, mdd_sobj, handle);
 
@@ -1753,7 +1764,8 @@ static int mdd_unlink(const struct lu_env *env,
                 GOTO(cleanup, rc);
 
         is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
-        rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
+        rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
+                                mdd_object_capa(env, mdd_pobj));
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -2008,17 +2020,20 @@ static int mdd_rename(const struct lu_env *env,
         if (rc)
                 GOTO(cleanup, rc);
 
-        rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle);
+        rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
+                                mdd_object_capa(env, mdd_spobj));
         if (rc)
                 GOTO(cleanup, rc);
 
         /* tobj can be remote one,
          * so we do index_delete unconditionally and -ENOENT is allowed */
-        rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle);
+        rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
+                                mdd_object_capa(env, mdd_tpobj));
         if (rc != 0 && rc != -ENOENT)
                 GOTO(cleanup, rc);
 
-        rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle);
+        rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
+                                mdd_object_capa(env, mdd_tpobj));
         if (rc)
                 GOTO(cleanup, rc);
         
@@ -2090,7 +2105,8 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
                 RETURN(rc);
 
         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir))
-                rc = dir->do_index_ops->dio_lookup(env, dir, rec, key);
+                rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
+                                                 mdd_object_capa(env, mdd_obj));
         else
                 rc = -ENOTDIR;
 
@@ -2170,15 +2186,16 @@ static int __mdd_object_initialize(const struct lu_env *env,
                 /* add . and .. for newly created dir */
                 __mdd_ref_add(env, child, handle);
                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
-                                             dot, handle);
+                                             dot, handle, BYPASS_CAPA);
                 if (rc == 0) {
                         rc = __mdd_index_insert_only(env, child, pfid,
-                                                     dotdot, handle);
+                                                     dotdot, handle,
+                                                     BYPASS_CAPA);
                         if (rc != 0) {
                                 int rc2;
 
-                                rc2 = __mdd_index_delete(env,
-                                                         child, dot, 0, handle);
+                                rc2 = __mdd_index_delete(env, child, dot, 0,
+                                                         handle, BYPASS_CAPA);
                                 if (rc2 != 0)
                                         CERROR("Failure to cleanup after dotdot"
                                                " creation: %d (%d)\n", rc2, rc);
@@ -2297,7 +2314,7 @@ static int mdd_create_sanity_check(const struct lu_env *env,
 
         /* sgid check */
         mdd_read_lock(env, obj);
-        rc = __mdd_la_get(env, obj, la);
+        rc = __mdd_la_get(env, obj, la, BYPASS_CAPA);
         mdd_read_unlock(env, obj);
         if (rc != 0)
                 RETURN(rc);
@@ -2429,7 +2446,8 @@ static int mdd_create(const struct lu_env *env,
                 GOTO(cleanup, rc);
 
         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
-                                name, S_ISDIR(attr->la_mode), handle);
+                                name, S_ISDIR(attr->la_mode), handle,
+                                mdd_object_capa(env, mdd_pobj));
 
         if (rc)
                 GOTO(cleanup, rc);
@@ -2453,13 +2471,12 @@ static int mdd_create(const struct lu_env *env,
                 struct dt_object *dt = mdd_object_child(son);
                 const char *target_name = spec->u.sp_symname;
                 int sym_len = strlen(target_name);
+                const struct lu_buf *buf;
                 loff_t pos = 0;
 
-                rc = dt->do_body_ops->dbo_write(env, dt,
-                                                mdd_buf_get_const(env,
-                                                                  target_name,
-                                                                  sym_len),
-                                                &pos, handle);
+                buf = mdd_buf_get_const(env, target_name, sym_len);
+                rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
+                                                mdd_object_capa(env, son));
                 if (rc == sym_len)
                         rc = 0;
                 else
@@ -2481,7 +2498,7 @@ cleanup:
                 if (inserted) {
                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
                                                  S_ISDIR(attr->la_mode),
-                                                 handle);
+                                                 handle, BYPASS_CAPA);
                         if (rc2)
                                 CERROR("error can not cleanup destroy %d\n",
                                        rc2);
@@ -2620,7 +2637,8 @@ static int mdd_name_insert(const struct lu_env *env,
         if (rc)
                 GOTO(out_unlock, rc);
 
-        rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle);
+        rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle,
+                                BYPASS_CAPA);
 
 out_unlock:
         mdd_write_unlock(env, mdd_obj);
@@ -2677,7 +2695,8 @@ static int mdd_name_remove(const struct lu_env *env,
         if (rc)
                 GOTO(out_unlock, rc);
 
-        rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle);
+        rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle,
+                                BYPASS_CAPA);
 
 out_unlock:
         mdd_write_unlock(env, mdd_obj);
@@ -2746,11 +2765,12 @@ static int mdd_rename_tgt(const struct lu_env *env,
 
         /* if rename_tgt is called then we should just re-insert name with
          * correct fid, no need to dec/inc parent nlink if obj is dir */
-        rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle);
+        rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
         if (rc)
                 GOTO(cleanup, rc);
 
-        rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle);
+        rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
+                                     BYPASS_CAPA);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -2809,15 +2829,21 @@ static int mdd_maxsize_get(const struct lu_env *env, struct md_device *m,
         RETURN(0);
 }
 
-static int mdd_init_capa_keys(struct md_device *m,
+static int mdd_init_capa_ctxt(const struct lu_env *env, struct md_device *m,
+                              __u32 valid, unsigned long timeout, __u32 alg,
                               struct lustre_capa_key *keys)
 {
        struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
         struct mds_obd    *mds = &mdd2obd_dev(mdd)->u.mds;
+        int rc;
         ENTRY;
 
-        mds->mds_capa_keys = keys;
-        RETURN(0);
+        if (valid & CAPA_CTX_KEYS)
+                mds->mds_capa_keys = keys;
+
+        rc = mdd_child_ops(mdd)->dt_init_capa_ctxt(env, mdd->mdd_child, valid,
+                                                   timeout, alg, keys);
+        RETURN(rc);
 }
 
 static int mdd_update_capa_key(const struct lu_env *env,
@@ -2918,7 +2944,7 @@ static int mdd_open_sanity_check(const struct lu_env *env,
         if (mdd_is_dead_obj(obj))
                 RETURN(-ENOENT);
 
-        rc = __mdd_la_get(env, obj, tmp_la);
+        rc = __mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
         if (rc)
                RETURN(rc);
 
@@ -3043,7 +3069,8 @@ static int mdd_readpage(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 GOTO(out_unlock, rc);
 
-        rc = next->do_ops->do_readpage(env, next, rdpg);
+        rc = next->do_ops->do_readpage(env, next, rdpg,
+                                       mdd_object_capa(env, mdd_obj));
 
 out_unlock:
         mdd_read_unlock(env, mdd_obj);
@@ -3140,7 +3167,8 @@ static int mdd_check_acl(const struct lu_env *env, struct mdd_object *obj,
         buf->lb_buf = mdd_env_info(env)->mti_xattr_buf;
         buf->lb_len = sizeof(mdd_env_info(env)->mti_xattr_buf);
         rc = next->do_ops->do_xattr_get(env, next, buf,
-                                        XATTR_NAME_ACL_ACCESS);
+                                        XATTR_NAME_ACL_ACCESS,
+                                        mdd_object_capa(env, obj));
         if (rc <= 0)
                 RETURN(rc ? : -EACCES);
 
@@ -3172,7 +3200,7 @@ static int mdd_exec_permission_lite(const struct lu_env *env,
         if (uc->mu_valid == UCRED_INVALID)
                 RETURN(-EACCES);
 
-        rc = __mdd_la_get(env, obj, la);
+        rc = __mdd_la_get(env, obj, la, BYPASS_CAPA);
         if (rc)
                 RETURN(rc);
 
@@ -3224,7 +3252,7 @@ static int __mdd_permission_internal(const struct lu_env *env,
                 RETURN(-EACCES);
 
         if (getattr) {
-                rc = __mdd_la_get(env, obj, la);
+                rc = __mdd_la_get(env, obj, la, BYPASS_CAPA);
                 if (rc)
                         RETURN(rc);
         }
@@ -3302,47 +3330,24 @@ static int mdd_permission(const struct lu_env *env, struct md_object *obj,
 static int mdd_capa_get(const struct lu_env *env, struct md_object *obj,
                         struct lustre_capa *capa)
 {
+        struct dt_object *next;
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
-        struct mdd_device *mdd = mdo2mdd(obj);
-        struct lu_site *ls = mdd->mdd_md_dev.md_lu_dev.ld_site;
-        struct lustre_capa_key *key = &ls->ls_capa_keys[1];
-        struct obd_capa *ocapa;
         int rc;
         ENTRY;
 
         LASSERT(lu_object_exists(mdd2lu_obj(mdd_obj)));
+        next = mdd_object_child(mdd_obj);
 
-        capa->lc_fid = *mdo2fid(mdd_obj);
-        if (ls->ls_capa_timeout < CAPA_TIMEOUT)
-                capa->lc_flags |= CAPA_FL_SHORT_EXPIRY;
-        if (lu_fid_eq(&capa->lc_fid, &mdd->mdd_root_fid))
-                capa->lc_flags |= CAPA_FL_ROOT;
-        capa->lc_flags = ls->ls_capa_alg << 24;
-
-        /* TODO: get right permission here after remote uid landing */
-        ocapa = capa_lookup(capa);
-        if (ocapa) {
-                LASSERT(!capa_is_expired(ocapa));
-                capa_cpy(capa, ocapa);
-                capa_put(ocapa);
-                RETURN(0);
-        }
-
-        capa->lc_keyid = key->lk_keyid;
-        capa->lc_expiry = CURRENT_SECONDS + ls->ls_capa_timeout;
-        rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
-        if (rc)
-                RETURN(rc);
+        rc = next->do_ops->do_capa_get(env, next, capa);
 
-        capa_add(capa);
-        RETURN(0);
+        RETURN(rc);
 }
 
 struct md_device_operations mdd_ops = {
         .mdo_statfs         = mdd_statfs,
         .mdo_root_get       = mdd_root_get,
         .mdo_maxsize_get    = mdd_maxsize_get,
-        .mdo_init_capa_keys = mdd_init_capa_keys,
+        .mdo_init_capa_ctxt = mdd_init_capa_ctxt,
         .mdo_update_capa_key= mdd_update_capa_key,
 };
 
@@ -3442,6 +3447,39 @@ struct md_ucred *md_ucred(const struct lu_env *env)
 }
 EXPORT_SYMBOL(md_ucred);
 
+static void *mdd_capainfo_key_init(const struct lu_context *ctx,
+                                   struct lu_context_key *key)
+{
+        struct md_capainfo *ci;
+
+        OBD_ALLOC_PTR(ci);
+        if (ci == NULL)
+                ci = ERR_PTR(-ENOMEM);
+        return ci;
+}
+
+static void mdd_capainfo_key_fini(const struct lu_context *ctx,
+                                  struct lu_context_key *key, void *data)
+{
+        struct md_capainfo *ci = data;
+        OBD_FREE_PTR(ci);
+}
+
+struct lu_context_key mdd_capainfo_key = {
+        .lct_tags = LCT_SESSION,
+        .lct_init = mdd_capainfo_key_init,
+        .lct_fini = mdd_capainfo_key_fini
+};
+
+struct md_capainfo *md_capainfo(const struct lu_env *env)
+{
+        /* NB, in mdt_init0 */
+        if (env->le_ses == NULL)
+                return NULL;
+        return lu_context_key_get(env->le_ses, &mdd_capainfo_key);
+}
+EXPORT_SYMBOL(md_capainfo);
+
 static int mdd_type_init(struct lu_device_type *t)
 {
         int result;
@@ -3449,11 +3487,14 @@ static int mdd_type_init(struct lu_device_type *t)
         result = lu_context_key_register(&mdd_thread_key);
         if (result == 0)
                 result = lu_context_key_register(&mdd_ucred_key);
+        if (result == 0)
+                result = lu_context_key_register(&mdd_capainfo_key);
         return result;
 }
 
 static void mdd_type_fini(struct lu_device_type *t)
 {
+        lu_context_key_degister(&mdd_capainfo_key);
         lu_context_key_degister(&mdd_ucred_key);
         lu_context_key_degister(&mdd_thread_key);
 }
index ab22611..bc112fd 100644 (file)
@@ -225,4 +225,20 @@ static inline int mdd_lov_cookiesize(const struct lu_env *env,
         return obd->u.mds.mds_max_cookiesize;
 }
 
+static inline struct lustre_capa *mdd_object_capa(const struct lu_env *env,
+                                                  const struct mdd_object *obj)
+{
+        struct md_capainfo *ci = md_capainfo(env);
+        const struct lu_fid *fid = mdo2fid(obj);
+        int i;
+
+        /* NB: in mdt_init0 */
+        if (!ci)
+                return BYPASS_CAPA;
+        for (i = 0; i < 4; i++)
+                if (ci->mc_fid[i] && lu_fid_eq(ci->mc_fid[i], fid))
+                        return ci->mc_capa[i];
+        return NULL;
+}
+
 #endif
index 85c040e..00a3043 100644 (file)
@@ -193,7 +193,8 @@ int mdd_get_md(const struct lu_env *env, struct mdd_object *obj,
 
         next = mdd_object_child(obj);
         rc = next->do_ops->do_xattr_get(env, next,
-                                        mdd_buf_get(env, md, *md_size), name);
+                                        mdd_buf_get(env, md, *md_size), name,
+                                        mdd_object_capa(env, obj));
         /*
          * XXX: handling of -ENODATA, the right way is to have ->do_md_get()
          * exported by dt layer.
@@ -569,7 +570,8 @@ int mdd_lov_setattr_async(const struct lu_env *env, struct mdd_object *obj,
         int rc = 0;
         ENTRY;
 
-        rc = next->do_ops->do_attr_get(env, next, tmp_la);
+        rc = next->do_ops->do_attr_get(env, next, tmp_la,
+                                       mdd_object_capa(env, obj));
         if (rc)
                 RETURN(rc);
 
index 64e3bbc..dfa05ab 100644 (file)
@@ -76,7 +76,8 @@ static int orph_index_insert(const struct lu_env *env,
         ENTRY;
 
         rc = dor->do_index_ops->dio_insert(env, dor, (struct dt_rec *)offset,
-                                           (struct dt_key *)key, th);
+                                           (struct dt_key *)key, th,
+                                           BYPASS_CAPA);
         RETURN(rc);
 }
 
@@ -91,7 +92,8 @@ static int orph_index_delete(const struct lu_env *env,
         ENTRY;
         LASSERT(dor);
         rc = dor->do_index_ops->dio_delete(env, dor,
-                                           (struct dt_key *)key, th);
+                                           (struct dt_key *)key, th,
+                                           BYPASS_CAPA);
         RETURN(rc);
 
 }
index 89f169b..384db5a 100644 (file)
@@ -147,7 +147,7 @@ int mdt_capa_keys_init(const struct lu_env *env, struct mdt_device *mdt)
 
         obj = mdt->mdt_ck_obj;
         obj->do_ops->do_read_lock(env, obj);
-        rc = obj->do_ops->do_attr_get(env, mdt->mdt_ck_obj, la);
+        rc = obj->do_ops->do_attr_get(env, mdt->mdt_ck_obj, la, BYPASS_CAPA);
         obj->do_ops->do_read_unlock(env, obj);
         if (rc)
                 RETURN(rc);
@@ -194,7 +194,7 @@ static int mdt_ck_thread_main(void *args)
 {
         struct mdt_device      *mdt = args;
         struct ptlrpc_thread   *thread = &mdt->mdt_ck_thread;
-        struct lustre_capa_key *tmp, *key = red_capa_key(mdt);
+        struct lustre_capa_key *tmp, *key = &mdt->mdt_capa_keys[1];
         struct lu_env           env;
         struct mdt_thread_info *info;
         struct md_device       *next;
@@ -295,3 +295,5 @@ void mdt_ck_thread_stop(struct mdt_device *mdt)
         cfs_waitq_signal(&thread->t_ctl_waitq);
         wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
 }
+
+
index 4c36945..eaa427f 100644 (file)
@@ -157,19 +157,36 @@ void mdt_set_disposition(struct mdt_thread_info *info,
 static int mdt_getstatus(struct mdt_thread_info *info)
 {
         struct md_device *next  = info->mti_mdt->mdt_child;
-        int               rc;
         struct mdt_body  *body;
+        int               rc;
 
         ENTRY;
 
-        if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
-                rc = -ENOMEM;
-        } else {
-                body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
-                rc = next->md_ops->mdo_root_get(info->mti_env, next,
-                                                &body->fid1);
-                if (rc == 0)
-                        body->valid |= OBD_MD_FLID;
+        if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
+                RETURN(-ENOMEM);
+
+        body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+        rc = next->md_ops->mdo_root_get(info->mti_env, next, &body->fid1);
+        if (rc == 0)
+                body->valid |= OBD_MD_FLID;
+
+        if (info->mti_mdt->mdt_opts.mo_mds_capa) {
+                struct mdt_object  *root;
+                struct lustre_capa *capa;
+
+                root = mdt_object_find(info->mti_env, info->mti_mdt, &body->fid1);
+                if (IS_ERR(root))
+                        RETURN(PTR_ERR(root));
+
+                capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
+                LASSERT(capa);
+                capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
+
+                rc = mo_capa_get(info->mti_env, mdt_object_child(root), capa);
+                mdt_object_put(info->mti_env, root);
+                if (rc)
+                        RETURN(rc);
+                body->valid |= OBD_MD_FLMDSCAPA;
         }
 
         RETURN(rc);
@@ -390,10 +407,6 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
         if ((reqbody->valid & OBD_MD_FLMDSCAPA) && mdt->mdt_opts.mo_mds_capa) {
                 struct lustre_capa *capa;
 
-                spin_lock(&capa_lock);
-                info->mti_capa_key = *red_capa_key(mdt);
-                spin_unlock(&capa_lock);
-
                 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
                 LASSERT(capa);
                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
@@ -406,6 +419,32 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
         RETURN(rc);
 }
 
+static int mdt_renew_capa(struct mdt_thread_info *info)
+{
+        struct mdt_object *obj = info->mti_object;
+        struct mdt_body *body;
+        struct lustre_capa *capa, *c;
+        int rc;
+        ENTRY;
+
+        c = req_capsule_client_get(&info->mti_pill, &RMF_CAPA1);
+        LASSERT(c);
+
+        capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
+        LASSERT(capa);
+
+        *capa = *c;
+        rc = mo_capa_get(info->mti_env, mdt_object_child(obj), capa);
+
+        body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+        LASSERT(body);
+
+        body->valid |= OBD_MD_FLOSSCAPA;
+        body->flags = (__u32)rc;
+
+        RETURN(0);
+}
+
 static int mdt_getattr(struct mdt_thread_info *info)
 {
         struct mdt_object *obj = info->mti_object;
@@ -420,6 +459,11 @@ static int mdt_getattr(struct mdt_thread_info *info)
         if (reqbody == NULL)
                 GOTO(out, rc = -EFAULT);
 
+        if (reqbody->valid & OBD_MD_FLOSSCAPA) {
+                rc = mdt_renew_capa(info);
+                GOTO(out, rc);
+        }
+
         if (reqbody->valid & OBD_MD_FLRMTPERM) {
                 rc = mdt_init_ucred(info, reqbody);
                 if (rc)
@@ -541,6 +585,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 }
                 if (rc == 0) {
                         /* Finally, we can get attr for child. */
+                        mdt_set_capainfo(info, 0, mdt_object_fid(child),
+                                         BYPASS_CAPA);
                         rc = mdt_getattr_internal(info, child);
                         if (rc != 0)
                                 mdt_object_unlock(info, child, lhc, 1);
@@ -567,8 +613,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
          *step 3: find the child object by fid & lock it.
          *        regardless if it is local or remote.
          */
-        child = mdt_object_find(info->mti_env, info->mti_mdt, child_fid,
-                                BYPASS_CAPA);
+        child = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
         if (IS_ERR(child))
                 GOTO(out_parent, rc = PTR_ERR(child));
         if (is_resent) {
@@ -591,6 +636,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         }
 
         /* finally, we can get attr for child. */
+        mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
         rc = mdt_getattr_internal(info, child);
         if (rc != 0) {
                 mdt_object_unlock(info, child, lhc, 1);
@@ -1150,34 +1196,6 @@ static int mdt_quotactl_handle(struct mdt_thread_info *info)
         return -EOPNOTSUPP;
 }
 
-static int mdt_renew_capa(struct mdt_thread_info *info)
-{
-        struct mdt_device *mdt = info->mti_mdt;
-        struct mdt_object *obj = info->mti_object;
-        struct mdt_body *body;
-        struct lustre_capa *capa;
-        int rc;
-        ENTRY;
-
-        body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
-        LASSERT(body);
-
-        capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
-        LASSERT(capa);
-
-        spin_lock(&capa_lock);
-        info->mti_capa_key = *red_capa_key(mdt);
-        spin_unlock(&capa_lock);
-
-        *capa = obj->mot_header.loh_capa;
-        /* TODO: add capa check */
-        rc = mo_capa_get(info->mti_env, mdt_object_child(obj), capa);
-        if (rc)
-                RETURN(rc);
-
-        RETURN(rc);
-}
-
 /*
  * OBD PING and other handlers.
  */
@@ -1274,17 +1292,13 @@ static struct mdt_object *mdt_obj(struct lu_object *o)
 
 struct mdt_object *mdt_object_find(const struct lu_env *env,
                                    struct mdt_device *d,
-                                   const struct lu_fid *f,
-                                   struct lustre_capa *c)
+                                   const struct lu_fid *f)
 {
         struct lu_object *o;
         struct mdt_object *m;
         ENTRY;
 
-        if (!d->mdt_opts.mo_mds_capa)
-                c = BYPASS_CAPA;
-
-        o = lu_object_find(env, d->mdt_md_dev.md_lu_dev.ld_site, f, c);
+        o = lu_object_find(env, d->mdt_md_dev.md_lu_dev.ld_site, f);
         if (IS_ERR(o))
                 m = (struct mdt_object *)o;
         else
@@ -1352,12 +1366,11 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
                                         const struct lu_fid *f,
                                         struct mdt_lock_handle *lh,
-                                        __u64 ibits,
-                                        struct lustre_capa *capa)
+                                        __u64 ibits)
 {
         struct mdt_object *o;
 
-        o = mdt_object_find(info->mti_env, info->mti_mdt, f, capa);
+        o = mdt_object_find(info->mti_env, info->mti_mdt, f);
         if (!IS_ERR(o)) {
                 int rc;
 
@@ -1436,7 +1449,6 @@ static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
  */
 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
 {
-        struct lustre_capa       *capa = NULL;
         const struct mdt_body    *body;
         struct mdt_object        *obj;
         const struct lu_env      *env;
@@ -1461,13 +1473,11 @@ static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
          * instance MDS_IS_SUBDIR.
          */
         if (req_capsule_has_field(pill, &RMF_CAPA1, RCL_CLIENT) && 
-                req_capsule_field_present(pill, &RMF_CAPA1, RCL_CLIENT)) {
-                int len = req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT);
-                if (len == sizeof(struct lustre_capa)) 
-                        capa = req_capsule_client_get(pill, &RMF_CAPA1);
-        }
+            req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
+                mdt_set_capainfo(info, 0, &body->fid1,
+                                 req_capsule_client_get(pill, &RMF_CAPA1));
         
-        obj = mdt_object_find(env, info->mti_mdt, &body->fid1, capa);
+        obj = mdt_object_find(env, info->mti_mdt, &body->fid1);
         if (!IS_ERR(obj)) {
                 if ((flags & HABEO_CORPUS) &&
                     !lu_object_exists(&obj->mot_obj.mo_lu)) {
@@ -3089,6 +3099,7 @@ out:
 
 static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 {
+        struct md_device *next = m->mdt_child;
         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
         struct lu_site    *ls = d->ld_site;
 
@@ -3124,6 +3135,11 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
                 m->mdt_rootsquash_info = NULL;
         }
 
+        next->md_ops->mdo_init_capa_ctxt(env, next, CAPA_CTX_KEYS, 0, 0, NULL);
+        cleanup_capas(CAPA_SITE_SERVER);
+        del_timer(&m->mdt_ck_timer);
+        mdt_ck_thread_stop(m);
+
         /* finish the stack */
         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
 
@@ -3140,6 +3156,21 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 
 int mdt_postrecov(const struct lu_env *, struct mdt_device *);
 
+static int mdt_init_capa_ctxt(const struct lu_env *env, struct mdt_device *m)
+{
+        struct md_device *next = m->mdt_child;
+        __u32 valid = CAPA_CTX_TIMEOUT | CAPA_CTX_ALG | CAPA_CTX_KEYS;
+        int rc;
+
+        if (m->mdt_opts.mo_mds_capa)
+                valid |= CAPA_CTX_ON;
+        rc = next->md_ops->mdo_init_capa_ctxt(env, next, valid,
+                                              m->mdt_capa_timeout,
+                                              m->mdt_capa_alg,
+                                              m->mdt_capa_keys);
+        return rc;
+}
+
 static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
 {
@@ -3170,8 +3201,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         m->mdt_opts.mo_compat_resname = 0;
         m->mdt_opts.mo_mds_capa = 0;
         m->mdt_opts.mo_oss_capa = 0;
-        m->mdt_capa_alg = CAPA_HMAC_ALG_SHA1;
         m->mdt_capa_timeout = CAPA_TIMEOUT;
+        m->mdt_capa_alg = CAPA_HMAC_ALG_SHA1;
         m->mdt_ck_timeout = CAPA_KEY_TIMEOUT;
         obd->obd_replayable = 1;
         spin_lock_init(&m->mdt_client_bitmap_lock);
@@ -3254,10 +3285,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         m->mdt_ck_timer.data = (unsigned long)m;
         init_timer(&m->mdt_ck_timer);
 
-        s->ls_capa_keys = m->mdt_capa_keys;
-        s->ls_capa_timeout = m->mdt_capa_timeout;
-        s->ls_capa_alg = m->mdt_capa_alg;
-
         rc = mdt_start_ptlrpc_service(m);
         if (rc)
                 GOTO(err_capa, rc);
@@ -3271,6 +3298,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         if (obd->obd_recovering == 0)
                 mdt_postrecov(env, m);
 
+        mdt_init_capa_ctxt(env, m);
         RETURN(0);
 
 err_stop_service:
@@ -4062,8 +4090,7 @@ DEF_MDT_HNDL_F(0           |HABEO_REFERO, PIN,          mdt_pin),
 DEF_MDT_HNDL_0(0,                         SYNC,         mdt_sync),
 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, IS_SUBDIR,    mdt_is_subdir),
 DEF_MDT_HNDL_0(0,                         QUOTACHECK,   mdt_quotacheck_handle),
-DEF_MDT_HNDL_0(0,                         QUOTACTL,     mdt_quotactl_handle),
-DEF_MDT_HNDL_0(0           |HABEO_REFERO, RENEW_CAPA,   mdt_renew_capa)
+DEF_MDT_HNDL_0(0,                         QUOTACTL,     mdt_quotactl_handle)
 };
 
 #define DEF_OBD_HNDL(flags, name, fn)                   \
index 16bcc91..b8db60c 100644 (file)
@@ -170,11 +170,11 @@ struct mdt_device {
         /* root squash */
         struct rootsquash_info     *mdt_rootsquash_info;
 
-        /* capability */
-        __u32                      mdt_capa_alg;
+        /* capability keys */
         unsigned long              mdt_capa_timeout;
-        unsigned long              mdt_ck_timeout;
+        __u32                      mdt_capa_alg;
         struct dt_object          *mdt_ck_obj;
+        unsigned long              mdt_ck_timeout;
         unsigned long              mdt_ck_expiry;
         struct timer_list          mdt_ck_timer;
         struct ptlrpc_thread       mdt_ck_thread;
@@ -220,8 +220,6 @@ struct mdt_reint_record {
         int                  rr_logcookielen;
         const struct llog_cookie  *rr_logcookies;
         __u32                rr_flags;
-        struct lustre_capa  *rr_capa1;
-        struct lustre_capa  *rr_capa2;
 };
 
 enum mdt_reint_flag {
@@ -326,8 +324,8 @@ struct mdt_thread_info {
         struct mdt_client_data     mti_mcd;
         loff_t                     mti_off;
         struct txn_param           mti_txn_param;
-        struct lustre_capa_key     mti_capa_key;
         struct lu_buf              mti_buf;
+        struct lustre_capa_key     mti_capa_key;
 };
 /*
  * Info allocated per-transaction.
@@ -392,13 +390,11 @@ void mdt_object_unlock(struct mdt_thread_info *,
 
 struct mdt_object *mdt_object_find(const struct lu_env *,
                                    struct mdt_device *,
-                                   const struct lu_fid *,
-                                   struct lustre_capa *);
+                                   const struct lu_fid *);
 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *,
                                         const struct lu_fid *,
                                         struct mdt_lock_handle *,
-                                        __u64 ibits,
-                                        struct lustre_capa *);
+                                        __u64 ibits);
 void mdt_object_unlock_put(struct mdt_thread_info *,
                            struct mdt_object *,
                            struct mdt_lock_handle *,
@@ -507,6 +503,12 @@ int mdt_remote_perm_reverse_idmap(struct ptlrpc_request *,
 
 int mdt_fix_attr_ucred(struct mdt_thread_info *, __u32);
 
+static inline struct mdt_device *mdt_dev(struct lu_device *d)
+{
+//        LASSERT(lu_device_is_mdt(d));
+        return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
+}
+
 /* mdt/mdt_identity.c */
 #define MDT_IDENTITY_UPCALL_PATH        "/usr/sbin/l_getidentity"
 
@@ -577,29 +579,33 @@ do {                                                                         \
 
 struct md_ucred *mdt_ucred(const struct mdt_thread_info *info);
 
+static inline int is_identity_get_disabled(struct upcall_cache *cache)
+{
+        return cache ? (strcmp(cache->uc_upcall, "NONE") == 0) : 1;
+}
+
 /*
- * fid Capability
+ * Capability
  */
 int mdt_ck_thread_start(struct mdt_device *mdt);
 void mdt_ck_thread_stop(struct mdt_device *mdt);
 void mdt_ck_timer_callback(unsigned long castmeharder);
 int mdt_capa_keys_init(const struct lu_env *env, struct mdt_device *mdt);
 
-static inline struct mdt_device *mdt_dev(struct lu_device *d)
+static inline void mdt_set_capainfo(struct mdt_thread_info *info, int offset,
+                                    const struct lu_fid *fid,
+                                    struct lustre_capa *capa)
 {
-//        LASSERT(lu_device_is_mdt(d));
-        return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
-}
+        struct mdt_device *dev = info->mti_mdt;
+        struct md_capainfo *ci;
 
-static inline struct lustre_capa_key *red_capa_key(struct mdt_device *mdt)
-{
-        return &mdt->mdt_capa_keys[1];
-}
+        if (!dev->mdt_opts.mo_mds_capa)
+                return;
 
-static inline int is_identity_get_disabled(struct upcall_cache *cache)
-{
-        return cache ? (strcmp(cache->uc_upcall, "NONE") == 0) : 1;
+        ci = md_capainfo(info->mti_env);
+        LASSERT(ci);
+        ci->mc_fid[offset]  = fid;
+        ci->mc_capa[offset] = capa;
 }
-
 #endif /* __KERNEL__ */
 #endif /* _MDT_H */
index 0e56a58..f36ca46 100644 (file)
@@ -502,7 +502,7 @@ void mdt_shrink_reply(struct mdt_thread_info *info, int offset,
         lustre_shrink_reply(req, offset, acl_size, 1);
         offset += !!acl_size;
         if (mdscapa && !(body->valid & OBD_MD_FLMDSCAPA))
-                lustre_shrink_reply(req, offset, 0, 0);
+                lustre_shrink_reply(req, offset, 0, 1);
         offset += mdscapa;
         if (osscapa && !(body->valid & OBD_MD_FLOSSCAPA))
                 lustre_shrink_reply(req, offset, 0, 0);
@@ -635,7 +635,8 @@ static int mdt_setattr_unpack_rec(struct mdt_thread_info *info)
         ma->ma_valid = MA_INODE;
 
         if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
-                rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1);
+                mdt_set_capainfo(info, 0, rr->rr_fid1,
+                                 req_capsule_client_get(pill, &RMF_CAPA1));
 
         RETURN(0);
 }
@@ -732,8 +733,11 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
                          LA_CTIME | LA_MTIME | LA_ATIME;
         info->mti_spec.sp_cr_flags = rec->cr_flags;
 
-        if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
-                rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1);
+        if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT)) {
+                mdt_set_capainfo(info, 0, rr->rr_fid1,
+                                 req_capsule_client_get(pill, &RMF_CAPA1));
+                mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
+        }
 
         rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
         if (S_ISDIR(attr->la_mode)) {
@@ -797,9 +801,11 @@ static int mdt_link_unpack(struct mdt_thread_info *info)
         attr->la_valid = LA_UID | LA_GID | LA_CTIME | LA_MTIME;
 
         if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
-                rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1);
+                mdt_set_capainfo(info, 0, rr->rr_fid1,
+                                 req_capsule_client_get(pill, &RMF_CAPA1));
         if (req_capsule_get_size(pill, &RMF_CAPA2, RCL_CLIENT))
-                rr->rr_capa2 = req_capsule_client_get(pill, &RMF_CAPA2);
+                mdt_set_capainfo(info, 1, rr->rr_fid2,
+                                 req_capsule_client_get(pill, &RMF_CAPA2));
 
         rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
         if (rr->rr_name == NULL)
@@ -837,7 +843,8 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info)
         attr->la_valid = LA_UID | LA_GID | LA_CTIME | LA_MTIME | LA_MODE;
 
         if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
-                rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1);
+                mdt_set_capainfo(info, 0, rr->rr_fid1,
+                                 req_capsule_client_get(pill, &RMF_CAPA1));
 
         rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
         if (rr->rr_name == NULL)
@@ -876,9 +883,11 @@ static int mdt_rename_unpack(struct mdt_thread_info *info)
         attr->la_valid = LA_UID | LA_GID | LA_CTIME | LA_MTIME | LA_MODE;
 
         if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
-                rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1);
+                mdt_set_capainfo(info, 0, rr->rr_fid1,
+                                 req_capsule_client_get(pill, &RMF_CAPA1));
         if (req_capsule_get_size(pill, &RMF_CAPA2, RCL_CLIENT))
-                rr->rr_capa2 = req_capsule_client_get(pill, &RMF_CAPA2);
+                mdt_set_capainfo(info, 1, rr->rr_fid2,
+                                 req_capsule_client_get(pill, &RMF_CAPA2));
 
         rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
         rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT);
@@ -895,6 +904,7 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
         struct lu_attr          *attr = &info->mti_attr.ma_attr;
         struct req_capsule      *pill = &info->mti_pill;
         struct mdt_reint_record *rr   = &info->mti_rr;
+        struct ptlrpc_request   *req  = mdt_info_req(info);
         ENTRY;
 
         rec = req_capsule_client_get(pill, &RMF_REC_CREATE);
@@ -922,9 +932,12 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
         info->mti_replayepoch = rec->cr_ioepoch;
 
         if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
-                rr->rr_capa1 = req_capsule_client_get(pill, &RMF_CAPA1);
-        if (req_capsule_get_size(pill, &RMF_CAPA2, RCL_CLIENT))
-                rr->rr_capa2 = req_capsule_client_get(pill, &RMF_CAPA2);
+                mdt_set_capainfo(info, 0, rr->rr_fid1,
+                                 req_capsule_client_get(pill, &RMF_CAPA1));
+        if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) &&
+            (req_capsule_get_size(pill, &RMF_CAPA2, RCL_CLIENT)))
+                mdt_set_capainfo(info, 1, rr->rr_fid2,
+                                 req_capsule_client_get(pill, &RMF_CAPA2));
 
         rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
         if (rr->rr_name == NULL)
@@ -932,7 +945,6 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
 
         if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) {
                 struct md_create_spec *sp = &info->mti_spec;
-                struct ptlrpc_request *req = mdt_info_req(info);
                 sp->u.sp_ea.eadata = req_capsule_client_get(pill,
                                                             &RMF_EADATA);
                 sp->u.sp_ea.eadatalen = req_capsule_get_size(pill,
index 0ee67a9..c01170f 100644 (file)
@@ -416,6 +416,66 @@ static int lprocfs_rd_rootsquash_skips(char *page, char **start, off_t off,
         return ret;
 }
 
+/* for debug only */
+static int lprocfs_rd_capa(char *page, char **start, off_t off,
+                           int count, int *eof, void *data)
+{
+        struct obd_device *obd = data;
+        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+        return snprintf(page, count, "capability on: %s %s\n",
+                        mdt->mdt_opts.mo_oss_capa ? "oss" : "",
+                        mdt->mdt_opts.mo_mds_capa ? "mds" : "");
+}
+
+static int lprocfs_wr_capa(struct file *file, const char *buffer,
+                           unsigned long count, void *data)
+{
+        int val, rc;
+
+        rc = lprocfs_write_helper(buffer, count, &val);
+        if (rc)
+                return rc;
+
+        if (val & ~0x3) {
+                CERROR("invalid value %u: only 0/1/2/3 is accepted.\n", val);
+                CERROR("\t0: disable capability\n"
+                       "\t1: enable mds capability\n"
+                       "\t2: enable oss capability\n"
+                       "\t3: enable both mds and oss capability\n");
+                return -EINVAL;
+        }
+
+//        mds_capa_onoff(obd, val);
+        return count;
+}
+
+static int lprocfs_rd_capa_count(char *page, char **start, off_t off,
+                                 int count, int *eof, void *data)
+{
+        return snprintf(page, count, "%d %d\n",
+                        capa_count[CAPA_SITE_CLIENT],
+                        capa_count[CAPA_SITE_SERVER]);
+}
+
+static int lprocfs_rd_capa_timeout(char *page, char **start, off_t off,
+                                       int count, int *eof, void *data)
+{
+        struct obd_device *obd = data;
+        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+        return snprintf(page, count, "%lu\n", mdt->mdt_capa_timeout);
+}
+
+static int lprocfs_rd_ck_timeout(char *page, char **start, off_t off, int count,
+                                 int *eof, void *data)
+{
+        struct obd_device *obd = data;
+        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+        return snprintf(page, count, "%lu\n", mdt->mdt_ck_timeout);
+}
+
 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
         { "uuid",                       lprocfs_rd_uuid,                 0, 0 },
         { "recovery_status",            lprocfs_obd_rd_recovery_status,  0, 0 },
@@ -438,6 +498,10 @@ static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
         { "rootsquash_uid",             lprocfs_rd_rootsquash_uid,       0, 0 },
         { "rootsquash_gid",             lprocfs_rd_rootsquash_gid,       0, 0 },
         { "rootsquash_skips",           lprocfs_rd_rootsquash_skips,     0, 0 },
+        { "capa",                       lprocfs_rd_capa, lprocfs_wr_capa,   0 },
+        { "capa_timeout",               lprocfs_rd_capa_timeout,         0, 0 },
+        { "capa_key_timeout",           lprocfs_rd_ck_timeout,           0, 0 },
+        { "capa_count",                 lprocfs_rd_capa_count,           0, 0 },
         { 0 }
 };
 
index 89dd16e..5bd3caf 100644 (file)
@@ -80,8 +80,8 @@ static int mdt_create_data(struct mdt_thread_info *info,
         int rc;
         ENTRY;
 
-        if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
-                        !(spec->sp_cr_flags & FMODE_WRITE))
+        if ((spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE) ||
+            !(spec->sp_cr_flags & FMODE_WRITE))
                 RETURN(0);
 
         ma->ma_need = MA_INODE | MA_LOV;
@@ -346,10 +346,6 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                 }
         }
 
-        spin_lock(&capa_lock);
-        info->mti_capa_key = *red_capa_key(mdt);
-        spin_unlock(&capa_lock);
-
         if (mdt->mdt_opts.mo_mds_capa) {
                 struct lustre_capa *capa;
 
@@ -361,12 +357,13 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                         RETURN(rc);
                 repbody->valid |= OBD_MD_FLMDSCAPA;
         }
-        if (mdt->mdt_opts.mo_oss_capa) {
+        if (mdt->mdt_opts.mo_oss_capa &&
+            S_ISREG(lu_object_attr(&o->mot_obj.mo_lu))) {
                 struct lustre_capa *capa;
 
                 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA2);
                 LASSERT(capa);
-                capa->lc_opc = CAPA_OPC_OSS_DEFAULT;
+                capa->lc_opc = CAPA_OPC_OSS_DEFAULT | capa_open_opc(flags);
                 rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa);
                 if (rc)
                         RETURN(rc);
@@ -540,10 +537,10 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
                  * We failed after creation, but we do not know in which step
                  * we failed. So try to check the child object.
                  */
-                parent = mdt_object_find(env, mdt, rr->rr_fid1, rr->rr_capa1);
+                parent = mdt_object_find(env, mdt, rr->rr_fid1);
                 LASSERT(!IS_ERR(parent));
 
-                child = mdt_object_find(env, mdt, rr->rr_fid2, rr->rr_capa2);
+                child = mdt_object_find(env, mdt, rr->rr_fid2);
                 LASSERT(!IS_ERR(child));
 
                 rc = lu_object_exists(&child->mot_obj.mo_lu);
@@ -591,8 +588,7 @@ static int mdt_open_by_fid(struct mdt_thread_info* info,
         int                     rc;
         ENTRY;
 
-        o = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2,
-                            rr->rr_capa2);
+        o = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
         if (IS_ERR(o))
                 RETURN(rc = PTR_ERR(o));
 
@@ -637,12 +633,13 @@ static int mdt_cross_open(struct mdt_thread_info* info,
         int                rc;
         ENTRY;
 
-        o = mdt_object_find(info->mti_env, info->mti_mdt, fid, BYPASS_CAPA);
+        o = mdt_object_find(info->mti_env, info->mti_mdt, fid);
         if (IS_ERR(o))
                 RETURN(rc = PTR_ERR(o));
 
         rc = lu_object_exists(&o->mot_obj.mo_lu);
         if (rc > 0) {
+                mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
                 rc = mo_attr_get(info->mti_env, mdt_object_child(o), ma);
                 if (rc == 0)
                         rc = mdt_mfd_open(info, NULL, o, flags, 0, rep);
@@ -747,7 +744,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         else
                 lh->mlh_mode = LCK_EX;
         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
-                                      MDS_INODELOCK_UPDATE, rr->rr_capa1);
+                                      MDS_INODELOCK_UPDATE);
         if (IS_ERR(parent))
                 GOTO(out, result = PTR_ERR(parent));
 
@@ -778,10 +775,11 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
         }
 
-        child = mdt_object_find(info->mti_env, mdt, child_fid, BYPASS_CAPA);
+        child = mdt_object_find(info->mti_env, mdt, child_fid);
         if (IS_ERR(child))
                 GOTO(out_parent, result = PTR_ERR(child));
 
+        mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
         if (result == -ENOENT) {
                 /* Not found and with MDS_OPEN_CREAT: let's create it. */
                 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
index 02e4b36..dd63f2d 100644 (file)
@@ -70,7 +70,7 @@ int mdt_record_read(const struct lu_env *env,
 
         LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
 
-        rc = dt->do_body_ops->dbo_read(env, dt, buf, pos);
+        rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
 
         if (rc == buf->lb_len)
                 rc = 0;
@@ -87,7 +87,7 @@ int mdt_record_write(const struct lu_env *env,
 
         LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
         LASSERT(th != NULL);
-        rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th);
+        rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA);
         if (rc == buf->lb_len)
                 rc = 0;
         else if (rc >= 0)
@@ -435,7 +435,7 @@ static int mdt_server_data_init(const struct lu_env *env,
 
         obj = mdt->mdt_last_rcvd;
         obj->do_ops->do_read_lock(env, obj);
-        rc = obj->do_ops->do_attr_get(env, mdt->mdt_last_rcvd, la);
+        rc = obj->do_ops->do_attr_get(env, mdt->mdt_last_rcvd, la, BYPASS_CAPA);
         obj->do_ops->do_read_unlock(env, obj);
         if (rc)
                 RETURN(rc);
@@ -944,7 +944,6 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
 
         o = dt_store_open(env, mdt->mdt_bottom, CAPA_KEYS, &fid);
         if(!IS_ERR(o)) {
-                struct md_device *next = mdt->mdt_child;
                 mdt->mdt_ck_obj = o;
                 rc = mdt_capa_keys_init(env, mdt);
                 if (rc) {
@@ -952,7 +951,6 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
                         mdt->mdt_ck_obj = NULL;
                         RETURN(rc);
                 }
-                rc = next->md_ops->mdo_init_capa_keys(next, mdt->mdt_capa_keys);
         } else {
                 rc = PTR_ERR(o);
                 CERROR("cannot open %s: rc = %d\n", CAPA_KEYS, rc);
@@ -1032,8 +1030,7 @@ static void mdt_reconstruct_create(struct mdt_thread_info *mti,
                 return;
 
         /* if no error, so child was created with requested fid */
-        child = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid2,
-                                mti->mti_rr.rr_capa2);
+        child = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid2);
         LASSERT(!IS_ERR(child));
 
         body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
@@ -1062,8 +1059,7 @@ static void mdt_reconstruct_setattr(struct mdt_thread_info *mti,
                 return;
 
         body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
-        obj = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid1,
-                              mti->mti_rr.rr_capa1);
+        obj = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid1);
         LASSERT(!IS_ERR(obj));
         mo_attr_get(mti->mti_env, mdt_object_child(obj), &mti->mti_attr);
         mdt_pack_attr2body(body, &mti->mti_attr.ma_attr, mdt_object_fid(obj));
index 3774e54..eb40fe9 100644 (file)
@@ -53,13 +53,12 @@ static int mdt_md_create(struct mdt_thread_info *info)
         lh = &info->mti_lh[MDT_LH_PARENT];
         lh->mlh_mode = LCK_EX;
 
-        parent = mdt_object_find_lock(info, rr->rr_fid1,
-                                      lh, MDS_INODELOCK_UPDATE,
-                                      rr->rr_capa1);
+        parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
+                                      MDS_INODELOCK_UPDATE);
         if (IS_ERR(parent))
                 RETURN(PTR_ERR(parent));
 
-        child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2, BYPASS_CAPA);
+        child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2);
         if (!IS_ERR(child)) {
                 struct md_object *next = mdt_object_child(parent);
 
@@ -67,6 +66,7 @@ static int mdt_md_create(struct mdt_thread_info *info)
                 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
 
+                mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
                 rc = mdo_create(info->mti_env, next, rr->rr_name,
                                 mdt_object_child(child),
                                 &info->mti_spec, ma);
@@ -96,8 +96,7 @@ static int mdt_md_mkobj(struct mdt_thread_info *info)
 
         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
 
-        o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2,
-                            BYPASS_CAPA);
+        o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2);
         if (!IS_ERR(o)) {
                 struct md_object *next = mdt_object_child(o);
 
@@ -211,8 +210,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                   (unsigned int)ma->ma_attr.la_valid);
 
         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
-        mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1,
-                             rr->rr_capa1);
+        mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
         if (IS_ERR(mo))
                 RETURN(rc = PTR_ERR(mo));
 
@@ -277,7 +275,8 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
 
         mdt_pack_attr2body(repbody, &ma->ma_attr, mdt_object_fid(mo));
 
-        if (mdt->mdt_opts.mo_oss_capa) {
+        if (mdt->mdt_opts.mo_oss_capa &&
+            S_ISREG(lu_object_attr(&mo->mot_obj.mo_lu))) {
                 struct lustre_capa *capa;
 
                 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
@@ -354,7 +353,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         parent_lh = &info->mti_lh[MDT_LH_PARENT];
         parent_lh->mlh_mode = LCK_EX;
         mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
-                                  MDS_INODELOCK_UPDATE, rr->rr_capa1);
+                                  MDS_INODELOCK_UPDATE);
         if (IS_ERR(mp))
                 GOTO(out, rc = PTR_ERR(mp));
 
@@ -387,8 +386,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                  GOTO(out_unlock_parent, rc);
 
         /* we will lock the child regardless it is local or remote. No harm. */
-        mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid,
-                                  BYPASS_CAPA);
+        mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
         if (IS_ERR(mc))
                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
         child_lh = &info->mti_lh[MDT_LH_CHILD];
@@ -405,6 +403,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
          * whether need MA_LOV and MA_COOKIE.
          */
         ma->ma_need = MA_INODE;
+        mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
                         mdt_object_child(mc), rr->rr_name, ma);
         if (rc)
@@ -448,12 +447,13 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         lhs = &info->mti_lh[MDT_LH_PARENT];
         lhs->mlh_mode = LCK_EX;
         ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
-                                  MDS_INODELOCK_UPDATE, rr->rr_capa1);
+                                  MDS_INODELOCK_UPDATE);
         if (IS_ERR(ms))
                 RETURN(PTR_ERR(ms));
 
         if (strlen(rr->rr_name) == 0) {
                 /* remote partial operation */
+                mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
                 rc = mo_ref_add(info->mti_env, mdt_object_child(ms));
                 GOTO(out_unlock_source, rc);
         }
@@ -461,7 +461,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         lhp = &info->mti_lh[MDT_LH_CHILD];
         lhp->mlh_mode = LCK_EX;
         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
-                                  MDS_INODELOCK_UPDATE, rr->rr_capa2);
+                                  MDS_INODELOCK_UPDATE);
         if (IS_ERR(mp))
                 GOTO(out_unlock_source, rc = PTR_ERR(mp));
 
@@ -505,7 +505,7 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
         lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
         lh_tgtdir->mlh_mode = LCK_EX;
         mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
-                                       MDS_INODELOCK_UPDATE, rr->rr_capa1);
+                                       MDS_INODELOCK_UPDATE);
         if (IS_ERR(mtgtdir))
                 GOTO(out, rc = PTR_ERR(mtgtdir));
 
@@ -518,7 +518,7 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
                 lh_tgt->mlh_mode = LCK_EX;
 
                 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
-                                            MDS_INODELOCK_LOOKUP, BYPASS_CAPA);
+                                            MDS_INODELOCK_LOOKUP);
                 if (IS_ERR(mtgt))
                         GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
 
@@ -604,8 +604,7 @@ static int mdt_rename_check(struct mdt_thread_info *info, struct lu_fid *fid)
         ENTRY;
 
         do {
-                dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid,
-                                      BYPASS_CAPA);
+                dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
                 if (!IS_ERR(dst)) {
                         rc = mdo_is_subdir(info->mti_env, mdt_object_child(dst),
                                            fid, &dst_fid);
@@ -664,7 +663,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
         lh_srcdirp->mlh_mode = LCK_EX;
         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
-                                       MDS_INODELOCK_UPDATE, rr->rr_capa1);
+                                       MDS_INODELOCK_UPDATE);
         if (IS_ERR(msrcdir))
                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
 
@@ -676,7 +675,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                 mtgtdir = msrcdir;
         } else {
                 mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
-                                          rr->rr_fid2, rr->rr_capa2);
+                                          rr->rr_fid2);
                 if (IS_ERR(mtgtdir))
                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
 
@@ -701,7 +700,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         lh_oldp = &info->mti_lh[MDT_LH_OLD];
         lh_oldp->mlh_mode = LCK_EX;
         mold = mdt_object_find_lock(info, old_fid, lh_oldp,
-                                    MDS_INODELOCK_LOOKUP, BYPASS_CAPA);
+                                    MDS_INODELOCK_LOOKUP);
         if (IS_ERR(mold))
                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
 
@@ -719,8 +718,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                         GOTO(out_unlock_old, rc = -EINVAL);
 
                 lh_newp->mlh_mode = LCK_EX;
-                mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid,
-                                       BYPASS_CAPA);
+                mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
                 if (IS_ERR(mnew))
                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
 
@@ -752,6 +750,8 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
 
+        mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
+        mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
         /* Check if @dst is subdir of @src. */
         rc = mdt_rename_check(info, old_fid);
         if (rc)
index ebc97b7..bdc4a76 100644 (file)
@@ -174,14 +174,14 @@ static inline void free_capa_lru(struct list_head *head)
 }
 
 /* add or update */
-struct obd_capa *capa_add(struct lustre_capa *capa)
+void capa_add(struct lustre_capa *capa)
 {
         struct hlist_head *head = capa_hash + capa_hashfn(&capa->lc_fid);
         struct obd_capa *ocapa, *old = NULL;
 
         ocapa = alloc_capa(CAPA_SITE_SERVER);
         if (!ocapa)
-                return NULL;
+                return;
 
         spin_lock(&capa_lock);
 
@@ -198,7 +198,7 @@ struct obd_capa *capa_add(struct lustre_capa *capa)
                 DEBUG_CAPA(D_SEC, &ocapa->c_capa, "new");
                                         
                 spin_unlock(&capa_lock);
-                return ocapa;
+                return;
         }
 
         spin_lock(&old->c_lock);
@@ -213,7 +213,6 @@ struct obd_capa *capa_add(struct lustre_capa *capa)
         DEBUG_CAPA(D_SEC, &old->c_capa, "update");
 
         free_capa(ocapa);
-        return old;
 }
 
 struct obd_capa *capa_lookup(struct lustre_capa *capa)
index 9171d86..a6111ac 100644 (file)
@@ -149,7 +149,8 @@ static int dt_lookup(const struct lu_env *env, struct dt_object *dir,
         int result;
 
         if (dt_try_as_dir(env, dir))
-                result = dir->do_index_ops->dio_lookup(env, dir, rec, key);
+                result = dir->do_index_ops->dio_lookup(env, dir, rec, key,
+                                                       BYPASS_CAPA);
         else
                 result = -ENOTDIR;
         return result;
@@ -162,7 +163,7 @@ static struct dt_object *dt_locate(const struct lu_env *env,
         struct lu_object *obj;
         struct dt_object *dt;
 
-        obj = lu_object_find(env, dev->dd_lu_dev.ld_site, fid, BYPASS_CAPA);
+        obj = lu_object_find(env, dev->dd_lu_dev.ld_site, fid);
         if (!IS_ERR(obj)) {
                 obj = lu_object_locate(obj->lo_header, dev->dd_lu_dev.ld_type);
                 LASSERT(obj != NULL);
@@ -185,7 +186,6 @@ struct dt_object *dt_store_open(const struct lu_env *env,
         if (result == 0) {
                 root = dt_locate(env, dt, fid);
                 if (!IS_ERR(root)) {
-                        lu_object_bypass_capa(&root->do_lu);
                         result = dt_lookup(env, root, name, fid);
                         if (result == 0)
                                 child = dt_locate(env, dt, fid);
index b0a4b77..8f977cb 100644 (file)
@@ -107,8 +107,7 @@ EXPORT_SYMBOL(lu_object_put);
  */
 static struct lu_object *lu_object_alloc(const struct lu_env *env,
                                          struct lu_site *s,
-                                         const struct lu_fid *f,
-                                         const struct lustre_capa *capa)
+                                         const struct lu_fid *f)
 {
         struct lu_object *scan;
         struct lu_object *top;
@@ -130,11 +129,6 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
          * after this point.
          */
         top->lo_header->loh_fid  = *f;
-        if (capa == BYPASS_CAPA)
-                lu_object_bypass_capa(top);
-        else if (capa)
-                top->lo_header->loh_capa = *capa;
-
         layers = &top->lo_header->loh_layers;
         do {
                 /*
@@ -428,13 +422,11 @@ static __u32 fid_hash(const struct lu_fid *f)
  * any case, additional reference is acquired on the returned object.
  */
 struct lu_object *lu_object_find(const struct lu_env *env,
-                                 struct lu_site *s, const struct lu_fid *f,
-                                 struct lustre_capa *capa)
+                                 struct lu_site *s, const struct lu_fid *f)
 {
         struct lu_object  *o;
         struct lu_object  *shadow;
         struct hlist_head *bucket;
-        int                rc;
 
         /*
          * This uses standard index maintenance protocol:
@@ -455,25 +447,14 @@ struct lu_object *lu_object_find(const struct lu_env *env,
         o = htable_lookup(s, bucket, f);
 
         spin_unlock(&s->ls_guard);
-        if (o != NULL) {
-                if (capa == BYPASS_CAPA) {
-                        o->lo_header->loh_capa_bypass = 1;
-                } else {
-                        rc = lu_object_auth(env, o, capa,
-                                            CAPA_OPC_INDEX_LOOKUP);
-                        if (rc)
-                                return ERR_PTR(rc);
-                        if (capa)
-                                o->lo_header->loh_capa = *capa;
-                }
+        if (o != NULL)
                 return o;
-        }
 
         /*
          * Allocate new object. This may result in rather complicated
          * operations, including fld queries, inode loading, etc.
          */
-        o = lu_object_alloc(env, s, f, capa);
+        o = lu_object_alloc(env, s, f);
         if (IS_ERR(o))
                 return o;
 
@@ -496,24 +477,6 @@ struct lu_object *lu_object_find(const struct lu_env *env,
 }
 EXPORT_SYMBOL(lu_object_find);
 
-int lu_object_auth(const struct lu_env *env, const struct lu_object *o,
-                   struct lustre_capa *capa, __u64 opc)
-{
-        struct lu_object_header *top = o->lo_header;
-        int rc;
-
-        list_for_each_entry(o, &top->loh_layers, lo_linkage) {
-                if (o->lo_ops->loo_object_auth) {
-                        rc = o->lo_ops->loo_object_auth(env, o, capa, opc);
-                        if (rc)
-                                return rc;
-                }
-        }
-
-        return 0;
-}
-EXPORT_SYMBOL(lu_object_auth);
-
 enum {
         LU_SITE_HTABLE_BITS = 8,
         LU_SITE_HTABLE_SIZE = (1 << LU_SITE_HTABLE_BITS),
index a0cae5d..55706c0 100644 (file)
@@ -110,6 +110,13 @@ struct osd_device {
          * This means that it's enough to have _one_ lu_context.
          */
         struct lu_env             od_env_for_commit;
+        /*
+         * Capability
+         */
+        unsigned int              od_fl_capa:1;
+        unsigned long             od_capa_timeout;
+        __u32                     od_capa_alg;
+        struct lustre_capa_key   *od_capa_keys;
 };
 
 static int   osd_root_get      (const struct lu_env *env,
@@ -151,15 +158,18 @@ static int   osd_param_is_sane (const struct osd_device *dev,
                                 const struct txn_param *param);
 static int   osd_index_lookup  (const struct lu_env *env,
                                 struct dt_object *dt,
-                                struct dt_rec *rec, const struct dt_key *key);
+                                struct dt_rec *rec, const struct dt_key *key,
+                                struct lustre_capa *capa);
 static int   osd_index_insert  (const struct lu_env *env,
                                 struct dt_object *dt,
                                 const struct dt_rec *rec,
                                 const struct dt_key *key,
-                                struct thandle *handle);
+                                struct thandle *handle,
+                                struct lustre_capa *capa);
 static int   osd_index_delete  (const struct lu_env *env,
                                 struct dt_object *dt, const struct dt_key *key,
-                                struct thandle *handle);
+                                struct thandle *handle,
+                                struct lustre_capa *capa);
 static int   osd_index_probe   (const struct lu_env *env,
                                 struct osd_object *o,
                                 const struct dt_index_features *feat);
@@ -666,6 +676,29 @@ static void osd_ro(const struct lu_env *env, struct dt_device *d)
         EXIT;
 }
 
+static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
+                              __u32 valid, unsigned long timeout, __u32 alg,
+                              struct lustre_capa_key *keys)
+{
+        struct osd_device *dev = osd_dt_dev(d);
+        ENTRY;
+
+        if (valid & CAPA_CTX_ON)
+                dev->od_fl_capa = 1;
+        else
+                dev->od_fl_capa = 0;
+       
+        if (valid & CAPA_CTX_TIMEOUT)
+                dev->od_capa_timeout = timeout;
+
+        if (valid & CAPA_CTX_ALG)
+                dev->od_capa_alg = alg;
+
+        if (valid & CAPA_CTX_KEYS)
+                dev->od_capa_keys = keys;
+        RETURN(0);
+}
+
 /* Note: we did not count into QUOTA here, If we mount with --data_journal
  * we may need more*/
 enum {
@@ -720,14 +753,15 @@ static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
 }
                 
 static struct dt_device_operations osd_dt_ops = {
-        .dt_root_get    = osd_root_get,
-        .dt_statfs      = osd_statfs,
-        .dt_trans_start = osd_trans_start,
-        .dt_trans_stop  = osd_trans_stop,
-        .dt_conf_get    = osd_conf_get,
-        .dt_sync        = osd_sync,
-        .dt_ro          = osd_ro,
-        .dt_credit_get  = osd_credit_get
+        .dt_root_get       = osd_root_get,
+        .dt_statfs         = osd_statfs,
+        .dt_trans_start    = osd_trans_start,
+        .dt_trans_stop     = osd_trans_stop,
+        .dt_conf_get       = osd_conf_get,
+        .dt_sync           = osd_sync,
+        .dt_ro             = osd_ro,
+        .dt_credit_get     = osd_credit_get,
+        .dt_init_capa_ctxt = osd_init_capa_ctxt
 };
 
 static void osd_object_read_lock(const struct lu_env *env,
@@ -785,16 +819,99 @@ static void osd_object_write_unlock(const struct lu_env *env,
         up_write(&obj->oo_sem);
 }
 
-static inline int osd_object_auth(const struct lu_env *env,
-                                  const struct lu_object *o,
-                                  __u64 opc)
+static int capa_is_sane(const struct lu_env *env,
+                        struct lustre_capa *capa,
+                        struct lustre_capa_key *keys)
+{
+        struct obd_capa *c;
+        struct osd_thread_info *oti = lu_context_key_get(&env->le_ctx, &osd_key);
+        int i, rc = 1;
+        ENTRY;
+
+        c = capa_lookup(capa);
+        if (c) {
+                spin_lock(&c->c_lock);
+                if (memcmp(&c->c_capa, capa, sizeof(*capa))) {
+                        DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
+                        rc = -EACCES;
+                } else if (capa_is_expired(c)) {
+                        DEBUG_CAPA(D_ERROR, capa, "expired");
+                        rc = -ESTALE;
+                }
+                spin_unlock(&c->c_lock);
+
+                capa_put(c);
+                RETURN(rc);
+        }
+
+        spin_lock(&capa_lock);
+        for (i = 0; i < 2; i++) {
+                if (keys[i].lk_keyid == capa->lc_keyid) {
+                        oti->oti_capa_key = keys[i];
+                        break;
+                }
+        }
+        spin_unlock(&capa_lock);
+
+        if (i == 2) {
+                DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
+                RETURN(-ESTALE);
+        }
+
+        rc = capa_hmac(oti->oti_capa_hmac, capa, oti->oti_capa_key.lk_key);
+        if (rc)
+                RETURN(rc);
+        if (memcmp(oti->oti_capa_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
+                DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
+                RETURN(-EACCES);
+        }
+
+        capa_add(capa);
+
+        RETURN(1);
+}
+
+static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
+                           struct lustre_capa *capa, __u64 opc)
 {
-        return o->lo_ops->loo_object_auth(env, o, lu_object_capa(o), opc);
+        const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
+        struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
+
+        if (!dev->od_fl_capa)
+                return 0;
+
+        if (capa == BYPASS_CAPA)
+                return 0;
+
+        if (!capa) {
+                CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
+                LBUG();
+                return -EACCES;
+        }
+
+        if (!lu_fid_eq(fid, &capa->lc_fid)) {
+                DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
+                           PFID(fid));
+                return -EACCES;
+        }
+
+        if (!capa_opc_supported(capa, opc)) {
+                DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
+                return -EACCES;
+        }
+
+        if (!capa_is_sane(env, capa, dev->od_capa_keys)) {
+                DEBUG_CAPA(D_ERROR, capa, "insane");
+                return -EACCES;
+        }
+
+        return 0;
 }
 
 static int osd_attr_get(const struct lu_env *env,
                         struct dt_object *dt,
-                        struct lu_attr *attr)
+                        struct lu_attr *attr,
+                        struct lustre_capa *capa)
 {
         struct osd_object *obj = osd_dt_obj(dt);
 
@@ -802,7 +919,7 @@ static int osd_attr_get(const struct lu_env *env,
         LASSERT(osd_invariant(obj));
         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
 
-        if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_READ))
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
                 return -EACCES;
 
         return osd_inode_getattr(env, obj->oo_inode, attr);
@@ -811,7 +928,8 @@ static int osd_attr_get(const struct lu_env *env,
 static int osd_attr_set(const struct lu_env *env,
                         struct dt_object *dt,
                         const struct lu_attr *attr,
-                        struct thandle *handle)
+                        struct thandle *handle,
+                        struct lustre_capa *capa)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         LASSERT(handle != NULL);
@@ -819,7 +937,7 @@ static int osd_attr_set(const struct lu_env *env,
         LASSERT(osd_invariant(obj));
         LASSERT(osd_write_locked(env, obj));
 
-        if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_WRITE))
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
                 return -EACCES;
 
         return osd_inode_setattr(env, obj->oo_inode, attr);
@@ -1074,8 +1192,6 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
         /*
          * XXX missing: permission checks.
          */
-        if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_INDEX_INSERT))
-                RETURN(-EACCES);
 
         /*
          * XXX missing: sanity checks (valid ->la_mode, etc.)
@@ -1111,7 +1227,8 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
 }
 
 static void osd_object_ref_add(const struct lu_env *env,
-                               struct dt_object *dt, struct thandle *th)
+                               struct dt_object *dt,
+                               struct thandle *th)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         struct inode *inode = obj->oo_inode;
@@ -1121,12 +1238,6 @@ static void osd_object_ref_add(const struct lu_env *env,
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
-        if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_WRITE)) {
-                LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu,
-                                "no capability to link!\n");
-                return;
-        }
-
         if (inode->i_nlink < LDISKFS_LINK_MAX) {
                 inode->i_nlink ++;
                 mark_inode_dirty(inode);
@@ -1137,7 +1248,8 @@ static void osd_object_ref_add(const struct lu_env *env,
 }
 
 static void osd_object_ref_del(const struct lu_env *env,
-                               struct dt_object *dt, struct thandle *th)
+                               struct dt_object *dt,
+                               struct thandle *th)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         struct inode *inode = obj->oo_inode;
@@ -1147,12 +1259,6 @@ static void osd_object_ref_del(const struct lu_env *env,
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
-        if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_WRITE)) {
-                LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu,
-                                "no capability to unlink!\n");
-                return;
-        }
-
         if (inode->i_nlink > 0) {
                 inode->i_nlink --;
                 mark_inode_dirty(inode);
@@ -1162,8 +1268,11 @@ static void osd_object_ref_del(const struct lu_env *env,
         LASSERT(osd_invariant(obj));
 }
 
-static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
-                         struct lu_buf *buf, const char *name)
+static int osd_xattr_get(const struct lu_env *env,
+                         struct dt_object *dt,
+                         struct lu_buf *buf,
+                         const char *name,
+                         struct lustre_capa *capa)
 {
         struct osd_object      *obj    = osd_dt_obj(dt);
         struct inode           *inode  = obj->oo_inode;
@@ -1174,7 +1283,7 @@ static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
 
-        if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_READ))
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
                 return -EACCES;
 
         dentry->d_inode = inode;
@@ -1183,7 +1292,7 @@ static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
 
 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
                          const struct lu_buf *buf, const char *name, int fl,
-                         struct thandle *handle)
+                         struct thandle *handle, struct lustre_capa *capa)
 {
         int fs_flags;
 
@@ -1197,7 +1306,7 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
         LASSERT(osd_write_locked(env, obj));
         LASSERT(handle != NULL);
 
-        if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_WRITE))
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
                 return -EACCES;
 
         dentry->d_inode = inode;
@@ -1213,8 +1322,10 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
                                      buf->lb_buf, buf->lb_len, fs_flags);
 }
 
-static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
-                          struct lu_buf *buf)
+static int osd_xattr_list(const struct lu_env *env,
+                          struct dt_object *dt,
+                          struct lu_buf *buf,
+                          struct lustre_capa *capa)
 {
         struct osd_object      *obj    = osd_dt_obj(dt);
         struct inode           *inode  = obj->oo_inode;
@@ -1225,15 +1336,18 @@ static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
 
-        if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_READ))
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
                 return -EACCES;
 
         dentry->d_inode = inode;
         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
 }
 
-static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
-                         const char *name, struct thandle *handle)
+static int osd_xattr_del(const struct lu_env *env,
+                         struct dt_object *dt,
+                         const char *name,
+                         struct thandle *handle,
+                         struct lustre_capa *capa)
 {
         struct osd_object      *obj    = osd_dt_obj(dt);
         struct inode           *inode  = obj->oo_inode;
@@ -1245,7 +1359,7 @@ static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
         LASSERT(osd_write_locked(env, obj));
         LASSERT(handle != NULL);
 
-        if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_META_WRITE))
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
                 return -EACCES;
 
         dentry->d_inode = inode;
@@ -1258,7 +1372,8 @@ static int osd_dir_page_build(const struct lu_env *env, int first,
                               __u32 *start, __u32 *end, struct lu_dirent **last)
 {
         int result;
-        struct osd_thread_info *info = lu_context_key_get(&env->le_ctx, &osd_key);
+        struct osd_thread_info *info = lu_context_key_get(&env->le_ctx,
+                                                          &osd_key);
         struct lu_fid          *fid  = &info->oti_fid;
         struct lu_dirent       *ent;
 
@@ -1315,7 +1430,9 @@ static int osd_dir_page_build(const struct lu_env *env, int first,
 }
 
 static int osd_readpage(const struct lu_env *env,
-                        struct dt_object *dt, const struct lu_rdpg *rdpg)
+                        struct dt_object *dt,
+                        const struct lu_rdpg *rdpg,
+                        struct lustre_capa *capa)
 {
         struct dt_it      *it;
         struct osd_object *obj = osd_dt_obj(dt);
@@ -1329,7 +1446,7 @@ static int osd_readpage(const struct lu_env *env,
 
         LASSERT(rdpg->rp_pages != NULL);
 
-        if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_BODY_READ))
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
                 return -EACCES;
 
         if (rdpg->rp_count <= 0)
@@ -1400,6 +1517,51 @@ static int osd_readpage(const struct lu_env *env,
         return rc ? rc : rc1;
 }
 
+static int osd_capa_get(const struct lu_env *env,
+                        struct dt_object *dt, struct lustre_capa *capa)
+{
+        struct osd_thread_info *info = lu_context_key_get(&env->le_ctx,
+                                                          &osd_key);
+        const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
+        struct osd_object *obj = osd_dt_obj(dt);
+        struct osd_device *dev = osd_obj2dev(obj);
+        struct lustre_capa_key *key = &info->oti_capa_key;
+        struct obd_capa *oc;
+        int rc;
+        ENTRY;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(osd_invariant(obj));
+
+        capa->lc_fid = *fid;
+        if (dev->od_capa_timeout < CAPA_TIMEOUT)
+                capa->lc_flags |= CAPA_FL_SHORT_EXPIRY;
+
+        capa->lc_flags = dev->od_capa_alg << 24;
+
+        /* TODO: get right permission here */
+        oc = capa_lookup(capa);
+        if (oc) {
+                LASSERT(!capa_is_expired(oc));
+                capa_cpy(capa, oc);
+                capa_put(oc);
+                RETURN(0);
+        }
+
+        spin_lock(&capa_lock);
+        *key = dev->od_capa_keys[1];
+        capa->lc_expiry = CURRENT_SECONDS + dev->od_capa_timeout;
+        spin_unlock(&capa_lock);
+
+        capa->lc_keyid = key->lk_keyid;
+        rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
+        if (rc)
+                RETURN(rc);
+
+        capa_add(capa);
+        RETURN(0);
+}
+
 static struct dt_object_operations osd_obj_ops = {
         .do_read_lock    = osd_object_read_lock,
         .do_write_lock   = osd_object_write_lock,
@@ -1416,6 +1578,7 @@ static struct dt_object_operations osd_obj_ops = {
         .do_xattr_del    = osd_xattr_del,
         .do_xattr_list   = osd_xattr_list,
         .do_readpage     = osd_readpage,
+        .do_capa_get     = osd_capa_get,
 };
 
 /*
@@ -1423,13 +1586,17 @@ static struct dt_object_operations osd_obj_ops = {
  */
 
 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
-                        struct lu_buf *buf, loff_t *pos)
+                        struct lu_buf *buf, loff_t *pos,
+                        struct lustre_capa *capa)
 {
         struct inode *inode = osd_dt_obj(dt)->oo_inode;
         struct file  *file;
         mm_segment_t  seg;
         ssize_t       result;
 
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
+                RETURN(-EACCES);
+
         file = osd_rw_init(env, inode, &seg);
         /*
          * We'd like to use vfs_read() here, but it messes with
@@ -1448,7 +1615,7 @@ static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
 
 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
                          const struct lu_buf *buf, loff_t *pos,
-                         struct thandle *handle)
+                         struct thandle *handle, struct lustre_capa *capa)
 {
         struct inode *inode = osd_dt_obj(dt)->oo_inode;
         struct file  *file;
@@ -1457,6 +1624,9 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
 
         LASSERT(handle != NULL);
 
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
+                RETURN(-EACCES);
+
         file = osd_rw_init(env, inode, &seg);
         if (file->f_op->write)
                 result = file->f_op->write(file, buf->lb_buf, buf->lb_len, pos);
@@ -1514,9 +1684,6 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
         LASSERT(osd_invariant(obj));
         LASSERT(dt_object_exists(dt));
 
-        if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_INDEX_LOOKUP))
-                RETURN(-EACCES);
-
         if (osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode) {
                 dt->do_index_ops = &osd_index_compat_ops;
                 result = 0;
@@ -1554,7 +1721,8 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
 }
 
 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
-                            const struct dt_key *key, struct thandle *handle)
+                            const struct dt_key *key, struct thandle *handle,
+                            struct lustre_capa *capa)
 {
         struct osd_object     *obj = osd_dt_obj(dt);
         struct osd_thandle    *oh;
@@ -1568,7 +1736,7 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
         LASSERT(obj->oo_ipd != NULL);
         LASSERT(handle != NULL);
 
-        if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_INDEX_DELETE))
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
                 RETURN(-EACCES);
 
         oh = container_of0(handle, struct osd_thandle, ot_super);
@@ -1582,7 +1750,8 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
 }
 
 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
-                            struct dt_rec *rec, const struct dt_key *key)
+                            struct dt_rec *rec, const struct dt_key *key,
+                            struct lustre_capa *capa)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         int rc;
@@ -1594,7 +1763,7 @@ static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
         LASSERT(obj->oo_container.ic_object == obj->oo_inode);
         LASSERT(obj->oo_ipd != NULL);
 
-        if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_INDEX_LOOKUP))
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
                 return -EACCES;
 
         rc = iam_lookup(&obj->oo_container, (const struct iam_key *)key,
@@ -1607,7 +1776,7 @@ static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
 
 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
                             const struct dt_rec *rec, const struct dt_key *key,
-                            struct thandle *th)
+                            struct thandle *th, struct lustre_capa *capa)
 {
         struct osd_object     *obj = osd_dt_obj(dt);
 
@@ -1622,7 +1791,7 @@ static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
         LASSERT(obj->oo_ipd != NULL);
         LASSERT(th != NULL);
 
-        if (osd_object_auth(env, &dt->do_lu, CAPA_OPC_INDEX_INSERT))
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
                 return -EACCES;
 
         oh = container_of0(th, struct osd_thandle, ot_super);
@@ -1769,7 +1938,8 @@ static struct dt_index_operations osd_index_ops = {
 static int osd_index_compat_delete(const struct lu_env *env,
                                    struct dt_object *dt,
                                    const struct dt_key *key,
-                                   struct thandle *handle)
+                                   struct thandle *handle,
+                                   struct lustre_capa *capa)
 {
         struct osd_object *obj = osd_dt_obj(dt);
 
@@ -1777,6 +1947,11 @@ static int osd_index_compat_delete(const struct lu_env *env,
         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
         ENTRY;
 
+#if 0
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
+                RETURN(-EACCES);
+#endif
+
         RETURN(-EOPNOTSUPP);
 }
 
@@ -1796,7 +1971,8 @@ static int osd_build_fid(struct osd_device *osd,
 
 static int osd_index_compat_lookup(const struct lu_env *env,
                                    struct dt_object *dt,
-                                   struct dt_rec *rec, const struct dt_key *key)
+                                   struct dt_rec *rec, const struct dt_key *key,
+                                   struct lustre_capa *capa)
 {
         struct osd_object *obj = osd_dt_obj(dt);
 
@@ -1816,6 +1992,9 @@ static int osd_index_compat_lookup(const struct lu_env *env,
         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
         LASSERT(osd_has_index(obj));
 
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
+                return -EACCES;
+
         info->oti_str.name = (const char *)key;
         info->oti_str.len  = strlen((const char *)key);
 
@@ -1907,7 +2086,8 @@ static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev,
 static int osd_index_compat_insert(const struct lu_env *env,
                                    struct dt_object *dt,
                                    const struct dt_rec *rec,
-                                   const struct dt_key *key, struct thandle *th)
+                                   const struct dt_key *key, struct thandle *th,
+                                   struct lustre_capa *capa)
 {
         struct osd_object     *obj = osd_dt_obj(dt);
 
@@ -1925,7 +2105,10 @@ static int osd_index_compat_insert(const struct lu_env *env,
         LASSERT(osd_invariant(obj));
         LASSERT(th != NULL);
 
-        luch = lu_object_find(env, ludev->ld_site, fid, BYPASS_CAPA);
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
+                return -EACCES;
+
+        luch = lu_object_find(env, ludev->ld_site, fid);
         if (!IS_ERR(luch)) {
                 if (lu_object_exists(luch)) {
                         struct osd_object *child;
@@ -2375,102 +2558,13 @@ static int osd_object_invariant(const struct lu_object *l)
         return osd_invariant(osd_obj(l));
 }
 
-static int capa_is_sane(const struct lu_env *env,
-                        struct lustre_capa *capa,
-                        struct lustre_capa_key *keys)
-{
-        struct obd_capa *c;
-        struct osd_thread_info *oti = lu_context_key_get(&env->le_ctx, &osd_key);
-        int i, rc = 0;
-        ENTRY;
-
-        c = capa_lookup(capa);
-        if (c) {
-                spin_lock(&c->c_lock);
-                if (memcmp(&c->c_capa, capa, sizeof(*capa))) {
-                        DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
-                        rc = -EACCES;
-                } else if (capa_is_expired(c)) {
-                        DEBUG_CAPA(D_ERROR, capa, "expired");
-                        rc = -ESTALE;
-                }
-                spin_unlock(&c->c_lock);
-
-                capa_put(c);
-                RETURN(rc);
-        }
-
-        spin_lock(&capa_lock);
-        for (i = 0; i < 2; i++) {
-                if (keys[i].lk_keyid == capa->lc_keyid) {
-                        oti->oti_capa_key = keys[i];
-                        break;
-                }
-        }
-        spin_unlock(&capa_lock);
-
-        if (i == 2) {
-                DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
-                RETURN(-ESTALE);
-        }
-
-        rc = capa_hmac(oti->oti_capa_hmac, capa, oti->oti_capa_key.lk_key);
-        if (rc)
-                RETURN(rc);
-        if (memcmp(oti->oti_capa_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
-                DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
-                RETURN(-EACCES);
-        }
-
-        capa_add(capa);
-
-        RETURN(0);
-}
-
-static int osd_object_capa_auth(const struct lu_env *env,
-                                const struct lu_object *obj,
-                                struct lustre_capa *capa,
-                                __u64 opc)
-{
-        const struct lu_fid *fid = lu_object_fid(obj);
-
-        return 0;
-
-        if (lu_object_capa_bypass(obj))
-                return 0;
-
-        if (!capa) {
-                CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
-                return -EACCES;
-        }
-
-        if (!lu_fid_eq(fid, &capa->lc_fid)) {
-                DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
-                           PFID(fid));
-                return -EACCES;
-        }
-
-        if (!capa_opc_supported(capa, opc)) {
-                DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
-                return -EACCES;
-        }
-
-        if (!capa_is_sane(env, capa, obj->lo_dev->ld_site->ls_capa_keys)) {
-                DEBUG_CAPA(D_ERROR, capa, "insane");
-                return -EACCES;
-        }
-
-        return 0;
-}
-
 static struct lu_object_operations osd_lu_obj_ops = {
         .loo_object_init      = osd_object_init,
         .loo_object_delete    = osd_object_delete,
         .loo_object_release   = osd_object_release,
         .loo_object_free      = osd_object_free,
         .loo_object_print     = osd_object_print,
-        .loo_object_invariant = osd_object_invariant,
-        .loo_object_auth      = osd_object_capa_auth
+        .loo_object_invariant = osd_object_invariant
 };
 
 static struct lu_device_operations osd_lu_ops = {
index 1b3cb1e..9fda0f8 100644 (file)
@@ -164,7 +164,8 @@ int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi,
         } else {
                 rc = oi->oi_dir->do_index_ops->dio_lookup
                         (info->oti_env, oi->oi_dir,
-                         (struct dt_rec *)id, oi_fid_key(info, fid));
+                         (struct dt_rec *)id, oi_fid_key(info, fid),
+                         BYPASS_CAPA);
                 osd_inode_id_init(id, id->oii_ino, id->oii_gen);
         }
         return rc;
@@ -190,7 +191,8 @@ int osd_oi_insert(struct osd_thread_info *info, struct osd_oi *oi,
         osd_inode_id_init(id, id0->oii_ino, id0->oii_gen);
         return idx->do_index_ops->dio_insert(info->oti_env, idx,
                                              (const struct dt_rec *)id,
-                                             oi_fid_key(info, fid), th);
+                                             oi_fid_key(info, fid), th,
+                                             BYPASS_CAPA);
 }
 
 /*
@@ -209,6 +211,7 @@ int osd_oi_delete(struct osd_thread_info *info,
         idx = oi->oi_dir;
         dev = lu2dt_dev(idx->do_lu.lo_dev);
         return idx->do_index_ops->dio_delete(info->oti_env, idx,
-                                             oi_fid_key(info, fid), th);
+                                             oi_fid_key(info, fid), th,
+                                             BYPASS_CAPA);
 }
 
index b4db7ba..9261e2c 100644 (file)
@@ -73,11 +73,6 @@ static const struct req_msg_field *mdt_body_only[] = {
         &RMF_MDT_BODY
 };
 
-static const struct req_msg_field *mdt_renew_capa_client[] = {
-        &RMF_PTLRPC_BODY,
-        &RMF_CAPA1
-};
-
 static const struct req_msg_field *mdt_body_capa[] = {
         &RMF_PTLRPC_BODY,
         &RMF_MDT_BODY,
@@ -366,8 +361,7 @@ static const struct req_format *req_formats[] = {
         &RQF_MDS_READPAGE,
         &RQF_MDS_WRITEPAGE,
         &RQF_MDS_IS_SUBDIR,
-        &RQF_MDS_DONE_WRITING,
-        &RQF_MDS_RENEW_CAPA
+        &RQF_MDS_DONE_WRITING
 };
 
 struct req_msg_field {
@@ -725,11 +719,6 @@ const struct req_format RQF_MDS_IS_SUBDIR =
                         mdt_body_only, mdt_body_only);
 EXPORT_SYMBOL(RQF_MDS_IS_SUBDIR);
 
-const struct req_format RQF_MDS_RENEW_CAPA =
-        DEFINE_REQ_FMT0("MDS_RENEW_CAPA",
-                        mdt_renew_capa_client, mdt_body_capa);
-EXPORT_SYMBOL(RQF_MDS_RENEW_CAPA);
-
 #if !defined(__REQ_LAYOUT_USER__)
 
 int req_layout_init(void)
index 5b44000..a430307 100644 (file)
@@ -77,7 +77,6 @@ struct ll_rpc_opcode {
         { MDS_SETXATTR,     "mds_setxattr" },
         { MDS_WRITEPAGE,    "mds_writepage" },
         { MDS_IS_SUBDIR,    "mds_is_subdir" },
-        { MDS_RENEW_CAPA,   "mds_renew_capa" },
         { LDLM_ENQUEUE,     "ldlm_enqueue" },
         { LDLM_CONVERT,     "ldlm_convert" },
         { LDLM_CANCEL,      "ldlm_cancel" },