Whamcloud - gitweb
fixes:
authorlsy <lsy>
Thu, 25 Aug 2005 14:52:52 +0000 (14:52 +0000)
committerlsy <lsy>
Thu, 25 Aug 2005 14:52:52 +0000 (14:52 +0000)
* previous implementation doesn't support multi-open/truncate by different
  users on the same inode.
* update DLD.
* cleanup.

16 files changed:
lustre/include/linux/lustre_lib.h
lustre/include/linux/lustre_lite.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_sec.h
lustre/llite/file.c
lustre/llite/llite_capa.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/rw.c
lustre/mdc/mdc_request.c
lustre/mds/mds_capa.c
lustre/mds/mds_reint.c
lustre/obdclass/capa.c
lustre/obdfilter/filter_capa.c
lustre/osc/osc_request.c
lustre/tests/replay-single.sh

index f31dcc2..3c9f081 100644 (file)
@@ -113,7 +113,6 @@ struct obd_client_handle {
         struct lustre_handle och_fh;
         struct llog_cookie och_cookie;
         struct mdc_open_data *och_mod;
-        struct obd_capa *och_capa;
         __u32 och_magic;
 };
 #define OBD_CLIENT_HANDLE_MAGIC 0xd15ea5ed
index 3d2c640..8f8c3b0 100644 (file)
@@ -120,7 +120,7 @@ struct ll_inode_info {
 
         struct lustre_key      *lli_key_info;
 
-        struct lustre_capa     *lli_trunc_capa; /* capabiliity for truncate */
+        struct list_head        lli_capas;
 
         wait_queue_head_t       lli_dirty_wait;
 };
