From: Andrew Perepechko Date: Fri, 26 Jul 2013 18:57:21 +0000 (+0400) Subject: LU-2869 llite: extended attribute cache X-Git-Tag: 2.4.53~5 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=e53d1c18ea9f8c08d55d573e8f0993e582c44c20 LU-2869 llite: extended attribute cache This patch implements an extended attribute cache for a Lustre client. It is organized as a write-through cache: reads are performed from cache, updates are sent synchronously to the MDS. An additional inode bit MDS_INODELOCK_XATTR is added to protect the cache. Signed-off-by: Andrew Perepechko Xyratex-bug-id: MRP-57 Change-Id: I16aae894a3c2f62448722eeade822ee22b20efa0 Reviewed-on: http://review.whamcloud.com/5537 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Jinshan Xiong Reviewed-by: Oleg Drokin --- diff --git a/libcfs/include/libcfs/user-lock.h b/libcfs/include/libcfs/user-lock.h index 71e9792..0bae9c1 100644 --- a/libcfs/include/libcfs/user-lock.h +++ b/libcfs/include/libcfs/user-lock.h @@ -201,6 +201,7 @@ void init_rwsem(struct rw_semaphore *s); void down_read(struct rw_semaphore *s); int down_read_trylock(struct rw_semaphore *s); void down_write(struct rw_semaphore *s); +void downgrade_write(struct rw_semaphore *s); int down_write_trylock(struct rw_semaphore *s); void up_read(struct rw_semaphore *s); void up_write(struct rw_semaphore *s); diff --git a/libcfs/libcfs/user-lock.c b/libcfs/libcfs/user-lock.c index 214ca3a..4337e12 100644 --- a/libcfs/libcfs/user-lock.c +++ b/libcfs/libcfs/user-lock.c @@ -242,6 +242,12 @@ void down_write(struct rw_semaphore *s) (void)s; } +void downgrade_write(struct rw_semaphore *s) +{ + LASSERT(s != NULL); + (void)s; +} + int down_write_trylock(struct rw_semaphore *s) { LASSERT(s != NULL); diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index 5a4d91f..37bcd2f 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -91,6 +91,7 @@ enum { LPROC_LL_ALLOC_INODE, LPROC_LL_SETXATTR, LPROC_LL_GETXATTR, + LPROC_LL_GETXATTR_HITS, LPROC_LL_LISTXATTR, LPROC_LL_REMOVEXATTR, LPROC_LL_INODE_PERM, diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 860a3de..593aded 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -1355,7 +1355,7 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); OBD_CONNECT_EINPROGRESS | \ OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_UMASK | \ OBD_CONNECT_LVB_TYPE | OBD_CONNECT_LAYOUTLOCK |\ - OBD_CONNECT_PINGLESS) + OBD_CONNECT_PINGLESS | OBD_CONNECT_MAX_EASIZE) #define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \ OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \ OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \ @@ -1737,7 +1737,9 @@ static inline __u32 lov_mds_md_size(__u16 stripes, __u32 lmm_magic) #define OBD_MD_FLCKSPLIT (0x0000080000000000ULL) /* Check split on server */ #define OBD_MD_FLCROSSREF (0x0000100000000000ULL) /* Cross-ref case */ #define OBD_MD_FLGETATTRLOCK (0x0000200000000000ULL) /* Get IOEpoch attributes - * under lock */ + * under lock; for xattr + * requests means the + * client holds the lock */ #define OBD_MD_FLOBJCOUNT (0x0000400000000000ULL) /* for multiple destroy */ #define OBD_MD_FLRMTLSETFACL (0x0001000000000000ULL) /* lfs lsetfacl case */ @@ -1754,6 +1756,9 @@ static inline __u32 lov_mds_md_size(__u16 stripes, __u32 lmm_magic) OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLNLINK | \ OBD_MD_FLGENER | OBD_MD_FLRDEV | OBD_MD_FLGROUP) +#define OBD_MD_FLXATTRLOCKED OBD_MD_FLGETATTRLOCK +#define OBD_MD_FLXATTRALL (OBD_MD_FLXATTR | OBD_MD_FLXATTRLS) + /* don't forget obdo_fid which is way down at the bottom so it can * come after the definition of llog_cookie */ @@ -2126,8 +2131,9 @@ extern void lustre_swab_generic_32s (__u32 *val); #define MDS_INODELOCK_OPEN 0x000004 /* For opened files */ #define MDS_INODELOCK_LAYOUT 0x000008 /* for layout */ #define MDS_INODELOCK_PERM 0x000010 /* for permission */ +#define MDS_INODELOCK_XATTR 0x000020 /* extended attributes */ -#define MDS_INODELOCK_MAXSHIFT 4 +#define MDS_INODELOCK_MAXSHIFT 5 /* This FULL lock is useful to take on unlink sort of operations */ #define MDS_INODELOCK_FULL ((1<<(MDS_INODELOCK_MAXSHIFT+1))-1) diff --git a/lustre/include/lustre_req_layout.h b/lustre/include/lustre_req_layout.h index 9e66328..3354618 100644 --- a/lustre/include/lustre_req_layout.h +++ b/lustre/include/lustre_req_layout.h @@ -230,6 +230,7 @@ extern struct req_format RQF_LDLM_INTENT_GETATTR; extern struct req_format RQF_LDLM_INTENT_OPEN; extern struct req_format RQF_LDLM_INTENT_CREATE; extern struct req_format RQF_LDLM_INTENT_UNLINK; +extern struct req_format RQF_LDLM_INTENT_GETXATTR; extern struct req_format RQF_LDLM_INTENT_QUOTA; extern struct req_format RQF_LDLM_CANCEL; extern struct req_format RQF_LDLM_CALLBACK; @@ -279,6 +280,8 @@ extern struct req_msg_field RMF_LAYOUT_INTENT; extern struct req_msg_field RMF_MDT_MD; extern struct req_msg_field RMF_REC_REINT; extern struct req_msg_field RMF_EADATA; +extern struct req_msg_field RMF_EAVALS; +extern struct req_msg_field RMF_EAVALS_LENS; extern struct req_msg_field RMF_ACL; extern struct req_msg_field RMF_LOGCOOKIES; extern struct req_msg_field RMF_CAPA1; diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index 4537c7b..6b83789 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -345,8 +345,8 @@ struct md_device_operations { int (*mdo_root_get)(const struct lu_env *env, struct md_device *m, struct lu_fid *f); - int (*mdo_maxsize_get)(const struct lu_env *env, struct md_device *m, - int *md_size, int *cookie_size); + int (*mdo_maxeasize_get)(const struct lu_env *env, struct md_device *m, + int *easize); int (*mdo_statfs)(const struct lu_env *env, struct md_device *m, struct obd_statfs *sfs); diff --git a/lustre/include/obd.h b/lustre/include/obd.h index d387129..ce55df9 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -1046,6 +1046,7 @@ struct lu_context; #define IT_LAYOUT (1 << 10) #define IT_QUOTA_DQACQ (1 << 11) #define IT_QUOTA_CONN (1 << 12) +#define IT_SETXATTR (1 << 13) static inline int it_to_lock_mode(struct lookup_intent *it) { @@ -1055,6 +1056,10 @@ static inline int it_to_lock_mode(struct lookup_intent *it) else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP | IT_LAYOUT)) return LCK_CR; + else if (it->it_op & IT_GETXATTR) + return LCK_PR; + else if (it->it_op & IT_SETXATTR) + return LCK_PW; LASSERTF(0, "Invalid it_op: %d\n", it->it_op); return -EINVAL; diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index c0f5aca..7f01d0f 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -474,6 +474,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_LOCK_STATE_WAIT_INTR 0x1402 #define OBD_FAIL_LOV_INIT 0x1403 #define OBD_FAIL_GLIMPSE_DELAY 0x1404 +#define OBD_FAIL_LLITE_XATTR_ENOMEM 0x1405 #define OBD_FAIL_FID_INDIR 0x1501 #define OBD_FAIL_FID_INLMA 0x1502 diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index bde3bd4..9767ecd 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -149,6 +149,8 @@ char *ldlm_it2str(int it) return "getxattr"; case IT_LAYOUT: return "layout"; + case IT_SETXATTR: + return "setxattr"; default: CERROR("Unknown intent %d\n", it); return "UNKNOWN"; diff --git a/lustre/llite/Makefile.in b/lustre/llite/Makefile.in index b37b952..562b9d0 100644 --- a/lustre/llite/Makefile.in +++ b/lustre/llite/Makefile.in @@ -2,7 +2,7 @@ MODULES := lustre @LLITE_LLOOP_TRUE@MODULES += llite_lloop lustre-objs := dcache.o dir.o file.o llite_close.o llite_lib.o llite_nfs.o lustre-objs += rw.o lproc_llite.o namei.o symlink.o llite_mmap.o -lustre-objs += xattr.o remote_perm.o llite_rmtacl.o llite_capa.o +lustre-objs += xattr.o xattr_cache.o remote_perm.o llite_rmtacl.o llite_capa.o lustre-objs += rw26.o super25.o statahead.o lustre-objs += ../lclient/glimpse.o ../lclient/lcommon_cl.o ../lclient/lcommon_misc.o lustre-objs += vvp_dev.o vvp_page.o vvp_lock.o vvp_io.o vvp_object.o diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 83476bd..361bec1 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -2915,7 +2915,8 @@ int ll_have_md_lock(struct inode *inode, __u64 *bits, ldlm_mode_t l_req_mode) } ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits, - struct lustre_handle *lockh, __u64 flags) + struct lustre_handle *lockh, __u64 flags, + ldlm_mode_t mode) { ldlm_policy_data_t policy = { .l_inodebits = {bits}}; struct lu_fid *fid; @@ -2925,10 +2926,10 @@ ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits, fid = &ll_i2info(inode)->lli_fid; CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid)); - rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags, - fid, LDLM_IBITS, &policy, - LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh); - RETURN(rc); + rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags, + fid, LDLM_IBITS, &policy, mode, lockh); + + RETURN(rc); } static int ll_inode_revalidate_fini(struct inode *inode, int rc) @@ -3664,7 +3665,8 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen) /* mostly layout lock is caching on the local side, so try to match * it before grabbing layout lock mutex. */ - mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0); + mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0, + LCK_CR | LCK_CW | LCK_PR | LCK_PW); if (mode != 0) { /* hit cached lock */ rc = ll_layout_lock_set(&lockh, mode, inode, gen, false); if (rc == 0) @@ -3679,7 +3681,8 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen) again: /* try again. Maybe somebody else has done this. */ - mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0); + mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0, + LCK_CR | LCK_CW | LCK_PR | LCK_PW); if (mode != 0) { /* hit cached lock */ rc = ll_layout_lock_set(&lockh, mode, inode, gen, true); if (rc == -EAGAIN) diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 4fc1e4b..e827f01 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -126,6 +126,8 @@ enum lli_flags { LLIF_DATA_MODIFIED = (1 << 6), /* File is being restored */ LLIF_FILE_RESTORING = (1 << 7), + /* Xattr cache is attached to the file */ + LLIF_XATTR_CACHE = (1 << 8), }; struct ll_inode_info { @@ -278,8 +280,27 @@ struct ll_inode_info { struct mutex lli_layout_mutex; /* valid only inside LAYOUT ibits lock, protected by lli_layout_mutex */ __u32 lli_layout_gen; + + struct rw_semaphore lli_xattrs_list_rwsem; + struct mutex lli_xattrs_enq_lock; + struct list_head lli_xattrs; /* ll_xattr_entry->xe_list */ }; +int ll_xattr_cache_destroy(struct inode *inode); + +int ll_xattr_cache_get(struct inode *inode, + const char *name, + char *buffer, + size_t size, + __u64 valid); + +int ll_xattr_cache_update(struct inode *inode, + const char *name, + const char *newval, + size_t size, + __u64 valid, + int flags); + /* * Locking to guarantee consistency of non-atomic updates to long long i_size, * consistency between file size and KMS. @@ -401,6 +422,7 @@ enum stats_track_type { #define LL_SBI_VERBOSE 0x10000 /* verbose mount/umount */ #define LL_SBI_LAYOUT_LOCK 0x20000 /* layout lock support */ #define LL_SBI_USER_FID2PATH 0x40000 /* allow fid2path by unprivileged users */ +#define LL_SBI_XATTR_CACHE 0x80000 /* support for xattr cache */ #define LL_SBI_FLAGS { \ "nolck", \ @@ -408,6 +430,7 @@ enum stats_track_type { "flock", \ "xattr", \ "acl", \ + "???", \ "rmt_client", \ "mds_capa", \ "oss_capa", \ @@ -420,7 +443,9 @@ enum stats_track_type { "agl", \ "verbose", \ "layout", \ - "user_fid2path" } + "user_fid2path",\ + "xattr", \ +} /* default value for ll_sb_info->contention_time */ #define SBI_DEFAULT_CONTENTION_SECONDS 60 @@ -468,7 +493,8 @@ struct ll_sb_info { struct lu_fid ll_root_fid; /* root object fid */ int ll_flags; - int ll_umounting:1; + unsigned int ll_umounting:1, + ll_xattr_cache_enabled:1; cfs_list_t ll_conn_chain; /* per-conn chain of SBs */ struct lustre_client_ocd ll_lco; @@ -744,7 +770,8 @@ extern int ll_inode_revalidate_it(struct dentry *, struct lookup_intent *, extern int ll_have_md_lock(struct inode *inode, __u64 *bits, ldlm_mode_t l_req_mode); extern ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits, - struct lustre_handle *lockh, __u64 flags); + struct lustre_handle *lockh, __u64 flags, + ldlm_mode_t mode); int __ll_inode_revalidate_it(struct dentry *, struct lookup_intent *, __u64 bits); #ifdef HAVE_IOP_ATOMIC_OPEN @@ -1634,4 +1661,7 @@ int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf); int ll_layout_refresh(struct inode *inode, __u32 *gen); int ll_layout_restore(struct inode *inode); +int ll_xattr_init(void); +void ll_xattr_fini(void); + #endif /* LLITE_INTERNAL_H */ diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index c549b78..1eec9a6 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -218,7 +218,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, OBD_CONNECT_FULL20 | OBD_CONNECT_64BITHASH| OBD_CONNECT_EINPROGRESS | OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE | - OBD_CONNECT_LAYOUTLOCK | OBD_CONNECT_PINGLESS; + OBD_CONNECT_LAYOUTLOCK | OBD_CONNECT_PINGLESS | + OBD_CONNECT_MAX_EASIZE; if (sbi->ll_flags & LL_SBI_SOM_PREVIEW) data->ocd_connect_flags |= OBD_CONNECT_SOM; @@ -394,6 +395,16 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, sbi->ll_flags |= LL_SBI_LAYOUT_LOCK; } + if (data->ocd_ibits_known & MDS_INODELOCK_XATTR) { + if (!(data->ocd_connect_flags & OBD_CONNECT_MAX_EASIZE)) { + LCONSOLE_INFO("%s: disabling xattr cache due to " + "unknown maximum xattr size.\n", dt); + } else { + sbi->ll_flags |= LL_SBI_XATTR_CACHE; + sbi->ll_xattr_cache_enabled = 1; + } + } + obd = class_name2obd(dt); if (!obd) { CERROR("DT %s: not setup or attached\n", dt); @@ -947,6 +958,9 @@ void ll_lli_init(struct ll_inode_info *lli) lli->lli_layout_gen = LL_LAYOUT_GEN_NONE; lli->lli_clob = NULL; + init_rwsem(&lli->lli_xattrs_list_rwsem); + mutex_init(&lli->lli_xattrs_enq_lock); + LASSERT(lli->lli_vfs_inode.i_mode != 0); if (S_ISDIR(lli->lli_vfs_inode.i_mode)) { mutex_init(&lli->lli_readdir_mutex); @@ -1227,6 +1241,8 @@ void ll_clear_inode(struct inode *inode) lli->lli_symlink_name = NULL; } + ll_xattr_cache_destroy(inode); + if (sbi->ll_flags & LL_SBI_RMT_CLIENT) { LASSERT(lli->lli_posix_acl == NULL); if (lli->lli_remote_perms) { @@ -1792,7 +1808,9 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md) * lock on the client and set LLIF_MDS_SIZE_LOCK holding * it. */ mode = ll_take_md_lock(inode, MDS_INODELOCK_UPDATE, - &lockh, LDLM_FL_CBPENDING); + &lockh, LDLM_FL_CBPENDING, + LCK_CR | LCK_CW | + LCK_PR | LCK_PW); if (mode) { if (lli->lli_flags & (LLIF_DONE_WRITING | LLIF_EPOCH_PENDING | diff --git a/lustre/llite/lproc_llite.c b/lustre/llite/lproc_llite.c index 90dbfef..7e9ff3e 100644 --- a/lustre/llite/lproc_llite.c +++ b/lustre/llite/lproc_llite.c @@ -218,6 +218,40 @@ static int ll_rd_sb_uuid(char *page, char **start, off_t off, int count, return snprintf(page, count, "%s\n", ll_s2sbi(sb)->ll_sb_uuid.uuid); } +static int ll_rd_xattr_cache(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct super_block *sb = (struct super_block *)data; + struct ll_sb_info *sbi = ll_s2sbi(sb); + int rc; + + rc = snprintf(page, count, "%u\n", sbi->ll_xattr_cache_enabled); + + return rc; +} + +static int ll_wr_xattr_cache(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct super_block *sb = (struct super_block *)data; + struct ll_sb_info *sbi = ll_s2sbi(sb); + int val, rc; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc) + return rc; + + if (val != 0 && val != 1) + return -ERANGE; + + if (val == 1 && !(sbi->ll_flags & LL_SBI_XATTR_CACHE)) + return -ENOTSUPP; + + sbi->ll_xattr_cache_enabled = val; + + return count; +} + static int ll_rd_site_stats(char *page, char **start, off_t off, int count, int *eof, void *data) { @@ -773,6 +807,7 @@ static struct lprocfs_vars lprocfs_llite_obd_vars[] = { { "lazystatfs", ll_rd_lazystatfs, ll_wr_lazystatfs, 0 }, { "max_easize", ll_rd_maxea_size, 0, 0 }, { "sbi_flags", ll_rd_sbi_flags, 0, 0 }, + { "xattr_cache", ll_rd_xattr_cache, ll_wr_xattr_cache, 0 }, { 0 } }; @@ -824,6 +859,7 @@ struct llite_file_opcode { { LPROC_LL_ALLOC_INODE, LPROCFS_TYPE_REGS, "alloc_inode" }, { LPROC_LL_SETXATTR, LPROCFS_TYPE_REGS, "setxattr" }, { LPROC_LL_GETXATTR, LPROCFS_TYPE_REGS, "getxattr" }, + { LPROC_LL_GETXATTR_HITS, LPROCFS_TYPE_REGS, "getxattr_hits" }, { LPROC_LL_LISTXATTR, LPROCFS_TYPE_REGS, "listxattr" }, { LPROC_LL_REMOVEXATTR, LPROCFS_TYPE_REGS, "removexattr" }, { LPROC_LL_INODE_PERM, LPROCFS_TYPE_REGS, "inode_permission" }, diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 0db1ee0..8c15781 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -222,12 +222,16 @@ int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, * for mdc - bug 24555 */ LASSERT(lock->l_ast_data == NULL); - /* Invalidate all dentries associated with this inode */ - if (inode == NULL) - break; + /* Invalidate all dentries associated with this inode */ + if (inode == NULL) + break; + + LASSERT(lock->l_flags & LDLM_FL_CANCELING); + + if (bits & MDS_INODELOCK_XATTR) + ll_xattr_cache_destroy(inode); - LASSERT(lock->l_flags & LDLM_FL_CANCELING); - /* For OPEN locks we differentiate between lock modes + /* For OPEN locks we differentiate between lock modes * LCK_CR, LCK_CW, LCK_PR - bug 22891 */ if (bits & (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE | MDS_INODELOCK_LAYOUT | MDS_INODELOCK_PERM)) diff --git a/lustre/llite/super25.c b/lustre/llite/super25.c index a0feba9..9f156e5 100644 --- a/lustre/llite/super25.c +++ b/lustre/llite/super25.c @@ -201,11 +201,15 @@ static int __init init_lustre_lite(void) if (rc == 0) rc = vvp_global_init(); + if (rc == 0) + rc = ll_xattr_init(); + return rc; } static void __exit exit_lustre_lite(void) { + ll_xattr_fini(); vvp_global_fini(); del_timer(&ll_capa_timer); ll_capa_thread_stop(); diff --git a/lustre/llite/xattr.c b/lustre/llite/xattr.c index d0f841e..5a4985c 100644 --- a/lustre/llite/xattr.c +++ b/lustre/llite/xattr.c @@ -105,11 +105,11 @@ int xattr_type_filter(struct ll_sb_info *sbi, int xattr_type) static int ll_setxattr_common(struct inode *inode, const char *name, - const void *value, size_t size, - int flags, __u64 valid) + const void *value, size_t size, + int flags, __u64 valid) { - struct ll_sb_info *sbi = ll_i2sbi(inode); - struct ptlrpc_request *req; + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ptlrpc_request *req = NULL; int xattr_type, rc; struct obd_capa *oc; posix_acl_xattr_header *new_value = NULL; @@ -182,11 +182,17 @@ int ll_setxattr_common(struct inode *inode, const char *name, valid |= rce_ops2valid(rce->rce_ops); } #endif - oc = ll_mdscapa_get(inode); - rc = md_setxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, - valid, name, pv, size, 0, flags, ll_i2suppgid(inode), - &req); - capa_put(oc); + if (sbi->ll_xattr_cache_enabled && + (rce == NULL || rce->rce_ops == RMT_LSETFACL)) { + rc = ll_xattr_cache_update(inode, name, pv, size, valid, flags); + } else { + oc = ll_mdscapa_get(inode); + rc = md_setxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, + valid, name, pv, size, 0, flags, + ll_i2suppgid(inode), &req); + capa_put(oc); + } + #ifdef CONFIG_FS_POSIX_ACL if (new_value != NULL) lustre_posix_acl_xattr_free(new_value, size); @@ -352,48 +358,54 @@ int ll_getxattr_common(struct inode *inode, const char *name, #endif do_getxattr: - oc = ll_mdscapa_get(inode); - rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, - valid | (rce ? rce_ops2valid(rce->rce_ops) : 0), - name, NULL, 0, size, 0, &req); - capa_put(oc); - if (rc) { - if (rc == -EOPNOTSUPP && xattr_type == XATTR_USER_T) { - LCONSOLE_INFO("Disabling user_xattr feature because " - "it is not supported on the server\n"); - sbi->ll_flags &= ~LL_SBI_USER_XATTR; - } - RETURN(rc); - } - - body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - LASSERT(body); - - /* only detect the xattr size */ - if (size == 0) - GOTO(out, rc = body->eadatasize); + if (sbi->ll_xattr_cache_enabled && (rce == NULL || + rce->rce_ops == RMT_LGETFACL || + rce->rce_ops == RMT_LSETFACL)) { + rc = ll_xattr_cache_get(inode, name, buffer, size, valid); + if (rc < 0) + GOTO(out_xattr, rc); + } else { + oc = ll_mdscapa_get(inode); + rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, + valid | (rce ? rce_ops2valid(rce->rce_ops) : 0), + name, NULL, 0, size, 0, &req); + capa_put(oc); + + if (rc < 0) + GOTO(out_xattr, rc); + + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + LASSERT(body); + + /* only detect the xattr size */ + if (size == 0) + GOTO(out, rc = body->eadatasize); + + if (size < body->eadatasize) { + CERROR("server bug: replied size %u > %u\n", + body->eadatasize, (int)size); + GOTO(out, rc = -ERANGE); + } - if (size < body->eadatasize) { - CERROR("server bug: replied size %u > %u\n", - body->eadatasize, (int)size); - GOTO(out, rc = -ERANGE); - } + if (body->eadatasize == 0) + GOTO(out, rc = -ENODATA); - if (body->eadatasize == 0) - GOTO(out, rc = -ENODATA); + /* do not need swab xattr data */ + xdata = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA, + body->eadatasize); + if (!xdata) + GOTO(out, rc = -EFAULT); - /* do not need swab xattr data */ - xdata = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA, - body->eadatasize); - if (!xdata) - GOTO(out, rc = -EFAULT); + memcpy(buffer, xdata, body->eadatasize); + rc = body->eadatasize; + } #ifdef CONFIG_FS_POSIX_ACL - if (body->eadatasize >= 0 && rce && rce->rce_ops == RMT_LSETFACL) { - ext_acl_xattr_header *acl; + if (rce && rce->rce_ops == RMT_LSETFACL) { + ext_acl_xattr_header *acl; - acl = lustre_posix_acl_xattr_2ext((posix_acl_xattr_header *)xdata, - body->eadatasize); + acl = lustre_posix_acl_xattr_2ext((posix_acl_xattr_header *)buffer, + rc); if (IS_ERR(acl)) GOTO(out, rc = PTR_ERR(acl)); @@ -405,15 +417,15 @@ do_getxattr: } } #endif - - if (body->eadatasize == 0) { - rc = -ENODATA; - } else { - LASSERT(buffer); - memcpy(buffer, xdata, body->eadatasize); - rc = body->eadatasize; - } - EXIT; + EXIT; + +out_xattr: + if (rc == -EOPNOTSUPP && xattr_type == XATTR_USER_T) { + LCONSOLE_INFO("%s: disabling user_xattr feature because " + "it is not supported on the server: rc = %d\n", + ll_get_fsname(inode->i_sb, NULL, 0), rc); + sbi->ll_flags &= ~LL_SBI_USER_XATTR; + } out: ptlrpc_req_finished(req); return rc; diff --git a/lustre/llite/xattr_cache.c b/lustre/llite/xattr_cache.c new file mode 100644 index 0000000..9df1bfa --- /dev/null +++ b/lustre/llite/xattr_cache.c @@ -0,0 +1,640 @@ +/* GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see http://www.gnu.org/licenses + * + * Please visit http://www.xyratex.com/contact if you need additional + * information or have any questions. + * + * GPL HEADER END + */ + +/* + * Copyright 2012 Xyratex Technology Limited + * + * Author: Andrew Perepechko + * + */ + +#define DEBUG_SUBSYSTEM S_LLITE + +#include +#include +#include +#include +#include +#include +#include +#include "llite_internal.h" + +/* If we ever have hundreds of extended attributes, we might want to consider + * using a hash or a tree structure instead of list for faster lookups. + */ +struct ll_xattr_entry { + struct list_head xe_list; /* protected with + * lli_xattrs_list_rwsem */ + char *xe_name; /* xattr name, \0-terminated */ + char *xe_value; /* xattr value */ + unsigned xe_namelen; /* strlen(xe_name) + 1 */ + unsigned xe_vallen; /* xattr value length */ +}; + +static struct kmem_cache *xattr_kmem; +static struct lu_kmem_descr xattr_caches[] = { + { + .ckd_cache = &xattr_kmem, + .ckd_name = "xattr_kmem", + .ckd_size = sizeof(struct ll_xattr_entry) + }, + { + .ckd_cache = NULL + } +}; + +int ll_xattr_init(void) +{ + return lu_kmem_init(xattr_caches); +} + +void ll_xattr_fini(void) +{ + lu_kmem_fini(xattr_caches); +} + +/** + * Initializes xattr cache for an inode. + * + * This initializes the xattr list and marks cache presence. + */ +static void ll_xattr_cache_init(struct ll_inode_info *lli) +{ + ENTRY; + + LASSERT(lli != NULL); + + CFS_INIT_LIST_HEAD(&lli->lli_xattrs); + lli->lli_flags |= LLIF_XATTR_CACHE; +} + +/** + * This looks for a specific extended attribute. + * + * Find in @cache and return @xattr_name attribute in @xattr, + * for the NULL @xattr_name return the first cached @xattr. + * + * \retval 0 success + * \retval -ENODATA if not found + */ +static int ll_xattr_cache_find(struct list_head *cache, + const char *xattr_name, + struct ll_xattr_entry **xattr) +{ + struct ll_xattr_entry *entry; + + ENTRY; + + list_for_each_entry(entry, cache, xe_list) { + /* xattr_name == NULL means look for any entry */ + if (xattr_name == NULL || + strcmp(xattr_name, entry->xe_name) == 0) { + *xattr = entry; + CDEBUG(D_CACHE, "find: [%s]=%.*s\n", + entry->xe_name, entry->xe_vallen, + entry->xe_value); + RETURN(0); + } + } + + RETURN(-ENODATA); +} + +/** + * This adds or updates an xattr. + * + * Add @xattr_name attr with @xattr_val value and @xattr_val_len length, + * if the attribute already exists, then update its value. + * + * \retval 0 success + * \retval -ENOMEM if no memory could be allocated for the cached attr + */ +static int ll_xattr_cache_add(struct list_head *cache, + const char *xattr_name, + const char *xattr_val, + unsigned xattr_val_len) +{ + struct ll_xattr_entry *xattr; + + ENTRY; + + if (ll_xattr_cache_find(cache, xattr_name, &xattr) == 0) { + /* Found a cached EA, update it */ + + if (xattr_val_len != xattr->xe_vallen) { + char *val; + OBD_ALLOC(val, xattr_val_len); + if (val == NULL) { + CDEBUG(D_CACHE, "failed to allocate %u bytes " + "for xattr %s update\n", + xattr_val_len, + xattr_name); + RETURN(-ENOMEM); + } + OBD_FREE(xattr->xe_value, xattr->xe_vallen); + xattr->xe_value = val; + xattr->xe_vallen = xattr_val_len; + } + memcpy(xattr->xe_value, xattr_val, xattr_val_len); + + CDEBUG(D_CACHE, "update: [%s]=%.*s\n", xattr_name, + xattr_val_len, xattr_val); + + RETURN(0); + } + + OBD_SLAB_ALLOC_PTR_GFP(xattr, xattr_kmem, __GFP_IO); + if (xattr == NULL) { + CDEBUG(D_CACHE, "failed to allocate xattr\n"); + RETURN(-ENOMEM); + } + + xattr->xe_namelen = strlen(xattr_name) + 1; + + OBD_ALLOC(xattr->xe_name, xattr->xe_namelen); + if (!xattr->xe_name) { + CDEBUG(D_CACHE, "failed to alloc xattr name %u\n", + xattr->xe_namelen); + goto err_name; + } + OBD_ALLOC(xattr->xe_value, xattr_val_len); + if (!xattr->xe_value) { + CDEBUG(D_CACHE, "failed to alloc xattr value %d\n", + xattr_val_len); + goto err_value; + } + + memcpy(xattr->xe_name, xattr_name, xattr->xe_namelen); + memcpy(xattr->xe_value, xattr_val, xattr_val_len); + xattr->xe_vallen = xattr_val_len; + list_add(&xattr->xe_list, cache); + + CDEBUG(D_CACHE, "set: [%s]=%.*s\n", xattr_name, + xattr_val_len, xattr_val); + + RETURN(0); +err_value: + OBD_FREE(xattr->xe_name, xattr->xe_namelen); +err_name: + OBD_SLAB_FREE_PTR(xattr, xattr_kmem); + + RETURN(-ENOMEM); +} + +/** + * This removes an extended attribute from cache. + * + * Remove @xattr_name attribute from @cache. + * + * \retval 0 success + * \retval -ENODATA if @xattr_name is not cached + */ +static int ll_xattr_cache_del(struct list_head *cache, + const char *xattr_name) +{ + struct ll_xattr_entry *xattr; + + ENTRY; + + CDEBUG(D_CACHE, "del xattr: %s\n", xattr_name); + + if (ll_xattr_cache_find(cache, xattr_name, &xattr) == 0) { + list_del(&xattr->xe_list); + OBD_FREE(xattr->xe_name, xattr->xe_namelen); + OBD_FREE(xattr->xe_value, xattr->xe_vallen); + OBD_SLAB_FREE_PTR(xattr, xattr_kmem); + + RETURN(0); + } + + RETURN(-ENODATA); +} + +/** + * This iterates cached extended attributes. + * + * Walk over cached attributes in @cache and + * fill in @xld_buffer or only calculate buffer + * size if @xld_buffer is NULL. + * + * \retval >= 0 buffer list size + * \retval -ENODATA if the list cannot fit @xld_size buffer + */ +static int ll_xattr_cache_list(struct list_head *cache, + char *xld_buffer, + int xld_size) +{ + struct ll_xattr_entry *xattr, *tmp; + int xld_tail = 0; + + ENTRY; + + list_for_each_entry_safe(xattr, tmp, cache, xe_list) { + CDEBUG(D_CACHE, "list: buffer=%p[%d] name=%s\n", + xld_buffer, xld_tail, xattr->xe_name); + + if (xld_buffer) { + xld_size -= xattr->xe_namelen; + if (xld_size < 0) + break; + memcpy(&xld_buffer[xld_tail], + xattr->xe_name, xattr->xe_namelen); + } + xld_tail += xattr->xe_namelen; + } + + if (xld_size < 0) + RETURN(-ERANGE); + + RETURN(xld_tail); +} + +/** + * Check if the xattr cache is initialized (filled). + * + * \retval 0 @cache is not initialized + * \retval 1 @cache is initialized + */ +int ll_xattr_cache_valid(struct ll_inode_info *lli) +{ + return !!(lli->lli_flags & LLIF_XATTR_CACHE); +} + +/** + * This finalizes the xattr cache. + * + * Free all xattr memory. @lli is the inode info pointer. + * + * \retval 0 no error occured + */ +static int ll_xattr_cache_destroy_locked(struct ll_inode_info *lli) +{ + ENTRY; + + if (!ll_xattr_cache_valid(lli)) + RETURN(0); + + while (ll_xattr_cache_del(&lli->lli_xattrs, NULL) == 0) + /* empty loop */ ; + lli->lli_flags &= ~LLIF_XATTR_CACHE; + + RETURN(0); +} + +int ll_xattr_cache_destroy(struct inode *inode) +{ + struct ll_inode_info *lli = ll_i2info(inode); + int rc; + + ENTRY; + + down_write(&lli->lli_xattrs_list_rwsem); + rc = ll_xattr_cache_destroy_locked(lli); + up_write(&lli->lli_xattrs_list_rwsem); + + RETURN(rc); +} + +/** + * Match or enqueue a PR or PW LDLM lock. + * + * Find or request an LDLM lock with xattr data. + * Since LDLM does not provide API for atomic match_or_enqueue, + * the function handles it with a separate enq lock. + * If successful, the function exits with the list lock held. + * + * \retval 0 no error occured + * \retval -ENOMEM not enough memory + */ +static int ll_xattr_find_get_lock(struct inode *inode, + struct lookup_intent *oit, + struct ptlrpc_request **req) +{ + ldlm_mode_t mode; + struct lustre_handle lockh = { 0 }; + struct md_op_data *op_data; + struct ll_inode_info *lli = ll_i2info(inode); + struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS, + .ei_mode = it_to_lock_mode(oit), + .ei_cb_bl = ll_md_blocking_ast, + .ei_cb_cp = ldlm_completion_ast }; + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct obd_export *exp = sbi->ll_md_exp; + int rc; + + ENTRY; + + mutex_lock(&lli->lli_xattrs_enq_lock); + /* Try matching first. */ + mode = ll_take_md_lock(inode, MDS_INODELOCK_XATTR, &lockh, 0, + oit->it_op == IT_SETXATTR ? LCK_PW : + (LCK_PR | LCK_PW)); + if (mode != 0) { + /* fake oit in mdc_revalidate_lock() manner */ + oit->d.lustre.it_lock_handle = lockh.cookie; + oit->d.lustre.it_lock_mode = mode; + goto out; + } + + /* Enqueue if the lock isn't cached locally. */ + op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0, + LUSTRE_OPC_ANY, NULL); + if (IS_ERR(op_data)) { + mutex_unlock(&lli->lli_xattrs_enq_lock); + RETURN(PTR_ERR(op_data)); + } + + op_data->op_valid = OBD_MD_FLXATTR | OBD_MD_FLXATTRLS | + OBD_MD_FLXATTRLOCKED; +#ifdef CONFIG_FS_POSIX_ACL + /* If working with ACLs, we would like to cache local ACLs */ + if (sbi->ll_flags & LL_SBI_RMT_CLIENT) + op_data->op_valid |= OBD_MD_FLRMTLGETFACL; +#endif + + rc = md_enqueue(exp, &einfo, oit, op_data, &lockh, NULL, 0, NULL, 0); + ll_finish_md_op_data(op_data); + + if (rc < 0) { + CDEBUG(D_CACHE, "md_intent_lock failed with %d for fid "DFID"\n", + rc, PFID(ll_inode2fid(inode))); + mutex_unlock(&lli->lli_xattrs_enq_lock); + RETURN(rc); + } + + *req = (struct ptlrpc_request *)oit->d.lustre.it_data; +out: + down_write(&lli->lli_xattrs_list_rwsem); + mutex_unlock(&lli->lli_xattrs_enq_lock); + + RETURN(0); +} + +/** + * Refill the xattr cache. + * + * Fetch and cache the whole of xattrs for @inode, acquiring + * a read or a write xattr lock depending on operation in @oit. + * Intent is dropped on exit unless the operation is setxattr. + * + * \retval 0 no error occured + * \retval -EPROTO network protocol error + * \retval -ENOMEM not enough memory for the cache + */ +static int ll_xattr_cache_refill(struct inode *inode, struct lookup_intent *oit) +{ + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ptlrpc_request *req = NULL; + const char *xdata, *xval, *xtail, *xvtail; + struct ll_inode_info *lli = ll_i2info(inode); + struct mdt_body *body; + __u32 *xsizes; + int rc = 0, i; + + ENTRY; + + rc = ll_xattr_find_get_lock(inode, oit, &req); + if (rc) + GOTO(out_no_unlock, rc); + + /* Do we have the data at this point? */ + if (ll_xattr_cache_valid(lli)) { + ll_stats_ops_tally(sbi, LPROC_LL_GETXATTR_HITS, 1); + GOTO(out_maybe_drop, rc = 0); + } + + /* Matched but no cache? Cancelled on error by a parallel refill. */ + if (unlikely(req == NULL)) { + CDEBUG(D_CACHE, "cancelled by a parallel getxattr\n"); + GOTO(out_maybe_drop, rc = -EIO); + } + + if (oit->d.lustre.it_status < 0) { + CDEBUG(D_CACHE, "getxattr intent returned %d for fid "DFID"\n", + oit->d.lustre.it_status, PFID(ll_inode2fid(inode))); + GOTO(out_destroy, rc = oit->d.lustre.it_status); + } + + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + if (body == NULL) { + CERROR("no MDT BODY in the refill xattr reply\n"); + GOTO(out_destroy, rc = -EPROTO); + } + /* do not need swab xattr data */ + xdata = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA, + body->eadatasize); + xval = req_capsule_server_sized_get(&req->rq_pill, &RMF_EAVALS, + body->aclsize); + xsizes = req_capsule_server_sized_get(&req->rq_pill, &RMF_EAVALS_LENS, + body->max_mdsize * sizeof(__u32)); + if (xdata == NULL || xval == NULL || xsizes == NULL) { + CERROR("wrong setxattr reply\n"); + GOTO(out_destroy, rc = -EPROTO); + } + + xtail = xdata + body->eadatasize; + xvtail = xval + body->aclsize; + + CDEBUG(D_CACHE, "caching: xdata=%p xtail=%p\n", xdata, xtail); + + ll_xattr_cache_init(lli); + + for (i = 0; i < body->max_mdsize; i++) { + CDEBUG(D_CACHE, "caching [%s]=%.*s\n", xdata, *xsizes, xval); + /* Perform consistency checks: attr names and vals in pill */ + if (memchr(xdata, 0, xtail - xdata) == NULL) { + CERROR("xattr protocol violation (names are broken)\n"); + rc = -EPROTO; + } else if (xval + *xsizes > xvtail) { + CERROR("xattr protocol violation (vals are broken)\n"); + rc = -EPROTO; + } else if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_XATTR_ENOMEM)) { + rc = -ENOMEM; + } else { + rc = ll_xattr_cache_add(&lli->lli_xattrs, xdata, xval, + *xsizes); + } + if (rc < 0) { + ll_xattr_cache_destroy_locked(lli); + GOTO(out_destroy, rc); + } + xdata += strlen(xdata) + 1; + xval += *xsizes; + xsizes++; + } + + if (xdata != xtail || xval != xvtail) + CERROR("a hole in xattr data\n"); + + ll_set_lock_data(sbi->ll_md_exp, inode, oit, NULL); + + GOTO(out_maybe_drop, rc); +out_maybe_drop: + /* drop lock on error or getxattr */ + if (rc != 0 || oit->it_op != IT_SETXATTR) + ll_intent_drop_lock(oit); + + if (rc != 0) + up_write(&lli->lli_xattrs_list_rwsem); +out_no_unlock: + ptlrpc_req_finished(req); + + return rc; + +out_destroy: + up_write(&lli->lli_xattrs_list_rwsem); + + ldlm_lock_decref_and_cancel((struct lustre_handle *) + &oit->d.lustre.it_lock_handle, + oit->d.lustre.it_lock_mode); + + goto out_no_unlock; +} + +/** + * Get an xattr value or list xattrs using the write-through cache. + * + * Get the xattr value (@valid has OBD_MD_FLXATTR set) of @name or + * list xattr names (@valid has OBD_MD_FLXATTRLS set) for @inode. + * The resulting value/list is stored in @buffer if the former + * is not larger than @size. + * + * \retval 0 no error occured + * \retval -EPROTO network protocol error + * \retval -ENOMEM not enough memory for the cache + * \retval -ERANGE the buffer is not large enough + * \retval -ENODATA no such attr or the list is empty + */ +int ll_xattr_cache_get(struct inode *inode, + const char *name, + char *buffer, + size_t size, + __u64 valid) +{ + struct lookup_intent oit = { .it_op = IT_GETXATTR }; + struct ll_inode_info *lli = ll_i2info(inode); + int rc = 0; + + ENTRY; + + LASSERT(!!(valid & OBD_MD_FLXATTR) ^ !!(valid & OBD_MD_FLXATTRLS)); + + down_read(&lli->lli_xattrs_list_rwsem); + if (!ll_xattr_cache_valid(lli)) { + up_read(&lli->lli_xattrs_list_rwsem); + rc = ll_xattr_cache_refill(inode, &oit); + if (rc) + RETURN(rc); + downgrade_write(&lli->lli_xattrs_list_rwsem); + } else { + ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETXATTR_HITS, 1); + } + + if (valid & OBD_MD_FLXATTR) { + struct ll_xattr_entry *xattr; + + rc = ll_xattr_cache_find(&lli->lli_xattrs, name, &xattr); + if (rc == 0) { + rc = xattr->xe_vallen; + /* zero size means we are only requested size in rc */ + if (size != 0) { + if (size >= xattr->xe_vallen) + memcpy(buffer, xattr->xe_value, + xattr->xe_vallen); + else + rc = -ERANGE; + } + } + } else if (valid & OBD_MD_FLXATTRLS) { + rc = ll_xattr_cache_list(&lli->lli_xattrs, + size ? buffer : NULL, size); + } + + GOTO(out, rc); +out: + up_read(&lli->lli_xattrs_list_rwsem); + + return rc; +} + + +/** + * Set/update an xattr value or remove xattr using the write-through cache. + * + * Set/update the xattr value (if @valid has OBD_MD_FLXATTR) of @name to @newval + * or + * remove the xattr @name (@valid has OBD_MD_FLXATTRRM set) from @inode. + * @flags is either XATTR_CREATE or XATTR_REPLACE as defined by setxattr(2) + * + * \retval 0 no error occured + * \retval -EPROTO network protocol error + * \retval -ENOMEM not enough memory for the cache + * \retval -ERANGE the buffer is not large enough + * \retval -ENODATA no such attr (in the removal case) + */ +int ll_xattr_cache_update(struct inode *inode, + const char *name, + const char *newval, + size_t size, + __u64 valid, + int flags) +{ + struct lookup_intent oit = { .it_op = IT_SETXATTR }; + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ptlrpc_request *req = NULL; + struct ll_inode_info *lli = ll_i2info(inode); + struct obd_capa *oc; + int rc; + + ENTRY; + + LASSERT(!!(valid & OBD_MD_FLXATTR) ^ !!(valid & OBD_MD_FLXATTRRM)); + + rc = ll_xattr_cache_refill(inode, &oit); + if (rc) + RETURN(rc); + + oc = ll_mdscapa_get(inode); + rc = md_setxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, + valid | OBD_MD_FLXATTRLOCKED, name, newval, + size, 0, flags, ll_i2suppgid(inode), &req); + capa_put(oc); + + if (rc) { + ll_intent_drop_lock(&oit); + GOTO(out, rc); + } + + if (valid & OBD_MD_FLXATTR) + rc = ll_xattr_cache_add(&lli->lli_xattrs, name, newval, size); + else if (valid & OBD_MD_FLXATTRRM) + rc = ll_xattr_cache_del(&lli->lli_xattrs, name); + + ll_intent_drop_lock(&oit); + GOTO(out, rc); +out: + up_write(&lli->lli_xattrs_list_rwsem); + ptlrpc_req_finished(req); + + return rc; +} diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index 38eecb0..5d98eb2 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -72,6 +72,7 @@ void mdc_open_pack(struct ptlrpc_request *req, struct md_op_data *op_data, __u32 mode, __u64 rdev, __u64 flags, const void *data, int datalen); void mdc_unlink_pack(struct ptlrpc_request *req, struct md_op_data *op_data); +void mdc_getxattr_pack(struct ptlrpc_request *req, struct md_op_data *op_data); void mdc_link_pack(struct ptlrpc_request *req, struct md_op_data *op_data); void mdc_rename_pack(struct ptlrpc_request *req, struct md_op_data *op_data, const char *old, int oldlen, const char *new, int newlen); diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index d6e155a..12046eb 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -373,6 +373,62 @@ static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp, return req; } +static struct ptlrpc_request * +mdc_intent_getxattr_pack(struct obd_export *exp, + struct lookup_intent *it, + struct md_op_data *op_data) +{ + struct ptlrpc_request *req; + struct ldlm_intent *lit; + int rc, count = 0, maxdata; + CFS_LIST_HEAD(cancels); + + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_LDLM_INTENT_GETXATTR); + if (req == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + + if (it->it_op == IT_SETXATTR) + /* If we want to upgrade to LCK_PW, let's cancel LCK_PR + * locks now. This avoids unnecessary ASTs. */ + count = mdc_resource_get_unused(exp, &op_data->op_fid1, + &cancels, LCK_PW, + MDS_INODELOCK_XATTR); + + rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); + if (rc) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } + + /* pack the intent */ + lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + lit->opc = IT_GETXATTR; + + maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize; + + /* pack the intended request */ + mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1, + op_data->op_valid, maxdata, -1, 0); + + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, + RCL_SERVER, maxdata); + + req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, + RCL_SERVER, maxdata); + + req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, + RCL_SERVER, maxdata); + + ptlrpc_request_set_replen(req); + + RETURN(req); +} + static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp, struct lookup_intent *it, struct md_op_data *op_data) @@ -753,6 +809,8 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, { .l_inodebits = { MDS_INODELOCK_UPDATE } }; static const ldlm_policy_data_t layout_policy = { .l_inodebits = { MDS_INODELOCK_LAYOUT } }; + static const ldlm_policy_data_t getxattr_policy = { + .l_inodebits = { MDS_INODELOCK_XATTR } }; ldlm_policy_data_t const *policy = &lookup_policy; int generation, resends = 0; struct ldlm_reply *lockrep; @@ -770,6 +828,8 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, policy = &update_policy; else if (it->it_op & IT_LAYOUT) policy = &layout_policy; + else if (it->it_op & (IT_GETXATTR | IT_SETXATTR)) + policy = &getxattr_policy; } LASSERT(reqp == NULL); @@ -800,9 +860,10 @@ resend: } else if (it->it_op & IT_LAYOUT) { if (!imp_connect_lvb_type(class_exp2cliimp(exp))) RETURN(-EOPNOTSUPP); - req = mdc_intent_layout_pack(exp, it, op_data); lvb_type = LVB_T_LAYOUT; + } else if (it->it_op & (IT_GETXATTR | IT_SETXATTR)) { + req = mdc_intent_getxattr_pack(exp, it, op_data); } else { LBUG(); RETURN(-EINVAL); diff --git a/lustre/mdd/mdd_device.c b/lustre/mdd/mdd_device.c index 7d7c66f..648fb1b 100644 --- a/lustre/mdd/mdd_device.c +++ b/lustre/mdd/mdd_device.c @@ -1038,6 +1038,17 @@ static int mdd_init_capa_ctxt(const struct lu_env *env, struct md_device *m, RETURN(rc); } +static int mdd_maxeasize_get(const struct lu_env *env, struct md_device *m, + int *easize) +{ + struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev); + ENTRY; + + *easize = mdd->mdd_dt_conf.ddp_max_ea_size; + + RETURN(0); +} + static int mdd_update_capa_key(const struct lu_env *env, struct md_device *m, struct lustre_capa_key *key) @@ -1442,6 +1453,7 @@ const struct md_device_operations mdd_ops = { .mdo_update_capa_key= mdd_update_capa_key, .mdo_llog_ctxt_get = mdd_llog_ctxt_get, .mdo_iocontrol = mdd_iocontrol, + .mdo_maxeasize_get = mdd_maxeasize_get, }; static struct lu_device_type_operations mdd_device_type_ops = { diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 6bafc90..2d09345 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -3458,6 +3458,12 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, struct mdt_thread_info *info, struct ldlm_lock **, __u64); + +static int mdt_intent_getxattr(enum mdt_it_code opcode, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, + __u64 flags); + static int mdt_intent_layout(enum mdt_it_code opcode, struct mdt_thread_info *info, struct ldlm_lock **, @@ -3522,9 +3528,9 @@ static struct mdt_it_flavor { .it_act = NULL }, [MDT_IT_GETXATTR] = { - .it_fmt = NULL, + .it_fmt = &RQF_LDLM_INTENT_GETXATTR, .it_flags = 0, - .it_act = NULL + .it_act = mdt_intent_getxattr }, [MDT_IT_LAYOUT] = { .it_fmt = &RQF_LDLM_INTENT_LAYOUT, @@ -3675,6 +3681,44 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info, remote_hdl.cookie); } +static int mdt_intent_getxattr(enum mdt_it_code opcode, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, + __u64 flags) +{ + struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; + struct ldlm_reply *ldlm_rep = NULL; + int rc, grc; + + /* + * Initialize lhc->mlh_reg_lh either from a previously granted lock + * (for the resend case) or a new lock. Below we will use it to + * replace the original lock. + */ + mdt_intent_fixup_resent(info, *lockp, NULL, lhc); + if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) { + mdt_lock_reg_init(lhc, (*lockp)->l_req_mode); + rc = mdt_object_lock(info, info->mti_object, lhc, + MDS_INODELOCK_XATTR, + MDT_LOCAL_LOCK); + if (rc) + return rc; + } + + grc = mdt_getxattr(info); + + rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags); + + if (mdt_info_req(info)->rq_repmsg != NULL) + ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); + if (ldlm_rep == NULL) + RETURN(err_serious(-EFAULT)); + + ldlm_rep->lock_policy_res2 = grc; + + return rc; +} + static int mdt_intent_getattr(enum mdt_it_code opcode, struct mdt_thread_info *info, struct ldlm_lock **lockp, @@ -4864,6 +4908,10 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, else m->mdt_opts.mo_user_xattr = 0; + rc = next->md_ops->mdo_maxeasize_get(env, next, &m->mdt_max_ea_size); + if (rc) + GOTO(err_fs_cleanup, rc); + if (mntopts & MNTOPT_ACL) m->mdt_opts.mo_acl = 1; else @@ -5042,6 +5090,7 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env, mutex_init(&mo->mot_ioepoch_mutex); mutex_init(&mo->mot_lov_mutex); init_rwsem(&mo->mot_open_sem); + init_rwsem(&mo->mot_xattr_sem); RETURN(o); } RETURN(NULL); @@ -5293,6 +5342,8 @@ static int mdt_connect_internal(struct obd_export *exp, } } + data->ocd_max_easize = mdt->mdt_max_ea_size; + return 0; } diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index becdb15..e8b5830 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -185,6 +185,8 @@ struct mdt_device { int mdt_max_mdsize; int mdt_max_cookiesize; + int mdt_max_ea_size; + struct upcall_cache *mdt_identity_cache; /* sptlrpc rules */ @@ -254,6 +256,8 @@ struct mdt_object { struct rw_semaphore mot_open_sem; atomic_t mot_lease_count; atomic_t mot_open_count; + /* A lock to protect EA data from racing setxattr and getxattrall */ + struct rw_semaphore mot_xattr_sem; }; enum mdt_object_flags { diff --git a/lustre/mdt/mdt_xattr.c b/lustre/mdt/mdt_xattr.c index 1ba61e5..03ffc64 100644 --- a/lustre/mdt/mdt_xattr.c +++ b/lustre/mdt/mdt_xattr.c @@ -55,7 +55,7 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info) struct req_capsule *pill = info->mti_pill ; struct ptlrpc_request *req = mdt_info_req(info); char *xattr_name; - __u64 valid = info->mti_body->valid; + __u64 valid; static const char user_string[] = "user."; int size, rc; ENTRY; @@ -63,8 +63,10 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info) if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETXATTR_PACK)) RETURN(-ENOMEM); + valid = info->mti_body->valid & (OBD_MD_FLXATTR | OBD_MD_FLXATTRLS); + /* Determine how many bytes we need */ - if (valid & OBD_MD_FLXATTR) { + if (valid == OBD_MD_FLXATTR) { xattr_name = req_capsule_client_get(pill, &RMF_NAME); if (!xattr_name) RETURN(-EFAULT); @@ -76,10 +78,17 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info) size = mo_xattr_get(info->mti_env, mdt_object_child(info->mti_object), &LU_BUF_NULL, xattr_name); - } else if (valid & OBD_MD_FLXATTRLS) { + } else if (valid == OBD_MD_FLXATTRLS) { size = mo_xattr_list(info->mti_env, mdt_object_child(info->mti_object), &LU_BUF_NULL); + } else if (valid == OBD_MD_FLXATTRALL) { + /* N.B. eadatasize = 0 is not valid for FLXATTRALL */ + /* We could calculate accurate sizes, but this would + * introduce a lot of overhead, let's do it later... */ + size = info->mti_body->eadatasize; + req_capsule_set_size(pill, &RMF_EAVALS, RCL_SERVER, size); + req_capsule_set_size(pill, &RMF_EAVALS_LENS, RCL_SERVER, size); } else { CDEBUG(D_INFO, "Valid bits: "LPX64"\n", info->mti_body->valid); RETURN(-EINVAL); @@ -107,6 +116,49 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info) RETURN(size); } +static int +mdt_getxattr_one(struct mdt_thread_info *info, + char *xattr_name, struct md_object *next, + struct lu_buf *buf, struct mdt_export_data *med, + struct lu_ucred *uc) +{ + __u32 remote = exp_connect_rmtclient(info->mti_exp); + int flags = CFS_IC_NOTHING, rc; + + ENTRY; + + CDEBUG(D_INODE, "getxattr %s\n", xattr_name); + + rc = mo_xattr_get(info->mti_env, next, buf, xattr_name); + if (rc < 0) { + CERROR("getxattr failed: %d\n", rc); + GOTO(out, rc); + } + + if (info->mti_body->valid & + (OBD_MD_FLRMTLSETFACL | OBD_MD_FLRMTLGETFACL)) + flags = CFS_IC_ALL; + else if (info->mti_body->valid & OBD_MD_FLRMTRGETFACL) + flags = CFS_IC_MAPPED; + + if (rc > 0 && flags != CFS_IC_NOTHING) { + int rc1; + + if (unlikely(!remote)) + GOTO(out, rc = -EINVAL); + + rc1 = lustre_posix_acl_xattr_id2client(uc, + med->med_idmap, + (posix_acl_xattr_header *)(buf->lb_buf), + rc, flags); + if (unlikely(rc1 < 0)) + rc = rc1; + } + +out: + return rc; +} + int mdt_getxattr(struct mdt_thread_info *info) { struct ptlrpc_request *req = mdt_info_req(info); @@ -119,6 +171,7 @@ int mdt_getxattr(struct mdt_thread_info *info) __u32 remote = exp_connect_rmtclient(info->mti_exp); __u32 perm; int easize, rc; + obd_valid valid; ENTRY; LASSERT(info->mti_object != NULL); @@ -134,6 +187,8 @@ int mdt_getxattr(struct mdt_thread_info *info) if (rc) RETURN(err_serious(rc)); + down_read(&info->mti_object->mot_xattr_sem); + next = mdt_object_child(info->mti_object); if (info->mti_body->valid & OBD_MD_FLRMTRGETFACL) { @@ -162,60 +217,81 @@ int mdt_getxattr(struct mdt_thread_info *info) if (easize == 0 || reqbody->eadatasize == 0) GOTO(out, rc = easize); - buf = &info->mti_buf; buf->lb_buf = req_capsule_server_get(info->mti_pill, &RMF_EADATA); buf->lb_len = easize; - if (info->mti_body->valid & OBD_MD_FLXATTR) { - int flags = CFS_IC_NOTHING; - char *xattr_name = req_capsule_client_get(info->mti_pill, - &RMF_NAME); - CDEBUG(D_INODE, "getxattr %s\n", xattr_name); - - rc = mo_xattr_get(info->mti_env, next, buf, xattr_name); - if (rc < 0) { - CERROR("getxattr failed: %d\n", rc); - GOTO(out, rc); - } - - if (info->mti_body->valid & - (OBD_MD_FLRMTLSETFACL | OBD_MD_FLRMTLGETFACL)) - flags = CFS_IC_ALL; - else if (info->mti_body->valid & OBD_MD_FLRMTRGETFACL) - flags = CFS_IC_MAPPED; - - if (rc > 0 && flags != CFS_IC_NOTHING) { - int rc1; - - if (unlikely(!remote)) - GOTO(out, rc = -EINVAL); - - rc1 = lustre_posix_acl_xattr_id2client(uc, - med->med_idmap, - (posix_acl_xattr_header *)(buf->lb_buf), - rc, flags); - if (unlikely(rc1 < 0)) - rc = rc1; - } - } else if (info->mti_body->valid & OBD_MD_FLXATTRLS) { - CDEBUG(D_INODE, "listxattr\n"); - - rc = mo_xattr_list(info->mti_env, next, buf); - if (rc < 0) - CDEBUG(D_INFO, "listxattr failed: %d\n", rc); - } else - LBUG(); - - EXIT; + valid = info->mti_body->valid & (OBD_MD_FLXATTR | OBD_MD_FLXATTRLS); + + if (valid == OBD_MD_FLXATTR) { + char *xattr_name = req_capsule_client_get(info->mti_pill, + &RMF_NAME); + rc = mdt_getxattr_one(info, xattr_name, next, buf, med, uc); + } else if (valid == OBD_MD_FLXATTRLS) { + CDEBUG(D_INODE, "listxattr\n"); + + rc = mo_xattr_list(info->mti_env, next, buf); + if (rc < 0) + CDEBUG(D_INFO, "listxattr failed: %d\n", rc); + } else if (valid == OBD_MD_FLXATTRALL) { + /* + * The format of the pill is the following: + * EADATA: attr1\0attr2\0...attrn\0 + * EAVALS: val1val2...valn + * EAVALS_LENS: 4,4,...4 + */ + char *v, *b; + __u32 *sizes; + int eadatasize, eavallen = 0, eavallens = 0; + struct lu_buf buf2 = { .lb_len = reqbody->eadatasize }; + + /* Fill out EADATA */ + eadatasize = mo_xattr_list(info->mti_env, next, buf); + if (eadatasize < 0) + GOTO(out, rc = eadatasize); + + v = req_capsule_server_get(info->mti_pill, &RMF_EAVALS); + sizes = req_capsule_server_get(info->mti_pill, + &RMF_EAVALS_LENS); + + /* Fill out EAVALS and EAVALS_LENS */ + for (b = buf->lb_buf; + b < (char *)buf->lb_buf + eadatasize; + b += strlen(b) + 1, v += rc) { + buf2.lb_buf = v; + rc = mdt_getxattr_one(info, b, next, &buf2, med, uc); + if (rc < 0) + GOTO(out, rc); + sizes[eavallens] = rc; + buf2.lb_len -= rc; + eavallens++; + eavallen += rc; + } + + repbody->aclsize = eavallen; + repbody->max_mdsize = eavallens; + + req_capsule_shrink(info->mti_pill, &RMF_EAVALS, + eavallen, RCL_SERVER); + req_capsule_shrink(info->mti_pill, &RMF_EAVALS_LENS, + eavallens * sizeof(__u32), RCL_SERVER); + req_capsule_shrink(info->mti_pill, &RMF_EADATA, + eadatasize, RCL_SERVER); + rc = eadatasize; + } else + LBUG(); + + EXIT; out: - if (rc >= 0) { + up_read(&info->mti_object->mot_xattr_sem); + + if (rc >= 0) { mdt_counter_incr(req, LPROC_MDT_GETXATTR); - repbody->eadatasize = rc; - rc = 0; - } - mdt_exit_ucred(info); - return rc; + repbody->eadatasize = rc; + rc = 0; + } + mdt_exit_ucred(info); + return rc; } static int mdt_rmtlsetfacl(struct mdt_thread_info *info, @@ -341,6 +417,11 @@ int mdt_reint_setxattr(struct mdt_thread_info *info, if (!strcmp(xattr_name, XATTR_NAME_ACL_ACCESS)) lockpart |= MDS_INODELOCK_PERM | MDS_INODELOCK_LOOKUP; + /* We need to take the lock on behalf of old clients so that newer + * clients flush their xattr caches */ + if (!(valid & OBD_MD_FLXATTRLOCKED)) + lockpart |= MDS_INODELOCK_XATTR; + lh = &info->mti_lh[MDT_LH_PARENT]; /* ACLs were sent to clients under LCK_CR locks, so taking LCK_EX * to cancel them. */ @@ -349,6 +430,8 @@ int mdt_reint_setxattr(struct mdt_thread_info *info, if (IS_ERR(obj)) GOTO(out, rc = PTR_ERR(obj)); + down_write(&obj->mot_xattr_sem); + info->mti_mos = obj; rc = mdt_version_get_check_save(info, obj, 0); if (rc) @@ -419,6 +502,7 @@ int mdt_reint_setxattr(struct mdt_thread_info *info, EXIT; out_unlock: + up_write(&obj->mot_xattr_sem); mdt_object_unlock_put(info, obj, lh, rc); if (unlikely(new_xattr != NULL)) lustre_posix_acl_xattr_free(new_xattr, xattr_len); diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index 9358419..d0c4863 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -466,6 +466,25 @@ static const struct req_msg_field *ldlm_intent_unlink_client[] = { &RMF_NAME }; +static const struct req_msg_field *ldlm_intent_getxattr_client[] = { + &RMF_PTLRPC_BODY, + &RMF_DLM_REQ, + &RMF_LDLM_INTENT, + &RMF_MDT_BODY, + &RMF_CAPA1, +}; + +static const struct req_msg_field *ldlm_intent_getxattr_server[] = { + &RMF_PTLRPC_BODY, + &RMF_DLM_REP, + &RMF_MDT_BODY, + &RMF_MDT_MD, + &RMF_ACL, /* for req_capsule_extend/mdt_intent_policy */ + &RMF_EADATA, + &RMF_EAVALS, + &RMF_EAVALS_LENS +}; + static const struct req_msg_field *mds_getxattr_client[] = { &RMF_PTLRPC_BODY, &RMF_MDT_BODY, @@ -743,6 +762,7 @@ static struct req_format *req_formats[] = { &RQF_LDLM_INTENT_OPEN, &RQF_LDLM_INTENT_CREATE, &RQF_LDLM_INTENT_UNLINK, + &RQF_LDLM_INTENT_GETXATTR, &RQF_LDLM_INTENT_QUOTA, &RQF_QUOTA_DQACQ, &RQF_LOG_CANCEL, @@ -1017,6 +1037,9 @@ struct req_msg_field RMF_EADATA = DEFINE_MSGF("eadata", 0, -1, NULL, NULL); EXPORT_SYMBOL(RMF_EADATA); +struct req_msg_field RMF_EAVALS = DEFINE_MSGF("eavals", 0, -1, NULL, NULL); +EXPORT_SYMBOL(RMF_EAVALS); + struct req_msg_field RMF_ACL = DEFINE_MSGF("acl", RMF_F_NO_SIZE_CHECK, LUSTRE_POSIX_ACL_MAX_SIZE, NULL, NULL); @@ -1068,6 +1091,11 @@ struct req_msg_field RMF_RCS = lustre_swab_generic_32s, dump_rcs); EXPORT_SYMBOL(RMF_RCS); +struct req_msg_field RMF_EAVALS_LENS = + DEFINE_MSGF("eavals_lens", RMF_F_STRUCT_ARRAY, sizeof(__u32), + lustre_swab_generic_32s, NULL); +EXPORT_SYMBOL(RMF_EAVALS_LENS); + struct req_msg_field RMF_OBD_ID = DEFINE_MSGF("obd_id", 0, sizeof(obd_id), lustre_swab_ost_last_id, NULL); @@ -1425,6 +1453,12 @@ struct req_format RQF_LDLM_INTENT_UNLINK = ldlm_intent_unlink_client, ldlm_intent_server); EXPORT_SYMBOL(RQF_LDLM_INTENT_UNLINK); +struct req_format RQF_LDLM_INTENT_GETXATTR = + DEFINE_REQ_FMT0("LDLM_INTENT_GETXATTR", + ldlm_intent_getxattr_client, + ldlm_intent_getxattr_server); +EXPORT_SYMBOL(RQF_LDLM_INTENT_GETXATTR); + struct req_format RQF_MDS_CLOSE = DEFINE_REQ_FMT0("MDS_CLOSE", mdt_close_client, mds_last_unlink_server); diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 29d04d0..2b653bc 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -11424,6 +11424,29 @@ test_233() { } run_test 233 "checking that OBF of the FS root succeeds" +test_234() { + local p="$TMP/sanityN-$TESTNAME.parameters" + save_lustre_params client "llite.*.xattr_cache" > $p + lctl set_param llite.*.xattr_cache 1 || + { skip "xattr cache is not supported"; return 0; } + + mkdir -p $DIR/$tdir || error "mkdir failed" + touch $DIR/$tdir/$tfile || error "touch failed" + # OBD_FAIL_LLITE_XATTR_ENOMEM + $LCTL set_param fail_loc=0x1405 + setfattr -n user.attr -v value $DIR/$tdir/$tfile && + error "setfattr should have failed with ENOMEM" + # attr pre-2.4.44-7 had a bug with rc + getfattr -n user.attr $DIR/$tdir/$tfile && + error "getfattr should have failed with ENOMEM" + $LCTL set_param fail_loc=0x0 + rm -rf $DIR/$tdir + + restore_lustre_params < $p + rm -f $p +} +run_test 234 "xattr cache should not crash on ENOMEM" + # # tests that do cleanup/setup should be run at the end # diff --git a/lustre/tests/sanityn.sh b/lustre/tests/sanityn.sh index ba50004..45d5dc9 100644 --- a/lustre/tests/sanityn.sh +++ b/lustre/tests/sanityn.sh @@ -2477,6 +2477,54 @@ test_71() { } run_test 71 "correct file map just after write operation is finished" +test_72() { + local p="$TMP/sanityN-$TESTNAME.parameters" + save_lustre_params client "llite.*.xattr_cache" > $p + lctl set_param llite.*.xattr_cache 1 || + { skip "xattr cache is not supported"; return 0; } + + touch $DIR1/$tfile + setfattr -n user.attr1 -v value1 $DIR1/$tfile || + error "setfattr1 failed" + getfattr -n user.attr1 $DIR2/$tfile | grep value1 || + error "getfattr1 failed" + setfattr -n user.attr1 -v value2 $DIR2/$tfile || + error "setfattr2 failed" + getfattr -n user.attr1 $DIR1/$tfile | grep value2 || + error "getfattr2 failed" + rm -f $DIR2/$tfile + + restore_lustre_params < $p + rm -f $p +} +run_test 72 "getxattr/setxattr cache should be consistent between nodes" + +test_73() { + local p="$TMP/sanityN-$TESTNAME.parameters" + save_lustre_params client "llite.*.xattr_cache" > $p + lctl set_param llite.*.xattr_cache 1 || + { skip "xattr cache is not supported"; return 0; } + + touch $DIR1/$tfile + setfattr -n user.attr1 -v value1 $DIR1/$tfile || + error "setfattr1 failed" + getfattr -n user.attr1 $DIR2/$tfile || error "getfattr1 failed" + getfattr -n user.attr1 $DIR1/$tfile || error "getfattr2 failed" + clear_llite_stats + # PR lock should be cached by now on both clients + getfattr -n user.attr1 $DIR1/$tfile || error "getfattr3 failed" + # 2 hits for getfattr(0)+getfattr(size) + [ $(calc_llite_stats getxattr_hits) -eq 2 ] || error "not cached in $DIR1" + getfattr -n user.attr1 $DIR2/$tfile || error "getfattr4 failed" + # 4 hits for more getfattr(0)+getfattr(size) + [ $(calc_llite_stats getxattr_hits) -eq 4 ] || error "not cached in $DIR2" + rm -f $DIR2/$tfile + + restore_lustre_params < $p + rm -f $p +} +run_test 73 "getxattr should not cause xattr lock cancellation" + log "cleanup: ======================================================" [ "$(mount | grep $MOUNT2)" ] && umount $MOUNT2