Whamcloud - gitweb
LU-3669 xattr: separate ACL and XATTR caches 08/7208/15
authorNathaniel Clark <nathaniel.l.clark@intel.com>
Wed, 4 Sep 2013 18:10:48 +0000 (14:10 -0400)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 10 Sep 2013 21:51:23 +0000 (21:51 +0000)
This patch separates ACL and XATTR caches, so that
when updating an ACL only LOOKUP lock is needed and
when updating another XATTR only XATTR lock is needed.

This patch also reverts XATTR cache support for setxattr
because client performing REINT under even PR lock
can deadlock if an active server operation (like unlink)
attempts to cancel all locks, and setxattr has to wait
for it (MDC max-in-flight is 1).

This patch also drops mot_xattr_sem because there is
no use case for FLXATTRLS with locking.

This patch also takes into account that osd_xattr_list
may be changing lu_buf, so it does not assume that
the buf is unchanged after the call.

This patch disables the r/o cache if the data is
unreasonably large (larger than maximum single EA
size).

Signed-off-by: Andrew Perepechko <andrew_perepechko@xyratex.com>
Change-Id: I53f63597d9ecc85bcca0d2b35cda48762b2187bb
Signed-off-by: Nathaniel Clark <nathaniel.l.clark@intel.com>
Reviewed-on: http://review.whamcloud.com/7208
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
15 files changed:
lustre/include/lustre/lustre_idl.h
lustre/ldlm/ldlm_lock.c
lustre/llite/llite_internal.h
lustre/llite/xattr.c
lustre/llite/xattr_cache.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_xattr.c
lustre/ptlrpc/layout.c
lustre/tests/sanity.sh

index 2d05bea..45fe409 100644 (file)
@@ -1743,7 +1743,6 @@ static inline __u32 lov_mds_md_size(__u16 stripes, __u32 lmm_magic)
                           OBD_MD_FLGID   | OBD_MD_FLFLAGS | OBD_MD_FLNLINK | \
                           OBD_MD_FLGENER | OBD_MD_FLRDEV  | OBD_MD_FLGROUP)
 