index 7a27223..07b957c 100644 (file)
@@ -324,8 +324,6 @@ int mdc_enqueue(struct obd_export *exp,
 int mdc_req2lustre_md(struct obd_export *exp_lmv, struct ptlrpc_request *req, 
                       unsigned int offset, struct obd_export *exp_lov, 
                       struct lustre_md *md);
-int mdc_req2lustre_capa(struct ptlrpc_request *req, unsigned int offset,
-                        struct lustre_capa **capa);
 int mdc_getstatus(struct obd_export *exp, struct lustre_id *rootid);
 int mdc_getattr(struct obd_export *exp, struct lustre_id *id,
                 __u64 valid, const char *xattr_name,
index 723f76a..aa64bea 100644 (file)
@@ -512,8 +512,14 @@ int svcsec_null_exit(void);
 #define CAPA_CACHE_SIZE 1000             /* for MDS & OST */
 #define CAPA_HMAC_ALG "sha1"
 
+/* capa ops */
+#define CAPA_READ       MAY_READ        /* 2 */
+#define CAPA_WRITE      MAY_WRITE       /* 4 */
+#define CAPA_TRUNC      8
+
 struct lustre_capa_data {
-        __u32   lc_uid;       /* uid */
+        __u32   lc_uid;       /* uid, mapped uid */
+        __u32   lc_ruid;      /* remote uid on client */
         __u32   lc_op;        /* operations allowed */
         __u64   lc_ino;       /* inode# */
         __u32   lc_mdsid;     /* mds# */
@@ -526,6 +532,7 @@ struct client_capa {
         struct inode             *inode;      /* this should be always valid
                                                * if c_refc > 0 */
         struct lustre_handle      handle;     /* handle of mds_file_data */
+        struct list_head          lli_list;   /* link to lli_capas */
 #if 0   /* TODO: support multi mount point */
         struct list_head         *list;       /* the capa list belong to this client */
         struct timer_list        *timer;      /* timer belong to this client */
@@ -599,8 +606,8 @@ struct obd_capa *capa_get(uid_t uid, int capa_op, __u64 mdsid,
                           unsigned long ino, int type,
                           struct lustre_capa *capa, struct inode *inode,
                           struct lustre_handle *handle);
-void capa_put(struct obd_capa *ocapa, int type);
-int capa_renew(struct lustre_capa *capa, int type);
+void capa_put(struct obd_capa *ocapa);
+struct obd_capa *capa_renew(struct lustre_capa *capa, int type);
 void capa_hmac(struct crypto_tfm *tfm, __u8 *key, struct lustre_capa *capa);
 void capa_dup(void *dst, struct obd_capa *ocapa);
 void capa_dup2(void *dst, struct lustre_capa *capa);
@@ -609,7 +616,7 @@ int __capa_is_to_expire(struct obd_capa *ocapa);
 int capa_is_to_expire(struct obd_capa *ocapa);
 
 #define CAPA_EXPIRY_SHIFT 10 /* 1024 sec */
-#define CAPA_EXPIRY       (1UL << PAGE_SHIFT)
+#define CAPA_EXPIRY       (1UL << CAPA_EXPIRY_SHIFT)
 #define CAPA_EXPIRY_MASK  (~(CAPA_EXPIRY-1))
 
 #define CAPA_PRE_EXPIRY_NOROUND 3       /* sec */
@@ -618,6 +625,7 @@ int capa_is_to_expire(struct obd_capa *ocapa);
 /* struct lustre_capa.lc_flags */
 #define CAPA_FL_NOROUND   0x001 /* capa expiry not rounded */
 #define CAPA_FL_REMUID    0x002 /* remote uid */
+#define CAPA_FL_TRUNC     0x004 /* truncate capa, this kind of capa won't be renewed */
 
 static inline unsigned long capa_pre_expiry(struct lustre_capa *capa)
 {
index 3ca0446..aa71245 100644 (file)
@@ -142,8 +142,6 @@ int ll_md_och_close(struct obd_export *md_exp, struct inode *inode,
         EXIT;
 out:
         mdc_clear_open_replay_data(md_exp, och);
-        if (och->och_capa)
-                capa_put(och->och_capa, CLIENT_CAPA);
         och->och_fh.cookie = DEAD_HANDLE_MAGIC;
         OBD_FREE(och, sizeof *och);
         return rc;
@@ -364,7 +362,7 @@ int ll_och_fill(struct inode *inode, struct lookup_intent *it,
                                 LUSTRE_IT(it)->it_data);
 
         if (S_ISREG(inode->i_mode) && (body->valid & OBD_MD_CAPA))
-                rc = ll_set_och_capa(inode, it, och);
+                rc = ll_set_capa(inode, it);
         RETURN(rc);
 }
 
index ee3585f..4ac1458 100644 (file)
@@ -77,16 +77,12 @@ static int ll_renew_capa(struct obd_capa *ocapa)
         struct inode *inode = ocapa->c_inode;
         struct obd_export *md_exp = ll_i2mdexp(inode);
         struct ll_inode_info *lli = ll_i2info(inode);
-        __u64 valid = 0;
+        __u64 valid = OBD_MD_CAPA;
         int rc;
         ENTRY;
 
-        valid |= OBD_MD_CAPA;
-
         rc = md_getattr(md_exp, &lli->lli_id, valid, NULL, NULL, 0,
                         0, ocapa, &req);
-        if (rc < 0)
-                CDEBUG(D_INFO, "md_getattr failed: rc = %d\n", rc);
         RETURN(rc);
 }
 
@@ -116,22 +112,28 @@ static int ll_capa_thread(void *arg)
 
         while (1) {
                 struct l_wait_info lwi = { 0 };
-                struct obd_capa *ocapa, *next = NULL;
+                struct obd_capa *ocapa, *next = NULL, tcapa;
                 unsigned long expiry, sleep = CAPA_PRE_EXPIRY;
 
                 l_wait_event(capa_thread.t_ctl_waitq,
                              (have_expired_capa() || ll_capa_check_stop()),
                              &lwi);
 
+                if (ll_capa_check_stop())
+                        break;
+
                 spin_lock(&capa_lock);
                 list_for_each_entry(ocapa, ll_capa_list, c_list) {
-                        if (__capa_is_to_expire(ocapa)) {
-                                /* get capa in case it's deleted */
-                                __capa_get(ocapa);
+                        if (ocapa->c_capa.lc_op == CAPA_TRUNC)
+                                continue;
 
+                        if (__capa_is_to_expire(ocapa)) {
+                                /* copy capa in case it's deleted */
+                                tcapa = *ocapa;
                                 spin_unlock(&capa_lock);
-                                ll_renew_capa(ocapa);
-                                capa_put(ocapa, CLIENT_CAPA);
+
+                                ll_renew_capa(&tcapa);
+
                                 spin_lock(&capa_lock);
                         } else {
                                 next = ocapa;
@@ -147,9 +149,6 @@ static int ll_capa_thread(void *arg)
                 }
                 spin_unlock(&capa_lock);
 
-                if (ll_capa_check_stop())
-                        break;
-
                 /* wait ll_renew_capa finish */
                 set_current_state(TASK_INTERRUPTIBLE);
                 schedule_timeout(sleep * HZ);
@@ -205,18 +204,17 @@ void ll_capa_thread_stop(void)
         EXIT;
 }
 
-int ll_set_och_capa(struct inode *inode, struct lookup_intent *it,
-                   struct obd_client_handle *och)
+int ll_set_capa(struct inode *inode, struct lookup_intent *it)
 {
         struct ptlrpc_request *req = LUSTRE_IT(it)->it_data;
-        struct ll_inode_info *lli = ll_i2info(inode);
         struct mds_body *body;
         struct lustre_capa *capa;
+        struct obd_capa *ocapa;
+        struct ll_inode_info *lli = ll_i2info(inode);
         __u64 mdsid = lli->lli_id.li_fid.lf_group;
         unsigned long ino = lli->lli_id.li_stc.u.e3s.l3s_ino;
         int capa_op = (it->it_flags & MAY_WRITE) ? MAY_WRITE : MAY_READ;
         unsigned long expiry;
-        int rc = 0;
         ENTRY;
 
         body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
@@ -227,10 +225,16 @@ int ll_set_och_capa(struct inode *inode, struct lookup_intent *it,
         LASSERT(capa != NULL);          /* reply already checked out */
         LASSERT_REPSWABBED(req, 7);     /* and swabbed down */
 
-        och->och_capa = capa_get(current->uid, capa_op, mdsid, ino,
-                                 CLIENT_CAPA, capa, inode, &body->handle);
-        if (!och->och_capa)
-                rc = -ENOMEM;
+        ocapa = capa_get(current->uid, capa_op, mdsid, ino, CLIENT_CAPA, capa,
+                         inode, &body->handle);
+        if (!ocapa)
+                RETURN(-ENOMEM);
+
+        spin_lock(&lli->lli_lock);
+        /* in case it was linked to lli_capas already */
+        if (list_empty(&ocapa->u.client.lli_list))
+                list_add(&ocapa->u.client.lli_list, &lli->lli_capas);
+        spin_unlock(&lli->lli_lock);
 
         expiry = expiry_to_jiffies(capa->lc_expiry - capa_pre_expiry(capa));
 
@@ -242,6 +246,38 @@ int ll_set_och_capa(struct inode *inode, struct lookup_intent *it,
         }
         spin_unlock(&capa_lock);
 
-        RETURN(rc);
+        RETURN(0);
 }
 
+int ll_set_trunc_capa(struct ptlrpc_request *req, int offset, struct inode *inode)
+{
+        struct mds_body *body;
+        struct obd_capa *ocapa;
+        struct lustre_capa *capa;
+        struct ll_inode_info *lli = ll_i2info(inode);
+
+        body = lustre_msg_buf(req->rq_repmsg, offset, sizeof(*body));
+        if (!body)
+                return -ENOMEM;
+
+        if (!(body->valid & OBD_MD_CAPA))
+                return 0;
+
+        ENTRY;
+        capa = (struct lustre_capa *)lustre_swab_repbuf(req, offset + 1,
+                                        sizeof(*capa), lustre_swab_lustre_capa);
+        if (!capa)
+                RETURN(-ENOMEM);        
+
+        ocapa = capa_renew(capa, CLIENT_CAPA);
+        if (!ocapa)
+                RETURN(-ENOMEM);
+
+        spin_lock(&lli->lli_lock);
+        /* in case it was linked to lli_capas already */
+        if (list_empty(&ocapa->u.client.lli_list))
+                list_add(&ocapa->u.client.lli_list, &lli->lli_capas);
+        spin_unlock(&lli->lli_lock);
+
+        RETURN(0);
+}
index 9ed882d..bff927c 100644 (file)
@@ -112,9 +112,11 @@ struct ll_sb_info {
         char                      ll_gns_oname[PATH_MAX];
         void                      *ll_crypto_info;
 
+#if 0
         /* TODO: to support multi mount for capability */
         struct list_head          ll_capa_list;
         struct timer_list         ll_capa_timer;
+#endif
 };
 
 struct ll_gns_ctl {
@@ -359,8 +361,9 @@ int ll_capa_thread_start(void);
 void ll_capa_thread_stop(void);
 
 void ll_capa_timer_callback(unsigned long unused);
-int ll_set_och_capa(struct inode *inode, struct lookup_intent *it,
-                    struct obd_client_handle *och);
+int ll_set_capa(struct inode *inode, struct lookup_intent *it);
+int ll_set_trunc_capa(struct ptlrpc_request *req, int offset,
+                      struct inode *inode);
 
 /* llite/dcache.c */
 void ll_intent_drop_lock(struct lookup_intent *);
index 99903ff..fc68447 100644 (file)
@@ -584,6 +584,7 @@ void ll_lli_init(struct ll_inode_info *lli)
         lli->lli_key_info = NULL;
         init_waitqueue_head(&lli->lli_dirty_wait);
         lli->lli_io_epoch = 0;
+        INIT_LIST_HEAD(&lli->lli_capas);
 }
 
 int ll_fill_super(struct super_block *sb, void *data, int silent)
@@ -1145,6 +1146,7 @@ void ll_clear_inode(struct inode *inode)
         struct lustre_id id;
         struct ll_inode_info *lli = ll_i2info(inode);
         struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct obd_capa *ocapa, *tmp;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
@@ -1198,10 +1200,8 @@ void ll_clear_inode(struct inode *inode)
                 lli->lli_remote_acl = NULL;
         }
 
-        if (lli->lli_trunc_capa) {
-                OBD_FREE(lli->lli_trunc_capa, sizeof(struct lustre_capa));
-                lli->lli_trunc_capa = NULL;
-        }
+        list_for_each_entry_safe(ocapa, tmp, &lli->lli_capas, u.client.lli_list)
+                capa_put(ocapa);
 
         lli->lli_inode_magic = LLI_INODE_DEAD;
         EXIT;
@@ -1227,7 +1227,6 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ptlrpc_request *request = NULL;
         struct mdc_op_data *op_data;
-        struct lustre_capa *trunc_capa = NULL;
         int ia_valid = attr->ia_valid;
         int err, rc = 0;
         ENTRY;
@@ -1314,22 +1313,11 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
                 }
 
                 if (attr->ia_valid & ATTR_SIZE) {
-                        /* XXX: hack for truncate capa */
-                        rc = mdc_req2lustre_capa(request, 0, &trunc_capa);
+                        rc = ll_set_trunc_capa(request, 0, inode);
                         if (rc) {
                                 ptlrpc_req_finished(request);
                                 RETURN(rc);
                         }
-
-                        spin_lock(&lli->lli_lock);
-                        if (trunc_capa) {
-                                if (lli->lli_trunc_capa)
-                                        OBD_FREE(lli->lli_trunc_capa,
-                                                 sizeof(*trunc_capa));
-                                lli->lli_trunc_capa = trunc_capa;
-                                DEBUG_CAPA(D_INFO, trunc_capa, "truncate");
-                        }
-                        spin_unlock(&lli->lli_lock);
                 }
 
                 /* We call inode_setattr to adjust timestamps, but we first
index 7233a33..be3a2ab 100644 (file)
@@ -117,6 +117,8 @@ void ll_truncate(struct inode *inode)
 {
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         struct ll_inode_info *lli = ll_i2info(inode);
+        struct obd_capa *ocapa;
+        struct lustre_capa *capa = NULL;
         struct obdo *oa = NULL;
         int rc;
         ENTRY;
@@ -165,9 +167,14 @@ void ll_truncate(struct inode *inode)
 
         lli->lli_size_pid = 0;
         up(&lli->lli_size_sem);
+
+        ocapa = capa_get(current->fsuid, CAPA_TRUNC, id_group(&lli->lli_id),
+                         id_ino(&lli->lli_id), CLIENT_CAPA, NULL, NULL, NULL);
+        if (ocapa)
+                capa = &ocapa->c_capa;
         
         rc = obd_punch(ll_i2dtexp(inode), oa, lsm, inode->i_size,
-                       OBD_OBJECT_EOF, NULL, lli->lli_trunc_capa);
+                       OBD_OBJECT_EOF, NULL, capa);
         if (rc)
                 CERROR("obd_truncate fails (%d) ino %lu\n", rc, inode->i_ino);
         else
index 55a447b..24a9920 100644 (file)
@@ -106,6 +106,7 @@ int
 mdc_interpret_getattr(struct ptlrpc_request *req, void *unused, int rc)
 {
         struct mds_body *body = NULL;
+        struct obd_capa *ocapa;
         struct lustre_capa *capa = NULL;
         unsigned long expiry;
         ENTRY;
@@ -134,9 +135,9 @@ mdc_interpret_getattr(struct ptlrpc_request *req, void *unused, int rc)
                 RETURN(-EPROTO);
         }
 
-        rc = capa_renew(capa, CLIENT_CAPA);
-        if (rc)
-                RETURN(rc);
+        ocapa = capa_renew(capa, CLIENT_CAPA);
+        if (!ocapa)
+                RETURN(-ENOENT);
 
         spin_lock(&capa_lock);
         expiry = expiry_to_jiffies(capa->lc_expiry - capa_pre_expiry(capa));
@@ -411,34 +412,6 @@ int mdc_store_inode_generation(struct obd_export *exp,
         return 0;
 }
 
-int mdc_req2lustre_capa(struct ptlrpc_request *req, unsigned int offset,
-                        struct lustre_capa **capa)
-{
-        struct mds_body *body;
-        struct lustre_capa *lcapa;
-
-        body = lustre_msg_buf(req->rq_repmsg, offset, sizeof(*body));
-        if (!body)
-                RETURN(-ENOMEM);
-
-        if (!(body->valid & OBD_MD_CAPA)) {
-                *capa = NULL;
-                RETURN(0);
-        }
-
-        lcapa = (struct lustre_capa *)lustre_swab_repbuf(req, offset + 1,
-                                        sizeof(*capa), lustre_swab_lustre_capa);
-        if (!lcapa)
-                RETURN(-ENOMEM);
-                
-        OBD_ALLOC(*capa, sizeof(**capa));
-        if (!*capa)
-                RETURN(-ENOMEM);
-        memcpy(*capa, lcapa, sizeof(**capa));
-
-        RETURN(0);
-}
-
 static
 int mdc_unpack_acl(struct obd_export *exp_lmv, struct ptlrpc_request *req, 
                    unsigned int offset, struct lustre_md *md)
@@ -1793,7 +1766,6 @@ MODULE_DESCRIPTION("Lustre Metadata Client");
 MODULE_LICENSE("GPL");
 
 EXPORT_SYMBOL(mdc_req2lustre_md);
-EXPORT_SYMBOL(mdc_req2lustre_capa);
 EXPORT_SYMBOL(mdc_change_cbdata);
 EXPORT_SYMBOL(mdc_getstatus);
 EXPORT_SYMBOL(mdc_getattr);
index 23e349f..6f53219 100644 (file)
@@ -520,10 +520,10 @@ int mds_pack_capa(struct obd_device *obd, struct mds_export_data *med,
                 expired = capa_is_to_expire(ocapa);
                 if (!expired) {
                         capa_dup(capa, ocapa);
-                        capa_put(ocapa, MDS_CAPA);
+                        capa_put(ocapa);
                         GOTO(out, rc);
                 }
-                capa_put(ocapa, MDS_CAPA);
+                capa_put(ocapa);
         }
 
         memcpy(capa, req_capa, sizeof(*capa));
@@ -539,7 +539,9 @@ int mds_pack_capa(struct obd_device *obd, struct mds_export_data *med,
 
         capa_hmac(mds->mds_capa_hmac, key, capa);
 
-        rc = capa_renew(capa, MDS_CAPA);
+        ocapa = capa_renew(capa, MDS_CAPA);
+        if (!ocapa)
+                rc = -ENOMEM;
 out:
         if (rc == 0)
                 body->valid |= OBD_MD_CAPA;
index 8fafdfa..5bf399e 100644 (file)
@@ -684,7 +684,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         if (do_trunc) {
                 struct lustre_capa capa = {
                         .lc_uid   = rec->ur_uc.luc_uid,
-                        .lc_op    = MAY_WRITE,
+                        .lc_op    = CAPA_TRUNC,
                         .lc_ino   = inode->i_ino,
                         .lc_mdsid = mds->mds_num,
                 };
index 08dccb6..be92e79 100644 (file)
@@ -120,7 +120,8 @@ find_capa(struct hlist_head *head, uid_t uid, int capa_op, __u64 mdsid,
 
 inline void __capa_get(struct obd_capa *ocapa)
 {
-        atomic_inc(&ocapa->c_refc);
+        if (ocapa->c_type != CLIENT_CAPA)
+                atomic_inc(&ocapa->c_refc);
 }
 
 static struct obd_capa *
@@ -151,6 +152,13 @@ static struct obd_capa *alloc_capa(void)
         return ocapa;
 }
 
+static void __capa_put(struct obd_capa *ocapa)
+{
+        hlist_del_init(&ocapa->c_hash);
+        list_del_init(&ocapa->c_list);
+        capa_count[ocapa->c_type]--;
+}
+
 static void destroy_capa(struct obd_capa *ocapa)
 {
         OBD_SLAB_FREE(ocapa, capa_cachep, sizeof(*ocapa));
@@ -183,9 +191,9 @@ void capa_cache_cleanup(void)
 
         hlist_for_each_entry_safe(ocapa, pos, n, capa_hash, c_hash) {
                 LASSERT(ocapa->c_type != CLIENT_CAPA);
-                hlist_del(&ocapa->c_hash);
-                list_del(&ocapa->c_list);
-                OBD_FREE(ocapa, sizeof(*ocapa));
+//                        list_del_init(&ocapa->u.client.lli_list);
+                __capa_put(ocapa);
+                destroy_capa(ocapa);
         }
 
         OBD_FREE(capa_hash, PAGE_SIZE);
@@ -234,14 +242,11 @@ get_new_capa_locked(struct hlist_head *head, int type, struct lustre_capa *capa,
                 do_update_capa(ocapa, capa);
                 ocapa->c_type = type;
 
-                if (type == CLIENT_CAPA) {
-                        LASSERT(inode);
+                if (type == CLIENT_CAPA && inode) {
                         LASSERT(handle);
-#ifdef __KERNEL__
-                        igrab(inode);
-#endif
                         ocapa->c_inode = inode;
                         memcpy(&ocapa->c_handle, handle, sizeof(*handle));
+                        INIT_LIST_HEAD(&ocapa->u.client.lli_list);
                 }
 
                 DEBUG_CAPA(D_CACHE, &ocapa->c_capa, "new");
@@ -250,8 +255,6 @@ get_new_capa_locked(struct hlist_head *head, int type, struct lustre_capa *capa,
                 hlist_add_head(&ocapa->c_hash, capa_hash);
                 capa_count[type]++;
 
-                __capa_get(ocapa);
-
                 if (type != CLIENT_CAPA && capa_count[type] > CAPA_CACHE_SIZE) {
                         struct list_head *node = capa_list[type].next;
                         struct obd_capa *tcapa;
@@ -263,10 +266,8 @@ get_new_capa_locked(struct hlist_head *head, int type, struct lustre_capa *capa,
                                 node = node->next;
                                 if (atomic_read(&tcapa->c_refc) > 0)
                                         continue;
-                                list_del(&tcapa->c_list);
-                                hlist_del(&tcapa->c_hash);
+                                __capa_put(tcapa);
                                 destroy_capa(tcapa);
-                                capa_count[type]--;
                                 count++;
                         }
                 }
@@ -274,18 +275,16 @@ get_new_capa_locked(struct hlist_head *head, int type, struct lustre_capa *capa,
                 spin_unlock(&capa_lock);
                 return ocapa;
         }
-
-        __capa_get(old);
         spin_unlock(&capa_lock);
 
         destroy_capa(ocapa);
         return old;
 }
 
-static struct obd_capa *
-capa_get_locked(uid_t uid, int capa_op,__u64 mdsid, unsigned long ino,
-                int type, struct lustre_capa *capa, struct inode *inode,
-                struct lustre_handle *handle)
+struct obd_capa *
+capa_get(uid_t uid, int capa_op,__u64 mdsid, unsigned long ino,
+         int type, struct lustre_capa *capa, struct inode *inode,
+         struct lustre_handle *handle)
 {
         struct hlist_head *head = capa_hash +
                                   capa_hashfn(uid, capa_op, mdsid, ino);
@@ -295,48 +294,32 @@ capa_get_locked(uid_t uid, int capa_op,__u64 mdsid, unsigned long ino,
         if (ocapa)
                 return ocapa;
         
-        if (capa)
+        if (capa) {
                 ocapa = get_new_capa_locked(head, type, capa, inode, handle);
+                if (ocapa)
+                        __capa_get(ocapa);
+        }
         return ocapa;
 }
 
-struct obd_capa *
-capa_get(uid_t uid, int capa_op, __u64 mdsid, unsigned long ino, int type,
-         struct lustre_capa *capa, struct inode *inode,
-         struct lustre_handle *handle)
-{
-        return capa_get_locked(uid, capa_op, mdsid, ino, type, capa, inode,
-                               handle);
-}
-
-static void __capa_put(struct obd_capa *ocapa, int type)
+void capa_put(struct obd_capa *ocapa)
 {
-        hlist_del_init(&ocapa->c_hash);
-        list_del_init(&ocapa->c_list);
-        capa_count[type]--;
-}
-
-void capa_put(struct obd_capa *ocapa, int type)
-{
-        ENTRY;
+        if (!ocapa)
+                return;
 
-        if (ocapa) {
-                if (atomic_dec_and_lock(&ocapa->c_refc, &capa_lock)) {
-                        if (type == CLIENT_CAPA) {
-#ifdef __KERNEL__
-                                iput(ocapa->c_inode);
-#endif
-                                __capa_put(ocapa, type);
-                                destroy_capa(ocapa);
-                        }
-                        spin_unlock(&capa_lock);
-                }
+        DEBUG_CAPA(D_CACHE, &ocapa->c_capa, "put");
+        spin_lock(&capa_lock);
+        if (ocapa->c_type == CLIENT_CAPA) {
+                list_del_init(&ocapa->u.client.lli_list);
+                __capa_put(ocapa);
+                destroy_capa(ocapa);
+        } else {
+                atomic_dec(&ocapa->c_refc);
         }
-
-        EXIT;
+        spin_unlock(&capa_lock);
 }
 
-static int update_capa_locked(struct lustre_capa *capa, int type)
+static struct obd_capa *update_capa_locked(struct lustre_capa *capa, int type)
 {
         uid_t uid = capa->lc_uid;
         int capa_op = capa->lc_op;
@@ -345,7 +328,6 @@ static int update_capa_locked(struct lustre_capa *capa, int type)
         struct hlist_head *head = capa_hash +
                                   capa_hashfn(uid, capa_op, mdsid, ino);
         struct obd_capa *ocapa;
-        ENTRY;
 
         spin_lock(&capa_lock);
         ocapa = find_capa(head, uid, capa_op, mdsid, ino, type);
@@ -353,15 +335,17 @@ static int update_capa_locked(struct lustre_capa *capa, int type)
                 do_update_capa(ocapa, capa);
         spin_unlock(&capa_lock);
 
-        if (ocapa == NULL && type == MDS_CAPA) {
+        if (ocapa)
+                return ocapa;
+
+        if (type == MDS_CAPA ||
+            (type == CLIENT_CAPA && capa->lc_op == CAPA_TRUNC))
                 ocapa = get_new_capa_locked(head, type, capa, NULL, NULL);
-                capa_put(ocapa, type);
-        }
 
-        RETURN(ocapa ? 0 : -ENOENT);
+        return ocapa;
 }
 
-int capa_renew(struct lustre_capa *capa, int type)
+struct obd_capa *capa_renew(struct lustre_capa *capa, int type)
 {
         DEBUG_CAPA(D_INFO, capa, "renew");
 
index 1b3c718..0998bcc 100644 (file)
@@ -230,9 +230,9 @@ filter_verify_capa(int cmd, struct obd_export *exp, struct lustre_capa *capa)
         if (blacklist_check(capa->lc_uid))
                 RETURN(-EACCES);
 
-        if (cmd == OBD_BRW_WRITE && capa->lc_op != MAY_WRITE)
+        if (cmd == OBD_BRW_WRITE && !(capa->lc_op & (CAPA_WRITE | CAPA_TRUNC)))
                 RETURN(-EACCES);
-        if (cmd == OBD_BRW_READ && !(capa->lc_op & (MAY_WRITE | MAY_READ)))
+        if (cmd == OBD_BRW_READ && !(capa->lc_op & (CAPA_WRITE | CAPA_READ)))
                 RETURN(-EACCES);
 
         if (OBD_FAIL_CHECK(OBD_FAIL_FILTER_VERIFY_CAPA))
@@ -255,13 +255,13 @@ verify:
                                     sizeof(capa->lc_hmac));
                 } else {
                         /* ocapa is obsolete */
-                        capa_put(ocapa, FILTER_CAPA);
+                        capa_put(ocapa);
                         spin_unlock(&filter->fo_capa_lock);
                         goto new_capa;
                 }
                 spin_unlock(&filter->fo_capa_lock);
 
-                capa_put(ocapa, FILTER_CAPA);
+                capa_put(ocapa);
                 RETURN(rc ? -EACCES : 0);
         }
 
index 180e7f4..f0fdeca 100644 (file)
@@ -808,6 +808,8 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
                 if (!can_merge_pages(&pga[i - 1], &pga[i]))
                         niocount++;
 
+        /* TODO: this could be optimized: thie capability can be
+         * found from ll_inode_info->lli_capas. */
         capa_op = (opc == OST_WRITE) ? MAY_WRITE : MAY_READ;
 get_capa:
         ocapa = capa_get(oa->o_fsuid, capa_op, raw_id->li_fid.lf_group,
@@ -844,23 +846,19 @@ get_capa:
 
         bufcnt = 0;
         body = lustre_msg_buf(req->rq_reqmsg, bufcnt++, sizeof(*body));
+        memcpy(&body->oa, oa, sizeof(*oa));
         ioobj = lustre_msg_buf(req->rq_reqmsg, bufcnt++, sizeof(*ioobj));
-        if (ocapa)
+        if (ocapa) {
                 capa = lustre_msg_buf(req->rq_reqmsg, bufcnt++, sizeof(*capa));
+                capa_dup(capa, ocapa);
+                body->oa.o_valid |= OBD_MD_CAPA;
+        }
         niobuf = lustre_msg_buf(req->rq_reqmsg, bufcnt++,
                                 niocount * sizeof(*niobuf));
 
-        memcpy(&body->oa, oa, sizeof(*oa));
-
         obdo_to_ioobj(oa, ioobj);
         ioobj->ioo_bufcnt = niocount;
 
-        if (ocapa) {
-                capa_dup(capa, ocapa);
-                body->oa.o_valid |= OBD_MD_CAPA;
-                capa_put(ocapa, CLIENT_CAPA);
-        }
-
         LASSERT (page_count > 0);
 
         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
index 742c114..a6a3514 100755 (executable)
@@ -1314,5 +1314,5 @@ test_57() {
 run_test 57 "open orphan in reconstruct_open()"
 
 equals_msg test complete, cleaning up
-$CLEANUP
+#$CLEANUP