-#define OBD_MD_FLXATTRLOCKED OBD_MD_FLGETATTRLOCK
 #define OBD_MD_FLXATTRALL (OBD_MD_FLXATTR | OBD_MD_FLXATTRLS)
 
 /* don't forget obdo_fid which is way down at the bottom so it can
index 348549b..a0238b7 100644 (file)
@@ -149,8 +149,6 @@ char *ldlm_it2str(int it)
                 return "getxattr";
         case IT_LAYOUT:
                 return "layout";
-       case IT_SETXATTR:
-               return "setxattr";
         default:
                 CERROR("Unknown intent %d\n", it);
                 return "UNKNOWN";
index f5b01ae..af73c05 100644 (file)
@@ -294,13 +294,6 @@ int ll_xattr_cache_get(struct inode *inode,
                        size_t size,
                        __u64 valid);
 
-int ll_xattr_cache_update(struct inode *inode,
-                       const char *name,
-                       const char *newval,
-                       size_t size,
-                       __u64 valid,
-                       int flags);
-
 /*
  * Locking to guarantee consistency of non-atomic updates to long long i_size,
  * consistency between file size and KMS.
index c6b9cfb..56decef 100644 (file)
@@ -183,17 +183,11 @@ int ll_setxattr_common(struct inode *inode, const char *name,
                 valid |= rce_ops2valid(rce->rce_ops);
         }
 #endif
-       if (sbi->ll_xattr_cache_enabled &&
-           (rce == NULL || rce->rce_ops == RMT_LSETFACL)) {
-               rc = ll_xattr_cache_update(inode, name, pv, size, valid, flags);
-       } else {
-               oc = ll_mdscapa_get(inode);
-               rc = md_setxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
-                               valid, name, pv, size, 0, flags,
-                               ll_i2suppgid(inode), &req);
-               capa_put(oc);
-       }
-
+       oc = ll_mdscapa_get(inode);
+       rc = md_setxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
+                       valid, name, pv, size, 0, flags,
+                       ll_i2suppgid(inode), &req);
+       capa_put(oc);
 #ifdef CONFIG_FS_POSIX_ACL
         if (new_value != NULL)
                 lustre_posix_acl_xattr_free(new_value, size);
@@ -292,6 +286,7 @@ int ll_getxattr_common(struct inode *inode, const char *name,
         void *xdata;
         struct obd_capa *oc;
         struct rmtacl_ctl_entry *rce = NULL;
+       struct ll_inode_info *lli = ll_i2info(inode);
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n",
@@ -340,7 +335,7 @@ int ll_getxattr_common(struct inode *inode, const char *name,
          */
         if (xattr_type == XATTR_ACL_ACCESS_T &&
             !(sbi->ll_flags & LL_SBI_RMT_CLIENT)) {
-                struct ll_inode_info *lli = ll_i2info(inode);
+
                 struct posix_acl *acl;
 
                spin_lock(&lli->lli_lock);
@@ -359,13 +354,27 @@ int ll_getxattr_common(struct inode *inode, const char *name,
 #endif
 
 do_getxattr:
-       if (sbi->ll_xattr_cache_enabled && (rce == NULL ||
-                                           rce->rce_ops == RMT_LGETFACL ||
-                                           rce->rce_ops == RMT_LSETFACL)) {
+       if (sbi->ll_xattr_cache_enabled && xattr_type != XATTR_ACL_ACCESS_T) {
                rc = ll_xattr_cache_get(inode, name, buffer, size, valid);
+               if (rc == -EAGAIN)
+                       goto getxattr_nocache;
                if (rc < 0)
                        GOTO(out_xattr, rc);
+
+               /* Add "system.posix_acl_access" to the list */
+               if (lli->lli_posix_acl != NULL && valid & OBD_MD_FLXATTRLS) {
+                       if (size == 0) {
+                               rc += sizeof(XATTR_NAME_ACL_ACCESS);
+                       } else if (size - rc >= sizeof(XATTR_NAME_ACL_ACCESS)) {
+                               memcpy(buffer + rc, XATTR_NAME_ACL_ACCESS,
+                                      sizeof(XATTR_NAME_ACL_ACCESS));
+                               rc += sizeof(XATTR_NAME_ACL_ACCESS);
+                       } else {
+                               GOTO(out_xattr, rc = -ERANGE);
+                       }
+               }
        } else {
+getxattr_nocache:
                oc = ll_mdscapa_get(inode);
                rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
                                valid | (rce ? rce_ops2valid(rce->rce_ops) : 0),
index 9df1bfa..de5d8e2 100644 (file)
@@ -121,13 +121,13 @@ static int ll_xattr_cache_find(struct list_head *cache,
 }
 
 /**
- * This adds or updates an xattr.
+ * This adds an xattr.
  *
  * Add @xattr_name attr with @xattr_val value and @xattr_val_len length,
- * if the attribute already exists, then update its value.
  *
  * \retval 0       success
  * \retval -ENOMEM if no memory could be allocated for the cached attr
+ * \retval -EPROTO if duplicate xattr is being added
  */
 static int ll_xattr_cache_add(struct list_head *cache,
                              const char *xattr_name,
@@ -139,28 +139,8 @@ static int ll_xattr_cache_add(struct list_head *cache,
        ENTRY;
 
        if (ll_xattr_cache_find(cache, xattr_name, &xattr) == 0) {
-               /* Found a cached EA, update it */
-
-               if (xattr_val_len != xattr->xe_vallen) {
-                       char *val;
-                       OBD_ALLOC(val, xattr_val_len);
-                       if (val == NULL) {
-                               CDEBUG(D_CACHE, "failed to allocate %u bytes "
-                                               "for xattr %s update\n",
-                                               xattr_val_len,
-                                               xattr_name);
-                               RETURN(-ENOMEM);
-                       }
-                       OBD_FREE(xattr->xe_value, xattr->xe_vallen);
-                       xattr->xe_value = val;
-                       xattr->xe_vallen = xattr_val_len;
-               }
-               memcpy(xattr->xe_value, xattr_val, xattr_val_len);
-
-               CDEBUG(D_CACHE, "update: [%s]=%.*s\n", xattr_name,
-                       xattr_val_len, xattr_val);
-
-               RETURN(0);
+               CDEBUG(D_CACHE, "duplicate xattr: [%s]\n", xattr_name);
+               RETURN(-EPROTO);
        }
 
        OBD_SLAB_ALLOC_PTR_GFP(xattr, xattr_kmem, __GFP_IO);
@@ -316,7 +296,7 @@ int ll_xattr_cache_destroy(struct inode *inode)
 }
 
 /**
- * Match or enqueue a PR or PW LDLM lock.
+ * Match or enqueue a PR lock.
  *
  * Find or request an LDLM lock with xattr data.
  * Since LDLM does not provide API for atomic match_or_enqueue,
@@ -346,9 +326,7 @@ static int ll_xattr_find_get_lock(struct inode *inode,
 
        mutex_lock(&lli->lli_xattrs_enq_lock);
        /* Try matching first. */
-       mode = ll_take_md_lock(inode, MDS_INODELOCK_XATTR, &lockh, 0,
-                              oit->it_op == IT_SETXATTR ? LCK_PW :
-                                                          (LCK_PR | LCK_PW));
+       mode = ll_take_md_lock(inode, MDS_INODELOCK_XATTR, &lockh, 0, LCK_PR);
        if (mode != 0) {
                /* fake oit in mdc_revalidate_lock() manner */
                oit->d.lustre.it_lock_handle = lockh.cookie;
@@ -364,13 +342,7 @@ static int ll_xattr_find_get_lock(struct inode *inode,
                RETURN(PTR_ERR(op_data));
        }
 
-       op_data->op_valid = OBD_MD_FLXATTR | OBD_MD_FLXATTRLS |
-                           OBD_MD_FLXATTRLOCKED;
-#ifdef CONFIG_FS_POSIX_ACL
-       /* If working with ACLs, we would like to cache local ACLs */
-       if (sbi->ll_flags & LL_SBI_RMT_CLIENT)
-               op_data->op_valid |= OBD_MD_FLRMTLGETFACL;
-#endif
+       op_data->op_valid = OBD_MD_FLXATTR | OBD_MD_FLXATTRLS;
 
        rc = md_enqueue(exp, &einfo, oit, op_data, &lockh, NULL, 0, NULL, 0);
        ll_finish_md_op_data(op_data);
@@ -432,7 +404,11 @@ static int ll_xattr_cache_refill(struct inode *inode, struct lookup_intent *oit)
        if (oit->d.lustre.it_status < 0) {
                CDEBUG(D_CACHE, "getxattr intent returned %d for fid "DFID"\n",
                       oit->d.lustre.it_status, PFID(ll_inode2fid(inode)));
-               GOTO(out_destroy, rc = oit->d.lustre.it_status);
+               rc = oit->d.lustre.it_status;
+               /* xattr data is so large that we don't want to cache it */
+               if (rc == -ERANGE)
+                       rc = -EAGAIN;
+               GOTO(out_destroy, rc);
        }
 
        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
@@ -470,6 +446,11 @@ static int ll_xattr_cache_refill(struct inode *inode, struct lookup_intent *oit)
                        rc = -EPROTO;
                } else if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_XATTR_ENOMEM)) {
                        rc = -ENOMEM;
+               } else if (!strcmp(xdata, XATTR_NAME_ACL_ACCESS)) {
+                       /* Filter out ACL ACCESS since it's cached separately */
+                       CDEBUG(D_CACHE, "not caching %s\n",
+                              XATTR_NAME_ACL_ACCESS);
+                       rc = 0;
                } else {
                        rc = ll_xattr_cache_add(&lli->lli_xattrs, xdata, xval,
                                                *xsizes);
@@ -490,9 +471,8 @@ static int ll_xattr_cache_refill(struct inode *inode, struct lookup_intent *oit)
 
        GOTO(out_maybe_drop, rc);
 out_maybe_drop:
-       /* drop lock on error or getxattr */
-       if (rc != 0 || oit->it_op != IT_SETXATTR)
-               ll_intent_drop_lock(oit);
+
+       ll_intent_drop_lock(oit);
 
        if (rc != 0)
                up_write(&lli->lli_xattrs_list_rwsem);
@@ -577,64 +557,3 @@ out:
        return rc;
 }
 
-
-/**
- * Set/update an xattr value or remove xattr using the write-through cache.
- *
- * Set/update the xattr value (if @valid has OBD_MD_FLXATTR) of @name to @newval
- * or
- * remove the xattr @name (@valid has OBD_MD_FLXATTRRM set) from @inode.
- * @flags is either XATTR_CREATE or XATTR_REPLACE as defined by setxattr(2)
- *
- * \retval 0        no error occured
- * \retval -EPROTO  network protocol error
- * \retval -ENOMEM  not enough memory for the cache
- * \retval -ERANGE  the buffer is not large enough
- * \retval -ENODATA no such attr (in the removal case)
- */
-int ll_xattr_cache_update(struct inode *inode,
-                       const char *name,
-                       const char *newval,
-                       size_t size,
-                       __u64 valid,
-                       int flags)
-{
-       struct lookup_intent oit = { .it_op = IT_SETXATTR };
-       struct ll_sb_info *sbi = ll_i2sbi(inode);
-       struct ptlrpc_request *req = NULL;
-       struct ll_inode_info *lli = ll_i2info(inode);
-       struct obd_capa *oc;
-       int rc;
-
-       ENTRY;
-
-       LASSERT(!!(valid & OBD_MD_FLXATTR) ^ !!(valid & OBD_MD_FLXATTRRM));
-
-       rc = ll_xattr_cache_refill(inode, &oit);
-       if (rc)
-               RETURN(rc);
-
-       oc = ll_mdscapa_get(inode);
-       rc = md_setxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
-                       valid | OBD_MD_FLXATTRLOCKED, name, newval,
-                       size, 0, flags, ll_i2suppgid(inode), &req);
-       capa_put(oc);
-
-       if (rc) {
-               ll_intent_drop_lock(&oit);
-               GOTO(out, rc);
-       }
-
-       if (valid & OBD_MD_FLXATTR)
-               rc = ll_xattr_cache_add(&lli->lli_xattrs, name, newval, size);
-       else if (valid & OBD_MD_FLXATTRRM)
-               rc = ll_xattr_cache_del(&lli->lli_xattrs, name);
-
-       ll_intent_drop_lock(&oit);
-       GOTO(out, rc);
-out:
-       up_write(&lli->lli_xattrs_list_rwsem);
-       ptlrpc_req_finished(req);
-
-       return rc;
-}
index 5d98eb2..7a807d7 100644 (file)
@@ -101,7 +101,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                 struct lustre_handle *lockh, void *lmm, int lmmsize,
                struct ptlrpc_request **req, __u64 extra_lock_flags);
 
-int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid,
+int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
                             cfs_list_t *cancels, ldlm_mode_t mode,
                             __u64 bits);
 /* mdc/mdc_request.c */
index abc2b25..65224af 100644 (file)
@@ -392,13 +392,6 @@ mdc_intent_getxattr_pack(struct obd_export *exp,
 
        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
 
-       if (it->it_op == IT_SETXATTR)
-               /* If we want to upgrade to LCK_PW, let's cancel LCK_PR
-                * locks now. This avoids unnecessary ASTs. */
-               count = mdc_resource_get_unused(exp, &op_data->op_fid1,
-                                               &cancels, LCK_PW,
-                                               MDS_INODELOCK_XATTR);
-
        rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
        if (rc) {
                ptlrpc_request_free(req);
@@ -862,7 +855,7 @@ resend:
                        RETURN(-EOPNOTSUPP);
                req = mdc_intent_layout_pack(exp, it, op_data);
                lvb_type = LVB_T_LAYOUT;
-       } else if (it->it_op & (IT_GETXATTR | IT_SETXATTR)) {
+       } else if (it->it_op & IT_GETXATTR) {
                req = mdc_intent_getxattr_pack(exp, it, op_data);
        } else {
                 LBUG();
index 6358ff7..b28b332 100644 (file)
@@ -70,7 +70,7 @@ static int mdc_reint(struct ptlrpc_request *request,
 /* Find and cancel locally locks matched by inode @bits & @mode in the resource
  * found by @fid. Found locks are added into @cancel list. Returns the amount of
  * locks added to @cancels list. */
-int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid,
+int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
                             cfs_list_t *cancels, ldlm_mode_t mode,
                             __u64 bits)
 {
index 26b75ad..8a0073c 100644 (file)
@@ -368,11 +368,33 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
                                      input_size);
         }
 
-        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, opcode);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
+       /* Flush local XATTR locks to get rid of a possible cancel RPC */
+       if (opcode == MDS_REINT && fid_is_sane(fid) &&
+           exp->exp_connect_data.ocd_ibits_known & MDS_INODELOCK_XATTR) {
+               CFS_LIST_HEAD(cancels);
+               int count;
+
+               /* Without that packing would fail */
+               if (input_size == 0)
+                       req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
+                                            RCL_CLIENT, 0);
+
+               count = mdc_resource_get_unused(exp, fid,
+                                               &cancels, LCK_EX,
+                                               MDS_INODELOCK_XATTR);
+
+               rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
+               if (rc) {
+                       ptlrpc_request_free(req);
+                       RETURN(rc);
+               }
+       } else {
+               rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, opcode);
+               if (rc) {
+                       ptlrpc_request_free(req);
+                       RETURN(rc);
+               }
+       }
 
         if (opcode == MDS_REINT) {
                 struct mdt_rec_setxattr *rec;
index 0fc4a34..dd37805 100644 (file)
@@ -1395,11 +1395,11 @@ relock:
                         if (unlikely(rc != 0))
                                 GOTO(out_child, rc);
 
-                        /* If the file has not been changed for some time, we
-                         * return not only a LOOKUP lock, but also an UPDATE
-                         * lock and this might save us RPC on later STAT. For
-                         * directories, it also let negative dentry starts
-                         * working for this dir. */
+                       /* If the file has not been changed for some time, we
+                        * return not only a LOOKUP lock, but also an UPDATE
+                        * lock and this might save us RPC on later STAT. For
+                        * directories, it also let negative dentry cache start
+                        * working for this dir. */
                         if (ma->ma_valid & MA_INODE &&
                             ma->ma_attr.la_valid & LA_CTIME &&
                             info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
@@ -4092,10 +4092,10 @@ static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
                        rep->lock_policy_res2 =
                                ptlrpc_status_hton(rep->lock_policy_res2);
                }
-        } else {
-                rc = -EOPNOTSUPP;
-        }
-        RETURN(rc);
+       } else {
+               rc = -EPROTO;
+       }
+       RETURN(rc);
 }
 
 static int mdt_intent_policy(struct ldlm_namespace *ns,
@@ -5199,7 +5199,6 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                mutex_init(&mo->mot_ioepoch_mutex);
                mutex_init(&mo->mot_lov_mutex);
                init_rwsem(&mo->mot_open_sem);
-               init_rwsem(&mo->mot_xattr_sem);
                RETURN(o);
        }
        RETURN(NULL);
index 4e7db2a..afce519 100644 (file)
@@ -255,8 +255,6 @@ struct mdt_object {
        struct rw_semaphore     mot_open_sem;
        atomic_t                mot_lease_count;
        atomic_t                mot_open_count;
-       /* A lock to protect EA data from racing setxattr and getxattrall */
-       struct rw_semaphore     mot_xattr_sem;
 };
 
 enum mdt_object_flags {
index 8a4042f..5dfcbea 100644 (file)
@@ -1367,6 +1367,9 @@ static int mdt_setxattr_unpack(struct mdt_thread_info *info)
                 RETURN(-EFAULT);
         }
 
+       if (mdt_dlmreq_unpack(info) < 0)
+               RETURN(-EPROTO);
+
         RETURN(0);
 }
 
index 03ffc64..f7c5f60 100644 (file)
@@ -159,6 +159,65 @@ out:
        return rc;
 }
 
+static int mdt_getxattr_all(struct mdt_thread_info *info,
+                           struct mdt_body *reqbody, struct mdt_body *repbody,
+                           struct lu_buf *buf, struct md_object *next)
+{
+       const struct lu_env *env = info->mti_env;
+       struct ptlrpc_request *req = mdt_info_req(info);
+       struct mdt_export_data *med = mdt_req2med(req);
+       struct lu_ucred        *uc  = mdt_ucred(info);
+       char *v, *b, *eadatahead, *eadatatail;
+       __u32 *sizes;
+       int eadatasize, eavallen = 0, eavallens = 0, rc;
+
+       ENTRY;
+
+       /*
+        * The format of the pill is the following:
+        * EADATA:      attr1\0attr2\0...attrn\0
+        * EAVALS:      val1val2...valn
+        * EAVALS_LENS: 4,4,...4
+        */
+
+       eadatahead = buf->lb_buf;
+
+       /* Fill out EADATA first */
+       eadatasize = mo_xattr_list(env, next, buf);
+       if (eadatasize < 0)
+               GOTO(out, rc = eadatasize);
+
+       eadatatail = eadatahead + eadatasize;
+
+       v = req_capsule_server_get(info->mti_pill, &RMF_EAVALS);
+       sizes = req_capsule_server_get(info->mti_pill, &RMF_EAVALS_LENS);
+
+       /* Fill out EAVALS and EAVALS_LENS */
+       for (b = eadatahead; b < eadatatail; b += strlen(b) + 1, v += rc) {
+               buf->lb_buf = v;
+               buf->lb_len = reqbody->eadatasize - eavallen;
+               rc = mdt_getxattr_one(info, b, next, buf, med, uc);
+               if (rc < 0)
+                       GOTO(out, rc);
+
+               sizes[eavallens] = rc;
+               eavallens++;
+               eavallen += rc;
+       }
+
+       repbody->aclsize = eavallen;
+       repbody->max_mdsize = eavallens;
+
+       req_capsule_shrink(info->mti_pill, &RMF_EAVALS, eavallen, RCL_SERVER);
+       req_capsule_shrink(info->mti_pill, &RMF_EAVALS_LENS,
+                          eavallens * sizeof(__u32), RCL_SERVER);
+       req_capsule_shrink(info->mti_pill, &RMF_EADATA, eadatasize, RCL_SERVER);
+
+       GOTO(out, rc = eadatasize);
+out:
+       return rc;
+}
+
 int mdt_getxattr(struct mdt_thread_info *info)
 {
         struct ptlrpc_request  *req = mdt_info_req(info);
@@ -187,8 +246,6 @@ int mdt_getxattr(struct mdt_thread_info *info)
         if (rc)
                 RETURN(err_serious(rc));
 
-       down_read(&info->mti_object->mot_xattr_sem);
-
         next = mdt_object_child(info->mti_object);
 
         if (info->mti_body->valid & OBD_MD_FLRMTRGETFACL) {
@@ -234,57 +291,13 @@ int mdt_getxattr(struct mdt_thread_info *info)
                if (rc < 0)
                        CDEBUG(D_INFO, "listxattr failed: %d\n", rc);
        } else if (valid == OBD_MD_FLXATTRALL) {
-               /*
-                * The format of the pill is the following:
-                * EADATA:      attr1\0attr2\0...attrn\0
-                * EAVALS:      val1val2...valn
-                * EAVALS_LENS: 4,4,...4
-                */
-               char *v, *b;
-               __u32 *sizes;
-               int eadatasize, eavallen = 0, eavallens = 0;
-               struct lu_buf buf2 = { .lb_len = reqbody->eadatasize };
-
-               /* Fill out EADATA */
-               eadatasize = mo_xattr_list(info->mti_env, next, buf);
-               if (eadatasize < 0)
-                       GOTO(out, rc = eadatasize);
-
-               v = req_capsule_server_get(info->mti_pill, &RMF_EAVALS);
-               sizes = req_capsule_server_get(info->mti_pill,
-                                               &RMF_EAVALS_LENS);
-
-               /* Fill out EAVALS and EAVALS_LENS */
-               for (b = buf->lb_buf;
-                    b < (char *)buf->lb_buf + eadatasize;
-                    b += strlen(b) + 1, v += rc) {
-                       buf2.lb_buf = v;
-                       rc = mdt_getxattr_one(info, b, next, &buf2, med, uc);
-                       if (rc < 0)
-                               GOTO(out, rc);
-                       sizes[eavallens] = rc;
-                       buf2.lb_len -= rc;
-                       eavallens++;
-                       eavallen += rc;
-               }
-
-               repbody->aclsize = eavallen;
-               repbody->max_mdsize = eavallens;
-
-               req_capsule_shrink(info->mti_pill, &RMF_EAVALS,
-                                       eavallen, RCL_SERVER);
-               req_capsule_shrink(info->mti_pill, &RMF_EAVALS_LENS,
-                                       eavallens * sizeof(__u32), RCL_SERVER);
-               req_capsule_shrink(info->mti_pill, &RMF_EADATA,
-                                       eadatasize, RCL_SERVER);
-               rc = eadatasize;
+               rc = mdt_getxattr_all(info, reqbody, repbody,
+                                     buf, next);
        } else
                LBUG();
 
        EXIT;
 out:
-       up_read(&info->mti_object->mot_xattr_sem);
-
        if (rc >= 0) {
                mdt_counter_incr(req, LPROC_MDT_GETXATTR);
                repbody->eadatasize = rc;
@@ -366,6 +379,9 @@ int mdt_reint_setxattr(struct mdt_thread_info *info,
 
         CDEBUG(D_INODE, "setxattr for "DFID"\n", PFID(rr->rr_fid1));
 
+       if (info->mti_dlm_req)
+               ldlm_request_cancel(req, info->mti_dlm_req, 0);
+
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SETXATTR))
                 RETURN(err_serious(-ENOMEM));
 
@@ -416,10 +432,9 @@ int mdt_reint_setxattr(struct mdt_thread_info *info,
        /* We need revoke both LOOKUP|PERM lock here, see mdt_attr_set. */
         if (!strcmp(xattr_name, XATTR_NAME_ACL_ACCESS))
                lockpart |= MDS_INODELOCK_PERM | MDS_INODELOCK_LOOKUP;
-
        /* We need to take the lock on behalf of old clients so that newer
         * clients flush their xattr caches */
-       if (!(valid & OBD_MD_FLXATTRLOCKED))
+       else
                lockpart |= MDS_INODELOCK_XATTR;
 
         lh = &info->mti_lh[MDT_LH_PARENT];
@@ -430,8 +445,6 @@ int mdt_reint_setxattr(struct mdt_thread_info *info,
         if (IS_ERR(obj))
                 GOTO(out, rc =  PTR_ERR(obj));
 
-       down_write(&obj->mot_xattr_sem);
-
         info->mti_mos = obj;
         rc = mdt_version_get_check_save(info, obj, 0);
         if (rc)
@@ -502,7 +515,6 @@ int mdt_reint_setxattr(struct mdt_thread_info *info,
 
         EXIT;
 out_unlock:
-       up_write(&obj->mot_xattr_sem);
         mdt_object_unlock_put(info, obj, lh, rc);
         if (unlikely(new_xattr != NULL))
                 lustre_posix_acl_xattr_free(new_xattr, xattr_len);
index 0c5f148..c793bb4 100644 (file)
@@ -299,7 +299,8 @@ static const struct req_msg_field *mds_reint_setxattr_client[] = {
         &RMF_REC_REINT,
         &RMF_CAPA1,
         &RMF_NAME,
-        &RMF_EADATA
+        &RMF_EADATA,
+       &RMF_DLM_REQ
 };
 
 static const struct req_msg_field *mdt_swap_layouts[] = {
index 5c1a0d7..311beef 100644 (file)
@@ -11604,8 +11604,6 @@ test_234() {
        touch $DIR/$tdir/$tfile || error "touch failed"
        # OBD_FAIL_LLITE_XATTR_ENOMEM
        $LCTL set_param fail_loc=0x1405
-       setfattr -n user.attr -v value $DIR/$tdir/$tfile &&
-               error "setfattr should have failed with ENOMEM"
        if [ ! -f /etc/SuSE-release ]; then
                # attr pre-2.4.44-7 had a bug with rc
                # LU-3703 - SLES clients have older attr