From 29c49b7a931b05235d3564b51f92d2b03f89d40a Mon Sep 17 00:00:00 2001 From: vitaly Date: Sat, 16 Sep 2006 00:05:48 +0000 Subject: [PATCH] Branch b_new_cmd b=22564 cache file size on mds --- lustre/include/liblustre.h | 4 +- lustre/include/linux/lustre_compat25.h | 6 +- lustre/include/linux/obd_class.h | 2 - lustre/include/lustre/lustre_idl.h | 52 ++++- lustre/include/lustre_handles.h | 1 + lustre/include/lustre_req_layout.h | 1 + lustre/include/obd.h | 8 +- lustre/include/obd_class.h | 18 +- lustre/liblustre/file.c | 83 +++++-- lustre/liblustre/llite_lib.h | 12 +- lustre/liblustre/namei.c | 20 +- lustre/liblustre/rw.c | 57 ++++- lustre/liblustre/super.c | 164 +++++++++----- lustre/llite/dir.c | 5 +- lustre/llite/file.c | 227 +++++++++++++------ lustre/llite/llite_close.c | 230 +++++++++++--------- lustre/llite/llite_internal.h | 44 +++- lustre/llite/llite_lib.c | 170 ++++++++++----- lustre/llite/namei.c | 23 +- lustre/llite/rw.c | 17 +- lustre/llite/rw26.c | 20 +- lustre/lmv/lmv_obd.c | 16 +- lustre/lov/lov_request.c | 1 + lustre/mdc/mdc_internal.h | 14 +- lustre/mdc/mdc_lib.c | 102 ++++----- lustre/mdc/mdc_reint.c | 22 +- lustre/mdc/mdc_request.c | 38 ++-- lustre/mdd/mdd_handler.c | 13 ++ lustre/mds/mds_open.c | 3 - lustre/mdt/mdt_handler.c | 59 +++-- lustre/mdt/mdt_internal.h | 32 ++- lustre/mdt/mdt_lib.c | 52 ++++- lustre/mdt/mdt_open.c | 386 +++++++++++++++++++++++++-------- lustre/mdt/mdt_recovery.c | 10 +- lustre/mdt/mdt_reint.c | 168 ++++++++++---- lustre/obdclass/class_obd.c | 1 + lustre/obdclass/linux/linux-obdo.c | 84 ------- lustre/obdclass/llog_swab.c | 2 +- lustre/obdclass/lustre_handles.c | 14 ++ lustre/obdclass/obdo.c | 114 ++++++++++ lustre/obdfilter/filter_internal.h | 2 +- lustre/obdfilter/filter_log.c | 16 +- lustre/ptlrpc/layout.c | 16 +- lustre/ptlrpc/pack_generic.c | 10 +- lustre/ptlrpc/ptlrpc_module.c | 1 + lustre/tests/oos.sh | 2 + lustre/utils/req-layout.c | 5 +- lustre/utils/wirecheck.c | 2 +- 48 files changed, 1591 insertions(+), 758 deletions(-) diff --git a/lustre/include/liblustre.h b/lustre/include/liblustre.h index e23fec2..c7dd09e 100644 --- a/lustre/include/liblustre.h +++ b/lustre/include/liblustre.h @@ -468,6 +468,7 @@ static inline cfs_page_t* __grab_cache_page(unsigned long index) #define ATTR_RAW 0x0800 /* file system, not vfs will massage attrs */ #define ATTR_FROM_OPEN 0x1000 /* called from open path, ie O_TRUNC */ #define ATTR_CTIME_SET 0x2000 +#define ATTR_BLOCKS 0x4000 struct iattr { unsigned int ia_valid; @@ -480,7 +481,8 @@ struct iattr { time_t ia_ctime; unsigned int ia_attr_flags; }; -#define ll_iattr_struct iattr + +#define ll_iattr iattr #define IT_OPEN 0x0001 #define IT_CREAT 0x0002 diff --git a/lustre/include/linux/lustre_compat25.h b/lustre/include/linux/lustre_compat25.h index 1214b17..fb4a62b 100644 --- a/lustre/include/linux/lustre_compat25.h +++ b/lustre/include/linux/lustre_compat25.h @@ -34,12 +34,12 @@ #include #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,14) -struct ll_iattr_struct { +struct ll_iattr { struct iattr iattr; unsigned int ia_attr_flags; }; #else -#define ll_iattr_struct iattr +#define ll_iattr iattr #endif #ifndef HAVE_SET_FS_PWD @@ -65,6 +65,8 @@ static inline void ll_set_fs_pwd(struct fs_struct *fs, struct vfsmount *mnt, #define ll_set_fs_pwd set_fs_pwd #endif +#define ATTR_BLOCKS 0x4000 + #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,15) #define UNLOCK_INODE_MUTEX(inode) do {mutex_unlock(&(inode)->i_mutex); } while(0) #define LOCK_INODE_MUTEX(inode) do {mutex_lock(&(inode)->i_mutex); } while(0) diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index 2fee9fa..72ebd2b 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -41,8 +41,6 @@ /* obdo.c */ #ifdef __KERNEL__ -void obdo_from_iattr(struct obdo *oa, struct iattr *attr, unsigned ia_valid); -void iattr_from_obdo(struct iattr *attr, struct obdo *oa, obd_flag valid); void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid); void obdo_from_la(struct obdo *dst, struct lu_attr *la, obd_flag valid); void obdo_refresh_inode(struct inode *dst, struct obdo *src, obd_flag valid); diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 5d916ad..694ec2e 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -595,6 +595,7 @@ extern void lustre_swab_obdo (struct obdo *o); struct md_op_data { struct lu_fid fid1; struct lu_fid fid2; + struct lustre_handle handle; __u64 mod_time; const char *name; int namelen; @@ -604,14 +605,19 @@ struct md_op_data { __u32 suppgids[2]; __u32 fsuid; __u32 fsgid; - /* part of obdo fields for md stack */ - obd_valid valid; - obd_size size; - obd_blocks blocks; - obd_flag flags; - obd_time mtime; - obd_time atime; - obd_time ctime; + + /* iattr fields and blocks. */ + struct iattr attr; +#ifdef __KERNEL__ +#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,14) + unsigned int attr_flags; +#endif +#endif + loff_t attr_blocks; + + /* Size-on-MDS epoch and flags. */ + __u64 ioepoch; + __u32 flags; }; #define MDS_MODE_DONT_LOCK (1 << 30) @@ -677,6 +683,7 @@ struct lov_mds_md_v1 { /* LOV EA mds/wire data (little-endian) */ #define OBD_MD_FLGROUP (0x01000000ULL) /* group */ #define OBD_MD_FLFID (0x02000000ULL) /* ->ost write inline fid */ #define OBD_MD_FLEPOCH (0x04000000ULL) /* ->ost write easize is epoch */ + /* ->mds if epoch opens or closes */ #define OBD_MD_FLGRANT (0x08000000ULL) /* ost preallocation space grant */ #define OBD_MD_FLDIREA (0x10000000ULL) /* dir's extended attribute data */ #define OBD_MD_FLUSRQUOTA (0x20000000ULL) /* over quota flags sent from ost */ @@ -887,7 +894,15 @@ struct mds_status_req { extern void lustre_swab_mds_status_req (struct mds_status_req *r); -#define MDS_BFLAG_UNCOMMITTED_WRITES 0x1 +/* mdt_thread_info.mti_flags. */ +enum mdt_ioepoch_flags { + /* The flag indicates Size-on-MDS attributes are changed. */ + MF_SOM_CHANGE = (1 << 0), + /* Flags indicates an epoch opens or closes. */ + MF_EPOCH_OPEN = (1 << 1), + MF_EPOCH_CLOSE = (1 << 2), +}; + #define MDS_BFLAG_EXT_FLAGS 0x80000000 /* == EXT3_RESERVED_FL */ /* these should be identical to their EXT3_*_FL counterparts, and are @@ -941,7 +956,7 @@ struct mdt_body { __u64 atime; __u64 ctime; __u64 blocks; /* XID, in the case of MDS_READPAGE */ - __u64 io_epoch; + __u64 ioepoch; __u32 fsuid; __u32 fsgid; __u32 capability; @@ -991,6 +1006,14 @@ struct mds_body { extern void lustre_swab_mds_body (struct mds_body *b); extern void lustre_swab_mdt_body (struct mdt_body *b); +struct mdt_epoch { + struct lustre_handle handle; + __u64 ioepoch; + __u32 flags; +}; + +extern void lustre_swab_mdt_epoch (struct mdt_body *b); + struct lustre_md { struct mdt_body *body; struct lov_stripe_md *lsm; @@ -1053,6 +1076,7 @@ struct mdt_rec_setattr { struct lu_fid sa_fid; __u64 sa_valid; __u64 sa_size; + __u64 sa_blocks; __u64 sa_mtime; __u64 sa_atime; __u64 sa_ctime; @@ -1073,6 +1097,12 @@ extern void lustre_swab_mdt_rec_setattr (struct mdt_rec_setattr *sa); #define FMODE_READ 00000001 #define FMODE_WRITE 00000002 #endif + +#define FMODE_EPOCH 01000000 +#define FMODE_EPOCHLCK 02000000 +#define FMODE_SOM 04000000 +#define FMODE_CLOSED 0 + #define MDS_FMODE_EXEC 00000004 #define MDS_OPEN_CREAT 00000100 #define MDS_OPEN_EXCL 00000200 @@ -1640,7 +1670,7 @@ struct llog_setattr_rec { struct llog_size_change_rec { struct llog_rec_hdr lsc_hdr; struct ll_fid lsc_fid; - __u32 lsc_io_epoch; + __u32 lsc_ioepoch; __u32 padding; struct llog_rec_tail lsc_tail; } __attribute__((packed)); diff --git a/lustre/include/lustre_handles.h b/lustre/include/lustre_handles.h index bbd2fcd..e2a3c9a 100644 --- a/lustre/include/lustre_handles.h +++ b/lustre/include/lustre_handles.h @@ -36,6 +36,7 @@ struct portals_handle { /* Add a handle to the hash table */ void class_handle_hash(struct portals_handle *, portals_handle_addref_cb); void class_handle_unhash(struct portals_handle *); +void class_handle_hash_back(struct portals_handle *); void *class_handle2object(__u64 cookie); int class_handle_init(void); void class_handle_cleanup(void); diff --git a/lustre/include/lustre_req_layout.h b/lustre/include/lustre_req_layout.h index c61f22e..64cb02b 100644 --- a/lustre/include/lustre_req_layout.h +++ b/lustre/include/lustre_req_layout.h @@ -134,6 +134,7 @@ extern const struct req_format RQF_LDLM_INTENT_UNLINK; extern const struct req_msg_field RMF_PTLRPC_BODY; extern const struct req_msg_field RMF_MDT_BODY; +extern const struct req_msg_field RMF_MDT_EPOCH; extern const struct req_msg_field RMF_OBD_STATFS; extern const struct req_msg_field RMF_NAME; extern const struct req_msg_field RMF_SYMTGT; diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 38ece9f..6dce92d 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -1108,7 +1108,8 @@ struct md_ops { int (*m_create)(struct obd_export *, struct md_op_data *, const void *, int, int, __u32, __u32, __u32, __u64, struct ptlrpc_request **); - int (*m_done_writing)(struct obd_export *, struct md_op_data *); + int (*m_done_writing)(struct obd_export *, struct md_op_data *, + struct obd_client_handle *); int (*m_enqueue)(struct obd_export *, int, struct lookup_intent *, int, struct md_op_data *, struct lustre_handle *, void *, int, ldlm_completion_callback, @@ -1127,9 +1128,8 @@ struct md_ops { int (*m_rename)(struct obd_export *, struct md_op_data *, const char *, int, const char *, int, struct ptlrpc_request **); - int (*m_setattr)(struct obd_export *, struct md_op_data *, - struct iattr *, void *, int , void *, int, - struct ptlrpc_request **); + int (*m_setattr)(struct obd_export *, struct md_op_data *, void *, + int , void *, int, struct ptlrpc_request **); int (*m_sync)(struct obd_export *, const struct lu_fid *, struct ptlrpc_request **); int (*m_readpage)(struct obd_export *, const struct lu_fid *, diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index fbe385b..cb0060d 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -182,7 +182,12 @@ int class_manual_cleanup(struct obd_device *obd); void obdo_cpy_md(struct obdo *dst, struct obdo *src, obd_flag valid); int obdo_cmp_md(struct obdo *dst, struct obdo *src, obd_flag compare); void obdo_to_ioobj(struct obdo *oa, struct obd_ioobj *ioobj); - +void obdo_from_iattr(struct obdo *oa, struct iattr *attr, + unsigned int ia_valid); +void iattr_from_obdo(struct iattr *attr, struct obdo *oa, obd_flag valid); +void md_from_obdo(struct md_op_data *op_data, struct obdo *oa, obd_flag valid); +void obdo_from_md(struct obdo *oa, struct md_op_data *op_data, + unsigned int valid); #define OBT(dev) (dev)->obd_type #define OBP(dev, op) (dev)->obd_type->typ_dt_ops->o_ ## op @@ -1608,13 +1613,14 @@ static inline int md_create(struct obd_export *exp, struct md_op_data *op_data, } static inline int md_done_writing(struct obd_export *exp, - struct md_op_data *op_data) + struct md_op_data *op_data, + struct obd_client_handle *och) { int rc; ENTRY; EXP_CHECK_MD_OP(exp, done_writing); MD_COUNTER_INCREMENT(exp->exp_obd, done_writing); - rc = MDP(exp->exp_obd, done_writing)(exp, op_data); + rc = MDP(exp->exp_obd, done_writing)(exp, op_data, och); RETURN(rc); } @@ -1699,14 +1705,14 @@ static inline int md_rename(struct obd_export *exp, } static inline int md_setattr(struct obd_export *exp, struct md_op_data *op_data, - struct iattr *iattr, void *ea, int ealen, - void *ea2, int ea2len, struct ptlrpc_request **request) + void *ea, int ealen, void *ea2, int ea2len, + struct ptlrpc_request **request) { int rc; ENTRY; EXP_CHECK_MD_OP(exp, setattr); MD_COUNTER_INCREMENT(exp->exp_obd, setattr); - rc = MDP(exp->exp_obd, setattr)(exp, op_data, iattr, ea, ealen, + rc = MDP(exp->exp_obd, setattr)(exp, op_data, ea, ealen, ea2, ea2len, request); RETURN(rc); } diff --git a/lustre/liblustre/file.c b/lustre/liblustre/file.c index 34dbf74..c0c26e3 100644 --- a/lustre/liblustre/file.c +++ b/lustre/liblustre/file.c @@ -310,6 +310,33 @@ int llu_objects_destroy(struct ptlrpc_request *request, struct inode *dir) return rc; } +int llu_sizeonmds_update(struct inode *inode, struct lustre_handle *fh) +{ + struct llu_inode_info *lli = llu_i2info(inode); + struct md_op_data op_data; + struct obdo oa; + int rc; + ENTRY; + + LASSERT(!(lli->lli_flags & LLIF_MDS_SIZE_LOCK)); + + rc = llu_inode_getattr(inode, &oa); + if (rc) { + CERROR("inode_getattr failed (%d): unable to send a " + "Size-on-MDS attribute update for inode %llu/%lu\n", + rc, (long long)llu_i2stat(inode)->st_ino, + lli->lli_st_generation); + RETURN(rc); + } + + md_from_obdo(&op_data, &oa, oa.o_valid); + memcpy(&op_data.handle, fh, sizeof(*fh)); + op_data.flags |= MF_SOM_CHANGE; + + rc = llu_md_setattr(inode, &op_data); + RETURN(rc); +} + int llu_md_close(struct obd_export *md_exp, struct inode *inode) { struct llu_inode_info *lli = llu_i2info(inode); @@ -329,33 +356,45 @@ int llu_md_close(struct obd_export *md_exp, struct inode *inode) &fd->fd_cwlockh); } - memset(&op_data, 0, sizeof(op_data)); - op_data.fid1 = lli->lli_fid; - op_data.valid = OBD_MD_FLTYPE | OBD_MD_FLMODE | - OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | - OBD_MD_FLATIME | OBD_MD_FLMTIME | - OBD_MD_FLCTIME; - - op_data.atime = LTIME_S(st->st_atime); - op_data.mtime = LTIME_S(st->st_mtime); - op_data.ctime = LTIME_S(st->st_ctime); - op_data.size = st->st_size; - op_data.blocks = st->st_blocks; - op_data.flags = lli->lli_st_flags; - - if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags)) - op_data.valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; - - if (0 /* ll_is_inode_dirty(inode) */) { - op_data.flags = MDS_BFLAG_UNCOMMITTED_WRITES; - op_data.valid |= OBD_MD_FLFLAGS; + op_data.attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET | + ATTR_MTIME_SET | ATTR_CTIME_SET; + + if (!S_ISREG(llu_i2stat(inode)->st_mode)) { + op_data.attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS; + } else { + /* Inode cannot be dirty. Close the epoch. */ + op_data.flags |= MF_EPOCH_CLOSE; + /* XXX: Send CHANGE flag only if Size-on-MDS inode attributes + * are really changed. */ + op_data.flags |= MF_SOM_CHANGE; + + /* Pack Size-on-MDS attrinodes if valid. */ + if ((lli->lli_flags & LLIF_MDS_SIZE_LOCK) || + !llu_local_size(inode)) + op_data.attr.ia_valid |= + OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; } + + op_data.fid1 = lli->lli_fid; + op_data.attr.ia_atime = st->st_atime; + op_data.attr.ia_mtime = st->st_mtime; + op_data.attr.ia_ctime = st->st_ctime; + op_data.attr.ia_size = st->st_size; + op_data.attr_blocks = st->st_blocks; + op_data.attr.ia_attr_flags = lli->lli_st_flags; + op_data.ioepoch = lli->lli_ioepoch; + memcpy(&op_data.handle, &och->och_fh, sizeof(op_data.handle)); + rc = md_close(md_exp, &op_data, och, &req); if (rc == EAGAIN) { /* We are the last writer, so the MDS has instructed us to get * the file size and any write cookies, then close again. */ - //ll_queue_done_writing(inode); - rc = 0; + rc = llu_sizeonmds_update(inode, &och->och_fh); + if (rc) { + CERROR("inode %llu mdc Size-on-MDS update failed: " + "rc = %d\n", (long long)st->st_ino, rc); + rc = 0; + } } else if (rc) { CERROR("inode %llu close failed: rc %d\n", (long long)st->st_ino, rc); diff --git a/lustre/liblustre/llite_lib.h b/lustre/liblustre/llite_lib.h index 1bd631b..ddd07d0 100644 --- a/lustre/liblustre/llite_lib.h +++ b/lustre/liblustre/llite_lib.h @@ -40,8 +40,10 @@ struct llu_sb_info { #define LL_SBI_NOLCK 0x1 -#define LLI_F_HAVE_OST_SIZE_LOCK 0 -#define LLI_F_HAVE_MDS_SIZE_LOCK 1 +enum lli_flags { + /* MDS has an authority for the Size-on-MDS attributes. */ + LLIF_MDS_SIZE_LOCK = (1 << 0), +}; struct llu_inode_info { struct llu_sb_info *lli_sbi; @@ -52,6 +54,7 @@ struct llu_inode_info { struct semaphore lli_open_sem; __u64 lli_maxbytes; unsigned long lli_flags; + __u64 lli_ioepoch; /* for libsysio */ struct file_identifier lli_sysio_fid; @@ -188,7 +191,8 @@ void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid); void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid); int ll_it_open_error(int phase, struct lookup_intent *it); struct inode *llu_iget(struct filesys *fs, struct lustre_md *md); -int llu_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm); +int llu_inode_getattr(struct inode *inode, struct obdo *obdo); +int llu_md_setattr(struct inode *inode, struct md_op_data *op_data); int llu_setattr_raw(struct inode *inode, struct iattr *attr); extern struct fssw_ops llu_fssw_ops; @@ -205,6 +209,7 @@ int llu_local_open(struct llu_inode_info *lli, struct lookup_intent *it); int llu_iop_open(struct pnode *pnode, int flags, mode_t mode); int llu_md_close(struct obd_export *md_exp, struct inode *inode); int llu_file_release(struct inode *inode); +int llu_sizeonmds_update(struct inode *inode, struct lustre_handle *fh); int llu_iop_close(struct inode *inode); _SYSIO_OFF_T llu_iop_pos(struct inode *ino, _SYSIO_OFF_T off); int llu_vmtruncate(struct inode * inode, loff_t offset, obd_flag obd_flags); @@ -215,6 +220,7 @@ int llu_objects_destroy(struct ptlrpc_request *request, struct inode *dir); int llu_iop_read(struct inode *ino, struct ioctx *ioctxp); int llu_iop_write(struct inode *ino, struct ioctx *ioctxp); int llu_iop_iodone(struct ioctx *ioctxp); +int llu_local_size(struct inode *inode); int llu_glimpse_size(struct inode *inode); int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, struct lov_stripe_md *lsm, int mode, diff --git a/lustre/liblustre/namei.c b/lustre/liblustre/namei.c index 9da93a1..258d1a9 100644 --- a/lustre/liblustre/namei.c +++ b/lustre/liblustre/namei.c @@ -153,6 +153,7 @@ int llu_md_blocking_ast(struct ldlm_lock *lock, struct llu_inode_info *lli; struct intnl_stat *st; __u64 bits = lock->l_policy_data.l_inodebits.bits; + struct lu_fid *fid; /* Invalidate all dentries associated with this inode */ if (inode == NULL) @@ -162,15 +163,16 @@ int llu_md_blocking_ast(struct ldlm_lock *lock, st = llu_i2stat(inode); if (bits & MDS_INODELOCK_UPDATE) - clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags); - - if (lock->l_resource->lr_name.name[0] != fid_seq(&lli->lli_fid) || - lock->l_resource->lr_name.name[1] != fid_oid(&lli->lli_fid) || - lock->l_resource->lr_name.name[2] != fid_ver(&lli->lli_fid)) { - LDLM_ERROR(lock, "data mismatch with ino %llu/%llu/%llu", - (long long)fid_seq(&lli->lli_fid), - (long long)fid_oid(&lli->lli_fid), - (long long)fid_ver(&lli->lli_fid)); + lli->lli_flags &= ~LLIF_MDS_SIZE_LOCK; + + fid = &lli->lli_fid; + if (lock->l_resource->lr_name.name[0] != fid_seq(fid) || + lock->l_resource->lr_name.name[1] != fid_oid(fid) || + lock->l_resource->lr_name.name[2] != fid_ver(fid)) { + LDLM_ERROR(lock,"data mismatch with ino %llu/%llu/%llu", + (long long)fid_seq(fid), + (long long)fid_oid(fid), + (long long)fid_ver(fid)); } if (S_ISDIR(st->st_mode) && (bits & MDS_INODELOCK_UPDATE)) { diff --git a/lustre/liblustre/rw.c b/lustre/liblustre/rw.c index 7ff2778..fd37dcf 100644 --- a/lustre/liblustre/rw.c +++ b/lustre/liblustre/rw.c @@ -218,6 +218,49 @@ static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp) return rc; } +static void llu_merge_lvb(struct inode *inode) +{ + struct llu_inode_info *lli = llu_i2info(inode); + struct llu_sb_info *sbi = llu_i2sbi(inode); + struct intnl_stat *st = llu_i2stat(inode); + struct ost_lvb lvb; + ENTRY; + + inode_init_lvb(inode, &lvb); + obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0); + st->st_size = lvb.lvb_size; + st->st_blocks = lvb.lvb_blocks; + st->st_mtime = lvb.lvb_mtime; + st->st_atime = lvb.lvb_atime; + st->st_ctime = lvb.lvb_ctime; + EXIT; +} + +int llu_local_size(struct inode *inode) +{ + ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } }; + struct llu_inode_info *lli = llu_i2info(inode); + struct llu_sb_info *sbi = llu_i2sbi(inode); + struct lustre_handle lockh = { 0 }; + int flags = 0; + int rc; + ENTRY; + + if (lli->lli_smd->lsm_stripe_count == 0) + RETURN(0); + + rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT, + &policy, LCK_PR | LCK_PW, &flags, inode, &lockh); + if (rc < 0) + RETURN(rc); + else if (rc == 0) + RETURN(-ENODATA); + + llu_merge_lvb(inode); + obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh); + RETURN(0); +} + /* NB: lov_merge_size will prefer locally cached writes if they extend the * file (because it prefers KMS over RSS when larger) */ int llu_glimpse_size(struct inode *inode) @@ -228,10 +271,13 @@ int llu_glimpse_size(struct inode *inode) struct lustre_handle lockh = { 0 }; struct obd_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; - struct ost_lvb lvb; int rc; ENTRY; + /* If size is cached on the mds, skip glimpse. */ + if (lli->lli_flags & LLIF_MDS_SIZE_LOCK) + RETURN(0); + CDEBUG(D_DLMTRACE, "Glimpsing inode %llu\n", (long long)st->st_ino); if (!lli->lli_smd) { @@ -258,14 +304,7 @@ int llu_glimpse_size(struct inode *inode) RETURN(rc > 0 ? -EIO : rc); } - inode_init_lvb(inode, &lvb); - obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0); - st->st_size = lvb.lvb_size; - st->st_blocks = lvb.lvb_blocks; - st->st_mtime = lvb.lvb_mtime; - st->st_atime = lvb.lvb_atime; - st->st_ctime = lvb.lvb_ctime; - + llu_merge_lvb(inode); CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n", (long long)st->st_size, (long long)st->st_blocks); diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 6dad0bc..9c40ef1 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -297,34 +297,33 @@ void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid) /* * really does the getattr on the inode and updates its fields */ -int llu_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm) +int llu_inode_getattr(struct inode *inode, struct obdo *obdo) { struct llu_inode_info *lli = llu_i2info(inode); - struct obd_export *exp = llu_i2obdexp(inode); struct ptlrpc_request_set *set; + struct lov_stripe_md *lsm = lli->lli_smd; struct obd_info oinfo = { { { 0 } } }; - struct obdo oa = { 0 }; - obd_flag refresh_valid; int rc; ENTRY; LASSERT(lsm); - LASSERT(lli); oinfo.oi_md = lsm; - oinfo.oi_oa = &oa; - oa.o_id = lsm->lsm_object_id; - oa.o_mode = S_IFREG; - oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE | - OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | - OBD_MD_FLCTIME; + oinfo.oi_oa = obdo; + oinfo.oi_oa->o_id = lsm->lsm_object_id; + oinfo.oi_oa->o_gr = lsm->lsm_object_gr; + oinfo.oi_oa->o_mode = S_IFREG; + oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | + OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | + OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | + OBD_MD_FLCTIME; set = ptlrpc_prep_set(); if (set == NULL) { CERROR ("ENOMEM allocing request set\n"); rc = -ENOMEM; } else { - rc = obd_getattr_async(exp, &oinfo, set); + rc = obd_getattr_async(llu_i2obdexp(inode), &oinfo, set); if (rc == 0) rc = ptlrpc_set_wait(set); ptlrpc_set_destroy(set); @@ -332,11 +331,16 @@ int llu_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm) if (rc) RETURN(rc); - refresh_valid = OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | - OBD_MD_FLCTIME | OBD_MD_FLSIZE; - - obdo_refresh_inode(inode, &oa, refresh_valid); + oinfo.oi_oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | + OBD_MD_FLMTIME | OBD_MD_FLCTIME | + OBD_MD_FLSIZE; + obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid); + CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %Lu, " + "blksize %Lu\n", lli->lli_smd->lsm_object_id, + (long long unsigned)llu_i2stat(inode)->st_size, + (long long unsigned)llu_i2stat(inode)->st_blocks, + (long long unsigned)llu_i2stat(inode)->st_blksize); RETURN(0); } @@ -474,10 +478,8 @@ static int llu_inode_revalidate(struct inode *inode) llu_update_inode(inode, md.body, md.lsm); if (md.lsm != NULL && llu_i2info(inode)->lli_smd != md.lsm) obd_free_memmd(sbi->ll_dt_exp, &md.lsm); - if (md.body->valid & OBD_MD_FLSIZE) - set_bit(LLI_F_HAVE_MDS_SIZE_LOCK, - &llu_i2info(inode)->lli_flags); + llu_i2info(inode)->lli_flags |= LLIF_MDS_SIZE_LOCK; ptlrpc_req_finished(req); } @@ -547,7 +549,7 @@ void llu_clear_inode(struct inode *inode) (long long)llu_i2stat(inode)->st_ino, lli->lli_st_generation, inode); - clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &(lli->lli_flags)); + lli->lli_flags &= ~LLIF_MDS_SIZE_LOCK; md_change_cbdata(sbi->ll_md_exp, ll_inode2fid(inode), null_if_equal, inode); @@ -614,6 +616,74 @@ static int inode_setattr(struct inode * inode, struct iattr * attr) return error; } +int llu_md_setattr(struct inode *inode, struct md_op_data *op_data) +{ + struct lustre_md md; + struct llu_sb_info *sbi = llu_i2sbi(inode); + struct ptlrpc_request *request = NULL; + int rc; + ENTRY; + + llu_prepare_md_op_data(op_data, inode, NULL, NULL, 0, 0); + rc = md_setattr(sbi->ll_md_exp, op_data, NULL, 0, NULL, 0, &request); + OBD_FREE_PTR(op_data); + + if (rc) { + ptlrpc_req_finished(request); + if (rc != -EPERM && rc != -EACCES) + CERROR("md_setattr fails: rc = %d\n", rc); + RETURN(rc); + } + + rc = md_get_lustre_md(sbi->ll_md_exp, request, REPLY_REC_OFF, + sbi->ll_dt_exp, sbi->ll_md_exp, &md); + if (rc) { + ptlrpc_req_finished(request); + RETURN(rc); + } + + /* We call inode_setattr to adjust timestamps. + * If there is at least some data in file, we cleared ATTR_SIZE + * above to avoid invoking vmtruncate, otherwise it is important + * to call vmtruncate in inode_setattr to update inode->i_size + * (bug 6196) */ + inode_setattr(inode, &op_data->attr); + llu_update_inode(inode, md.body, md.lsm); + ptlrpc_req_finished(request); + + RETURN(rc); +} + +/* Close IO epoch and send Size-on-MDS attribute update. */ +static int llu_setattr_done_writing(struct inode *inode, + struct md_op_data *op_data) +{ + struct llu_inode_info *lli = llu_i2info(inode); + struct intnl_stat *st = llu_i2stat(inode); + int rc = 0; + ENTRY; + + LASSERT(op_data != NULL); + if (!S_ISREG(st->st_mode)) + RETURN(0); + + /* XXX: pass och here for the recovery purpose. */ + CDEBUG(D_INODE, "Epoch "LPU64" closed on "DFID" for truncate\n", + op_data->ioepoch, PFID(&lli->lli_fid)); + + op_data->flags = MF_EPOCH_CLOSE | MF_SOM_CHANGE; + rc = md_done_writing(llu_i2sbi(inode)->ll_md_exp, op_data, NULL); + if (rc == EAGAIN) { + /* MDS has instructed us to obtain Size-on-MDS attribute + * from OSTs and send setattr to back to MDS. */ + rc = llu_sizeonmds_update(inode, &op_data->handle); + } else if (rc) { + CERROR("inode %llu mdc truncate failed: rc = %d\n", + st->st_ino, rc); + } + RETURN(rc); +} + /* If this inode has objects allocated to it (lsm != NULL), then the OST * object(s) determine the file size and mtime. Otherwise, the MDS will * keep these values until such a time that objects are allocated for it. @@ -632,9 +702,8 @@ int llu_setattr_raw(struct inode *inode, struct iattr *attr) struct lov_stripe_md *lsm = llu_i2info(inode)->lli_smd; struct llu_sb_info *sbi = llu_i2sbi(inode); struct intnl_stat *st = llu_i2stat(inode); - struct ptlrpc_request *request = NULL; int ia_valid = attr->ia_valid; - struct md_op_data op_data; + struct md_op_data op_data = { { 0 } }; int rc = 0; ENTRY; @@ -679,46 +748,30 @@ int llu_setattr_raw(struct inode *inode, struct iattr *attr) CDEBUG(D_INODE, "setting mtime %lu, ctime %lu, now = %lu\n", LTIME_S(attr->ia_mtime), LTIME_S(attr->ia_ctime), LTIME_S(CURRENT_TIME)); + + /* NB: ATTR_SIZE will only be set after this point if the size + * resides on the MDS, ie, this file has no objects. */ if (lsm) attr->ia_valid &= ~ATTR_SIZE; /* If only OST attributes being set on objects, don't do MDS RPC. * In that case, we need to check permissions and update the local * inode ourselves so we can call obdo_from_inode() always. */ - if (ia_valid & (lsm ? ~(ATTR_SIZE | ATTR_FROM_OPEN | ATTR_RAW) : ~0)) { - struct lustre_md md; - - llu_prepare_md_op_data(&op_data, inode, NULL, NULL, 0, 0); - - rc = md_setattr(sbi->ll_md_exp, &op_data, - attr, NULL, 0, NULL, 0, &request); + if (ia_valid & (lsm ? ~(ATTR_FROM_OPEN | ATTR_RAW) : ~0)) { + memcpy(&op_data.attr, attr, sizeof(*attr)); - if (rc) { - ptlrpc_req_finished(request); - if (rc != -EPERM && rc != -EACCES) - CERROR("md_setattr fails: rc = %d\n", rc); - RETURN(rc); - } - - rc = md_get_lustre_md(sbi->ll_md_exp, request, REPLY_REC_OFF, - sbi->ll_dt_exp, sbi->ll_md_exp, &md); - if (rc) { - ptlrpc_req_finished(request); + /* Open epoch for truncate. */ + if (ia_valid & ATTR_SIZE) + op_data.flags = MF_EPOCH_OPEN; + rc = llu_md_setattr(inode, &op_data); + if (rc) RETURN(rc); - } - - /* We call inode_setattr to adjust timestamps. - * If there is at least some data in file, we cleared ATTR_SIZE - * above to avoid invoking vmtruncate, otherwise it is important - * to call vmtruncate in inode_setattr to update inode->i_size - * (bug 6196) */ - inode_setattr(inode, attr); - llu_update_inode(inode, md.body, md.lsm); - ptlrpc_req_finished(request); if (!lsm || !S_ISREG(st->st_mode)) { CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n"); - RETURN(0); + if (op_data.ioepoch) + rc = llu_setattr_done_writing(inode, &op_data); + RETURN(rc); } } else { /* The OST doesn't check permissions, but the alternative is @@ -739,6 +792,7 @@ int llu_setattr_raw(struct inode *inode, struct iattr *attr) } } + /* Won't invoke llu_vmtruncate(), as we already cleared * ATTR_SIZE */ inode_setattr(inode, attr); @@ -792,6 +846,9 @@ int llu_setattr_raw(struct inode *inode, struct iattr *attr) if (!rc) rc = err; } + + if (op_data.ioepoch) + rc = llu_setattr_done_writing(inode, &op_data); } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) { struct obd_info oinfo = { { { 0 } } }; struct obdo oa; @@ -1641,7 +1698,6 @@ static int llu_lov_dir_setstripe(struct inode *ino, unsigned long arg) struct llu_sb_info *sbi = llu_i2sbi(ino); struct ptlrpc_request *request = NULL; struct md_op_data op_data; - struct iattr attr = { 0 }; struct lov_user_md lum, *lump = (struct lov_user_md *)arg; int rc = 0; @@ -1661,8 +1717,8 @@ static int llu_lov_dir_setstripe(struct inode *ino, unsigned long arg) lustre_swab_lov_user_md(&lum); /* swabbing is done in lov_setstripe() on server side */ - rc = md_setattr(sbi->ll_md_exp, &op_data, - &attr, &lum, sizeof(lum), NULL, 0, &request); + rc = md_setattr(sbi->ll_md_exp, &op_data, &lum, + sizeof(lum), NULL, 0, &request); if (rc) { ptlrpc_req_finished(request); if (rc != -EPERM && rc != -EACCES) diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 42849ee..bf68c18 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -605,7 +605,6 @@ static int ll_dir_ioctl(struct inode *inode, struct file *file, struct lov_user_md lum, *lump = (struct lov_user_md *)arg; struct ptlrpc_request *request = NULL; struct md_op_data *op_data; - struct iattr attr = { 0 }; int rc = 0; OBD_ALLOC_PTR(op_data); @@ -634,8 +633,8 @@ static int ll_dir_ioctl(struct inode *inode, struct file *file, lustre_swab_lov_user_md(&lum); /* swabbing is done in lov_setstripe() on server side */ - rc = md_setattr(sbi->ll_md_exp, op_data, - &attr, &lum, sizeof(lum), NULL, 0, &request); + rc = md_setattr(sbi->ll_md_exp, op_data, &lum, + sizeof(lum), NULL, 0, &request); if (rc) { if (rc != -EPERM && rc != -EACCES) CERROR("md_setattr fails: rc = %d\n", rc); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index e9e07c0..5b82144 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -48,6 +48,52 @@ static void ll_file_data_put(struct ll_file_data *fd) OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof *fd); } +void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data, + struct lustre_handle *fh) +{ + op_data->fid1 = ll_i2info(inode)->lli_fid; + op_data->attr.ia_atime = inode->i_atime; + op_data->attr.ia_mtime = inode->i_mtime; + op_data->attr.ia_ctime = inode->i_ctime; + op_data->attr.ia_size = inode->i_size; + op_data->attr_blocks = inode->i_blocks; + ((struct ll_iattr *)&op_data->attr)->ia_attr_flags = inode->i_flags; + op_data->ioepoch = ll_i2info(inode)->lli_ioepoch; + memcpy(&op_data->handle, fh, sizeof(op_data->handle)); +} + +static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data, + struct obd_client_handle *och) +{ + struct ll_inode_info *lli = ll_i2info(inode); + ENTRY; + + op_data->attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET | + ATTR_MTIME_SET | ATTR_CTIME_SET; + + if (!S_ISREG(inode->i_mode)) { + op_data->attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS; + goto out; + } + + spin_lock(&lli->lli_lock); + if (!(list_empty(&lli->lli_pending_write_llaps)) && + !(lli->lli_flags & LLIF_EPOCH_PENDING)) { + LASSERT(lli->lli_pending_och == NULL); + /* Inode is dirty and there is no pending write done request + * yet, DONE_WRITE is to be sent later. */ + lli->lli_flags |= LLIF_EPOCH_PENDING; + lli->lli_pending_och = och; + } else { + ll_epoch_close(inode, op_data); + } + spin_unlock(&lli->lli_lock); + +out: + ll_pack_inode2opdata(inode, op_data, &och->och_fh); + EXIT; +} + static int ll_close_inode_openhandle(struct obd_export *md_exp, struct inode *inode, struct obd_client_handle *och) @@ -55,6 +101,7 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp, struct md_op_data *op_data; struct ptlrpc_request *req = NULL; struct obd_device *obd; + int epoch_close = 1; int rc; ENTRY; @@ -81,35 +128,27 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp, if (op_data == NULL) RETURN(-ENOMEM); - op_data->fid1 = ll_i2info(inode)->lli_fid; - op_data->valid = OBD_MD_FLTYPE | OBD_MD_FLMODE | - OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | - OBD_MD_FLATIME | OBD_MD_FLMTIME | - OBD_MD_FLCTIME; - - op_data->atime = LTIME_S(inode->i_atime); - op_data->mtime = LTIME_S(inode->i_mtime); - op_data->ctime = LTIME_S(inode->i_ctime); - op_data->size = inode->i_size; - op_data->blocks = inode->i_blocks; - op_data->flags = inode->i_flags; - - if (0 /* ll_is_inode_dirty(inode) */) { - op_data->flags = MDS_BFLAG_UNCOMMITTED_WRITES; - op_data->valid |= OBD_MD_FLFLAGS; - } - + ll_prepare_close(inode, op_data, och); + epoch_close = (op_data->flags & MF_EPOCH_CLOSE); rc = md_close(md_exp, op_data, och, &req); - OBD_FREE_PTR(op_data); if (rc == EAGAIN) { - /* We are the last writer, so the MDS has instructed us to get - * the file size and any write cookies, then close again. */ - //ll_queue_done_writing(inode); - rc = 0; + /* This close must have closed the epoch. */ + LASSERT(epoch_close); + /* MDS has instructed us to obtain Size-on-MDS attribute from + * OSTs and send setattr to back to MDS. */ + rc = ll_sizeonmds_update(inode, &och->och_fh); + if (rc) { + CERROR("inode %lu mdc Size-on-MDS update failed: " + "rc = %d\n", inode->i_ino, rc); + rc = 0; + } } else if (rc) { CERROR("inode %lu mdc close failed: rc = %d\n", inode->i_ino, rc); + } else if (!epoch_close) { + ll_queue_done_writing(inode); } + OBD_FREE_PTR(op_data); if (rc == 0) { rc = ll_objects_destroy(req, inode); @@ -122,6 +161,8 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp, EXIT; out: md_clear_open_replay_data(md_exp, och); + if (epoch_close) + och->och_fh.cookie = DEAD_HANDLE_MAGIC; return rc; } @@ -161,8 +202,9 @@ int ll_md_real_close(struct inode *inode, int flags) already */ rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, inode, och); - och->och_fh.cookie = DEAD_HANDLE_MAGIC; - OBD_FREE(och, sizeof *och); + /* Do not free @och is it is waiting for DONE_WRITING. */ + if (och->och_fh.cookie == DEAD_HANDLE_MAGIC) + OBD_FREE(och, sizeof *och); } RETURN(rc); @@ -341,7 +383,7 @@ static void ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli, memcpy(&och->och_fh, &body->handle, sizeof(body->handle)); och->och_magic = OBD_CLIENT_HANDLE_MAGIC; och->och_fid = &lli->lli_fid; - lli->lli_io_epoch = body->io_epoch; + lli->lli_ioepoch = body->ioepoch; md_set_open_replay_data(md_exp, och, req); } @@ -350,16 +392,30 @@ int ll_local_open(struct file *file, struct lookup_intent *it, struct ll_file_data *fd, struct obd_client_handle *och) { struct inode *inode = file->f_dentry->d_inode; + struct ll_inode_info *lli = ll_i2info(inode); ENTRY; LASSERT(!LUSTRE_FPRIVATE(file)); LASSERT(fd != NULL); - if (och) - ll_och_fill(ll_i2sbi(inode)->ll_md_exp, - ll_i2info(inode), it, och); + if (och) { + struct ptlrpc_request *req = it->d.lustre.it_data; + struct mdt_body *body; + ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och); + + body = lustre_msg_buf(req->rq_repmsg, + DLM_REPLY_REC_OFF, sizeof(*body)); + + if ((it->it_flags & FMODE_WRITE) && + (body->valid & OBD_MD_FLSIZE)) + { + CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n", + lli->lli_ioepoch, PFID(&lli->lli_fid)); + } + } + LUSTRE_FPRIVATE(file) = fd; ll_readahead_init(inode, &fd->fd_ras); fd->fd_omode = it->it_flags; @@ -525,14 +581,12 @@ int ll_file_open(struct inode *inode, struct file *file) } file->f_flags &= ~O_LOV_DELAY_CREATE; GOTO(out, rc); - out: +out: ptlrpc_req_finished(req); if (req) it_clear_disposition(it, DISP_ENQ_OPEN_REF); - if (rc == 0) { - ll_open_complete(inode); - } else { out_och_free: + if (rc) { if (*och_p) { OBD_FREE(*och_p, sizeof (struct obd_client_handle)); *och_p = NULL; /* OBD_FREE writes some magic there */ @@ -545,30 +599,34 @@ out_och_free: } /* Fills the obdo with the attributes for the inode defined by lsm */ -int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm, - struct obdo *oa) +int ll_inode_getattr(struct inode *inode, struct obdo *obdo) { struct ptlrpc_request_set *set; + struct ll_inode_info *lli = ll_i2info(inode); + struct lov_stripe_md *lsm = lli->lli_smd; + struct obd_info oinfo = { { { 0 } } }; int rc; ENTRY; LASSERT(lsm != NULL); - memset(oa, 0, sizeof *oa); oinfo.oi_md = lsm; - oinfo.oi_oa = oa; - oa->o_id = lsm->lsm_object_id; - oa->o_mode = S_IFREG; - oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE | - OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | - OBD_MD_FLCTIME | OBD_MD_FLGROUP; + oinfo.oi_oa = obdo; + oinfo.oi_oa->o_id = lsm->lsm_object_id; + oinfo.oi_oa->o_gr = lsm->lsm_object_gr; + oinfo.oi_oa->o_mode = S_IFREG; + oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | + OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | + OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | + OBD_MD_FLCTIME | OBD_MD_FLGROUP; set = ptlrpc_prep_set(); if (set == NULL) { + CERROR("can't allocate ptlrpc set\n"); rc = -ENOMEM; } else { - rc = obd_getattr_async(exp, &oinfo, set); + rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set); if (rc == 0) rc = ptlrpc_set_wait(set); ptlrpc_set_destroy(set); @@ -576,8 +634,14 @@ int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm, if (rc) RETURN(rc); - oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | - OBD_MD_FLCTIME | OBD_MD_FLSIZE); + oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | + OBD_MD_FLMTIME | OBD_MD_FLCTIME | + OBD_MD_FLSIZE); + + obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid); + CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n", + lli->lli_smd->lsm_object_id, inode->i_size, inode->i_blocks, + inode->i_blksize); RETURN(0); } @@ -830,7 +894,6 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock, lsm->lsm_oinfo[stripe].loi_kms = kms; unlock_res_and_lock(lock); lov_stripe_unlock(lsm); - //ll_try_done_writing(inode); iput: iput(inode); break; @@ -951,6 +1014,50 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp) return rc; } +static void ll_merge_lvb(struct inode *inode) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ost_lvb lvb; + ENTRY; + + ll_inode_size_lock(inode, 1); + inode_init_lvb(inode, &lvb); + obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0); + inode->i_size = lvb.lvb_size; + inode->i_blocks = lvb.lvb_blocks; + LTIME_S(inode->i_mtime) = lvb.lvb_mtime; + LTIME_S(inode->i_atime) = lvb.lvb_atime; + LTIME_S(inode->i_ctime) = lvb.lvb_ctime; + ll_inode_size_unlock(inode, 1); + EXIT; +} + +int ll_local_size(struct inode *inode) +{ + ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } }; + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct lustre_handle lockh = { 0 }; + int flags = 0; + int rc; + ENTRY; + + if (lli->lli_smd->lsm_stripe_count == 0) + RETURN(0); + + rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT, + &policy, LCK_PR | LCK_PW, &flags, inode, &lockh); + if (rc < 0) + RETURN(rc); + else if (rc == 0) + RETURN(-ENODATA); + + ll_merge_lvb(inode); + obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR | LCK_PW, &lockh); + RETURN(0); +} + int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm, lstat_t *st) { @@ -1005,10 +1112,12 @@ int ll_glimpse_size(struct inode *inode, int ast_flags) struct lustre_handle lockh = { 0 }; struct obd_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; - struct ost_lvb lvb; int rc; ENTRY; + if (lli->lli_flags & LLIF_MDS_SIZE_LOCK) + RETURN(0); + CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino); if (!lli->lli_smd) { @@ -1043,16 +1152,8 @@ int ll_glimpse_size(struct inode *inode, int ast_flags) RETURN(rc > 0 ? -EIO : rc); } - ll_inode_size_lock(inode, 1); - inode_init_lvb(inode, &lvb); - obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0); - inode->i_size = lvb.lvb_size; - inode->i_blocks = lvb.lvb_blocks; - LTIME_S(inode->i_mtime) = lvb.lvb_mtime; - LTIME_S(inode->i_atime) = lvb.lvb_atime; - LTIME_S(inode->i_ctime) = lvb.lvb_ctime; - ll_inode_size_unlock(inode, 1); - + ll_merge_lvb(inode); + CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n", inode->i_size, inode->i_blocks); @@ -1927,7 +2028,9 @@ int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it) rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, inode, och); - OBD_FREE(och, sizeof(*och)); + /* Do not free @och is it is waiting for DONE_WRITING. */ + if (och->och_fh.cookie == DEAD_HANDLE_MAGIC) + OBD_FREE(och, sizeof(*och)); out: /* this one is in place of ll_file_open */ ptlrpc_req_finished(it->d.lustre.it_data); @@ -2260,7 +2363,6 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it) { struct inode *inode = dentry->d_inode; struct ptlrpc_request *req = NULL; - struct ll_inode_info *lli; struct ll_sb_info *sbi; struct obd_export *exp; int rc; @@ -2271,7 +2373,6 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it) RETURN(0); } sbi = ll_i2sbi(inode); - lli = ll_i2info(inode); CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n", inode->i_ino, inode->i_generation, inode, dentry->d_name.name); @@ -2351,12 +2452,12 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it) GOTO(out, rc = 0); /* ll_glimpse_size will prefer locally cached writes if they extend - * the file */ + the file */ rc = ll_glimpse_size(inode, 0); - + EXIT; out: ptlrpc_req_finished(req); - RETURN(rc); + return rc; } #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) diff --git a/lustre/llite/llite_close.c b/lustre/llite/llite_close.c index 82c6bb1..eb0e714 100644 --- a/lustre/llite/llite_close.c +++ b/lustre/llite/llite_close.c @@ -33,153 +33,179 @@ void llap_write_pending(struct inode *inode, struct ll_async_page *llap) { struct ll_inode_info *lli = ll_i2info(inode); + + ENTRY; spin_lock(&lli->lli_lock); - list_add(&llap->llap_pending_write, &lli->lli_pending_write_llaps); + lli->lli_flags |= LLIF_SOM_DIRTY; + if (llap && list_empty(&llap->llap_pending_write)) + list_add(&llap->llap_pending_write, + &lli->lli_pending_write_llaps); spin_unlock(&lli->lli_lock); + EXIT; } /* record that a write has completed */ -void llap_write_complete(struct inode *inode, struct ll_async_page *llap) -{ - struct ll_inode_info *lli = ll_i2info(inode); - spin_lock(&lli->lli_lock); - list_del_init(&llap->llap_pending_write); - spin_unlock(&lli->lli_lock); -} - -void ll_open_complete(struct inode *inode) -{ - struct ll_inode_info *lli = ll_i2info(inode); - spin_lock(&lli->lli_lock); - lli->lli_send_done_writing = 0; - spin_unlock(&lli->lli_lock); -} - -/* if we close with writes in flight then we want the completion or cancelation - * of those writes to send a DONE_WRITING rpc to the MDS */ -int ll_is_inode_dirty(struct inode *inode) +int llap_write_complete(struct inode *inode, struct ll_async_page *llap) { struct ll_inode_info *lli = ll_i2info(inode); int rc = 0; + ENTRY; - spin_lock(&lli->lli_lock); - if (!list_empty(&lli->lli_pending_write_llaps)) + if (llap && !list_empty(&llap->llap_pending_write)) { + list_del_init(&llap->llap_pending_write); rc = 1; + } spin_unlock(&lli->lli_lock); RETURN(rc); } -void ll_try_done_writing(struct inode *inode) +/* DONE_WRITING should be queued only if: + * - CLOSE has been called already and that CLOSE has not closed epoch; + * - inode has no no dirty page; */ +void ll_queue_done_writing(struct inode *inode) { struct ll_inode_info *lli = ll_i2info(inode); struct ll_close_queue *lcq = ll_i2sbi(inode)->ll_lcq; spin_lock(&lli->lli_lock); + + /* Close happened. If it has not closed epoch, let DONE_WRITING to + * happen. */ + if ((lli->lli_flags & LLIF_EPOCH_PENDING)) + lli->lli_flags |= LLIF_DONE_WRITING; - if (lli->lli_send_done_writing && + if ((lli->lli_flags & LLIF_DONE_WRITING) && list_empty(&lli->lli_pending_write_llaps)) { - + /* DONE_WRITING is allowed and inode has no dirty page. */ spin_lock(&lcq->lcq_lock); - if (list_empty(&lli->lli_close_item)) { - CDEBUG(D_INODE, "adding inode %lu/%u to close list\n", - inode->i_ino, inode->i_generation); - igrab(inode); - list_add_tail(&lli->lli_close_item, &lcq->lcq_list); - wake_up(&lcq->lcq_waitq); - } + + LASSERT(list_empty(&lli->lli_close_list)); + CDEBUG(D_INODE, "adding inode %lu/%u to close list\n", + inode->i_ino, inode->i_generation); + + igrab(inode); + list_add_tail(&lli->lli_close_list, &lcq->lcq_head); + wake_up(&lcq->lcq_waitq); spin_unlock(&lcq->lcq_lock); } - spin_unlock(&lli->lli_lock); } -/* The MDS needs us to get the real file attributes, then send a DONE_WRITING */ -void ll_queue_done_writing(struct inode *inode) +/* Close epoch and send Size-on-MDS attribute update if possible. + * Call this under @lli->lli_lock spinlock. */ +void ll_epoch_close(struct inode *inode, struct md_op_data *op_data) { struct ll_inode_info *lli = ll_i2info(inode); ENTRY; - spin_lock(&lli->lli_lock); - lli->lli_send_done_writing = 1; - spin_unlock(&lli->lli_lock); + CDEBUG(D_INODE, "Epoch "LPU64" closed on "DFID"\n", + op_data->ioepoch, PFID(&lli->lli_fid)); + op_data->flags |= MF_EPOCH_CLOSE; + + /* Pack Size-on-MDS inode attributes only if they has changed */ + if (!(lli->lli_flags & LLIF_SOM_DIRTY)) + goto out; + + /* There is already 1 pending DONE_WRITE, do not create another one -- + * close epoch with no attribute change. */ + if (lli->lli_flags & LLIF_EPOCH_PENDING) + goto out; + + op_data->flags |= MF_SOM_CHANGE; - ll_try_done_writing(inode); + /* Check if Size-on-MDS attributes are valid. */ + if ((lli->lli_flags & LLIF_MDS_SIZE_LOCK) || !ll_local_size(inode)) { + /* Send Size-on-MDS Attributes if valid. */ + op_data->attr.ia_valid |= ATTR_MTIME_SET | ATTR_CTIME_SET | + ATTR_SIZE | ATTR_BLOCKS; + } +out: EXIT; } -#if 0 -/* If we know the file size and have the cookies: - * - send a DONE_WRITING rpc - * - * Otherwise: - * - get a whole-file lock - * - get the authoritative size and all cookies with GETATTRs - * - send a DONE_WRITING rpc - */ -static void ll_close_done_writing(struct inode *inode) +int ll_sizeonmds_update(struct inode *inode, struct lustre_handle *fh) { struct ll_inode_info *lli = ll_i2info(inode); - ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF } }; - struct lustre_handle lockh = { 0 }; struct md_op_data *op_data; - struct obdo obdo; - obd_flag valid; - int rc, ast_flags = 0; + struct obdo *oa; + int rc; ENTRY; - - memset(&obdo, 0, sizeof(obdo)); - if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags)) - goto rpc; - - rc = ll_extent_lock(NULL, inode, lli->lli_smd, LCK_PW, &policy, &lockh, - ast_flags); - if (rc != 0) { - CERROR("lock acquisition failed (%d): unable to send " - "DONE_WRITING for inode %lu/%u\n", rc, inode->i_ino, - inode->i_generation); - GOTO(out, rc); + + LASSERT(!(lli->lli_flags & LLIF_MDS_SIZE_LOCK)); + + oa = obdo_alloc(); + OBD_ALLOC_PTR(op_data); + if (!oa || !op_data) { + CERROR("can't allocate memory for Size-on-MDS update.\n"); + RETURN(-ENOMEM); } - - rc = ll_lsm_getattr(ll_i2dtexp(inode), lli->lli_smd, &obdo); + rc = ll_inode_getattr(inode, oa); if (rc) { - CERROR("inode_getattr failed (%d): unable to send DONE_WRITING " - "for inode %lu/%u\n", rc, inode->i_ino, - inode->i_generation); - ll_extent_unlock(NULL, inode, lli->lli_smd, LCK_PW, &lockh); + CERROR("inode_getattr failed (%d): unable to send a " + "Size-on-MDS attribute update for inode %lu/%u\n", + rc, inode->i_ino, inode->i_generation); GOTO(out, rc); } + CDEBUG(D_INODE, "Size-on-MDS update on "DFID"\n", PFID(&lli->lli_fid)); + + md_from_obdo(op_data, oa, oa->o_valid); + memcpy(&op_data->handle, fh, sizeof(*fh)); + + op_data->ioepoch = lli->lli_ioepoch; + op_data->flags |= MF_SOM_CHANGE; + + rc = ll_md_setattr(inode, op_data); + EXIT; +out: + if (oa) + obdo_free(oa); + if (op_data) + OBD_FREE_PTR(op_data); + return rc; +} - obdo_refresh_inode(inode, &obdo, valid); - - CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n", - lli->lli_smd->lsm_object_id, inode->i_size, inode->i_blocks, - inode->i_blksize); - - set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags); - - rc = ll_extent_unlock(NULL, inode, lli->lli_smd, LCK_PW, &lockh); - if (rc != ELDLM_OK) - CERROR("unlock failed (%d)? proceeding anyways...\n", rc); +/* Send a DONE_WRITING rpc, pack Size-on-MDS attributes into it, if possible */ +static void ll_done_writing(struct inode *inode) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct md_op_data *op_data; + struct obd_client_handle *och; + int rc; + ENTRY; - rpc: OBD_ALLOC_PTR(op_data); if (op_data == NULL) { CERROR("can't allocate op_data\n"); EXIT; return; } + + spin_lock(&lli->lli_lock); + LASSERT(lli->lli_flags & LLIF_SOM_DIRTY); - op_data->fid1 = lli->lli_fid; - op_data->size = inode->i_size; - op_data->blocks = inode->i_blocks; - op_data->valid = OBD_MD_FLID | OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + och = lli->lli_pending_och; + lli->lli_pending_och = NULL; + lli->lli_flags &= ~(LLIF_DONE_WRITING | LLIF_EPOCH_PENDING); + ll_epoch_close(inode, op_data); + lli->lli_flags &= ~LLIF_SOM_DIRTY; + spin_unlock(&lli->lli_lock); + + ll_pack_inode2opdata(inode, op_data, &och->och_fh); - rc = md_done_writing(ll_i2sbi(inode)->ll_md_exp, op_data); + rc = md_done_writing(ll_i2sbi(inode)->ll_md_exp, op_data, och); OBD_FREE_PTR(op_data); - out: + if (rc == EAGAIN) { + /* MDS has instructed us to obtain Size-on-MDS attribute from + * OSTs and send setattr to back to MDS. */ + rc = ll_sizeonmds_update(inode, &och->och_fh); + } else if (rc) { + CERROR("inode %lu mdc done_writing failed: rc = %d\n", + inode->i_ino, rc); + } + OBD_FREE_PTR(och); + EXIT; } -#endif static struct ll_inode_info *ll_close_next_lli(struct ll_close_queue *lcq) { @@ -187,12 +213,12 @@ static struct ll_inode_info *ll_close_next_lli(struct ll_close_queue *lcq) spin_lock(&lcq->lcq_lock); - if (lcq->lcq_list.next == NULL) + if (lcq->lcq_head.next == NULL) lli = ERR_PTR(-1); - else if (!list_empty(&lcq->lcq_list)) { - lli = list_entry(lcq->lcq_list.next, struct ll_inode_info, - lli_close_item); - list_del(&lli->lli_close_item); + else if (!list_empty(&lcq->lcq_head)) { + lli = list_entry(lcq->lcq_head.next, struct ll_inode_info, + lli_close_list); + list_del_init(&lli->lli_close_list); } spin_unlock(&lcq->lcq_lock); @@ -215,7 +241,7 @@ static int ll_close_thread(void *arg) while (1) { struct l_wait_info lwi = { 0 }; struct ll_inode_info *lli; - //struct inode *inode; + struct inode *inode; l_wait_event_exclusive(lcq->lcq_waitq, (lli = ll_close_next_lli(lcq)) != NULL, @@ -223,9 +249,9 @@ static int ll_close_thread(void *arg) if (IS_ERR(lli)) break; - //inode = ll_info2i(lli); - //ll_close_done_writing(inode); - //iput(inode); + inode = ll_info2i(lli); + ll_done_writing(inode); + iput(inode); } complete(&lcq->lcq_comp); @@ -242,7 +268,7 @@ int ll_close_thread_start(struct ll_close_queue **lcq_ret) return -ENOMEM; spin_lock_init(&lcq->lcq_lock); - INIT_LIST_HEAD(&lcq->lcq_list); + INIT_LIST_HEAD(&lcq->lcq_head); init_waitqueue_head(&lcq->lcq_waitq); init_completion(&lcq->lcq_comp); @@ -260,7 +286,7 @@ int ll_close_thread_start(struct ll_close_queue **lcq_ret) void ll_close_thread_shutdown(struct ll_close_queue *lcq) { init_completion(&lcq->lcq_comp); - lcq->lcq_list.next = NULL; + lcq->lcq_head.next = NULL; wake_up(&lcq->lcq_waitq); wait_for_completion(&lcq->lcq_comp); OBD_FREE(lcq, sizeof(*lcq)); diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index db2094b..83f976f 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -56,8 +56,18 @@ extern struct file_operations ll_pgcache_seq_fops; #define LLI_INODE_MAGIC 0x111d0de5 #define LLI_INODE_DEAD 0xdeadd00d -#define LLI_F_HAVE_OST_SIZE_LOCK 0 -#define LLI_F_HAVE_MDS_SIZE_LOCK 1 + +enum lli_flags { + /* MDS has an authority for the Size-on-MDS attributes. */ + LLIF_MDS_SIZE_LOCK = (1 << 0), + /* Epoch close is postponed. */ + LLIF_EPOCH_PENDING = (1 << 1), + /* DONE WRITING is allowed. */ + LLIF_DONE_WRITING = (1 << 2), + /* Sizeon-on-MDS attributes are changed. An attribute update needs to + * be sent to MDS. */ + LLIF_SOM_DIRTY = (1 << 3), +}; struct ll_inode_info { int lli_inode_magic; @@ -67,17 +77,20 @@ struct ll_inode_info { struct semaphore lli_write_sem; char *lli_symlink_name; __u64 lli_maxbytes; - __u64 lli_io_epoch; + __u64 lli_ioepoch; unsigned long lli_flags; - /* this lock protects s_d_w and p_w_ll and mmap_cnt */ + /* this lock protects posix_acl, pending_write_llaps, mmap_cnt */ spinlock_t lli_lock; struct list_head lli_pending_write_llaps; - int lli_send_done_writing; + struct list_head lli_close_list; + /* handle is to be sent to MDS later on done_writing and setattr. + * Open handle data are needed for the recovery to reconstruct + * the inode state on the MDS. XXX: recovery is not ready yet. */ + struct obd_client_handle *lli_pending_och; + atomic_t lli_mmap_cnt; - struct list_head lli_close_item; - /* for writepage() only to communicate to fsync */ int lli_async_rc; @@ -435,6 +448,7 @@ int ll_writepage(struct page *page); void ll_inode_fill_obdo(struct inode *inode, int cmd, struct obdo *oa); int ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc); int llap_shrink_cache(struct ll_sb_info *sbi, int shrink_fraction); +struct ll_async_page *llap_from_page(struct page *page, unsigned origin); extern struct cache_definition ll_cache_definition; void ll_removepage(struct page *page); int ll_readpage(struct file *file, struct page *page); @@ -458,6 +472,7 @@ int ll_extent_unlock(struct ll_file_data *, struct inode *, int ll_file_open(struct inode *inode, struct file *file); int ll_file_release(struct inode *inode, struct file *file); int ll_lsm_getattr(struct obd_export *, struct lov_stripe_md *, struct obdo *); +int ll_local_size(struct inode *inode); int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm, lstat_t *st); int ll_glimpse_size(struct inode *inode, int ast_flags); @@ -468,6 +483,12 @@ int ll_release_openhandle(struct dentry *, struct lookup_intent *); int ll_md_close(struct obd_export *md_exp, struct inode *inode, struct file *file); int ll_md_real_close(struct inode *inode, int flags); +void ll_epoch_close(struct inode *inode, struct md_op_data *op_data); +int ll_sizeonmds_update(struct inode *inode, struct lustre_handle *fh); +int ll_inode_getattr(struct inode *inode, struct obdo *obdo); +int ll_md_setattr(struct inode *inode, struct md_op_data *op_data); +void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data, + struct lustre_handle *fh); extern void ll_rw_stats_tally(struct ll_sb_info *sbi, pid_t pid, struct file *file, size_t count, int rw); #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) @@ -549,17 +570,16 @@ extern struct inode_operations ll_fast_symlink_inode_operations; /* llite/llite_close.c */ struct ll_close_queue { spinlock_t lcq_lock; - struct list_head lcq_list; + struct list_head lcq_head; wait_queue_head_t lcq_waitq; struct completion lcq_comp; }; void llap_write_pending(struct inode *inode, struct ll_async_page *llap); -void llap_write_complete(struct inode *inode, struct ll_async_page *llap); -void ll_open_complete(struct inode *inode); -int ll_is_inode_dirty(struct inode *inode); -void ll_try_done_writing(struct inode *inode); +int llap_write_complete(struct inode *inode, struct ll_async_page *llap); +int ll_inode_dirty(struct inode *inode); void ll_queue_done_writing(struct inode *inode); +void ll_init_done_writing(struct inode *inode); void ll_close_thread_shutdown(struct ll_close_queue *lcq); int ll_close_thread_start(struct ll_close_queue **lcq_ret); diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index f4faaab..6fdfe13 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -710,6 +710,7 @@ void ll_lli_init(struct ll_inode_info *lli) lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; spin_lock_init(&lli->lli_lock); INIT_LIST_HEAD(&lli->lli_pending_write_llaps); + INIT_LIST_HEAD(&lli->lli_close_list); lli->lli_inode_magic = LLI_INODE_MAGIC; sema_init(&lli->lli_och_sem, 1); lli->lli_mds_read_och = lli->lli_mds_write_och = NULL; @@ -1130,7 +1131,7 @@ void ll_clear_inode(struct inode *inode) CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino, inode->i_generation, inode); - clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &(ll_i2info(inode)->lli_flags)); + ll_i2info(inode)->lli_flags &= ~LLIF_MDS_SIZE_LOCK; md_change_cbdata(sbi->ll_md_exp, ll_inode2fid(inode), null_if_equal, inode); @@ -1180,6 +1181,84 @@ void ll_clear_inode(struct inode *inode) EXIT; } +int ll_md_setattr(struct inode *inode, struct md_op_data *op_data) +{ + struct lustre_md md; + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ptlrpc_request *request = NULL; + int rc; + ENTRY; + + ll_prepare_md_op_data(op_data, inode, NULL, NULL, 0, 0); + rc = md_setattr(sbi->ll_md_exp, op_data, NULL, 0, NULL, 0, &request); + if (rc) { + ptlrpc_req_finished(request); + if (rc == -ENOENT) { + inode->i_nlink = 0; + /* Unlinked special device node? Or just a race? + * Pretend we done everything. */ + if (!S_ISREG(inode->i_mode) && + !S_ISDIR(inode->i_mode)) + rc = inode_setattr(inode, &op_data->attr); + } else if (rc != -EPERM && rc != -EACCES) { + CERROR("md_setattr fails: rc = %d\n", rc); + } + RETURN(rc); + } + + rc = md_get_lustre_md(sbi->ll_md_exp, request, REPLY_REC_OFF, + sbi->ll_dt_exp, sbi->ll_md_exp, &md); + if (rc) { + ptlrpc_req_finished(request); + RETURN(rc); + } + + /* We call inode_setattr to adjust timestamps. + * If there is at least some data in file, we cleared ATTR_SIZE + * above to avoid invoking vmtruncate, otherwise it is important + * to call vmtruncate in inode_setattr to update inode->i_size + * (bug 6196) */ + rc = inode_setattr(inode, &op_data->attr); + + /* Extract epoch data if obtained. */ + memcpy(&op_data->handle, &md.body->handle, sizeof(op_data->handle)); + op_data->ioepoch = md.body->ioepoch; + + ll_update_inode(inode, &md); + ptlrpc_req_finished(request); + + RETURN(rc); +} + +/* Close IO epoch and send Size-on-MDS attribute update. */ +static int ll_setattr_done_writing(struct inode *inode, + struct md_op_data *op_data) +{ + struct ll_inode_info *lli = ll_i2info(inode); + int rc = 0; + ENTRY; + + LASSERT(op_data != NULL); + if (!S_ISREG(inode->i_mode)) + RETURN(0); + + /* XXX: pass och here for the recovery purpose. */ + CDEBUG(D_INODE, "Epoch "LPU64" closed on "DFID" for truncate\n", + op_data->ioepoch, PFID(&lli->lli_fid)); + + op_data->flags = MF_EPOCH_CLOSE | MF_SOM_CHANGE; + rc = md_done_writing(ll_i2sbi(inode)->ll_md_exp, op_data, NULL); + if (rc == EAGAIN) { + /* MDS has instructed us to obtain Size-on-MDS attribute + * from OSTs and send setattr to back to MDS. */ + rc = ll_sizeonmds_update(inode, &op_data->handle); + } else if (rc) { + CERROR("inode %lu mdc truncate failed: rc = %d\n", + inode->i_ino, rc); + } + RETURN(rc); +} + /* If this inode has objects allocated to it (lsm != NULL), then the OST * object(s) determine the file size and mtime. Otherwise, the MDS will * keep these values until such a time that objects are allocated for it. @@ -1198,15 +1277,14 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) struct ll_inode_info *lli = ll_i2info(inode); struct lov_stripe_md *lsm = lli->lli_smd; struct ll_sb_info *sbi = ll_i2sbi(inode); - struct ptlrpc_request *request = NULL; + struct md_op_data *op_data = NULL; int ia_valid = attr->ia_valid; - struct md_op_data *op_data; int rc = 0; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu valid %x\n", inode->i_ino, attr->ia_valid); - lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_SETATTR); + lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_SETATTR); if (ia_valid & ATTR_SIZE) { if (attr->ia_size > ll_file_maxbytes(inode)) { @@ -1261,56 +1339,26 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) /* If only OST attributes being set on objects, don't do MDS RPC. * In that case, we need to check permissions and update the local * inode ourselves so we can call obdo_from_inode() always. */ - if (ia_valid & (lsm ? ~(ATTR_SIZE | ATTR_FROM_OPEN | ATTR_RAW) : ~0)) { - struct lustre_md md; - + if (ia_valid & (lsm ? ~(ATTR_FROM_OPEN | ATTR_RAW) : ~0)) { OBD_ALLOC_PTR(op_data); if (op_data == NULL) RETURN(-ENOMEM); - - ll_prepare_md_op_data(op_data, inode, NULL, NULL, 0, 0); - - rc = md_setattr(sbi->ll_md_exp, op_data, - attr, NULL, 0, NULL, 0, &request); - OBD_FREE_PTR(op_data); - if (rc) { - ptlrpc_req_finished(request); - if (rc == -ENOENT) { - inode->i_nlink = 0; - /* Unlinked special device node? Or just a race? - * Pretend we done everything. */ - if (!S_ISREG(inode->i_mode) && - !S_ISDIR(inode->i_mode) && - !S_ISDIR(inode->i_mode)) - rc = inode_setattr(inode, attr); - } else if (rc != -EPERM && rc != -EACCES) { - CERROR("mdcsetattr fails: rc = %d\n", rc); - } - RETURN(rc); - } - - rc = md_get_lustre_md(sbi->ll_md_exp, request, - REPLY_REC_OFF, sbi->ll_dt_exp, - sbi->ll_md_exp, &md); - if (rc) { - ptlrpc_req_finished(request); - RETURN(rc); - } + memcpy(&op_data->attr, attr, sizeof(*attr)); - /* We call inode_setattr to adjust timestamps. - * If there is at least some data in file, we cleared ATTR_SIZE - * above to avoid invoking vmtruncate, otherwise it is important - * to call vmtruncate in inode_setattr to update inode->i_size - * (bug 6196) */ - rc = inode_setattr(inode, attr); + /* Open epoch for truncate. */ + if (ia_valid & ATTR_SIZE) + op_data->flags = MF_EPOCH_OPEN; + rc = ll_md_setattr(inode, op_data); + if (rc) + GOTO(out, rc); - ll_update_inode(inode, &md); - ptlrpc_req_finished(request); + CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID" for truncate\n", + op_data->ioepoch, PFID(&lli->lli_fid)); if (!lsm || !S_ISREG(inode->i_mode)) { CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n"); - RETURN(rc); + GOTO(out, rc = 0); } } else { /* The OST doesn't check permissions, but the alternative is @@ -1364,7 +1412,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) DOWN_WRITE_I_ALLOC_SEM(inode); #endif if (rc != 0) - RETURN(rc); + GOTO(out, rc); /* Only ll_inode_size_lock is taken at this level. * lov_stripe_lock() is grabbed by ll_truncate() only over @@ -1406,6 +1454,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) oinfo.oi_oa = oa; oinfo.oi_md = lsm; + /* XXX: this looks unnecessary now. */ rc = obd_setattr_rqset(sbi->ll_dt_exp, &oinfo, NULL); if (rc) CERROR("obd_setattr_async fails: rc=%d\n", rc); @@ -1414,7 +1463,13 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) rc = -ENOMEM; } } - RETURN(rc); + EXIT; +out: + if (op_data && op_data->ioepoch) { + rc = ll_setattr_done_writing(inode, op_data); + OBD_FREE_PTR(op_data); + } + return rc; } int ll_setattr(struct dentry *de, struct iattr *attr) @@ -1655,13 +1710,14 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md) #else inode->i_rdev = old_decode_dev(body->rdev); #endif - if (body->valid & OBD_MD_FLSIZE) + if (body->valid & OBD_MD_FLSIZE) { inode->i_size = body->size; - if (body->valid & OBD_MD_FLBLOCKS) - inode->i_blocks = body->blocks; - if (body->valid & OBD_MD_FLSIZE) - set_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags); + if (body->valid & OBD_MD_FLBLOCKS) + inode->i_blocks = body->blocks; + + lli->lli_flags |= LLIF_MDS_SIZE_LOCK; + } if (body->valid & OBD_MD_FLID) { /* FID shouldn't be changed! */ @@ -1791,7 +1847,6 @@ int ll_iocontrol(struct inode *inode, struct file *file, } case EXT3_IOC_SETFLAGS: { struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; - struct ll_iattr_struct attr = { 0 }; struct obd_info oinfo = { { { 0 } } }; struct md_op_data *op_data; @@ -1808,12 +1863,11 @@ int ll_iocontrol(struct inode *inode, struct file *file, RETURN(-ENOMEM); ll_prepare_md_op_data(op_data, inode, NULL, NULL, 0, 0); - - attr.ia_attr_flags = flags; - ((struct iattr *)&attr)->ia_valid |= ATTR_ATTR_FLAG; - + + ((struct ll_iattr *)&op_data->attr)->ia_attr_flags = flags; + op_data->attr.ia_valid |= ATTR_ATTR_FLAG; rc = md_setattr(sbi->ll_md_exp, op_data, - (struct iattr *)&attr, NULL, 0, NULL, 0, &req); + NULL, 0, NULL, 0, &req); OBD_FREE_PTR(op_data); ptlrpc_req_finished(req); if (rc || lsm == NULL) { diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 7ecf181..12f06f5 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -155,16 +155,18 @@ int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, case LDLM_CB_CANCELING: { struct inode *inode = ll_inode_from_lock(lock); __u64 bits = lock->l_policy_data.l_inodebits.bits; + struct lu_fid *fid; /* Invalidate all dentries associated with this inode */ if (inode == NULL) break; - if (lock->l_resource->lr_name.name[0] != fid_seq(ll_inode2fid(inode)) || - lock->l_resource->lr_name.name[1] != fid_oid(ll_inode2fid(inode)) || - lock->l_resource->lr_name.name[2] != fid_ver(ll_inode2fid(inode))) { - LDLM_ERROR(lock, "data mismatch with object "DFID" (%p)", - PFID(ll_inode2fid(inode)), inode); + fid = ll_inode2fid(inode); + if (lock->l_resource->lr_name.name[0] != fid_seq(fid) || + lock->l_resource->lr_name.name[1] != fid_oid(fid) || + lock->l_resource->lr_name.name[2] != fid_ver(fid)) { + LDLM_ERROR(lock, "data mismatch with object " + DFID" (%p)", PFID(fid), inode); } if (bits & MDS_INODELOCK_OPEN) { @@ -190,8 +192,7 @@ int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, } if (bits & MDS_INODELOCK_UPDATE) - clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, - &(ll_i2info(inode)->lli_flags)); + ll_i2info(inode)->lli_flags &= ~LLIF_MDS_SIZE_LOCK; if (S_ISDIR(inode->i_mode) && (bits & MDS_INODELOCK_UPDATE)) { @@ -294,12 +295,7 @@ void ll_i2gids(__u32 *suppgids, struct inode *i1, struct inode *i2) } } -/* - * this function prepares md_op_data hint for passing ot down to MD stack. - * - * Note: it zeroes @op_data out before doing anything else, so all additional - * initializations of @op_data should be done after it. - */ +/* this function prepares md_op_data hint for passing ot down to MD stack. */ void ll_prepare_md_op_data(struct md_op_data *op_data, struct inode *i1, struct inode *i2, const char *name, int namelen, int mode) @@ -307,7 +303,6 @@ void ll_prepare_md_op_data(struct md_op_data *op_data, struct inode *i1, LASSERT(i1 != NULL); LASSERT(op_data != NULL); - memset(op_data, 0, sizeof(*op_data)); ll_i2gids(op_data->suppgids, i1, i2); op_data->fid1 = ll_i2info(i1)->lli_fid; diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 8ad18e5..e74eeab 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -370,7 +370,7 @@ void ll_inode_fill_obdo(struct inode *inode, int cmd, struct obdo *oa) valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME; if (cmd & OBD_BRW_WRITE) { oa->o_valid |= OBD_MD_FLEPOCH; - oa->o_easize = ll_i2info(inode)->lli_io_epoch; + oa->o_easize = ll_i2info(inode)->lli_ioepoch; valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLUID | OBD_MD_FLGID | @@ -536,7 +536,7 @@ int llap_shrink_cache(struct ll_sb_info *sbi, int shrink_fraction) return count; } -static struct ll_async_page *llap_from_page(struct page *page, unsigned origin) +struct ll_async_page *llap_from_page(struct page *page, unsigned origin) { struct ll_async_page *llap; struct obd_export *exp; @@ -603,11 +603,11 @@ static struct ll_async_page *llap_from_page(struct page *page, unsigned origin) /* also zeroing the PRIVBITS low order bitflags */ __set_page_ll_data(page, llap); llap->llap_page = page; - spin_lock(&sbi->ll_lock); sbi->ll_pglist_gen++; sbi->ll_async_page_count++; list_add_tail(&llap->llap_pglist_item, &sbi->ll_pglist); + INIT_LIST_HEAD(&llap->llap_pending_write); spin_unlock(&sbi->ll_lock); out: @@ -654,7 +654,6 @@ static int queue_or_sync_write(struct obd_export *exp, struct inode *inode, 0, 0, 0, async_flags); if (rc == 0) { LL_CDEBUG_PAGE(D_PAGE, llap->llap_page, "write queued\n"); - //llap_write_pending(inode, llap); GOTO(out, 0); } @@ -872,9 +871,9 @@ int ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc) unlock_page(page); - if (0 && cmd & OBD_BRW_WRITE) { - llap_write_complete(page->mapping->host, llap); - ll_try_done_writing(page->mapping->host); + if (cmd & OBD_BRW_WRITE) { + if (llap_write_complete(page->mapping->host, llap)) + ll_queue_done_writing(page->mapping->host); } if (PageWriteback(page)) { @@ -924,7 +923,9 @@ void ll_removepage(struct page *page) return; } - //llap_write_complete(inode, llap); + if (llap_write_complete(inode, llap)) + ll_queue_done_writing(inode); + rc = obd_teardown_async_page(exp, ll_i2info(inode)->lli_smd, NULL, llap->llap_cookie); if (rc != 0) diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index 349eab7..bb7287e 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -75,6 +75,18 @@ static int ll_releasepage(struct page *page, gfp_t gfp_mask) return 1; } +static int ll_set_page_dirty(struct page *page) +{ + struct ll_async_page *llap; + + llap = llap_from_page(page, LLAP_ORIGIN_UNKNOWN); + if (IS_ERR(llap)) + RETURN(PTR_ERR(llap)); + + llap_write_pending(page->mapping->host, llap); + return(__set_page_dirty_nobuffers(page)); +} + #define MAX_DIRECTIO_SIZE 2*1024*1024*1024UL static inline int ll_get_user_pages(int rw, unsigned long user_addr, @@ -151,12 +163,14 @@ static ssize_t ll_direct_IO_26_seg(int rw, struct file *file, ll_inode_fill_obdo(inode, rw, &oa); - if (rw == WRITE) + if (rw == WRITE) { lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_DIRECT_WRITE, size); - else + llap_write_pending(inode, NULL); + } else { lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_DIRECT_READ, size); + } rc = obd_brw_rqset(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, ll_i2dtexp(inode), &oa, lsm, page_count, pga, NULL); if (rc == 0) { @@ -231,7 +245,7 @@ struct address_space_operations ll_aops = { .direct_IO = ll_direct_IO_26, .writepage = ll_writepage_26, .writepages = generic_writepages, - .set_page_dirty = __set_page_dirty_nobuffers, + .set_page_dirty = ll_set_page_dirty, .sync_page = NULL, .prepare_write = ll_prepare_write, .commit_write = ll_commit_write, diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index bed94d5..918c6a1 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -1344,7 +1344,8 @@ repeat: } static int lmv_done_writing(struct obd_export *exp, - struct md_op_data *op_data) + struct md_op_data *op_data, + struct obd_client_handle *och) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -1360,7 +1361,7 @@ static int lmv_done_writing(struct obd_export *exp, if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); - rc = md_done_writing(tgt_exp, op_data); + rc = md_done_writing(tgt_exp, op_data, och); RETURN(rc); } @@ -1777,8 +1778,8 @@ request: } static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, - struct iattr *iattr, void *ea, int ealen, void *ea2, - int ea2len, struct ptlrpc_request **request) + void *ea, int ealen, void *ea2, int ea2len, + struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -1795,7 +1796,8 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, obj = lmv_obj_grab(obd, &op_data->fid1); CDEBUG(D_OTHER, "SETATTR for "DFID", valid 0x%x%s\n", - PFID(&op_data->fid1), iattr->ia_valid, obj ? ", split" : ""); + PFID(&op_data->fid1), op_data->attr.ia_valid, + obj ? ", split" : ""); if (obj) { for (i = 0; i < obj->lo_objcount; i++) { @@ -1807,7 +1809,7 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, break; } - rc = md_setattr(tgt_exp, op_data, iattr, ea, ealen, + rc = md_setattr(tgt_exp, op_data, ea, ealen, ea2, ea2len, &req); if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid)) { @@ -1829,7 +1831,7 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); - rc = md_setattr(tgt_exp, op_data, iattr, ea, ealen, ea2, + rc = md_setattr(tgt_exp, op_data, ea, ealen, ea2, ea2len, request); } RETURN(rc); diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index 29f3746..6f72dc2 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -1001,6 +1001,7 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo, sizeof(*req->rq_oi.oi_oa)); req->rq_oi.oi_oa->o_id = loi->loi_id; req->rq_oi.oi_cb_up = cb_getattr_update; + req->rq_rqset = set; lov_set_add_req(req, set); } diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index f85c873..a403ed6 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -37,8 +37,7 @@ void mdc_getattr_pack(struct ptlrpc_request *req, int offset, __u64 valid, int flags, struct md_op_data *data); void mdc_setattr_pack(struct ptlrpc_request *req, int offset, struct md_op_data *op_data, - struct iattr *iattr, void *ea, int ealen, - void *ea2, int ea2len); + void *ea, int ealen, void *ea2, int ea2len); void mdc_create_pack(struct ptlrpc_request *req, int offset, struct md_op_data *op_data, const void *data, int datalen, __u32 mode, __u32 uid, __u32 gid, __u32 cap_effective, @@ -55,8 +54,8 @@ void mdc_link_pack(struct ptlrpc_request *req, int offset, void mdc_rename_pack(struct ptlrpc_request *req, int offset, struct md_op_data *op_data, const char *old, int oldlen, const char *new, int newlen); -void mdc_close_pack(struct ptlrpc_request *req, int offset, struct md_op_data *op_data, - __u64 valid, struct obd_client_handle *och); +void mdc_close_pack(struct ptlrpc_request *req, int offset, + struct md_op_data *op_data); void mdc_exit_request(struct client_obd *cli); void mdc_enter_request(struct client_obd *cli); @@ -139,7 +138,7 @@ int mdc_getattr_name(struct obd_export *exp, const struct lu_fid *fid, const char *filename, int namelen, obd_valid valid, int ea_size, struct ptlrpc_request **request); int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data, - struct iattr *iattr, void *ea, int ealen, void *ea2, int ea2len, + void *ea, int ealen, void *ea2, int ea2len, struct ptlrpc_request **request); int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid, obd_valid valid, const char *xattr_name, @@ -171,7 +170,7 @@ int mdc_clear_open_replay_data(struct obd_export *exp, struct obd_client_handle *och); int mdc_close(struct obd_export *, struct md_op_data *, - struct obd_client_handle *, struct ptlrpc_request **); + struct obd_client_handle *och, struct ptlrpc_request **); int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid, __u64 offset, struct page *, struct ptlrpc_request **); @@ -202,6 +201,7 @@ int mdc_lock_match(struct obd_export *exp, int flags, int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, int flags, void *opaque); -int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data); +int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data, + struct obd_client_handle *och); #endif diff --git a/lustre/mdc/mdc_lib.c b/lustre/mdc/mdc_lib.c index c14e09f..1e957f2 100644 --- a/lustre/mdc/mdc_lib.c +++ b/lustre/mdc/mdc_lib.c @@ -178,45 +178,62 @@ void mdc_open_pack(struct ptlrpc_request *req, int offset, } } -void mdc_setattr_pack(struct ptlrpc_request *req, int offset, - struct md_op_data *op_data, struct iattr *iattr, - void *ea, int ealen, void *ea2, int ea2len) +static void mdc_setattr_pack_rec(struct mdt_rec_setattr *rec, + struct md_op_data *op_data) { - struct mdt_rec_setattr *rec = lustre_msg_buf(req->rq_reqmsg, offset, - sizeof (*rec)); rec->sa_opcode = REINT_SETATTR; rec->sa_fsuid = current->fsuid; rec->sa_fsgid = current->fsgid; rec->sa_cap = current->cap_effective; - rec->sa_fid = op_data->fid1; rec->sa_suppgid = -1; - if (iattr) { - rec->sa_valid = iattr->ia_valid; - rec->sa_mode = iattr->ia_mode; - rec->sa_uid = iattr->ia_uid; - rec->sa_gid = iattr->ia_gid; - rec->sa_size = iattr->ia_size; - rec->sa_atime = LTIME_S(iattr->ia_atime); - rec->sa_mtime = LTIME_S(iattr->ia_mtime); - rec->sa_ctime = LTIME_S(iattr->ia_ctime); - rec->sa_attr_flags = - ((struct ll_iattr_struct *)iattr)->ia_attr_flags; - if ((iattr->ia_valid & ATTR_GID) && in_group_p(iattr->ia_gid)) - rec->sa_suppgid = iattr->ia_gid; - else - rec->sa_suppgid = op_data->suppgids[0]; - } + rec->sa_fid = op_data->fid1; + rec->sa_valid = op_data->attr.ia_valid; + rec->sa_mode = op_data->attr.ia_mode; + rec->sa_uid = op_data->attr.ia_uid; + rec->sa_gid = op_data->attr.ia_gid; + rec->sa_size = op_data->attr.ia_size; + rec->sa_blocks = op_data->attr_blocks; + rec->sa_atime = LTIME_S(op_data->attr.ia_atime); + rec->sa_mtime = LTIME_S(op_data->attr.ia_mtime); + rec->sa_ctime = LTIME_S(op_data->attr.ia_ctime); + rec->sa_attr_flags = ((struct ll_iattr *)&op_data->attr)->ia_attr_flags; + if ((op_data->attr.ia_valid & ATTR_GID) && + in_group_p(op_data->attr.ia_gid)) + rec->sa_suppgid = op_data->attr.ia_gid; + else + rec->sa_suppgid = op_data->suppgids[0]; +} + +static void mdc_epoch_pack(struct mdt_epoch *epoch, struct md_op_data *op_data) +{ + memcpy(&epoch->handle, &op_data->handle, sizeof(epoch->handle)); + epoch->ioepoch = op_data->ioepoch; + epoch->flags = op_data->flags; +} + +void mdc_setattr_pack(struct ptlrpc_request *req, int offset, + struct md_op_data *op_data, void *ea, + int ealen, void *ea2, int ea2len) +{ + struct mdt_rec_setattr *rec; + struct mdt_epoch *epoch; + + rec = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*rec)); + epoch = lustre_msg_buf(req->rq_reqmsg, offset + 1, sizeof(*epoch)); + mdc_setattr_pack_rec(rec, op_data); + if (epoch) + mdc_epoch_pack(epoch, op_data); if (ealen == 0) return; - memcpy(lustre_msg_buf(req->rq_reqmsg, offset + 1, ealen), ea, ealen); + memcpy(lustre_msg_buf(req->rq_reqmsg, offset + 2, ealen), ea, ealen); if (ea2len == 0) return; - memcpy(lustre_msg_buf(req->rq_reqmsg, offset + 2, ea2len), ea2, ea2len); + memcpy(lustre_msg_buf(req->rq_reqmsg, offset + 3, ea2len), ea2, ea2len); } void mdc_unlink_pack(struct ptlrpc_request *req, int offset, @@ -318,39 +335,16 @@ void mdc_getattr_pack(struct ptlrpc_request *req, int offset, __u64 valid, } void mdc_close_pack(struct ptlrpc_request *req, int offset, - struct md_op_data *op_data, __u64 valid, - struct obd_client_handle *och) + struct md_op_data *op_data) { - struct mdt_body *body; + struct mdt_epoch *epoch; + struct mdt_rec_setattr *rec; - body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body)); + epoch = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*epoch)); + rec = lustre_msg_buf(req->rq_reqmsg, offset + 1, sizeof(*rec)); - body->fid1 = op_data->fid1; - memcpy(&body->handle, &och->och_fh, sizeof(body->handle)); - if (op_data->valid & OBD_MD_FLATIME) { - body->atime = op_data->atime; - body->valid |= OBD_MD_FLATIME; - } - if (op_data->valid & OBD_MD_FLMTIME) { - body->mtime = op_data->mtime; - body->valid |= OBD_MD_FLMTIME; - } - if (op_data->valid & OBD_MD_FLCTIME) { - body->ctime = op_data->ctime; - body->valid |= OBD_MD_FLCTIME; - } - if (op_data->valid & OBD_MD_FLSIZE) { - body->size = op_data->size; - body->valid |= OBD_MD_FLSIZE; - } - if (op_data->valid & OBD_MD_FLBLOCKS) { - body->blocks = op_data->blocks; - body->valid |= OBD_MD_FLBLOCKS; - } - if (op_data->valid & OBD_MD_FLFLAGS) { - body->flags = op_data->flags; - body->valid |= OBD_MD_FLFLAGS; - } + mdc_setattr_pack_rec(rec, op_data); + mdc_epoch_pack(epoch, op_data); } struct mdc_cache_waiter { diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index 45cd1eb..4982a34 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -67,19 +67,22 @@ static int mdc_reint(struct ptlrpc_request *request, * magic open-path setattr that should take the setattr semaphore and * go to the setattr portal. */ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data, - struct iattr *iattr, void *ea, int ealen, void *ea2, int ea2len, + void *ea, int ealen, void *ea2, int ea2len, struct ptlrpc_request **request) { struct ptlrpc_request *req; struct mdt_rec_setattr *rec; struct mdc_rpc_lock *rpc_lock; struct obd_device *obd = exp->exp_obd; - int size[4] = { sizeof(struct ptlrpc_body), - sizeof(*rec), ealen, ea2len }; - int bufcount = 2, rc; + int size[5] = { sizeof(struct ptlrpc_body), + sizeof(*rec), 0, ealen, ea2len }; + int bufcount = 3, rc; ENTRY; - LASSERT(iattr != NULL); + LASSERT(op_data != NULL); + + if (op_data->flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN)) + size[2] = sizeof(struct mdt_epoch); if (ealen > 0) { bufcount++; @@ -92,17 +95,18 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data, if (req == NULL) RETURN(-ENOMEM); - if (iattr->ia_valid & ATTR_FROM_OPEN) { + if (op_data->attr.ia_valid & ATTR_FROM_OPEN) { req->rq_request_portal = MDS_SETATTR_PORTAL; //XXX FIXME bug 249 rpc_lock = obd->u.cli.cl_setattr_lock; } else { rpc_lock = obd->u.cli.cl_rpc_lock; } - if (iattr->ia_valid & (ATTR_MTIME | ATTR_CTIME)) + if (op_data->attr.ia_valid & (ATTR_MTIME | ATTR_CTIME)) CDEBUG(D_INODE, "setting mtime %lu, ctime %lu\n", - LTIME_S(iattr->ia_mtime), LTIME_S(iattr->ia_ctime)); - mdc_setattr_pack(req, REQ_REC_OFF, op_data, iattr, ea, ealen, ea2, ea2len); + LTIME_S(op_data->attr.ia_mtime), + LTIME_S(op_data->attr.ia_ctime)); + mdc_setattr_pack(req, REQ_REC_OFF, op_data, ea, ealen, ea2, ea2len); size[REPLY_REC_OFF] = sizeof(struct mdt_body); ptlrpc_req_set_repsize(req, 2, size); diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 171bd18..2872903 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -638,8 +638,9 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data, struct obd_client_handle *och, struct ptlrpc_request **request) { struct obd_device *obd = class_exp2obd(exp); - int reqsize[2] = { sizeof(struct ptlrpc_body), - sizeof(struct mdt_body) }; + int reqsize[3] = { sizeof(struct ptlrpc_body), + sizeof(struct mdt_epoch), + sizeof(struct mdt_rec_setattr)}; int rc, repsize[4] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body), obd->u.cli.cl_max_mds_easize, @@ -649,7 +650,7 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data, ENTRY; req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_CLOSE, 2, reqsize, NULL); + MDS_CLOSE, 3, reqsize, NULL); if (req == NULL) GOTO(out, rc = -ENOMEM); @@ -679,8 +680,7 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data, CDEBUG(D_HA, "couldn't find open req; expecting close error\n"); } - mdc_close_pack(req, REQ_REC_OFF, op_data, op_data->valid, och); - + mdc_close_pack(req, REQ_REC_OFF, op_data); ptlrpc_req_set_repsize(req, 4, repsize); req->rq_commit_cb = mdc_commit_close; LASSERT(req->rq_cb_data == NULL); @@ -723,27 +723,27 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data, return rc; } -int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data) +int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data, + struct obd_client_handle *och) { struct ptlrpc_request *req; - struct mdt_body *body; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + int rc, size[3] = { sizeof(struct ptlrpc_body), + sizeof(struct mdt_epoch), + sizeof(struct mdt_rec_setattr)}; + int repsize[2] = { sizeof(struct ptlrpc_body), + sizeof(struct mdt_body)}; + ENTRY; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_DONE_WRITING, 2, size, NULL); + MDS_DONE_WRITING, 3, size, NULL); if (req == NULL) RETURN(-ENOMEM); - body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); - body->fid1 = op_data->fid1; - body->size = op_data->size; - body->blocks = op_data->blocks; - body->flags = op_data->flags; - body->valid = op_data->valid; - - ptlrpc_req_set_repsize(req, 2, size); - + /* XXX: add DONE_WRITING request to och -- when Size-on-MDS + * recovery will be ready. */ + mdc_close_pack(req, REQ_REC_OFF, op_data); + + ptlrpc_req_set_repsize(req, 2, repsize); rc = ptlrpc_queue_wait(req); ptlrpc_req_finished(req); RETURN(rc); diff --git a/lustre/mdd/mdd_handler.c b/lustre/mdd/mdd_handler.c index 60e3429..2a1d0cc 100644 --- a/lustre/mdd/mdd_handler.c +++ b/lustre/mdd/mdd_handler.c @@ -843,6 +843,19 @@ int mdd_fix_attr(const struct lu_context *ctxt, struct mdd_object *obj, la->la_mode = (mode & S_IALLUGO) | (tmp_la->la_mode & ~S_IALLUGO); } + + /* For the "Size-on-MDS" setattr update, merge coming attributes with + * the set in the inode. */ + if (la->la_valid & LA_SIZE) { + if ((la->la_valid & LA_ATIME) && + (la->la_atime < tmp_la->la_atime)) + la->la_valid &= ~LA_ATIME; + + if ((la->la_valid & LA_CTIME) && + (la->la_ctime < tmp_la->la_ctime)) + la->la_valid &= ~(LA_MTIME | LA_CTIME); + } + RETURN(rc); } diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index ec405c1..5f1ab2c 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -1448,9 +1448,6 @@ int mds_close(struct ptlrpc_request *req, int offset) RETURN(-EFAULT); } - if (body->flags & MDS_BFLAG_UNCOMMITTED_WRITES) - /* do some stuff */ ; - spin_lock(&med->med_open_lock); mfd = mds_handle2mfd(&body->handle); if (mfd == NULL) { diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index f7d4541..4688d55 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -192,6 +192,17 @@ static int mdt_statfs(struct mdt_thread_info *info) RETURN(rc); } +void mdt_pack_size2body(struct mdt_body *b, const struct lu_attr *attr, + struct mdt_object *o) +{ + /* Check if Size-on-MDS is enabled. */ + if (S_ISREG(attr->la_mode) && mdt_sizeonmds_enabled(o)) { + b->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS); + b->size = attr->la_size; + b->blocks = attr->la_blocks; + } +} + void mdt_pack_attr2body(struct mdt_body *b, const struct lu_attr *attr, const struct lu_fid *fid) { @@ -203,7 +214,7 @@ void mdt_pack_attr2body(struct mdt_body *b, const struct lu_attr *attr, if (!S_ISREG(attr->la_mode)) b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV; - + b->atime = attr->la_atime; b->mtime = attr->la_mtime; b->ctime = attr->la_ctime; @@ -272,7 +283,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, repbody->fid1 = *mdt_object_fid(o); repbody->valid = OBD_MD_FLID | OBD_MD_MDS; RETURN(0); - } else if (rc){ + } else if (rc) { CERROR("getattr error for "DFID": %d\n", PFID(mdt_object_fid(o)), rc); RETURN(rc); @@ -443,11 +454,13 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, if (rc != 0) mdt_object_unlock(info, child, lhc, 1); else { - /* This is pure debugging code. */ - struct ldlm_lock *lock; - struct ldlm_res_id *res_id; - lock = ldlm_handle2lock(&lhc->mlh_lh); + struct ldlm_lock *lock = ldlm_handle2lock(&lhc->mlh_lh); if (lock) { + struct ldlm_res_id *res_id; + struct mdt_body *repbody; + struct lu_attr *ma; + + /* Debugging code. */ res_id = &lock->l_resource->lr_name; LDLM_DEBUG(lock, "we will return this lock client\n"); LASSERTF(fid_res_name_eq(mdt_object_fid(child), @@ -457,8 +470,19 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, (unsigned long)res_id->name[1], (unsigned long)res_id->name[2], PFID(mdt_object_fid(child))); + + /* Pack Size-on-MDS inode attributes to the body if + * update lock is given. */ + repbody = req_capsule_server_get(&info->mti_pill, + &RMF_MDT_BODY); + ma = &info->mti_attr.ma_attr; + if (lock->l_policy_data.l_inodebits.bits & + MDS_INODELOCK_UPDATE) + mdt_pack_size2body(repbody, ma, child); LDLM_LOCK_PUT(lock); } + + } mdt_object_put(info->mti_ctxt, child); @@ -882,19 +906,18 @@ static int mdt_sync(struct mdt_thread_info *info) if (rc == 0) { rc = mdt_object_sync(info); if (rc == 0) { - struct md_object *next; + struct md_object *next; const struct lu_fid *fid; - struct lu_attr *la; - + struct lu_attr *la = &info->mti_attr.ma_attr; + next = mdt_object_child(info->mti_object); - fid = mdt_object_fid(info->mti_object); info->mti_attr.ma_need = MA_INODE; rc = mo_attr_get(info->mti_ctxt, next, &info->mti_attr); - la = &info->mti_attr.ma_attr; if (rc == 0) { body = req_capsule_server_get(pill, &RMF_MDT_BODY); + fid = mdt_object_fid(info->mti_object); mdt_pack_attr2body(body, la, fid); } } @@ -2716,7 +2739,7 @@ static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m, m->mdt_max_mdsize = MAX_MD_SIZE; m->mdt_max_cookiesize = sizeof(struct llog_cookie); - spin_lock_init(&m->mdt_epoch_lock); + spin_lock_init(&m->mdt_ioepoch_lock); /* Temporary. should parse mount option. */ m->mdt_opts.mo_user_xattr = 0; m->mdt_opts.mo_acl = 0; @@ -3078,6 +3101,8 @@ static int mdt_destroy_export(struct obd_export *export) info = lu_context_key_get(&ctxt, &mdt_thread_key); LASSERT(info != NULL); memset(info, 0, sizeof *info); + info->mti_ctxt = &ctxt; + info->mti_mdt = mdt; ma = &info->mti_attr; ma->ma_lmm_size = mdt->mdt_max_mdsize; @@ -3095,20 +3120,21 @@ static int mdt_destroy_export(struct obd_export *export) struct list_head *tmp = med->med_open_head.next; struct mdt_file_data *mfd = list_entry(tmp, struct mdt_file_data, mfd_list); - struct mdt_object *o = mfd->mfd_object; + struct md_attr *ma = &info->mti_attr; /* Remove mfd handle so it can't be found again. * We are consuming the mfd_list reference here. */ class_handle_unhash(&mfd->mfd_handle); list_del_init(&mfd->mfd_list); spin_unlock(&med->med_open_lock); - mdt_mfd_close(&ctxt, mdt, mfd, ma); + mdt_mfd_close(info, mfd); /* TODO: if we close the unlinked file, * we need to remove it's objects from OST */ - mdt_object_put(&ctxt, o); + memset(&ma->ma_attr, 0, sizeof(ma->ma_attr)); spin_lock(&med->med_open_lock); } spin_unlock(&med->med_open_lock); + info->mti_mdt = NULL; mdt_client_del(&ctxt, mdt, med); out: @@ -3418,7 +3444,7 @@ DEF_MDT_HNDL_F(0 |HABEO_REFERO, STATFS, mdt_statfs), DEF_MDT_HNDL_F(0 |MUTABOR, REINT, mdt_reint), DEF_MDT_HNDL_F(HABEO_CORPUS , CLOSE, mdt_close), -DEF_MDT_HNDL_0(0, DONE_WRITING, mdt_done_writing), +DEF_MDT_HNDL_F(HABEO_CORPUS , DONE_WRITING, mdt_done_writing), DEF_MDT_HNDL_F(0 |HABEO_REFERO, PIN, mdt_pin), DEF_MDT_HNDL_0(0, SYNC, mdt_sync), DEF_MDT_HNDL_0(0, QUOTACHECK, mdt_quotacheck_handle), @@ -3501,6 +3527,7 @@ static struct mdt_handler mdt_readpage_ops[] = { * detailed comments. --umka */ DEF_MDT_HNDL_F(HABEO_CORPUS, CLOSE, mdt_close), + DEF_MDT_HNDL_F(HABEO_CORPUS, DONE_WRITING, mdt_done_writing), }; static struct mdt_opc_slice mdt_readpage_handlers[] = { diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 1bfc51f..da528e1 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -136,10 +136,9 @@ struct mdt_device { signed int mo_compat_resname:1; } mdt_opts; - /* lock to pretect epoch and write count - */ - spinlock_t mdt_epoch_lock; - __u64 mdt_io_epoch; + /* lock to pretect epoch and write count */ + spinlock_t mdt_ioepoch_lock; + __u64 mdt_ioepoch; /* Transaction related stuff here */ spinlock_t mdt_transno_lock; @@ -170,7 +169,9 @@ struct mdt_device { struct mdt_object { struct lu_object_header mot_header; struct md_object mot_obj; - __u64 mot_io_epoch; + __u64 mot_ioepoch; + __u64 mot_flags; + int mot_epochcount; int mot_writecount; }; @@ -292,6 +293,9 @@ struct mdt_thread_info { } rdpg; } mti_u; + /* IO epoch related stuff. */ + struct mdt_epoch *mti_epoch; + /* server and client data buffers */ struct mdt_server_data mti_msd; struct mdt_client_data mti_mcd; @@ -365,8 +369,11 @@ void mdt_object_unlock_put(struct mdt_thread_info *, struct mdt_lock_handle *, int decref); +int mdt_close_unpack(struct mdt_thread_info *info); int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op); int mdt_reint_rec(struct mdt_thread_info *); +void mdt_pack_size2body(struct mdt_body *b, const struct lu_attr *attr, + struct mdt_object *o); void mdt_pack_attr2body(struct mdt_body *b, const struct lu_attr *attr, const struct lu_fid *fid); @@ -400,11 +407,18 @@ int mdt_lock_new_child(struct mdt_thread_info *info, int mdt_open(struct mdt_thread_info *info); -void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt, - struct mdt_file_data *mfd, struct md_attr *ma); - +struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle); +int mdt_epoch_open(struct mdt_thread_info *info, struct mdt_object *o, + __u64 epoch); +void mdt_sizeonmds_enable(struct mdt_thread_info *info, struct mdt_object *mo); +int mdt_sizeonmds_enabled(struct mdt_object *mo); +int mdt_write_get(struct mdt_device *mdt, struct mdt_object *o); +struct mdt_file_data *mdt_mfd_new(void); +int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd); +void mdt_mfd_free(struct mdt_file_data *mfd); int mdt_close(struct mdt_thread_info *info); - +int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, + int flags); int mdt_done_writing(struct mdt_thread_info *info); void mdt_shrink_reply(struct mdt_thread_info *info, int offset); int mdt_handle_last_unlink(struct mdt_thread_info *, struct mdt_object *, diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c index a9e8c23..a0cb3a7 100644 --- a/lustre/mdt/mdt_lib.c +++ b/lustre/mdt/mdt_lib.c @@ -137,6 +137,8 @@ static __u64 mdt_attr_valid_xlate(__u64 in, struct mdt_reint_record *rr, out |= LA_GID; if (in & ATTR_SIZE) out |= LA_SIZE; + if (in & ATTR_BLOCKS) + out |= LA_BLOCKS; if (in & ATTR_FROM_OPEN) rr->rr_flags |= MRF_SETATTR_LOCKED; @@ -154,7 +156,7 @@ static __u64 mdt_attr_valid_xlate(__u64 in, struct mdt_reint_record *rr, out |= LA_FLAGS; /*XXX need ATTR_RAW?*/ - in &= ~(ATTR_MODE|ATTR_UID|ATTR_GID|ATTR_SIZE| + in &= ~(ATTR_MODE|ATTR_UID|ATTR_GID|ATTR_SIZE|ATTR_BLOCKS| ATTR_ATIME|ATTR_MTIME|ATTR_CTIME|ATTR_FROM_OPEN| ATTR_ATIME_SET|ATTR_CTIME_SET|ATTR_MTIME_SET| ATTR_ATTR_FLAG|ATTR_RAW); @@ -163,17 +165,17 @@ static __u64 mdt_attr_valid_xlate(__u64 in, struct mdt_reint_record *rr, return out; } /* unpacking */ -static int mdt_setattr_unpack(struct mdt_thread_info *info) + +static int mdt_setattr_unpack_rec(struct mdt_thread_info *info) { - struct mdt_rec_setattr *rec; struct md_attr *ma = &info->mti_attr; struct lu_attr *la = &ma->ma_attr; - struct mdt_reint_record *rr = &info->mti_rr; struct req_capsule *pill = &info->mti_pill; + struct mdt_reint_record *rr = &info->mti_rr; + struct mdt_rec_setattr *rec; ENTRY; - + rec = req_capsule_client_get(pill, &RMF_REC_SETATTR); - if (rec == NULL) RETURN(-EFAULT); @@ -184,10 +186,36 @@ static int mdt_setattr_unpack(struct mdt_thread_info *info) la->la_uid = rec->sa_uid; la->la_gid = rec->sa_gid; la->la_size = rec->sa_size; + la->la_blocks = rec->sa_blocks; la->la_ctime = rec->sa_ctime; la->la_atime = rec->sa_atime; la->la_mtime = rec->sa_mtime; ma->ma_valid = MA_INODE; + RETURN(0); +} + +static int mdt_epoch_unpack(struct mdt_thread_info *info) +{ + struct req_capsule *pill = &info->mti_pill; + ENTRY; + + info->mti_epoch = req_capsule_client_get(pill, &RMF_MDT_EPOCH); + RETURN(info->mti_epoch == NULL ? -EFAULT : 0); +} + +static int mdt_setattr_unpack(struct mdt_thread_info *info) +{ + struct md_attr *ma = &info->mti_attr; + struct req_capsule *pill = &info->mti_pill; + int rc; + ENTRY; + + rc = mdt_setattr_unpack_rec(info); + if (rc) + RETURN(rc); + + /* Epoch may be absent, skip errors. */ + mdt_epoch_unpack(info); if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) { ma->ma_lmm = req_capsule_client_get(pill, &RMF_EADATA); @@ -207,6 +235,18 @@ static int mdt_setattr_unpack(struct mdt_thread_info *info) RETURN(0); } +int mdt_close_unpack(struct mdt_thread_info *info) +{ + int rc; + ENTRY; + + rc = mdt_epoch_unpack(info); + if (rc) + RETURN(rc); + + RETURN(mdt_setattr_unpack_rec(info)); +} + static int mdt_create_unpack(struct mdt_thread_info *info) { struct mdt_rec_create *rec; diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 4d90132..f45d1a4 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -42,7 +42,7 @@ static void mdt_mfd_get(void *mfdp) /* Create a new mdt_file_data struct, initialize it, * and insert it to global hash table */ -static struct mdt_file_data *mdt_mfd_new(void) +struct mdt_file_data *mdt_mfd_new(void) { struct mdt_file_data *mfd; ENTRY; @@ -57,7 +57,7 @@ static struct mdt_file_data *mdt_mfd_new(void) } /* Find the mfd pointed to by handle in global hash table. */ -static struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle) +struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle) { ENTRY; LASSERT(handle != NULL); @@ -65,7 +65,7 @@ static struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle) } /* free mfd */ -static void mdt_mfd_free(struct mdt_file_data *mfd) +void mdt_mfd_free(struct mdt_file_data *mfd) { LASSERT(list_empty(&mfd->mfd_handle.h_link)); LASSERT(list_empty(&mfd->mfd_list)); @@ -90,92 +90,194 @@ static int mdt_create_data(struct mdt_thread_info *info, RETURN(rc); } +static int mdt_epoch_opened(struct mdt_object *mo) +{ + return mo->mot_epochcount; +} -/*The following four functions are copied from MDS */ +int mdt_sizeonmds_enabled(struct mdt_object *mo) +{ + return !mo->mot_ioepoch; +} -/* Write access to a file: executors cause a negative count, - * writers a positive count. The semaphore is needed to perform - * a check for the sign and then increment or decrement atomically. - * - * This code is closely tied to the allocation of the d_fsdata and the - * MDS epoch, so we use the same semaphore for the whole lot. +/* Re-enable Size-on-MDS. */ +void mdt_sizeonmds_enable(struct mdt_thread_info *info, + struct mdt_object *mo) +{ + spin_lock(&info->mti_mdt->mdt_ioepoch_lock); + if (info->mti_epoch->ioepoch == mo->mot_ioepoch) { + mo->mot_ioepoch = 0; + mo->mot_flags = 0; + } + spin_unlock(&info->mti_mdt->mdt_ioepoch_lock); +} + +/* Open the epoch. Epoch open is allowed if @writecount is not negative. + * The epoch and writecount handling is performed under the mdt_ioepoch_lock. * - * FIXME and TODO : handle the epoch! - * epoch argument is nonzero during recovery */ -static int mdt_get_write_access(struct mdt_device *mdt, struct mdt_object *o, - __u64 epoch) + * @epoch is nonzero during recovery XXX not ready. */ +int mdt_epoch_open(struct mdt_thread_info *info, struct mdt_object *o, + __u64 epoch) { - int rc = 0; + struct mdt_device *mdt = info->mti_mdt; + int cancel = 0; + int rc; ENTRY; - spin_lock(&mdt->mdt_epoch_lock); - - if (o->mot_writecount < 0) { - rc = -ETXTBSY; + if (!S_ISREG(lu_object_attr(&o->mot_obj.mo_lu))) + RETURN(0); + + spin_lock(&mdt->mdt_ioepoch_lock); + if (mdt_epoch_opened(o)) { + /* Epoch continues even if there is no writers yet. */ + CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID"\n", + o->mot_ioepoch, PFID(mdt_object_fid(o))); } else { - if (o->mot_io_epoch != 0) { - CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID"\n", - o->mot_io_epoch, PFID(mdt_object_fid(o))); - } else { - if (epoch > mdt->mdt_io_epoch) - mdt->mdt_io_epoch = epoch; - else - mdt->mdt_io_epoch++; - o->mot_io_epoch = mdt->mdt_io_epoch; - CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID"\n", - mdt->mdt_io_epoch, PFID(mdt_object_fid(o))); - } - o->mot_writecount ++; + if (epoch > mdt->mdt_ioepoch) + mdt->mdt_ioepoch = epoch; + else + mdt->mdt_ioepoch++; + o->mot_ioepoch = epoch ? epoch : mdt->mdt_ioepoch; + CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID"\n", + mdt->mdt_ioepoch, PFID(mdt_object_fid(o))); + cancel = 1; } - spin_unlock(&mdt->mdt_epoch_lock); - RETURN(rc); + o->mot_epochcount++; + spin_unlock(&mdt->mdt_ioepoch_lock); + + /* Cancel Size-on-MDS attributes on clients if not truncate. + * In the later case, mdt_reint_setattr will do it. */ + if (cancel && (info->mti_rr.rr_fid1 != NULL)) { + struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_CHILD]; + lh->mlh_mode = LCK_EX; + rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_UPDATE); + mdt_object_unlock(info, o, lh, 1); + RETURN(rc); + } + RETURN(0); } -static void mdt_put_write_access(struct mdt_device *mdt, struct mdt_object *o) +/* Update the on-disk attributes if needed and re-enable Size-on-MDS caching. */ +static int mdt_sizeonmds_update(struct mdt_thread_info *info, + struct mdt_object *o) { ENTRY; - spin_lock(&mdt->mdt_epoch_lock); - o->mot_writecount --; - if (o->mot_writecount == 0) - o->mot_io_epoch = 0; - spin_unlock(&mdt->mdt_epoch_lock); - EXIT; + CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n", + o->mot_ioepoch, PFID(mdt_object_fid(o)), o->mot_epochcount); + + if (info->mti_attr.ma_attr.la_valid & LA_SIZE) + /* Do Size-on-MDS attribute update. + * Size-on-MDS is re-enabled inside. */ + RETURN(mdt_attr_set(info, o, 0)); + else + mdt_sizeonmds_enable(info, o); + RETURN(0); } -static int mdt_deny_write_access(struct mdt_device *mdt, struct mdt_object *o) +/* Epoch closes. + * Returns 1 if epoch does not close. + * Returns 0 if epoch closes. + * Returns EAGAIN if epoch closes but an Size-on-MDS Update is still needed + * from the client. */ +static int mdt_epoch_close(struct mdt_thread_info *info, struct mdt_object *o) +{ + int eviction = (mdt_info_req(info) == NULL ? 1 : 0); + struct lu_attr *la = &info->mti_attr.ma_attr; + int achange = 0; + int opened; + int rc = 1; + ENTRY; + + if (!S_ISREG(lu_object_attr(&o->mot_obj.mo_lu))) + RETURN(0); + + spin_lock(&info->mti_mdt->mdt_ioepoch_lock); + + /* Epoch closes only if client tells about it or eviction occures. */ + if (eviction || (info->mti_epoch->flags & MF_EPOCH_CLOSE)) { + LASSERT(o->mot_epochcount); + o->mot_epochcount--; + + CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n", + o->mot_ioepoch, PFID(mdt_object_fid(o)), + o->mot_epochcount); + + if (!eviction) + achange = (info->mti_epoch->flags & MF_SOM_CHANGE); + + rc = 0; + if (!eviction && !mdt_epoch_opened(o)) { + /* Epoch ends. Is an Size-on-MDS update needed? */ + if (o->mot_flags & MF_SOM_CHANGE) { + /* Some previous writer changed the attribute. + * Do not beleive to the current Size-on-MDS + * update, re-ask client. */ + rc = EAGAIN; + } else if (!(la->la_valid & LA_SIZE) && achange) { + /* Attributes were changed by the last writer + * only but no Size-on-MDS update is received.*/ + rc = EAGAIN; + } + } + + if (achange || eviction) + o->mot_flags |= MF_SOM_CHANGE; + } + + opened = mdt_epoch_opened(o); + spin_unlock(&info->mti_mdt->mdt_ioepoch_lock); + + /* XXX: if eviction occured, do nothing yet. */ + if ((rc == 0) && !opened && !eviction) { + /* Epoch ends and wanted Size-on-MDS update is obtained. */ + rc = mdt_sizeonmds_update(info, o); + } + RETURN(rc); +} + +int mdt_write_get(struct mdt_device *mdt, struct mdt_object *o) { int rc = 0; ENTRY; - spin_lock(&mdt->mdt_epoch_lock); - if (o->mot_writecount > 0) { + spin_lock(&mdt->mdt_ioepoch_lock); + if (o->mot_writecount < 0) rc = -ETXTBSY; - } else - o->mot_writecount --; - spin_unlock(&mdt->mdt_epoch_lock); + else + o->mot_writecount++; + spin_unlock(&mdt->mdt_ioepoch_lock); RETURN(rc); } -static void mdt_allow_write_access(struct mdt_device *mdt, - struct mdt_object *o) +static void mdt_write_put(struct mdt_device *mdt, struct mdt_object *o) { ENTRY; - spin_lock(&mdt->mdt_epoch_lock); - o->mot_writecount ++; - spin_unlock(&mdt->mdt_epoch_lock); + spin_lock(&mdt->mdt_ioepoch_lock); + o->mot_writecount--; + spin_unlock(&mdt->mdt_ioepoch_lock); EXIT; } -int mdt_query_write_access(struct mdt_device *mdt, struct mdt_object *o) +static int mdt_write_deny(struct mdt_device *mdt, struct mdt_object *o) { - int wc; + int rc = 0; ENTRY; + spin_lock(&mdt->mdt_ioepoch_lock); + if (o->mot_writecount > 0) + rc = -ETXTBSY; + else + o->mot_writecount--; + spin_unlock(&mdt->mdt_ioepoch_lock); + RETURN(rc); +} - spin_lock(&mdt->mdt_epoch_lock); - wc = o->mot_writecount; - spin_unlock(&mdt->mdt_epoch_lock); - - RETURN(wc); +static void mdt_write_allow(struct mdt_device *mdt, struct mdt_object *o) +{ + ENTRY; + spin_lock(&mdt->mdt_ioepoch_lock); + o->mot_writecount++; + spin_unlock(&mdt->mdt_ioepoch_lock); + EXIT; } /* there can be no real transaction so prepare the fake one */ @@ -213,9 +315,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info, int created, struct ldlm_reply *rep) { - struct mdt_export_data *med; struct mdt_file_data *mfd; - struct mdt_device *mdt = info->mti_mdt; struct mdt_body *repbody; struct md_attr *ma = &info->mti_attr; struct lu_attr *la = &ma->ma_attr; @@ -291,12 +391,14 @@ static int mdt_mfd_open(struct mdt_thread_info *info, if (flags & FMODE_WRITE) { - /* FIXME: in recovery, need to pass old epoch here */ - rc = mdt_get_write_access(mdt, o, 0); - if (rc == 0) - repbody->io_epoch = o->mot_io_epoch; + rc = mdt_write_get(info->mti_mdt, o); + if (rc == 0) { + /* FIXME: in recovery, need to pass old epoch here */ + mdt_epoch_open(info, o, 0); + repbody->ioepoch = o->mot_ioepoch; + } } else if (flags & MDS_FMODE_EXEC) { - rc = mdt_deny_write_access(mdt, o); + rc = mdt_write_deny(info->mti_mdt, o); } if (rc) RETURN(rc); @@ -307,6 +409,8 @@ static int mdt_mfd_open(struct mdt_thread_info *info, mfd = mdt_mfd_new(); if (mfd != NULL) { + struct mdt_export_data *med = &req->rq_export->exp_mdt_data; + /* keep a reference on this object for this open, * and is released by mdt_mfd_close() */ mdt_object_get(info->mti_ctxt, o); @@ -314,9 +418,8 @@ static int mdt_mfd_open(struct mdt_thread_info *info, mfd->mfd_mode = flags; mfd->mfd_object = o; - mfd->mfd_xid = mdt_info_req(info)->rq_xid; + mfd->mfd_xid = req->rq_xid; - med = &req->rq_export->exp_mdt_data; spin_lock(&med->med_open_lock); list_add(&mfd->mfd_list, &med->med_open_head); spin_unlock(&med->med_open_lock); @@ -350,7 +453,7 @@ void mdt_reconstruct_open(struct mdt_thread_info *info) const struct lu_context *ctxt = info->mti_ctxt; struct mdt_object *parent; struct mdt_object *child; - struct ldlm_reply *ldlm_rep; + struct ldlm_reply *ldlm_rep = NULL; struct mdt_body *repbody; struct mdt_file_data *mfd; struct list_head *h; @@ -455,11 +558,14 @@ void mdt_reconstruct_open(struct mdt_thread_info *info) if (flags & FMODE_WRITE) { /* FIXME: in recovery, need to pass old epoch here */ - result = mdt_get_write_access(mdt, child, 0); - if (result == 0) - repbody->io_epoch = child->mot_io_epoch; + result = mdt_write_get(info->mti_mdt, child); + if (result == 0) { + /* FIXME: in recovery, need to pass old epoch here */ + mdt_epoch_open(info, child, 0); + repbody->ioepoch = child->mot_ioepoch; + } } else if (flags & MDS_FMODE_EXEC) - result = mdt_deny_write_access(mdt, child); + result = mdt_write_deny(mdt, child); if (result) GOTO(out_child, result); @@ -758,23 +864,73 @@ out: return result; } -void mdt_mfd_close(const struct lu_context *ctxt, - struct mdt_device *mdt, struct mdt_file_data *mfd, - struct md_attr *ma) +#define MFD_CLOSED(mode) (((mode) & ~(FMODE_EPOCH | FMODE_SOM | \ + FMODE_EPOCHLCK)) == FMODE_CLOSED) + +static int mdt_mfd_closed(struct mdt_file_data *mfd) +{ + return ((mfd == NULL) || MFD_CLOSED(mfd->mfd_mode)); +} + +int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd) { struct mdt_object *o = mfd->mfd_object; + struct md_object *next = mdt_object_child(o); + struct md_attr *ma = &info->mti_attr; + int rc = 0, ret = 0; + int mode; ENTRY; - if (mfd->mfd_mode & FMODE_WRITE) { - mdt_put_write_access(mdt, o); - } else if (mfd->mfd_mode & MDS_FMODE_EXEC) { - mdt_allow_write_access(mdt, o); + mode = mfd->mfd_mode; + mfd->mfd_mode = FMODE_CLOSED; + + if ((mode & FMODE_WRITE) || (mode & FMODE_EPOCHLCK)) { + mdt_write_put(info->mti_mdt, o); + ret = mdt_epoch_close(info, o); + } else if (mode & MDS_FMODE_EXEC) { + mdt_write_allow(info->mti_mdt, o); + } else if (mode & FMODE_EPOCH) { + ret = mdt_epoch_close(info, o); } - mdt_mfd_free(mfd); + ma->ma_need |= MA_INODE; + + if (!MFD_CLOSED(mode)) + rc = mo_close(info->mti_ctxt, next, ma); + else if (ret == EAGAIN) + rc = mo_attr_get(info->mti_ctxt, next, ma); + + /* If the object is unlinked, do not try to re-enable SIZEONMDS */ + if ((ret == EAGAIN) && (ma->ma_valid & MA_INODE) && + (ma->ma_attr.la_nlink == 0)) + { + ret = 0; + } - mo_close(ctxt, mdt_object_child(o), ma); - EXIT; + if ((ret == EAGAIN) || (ret == 1)) { + struct mdt_export_data *med; + /* The epoch has not closed or Size-on-MDS update is needed. + * Put mfd back into the list. */ + mfd->mfd_mode = (ret == 1 ? FMODE_EPOCH : FMODE_SOM); + + LASSERT(mdt_info_req(info)); + med = &mdt_info_req(info)->rq_export->exp_mdt_data; + spin_lock(&med->med_open_lock); + list_add(&mfd->mfd_list, &med->med_open_head); + class_handle_hash_back(&mfd->mfd_handle); + spin_unlock(&med->med_open_lock); + if (ret == 1) + ret = 0; + else { + CDEBUG(D_INODE, "Size-on-MDS attribute update is " + "needed on "DFID"\n", PFID(mdt_object_fid(o))); + } + } else { + mdt_mfd_free(mfd); + mdt_object_put(info->mti_ctxt, o); + } + + RETURN(rc ? rc : ret); } int mdt_close(struct mdt_thread_info *info) @@ -784,9 +940,16 @@ int mdt_close(struct mdt_thread_info *info) struct mdt_object *o; struct md_attr *ma = &info->mti_attr; struct mdt_body *repbody = NULL; - int rc; + int rc, ret = 0; ENTRY; + /* Close may come with the Size-on-MDS update. Unpack it. */ + rc = mdt_close_unpack(info); + if (rc) + RETURN(rc); + + LASSERT(info->mti_epoch); + req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD, RCL_SERVER, info->mti_mdt->mdt_max_mdsize); req_capsule_set_size(&info->mti_pill, &RMF_LOGCOOKIES, RCL_SERVER, @@ -803,12 +966,12 @@ int mdt_close(struct mdt_thread_info *info) med = &mdt_info_req(info)->rq_export->exp_mdt_data; spin_lock(&med->med_open_lock); - mfd = mdt_handle2mfd(&(info->mti_body->handle)); - if (mfd == NULL) { + mfd = mdt_handle2mfd(&(info->mti_epoch->handle)); + if (mdt_mfd_closed(mfd)) { spin_unlock(&med->med_open_lock); CDEBUG(D_INODE, "no handle for file close: fid = "DFID - ": cookie = "LPX64"\n", PFID(&info->mti_body->fid1), - info->mti_body->handle.cookie); + ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1), + info->mti_epoch->handle.cookie); rc = -ESTALE; } else { class_handle_unhash(&mfd->mfd_handle); @@ -832,12 +995,13 @@ int mdt_close(struct mdt_thread_info *info) RCL_SERVER); ma->ma_need = MA_INODE; } + + /* Do not lose object before last unlink. */ o = mfd->mfd_object; - mdt_mfd_close(info->mti_ctxt, info->mti_mdt, mfd, ma); + mdt_object_get(info->mti_ctxt, o); + ret = mdt_mfd_close(info, mfd); if (repbody != NULL) rc = mdt_handle_last_unlink(info, o, ma); - - /* release reference on this object. */ mdt_object_put(info->mti_ctxt, o); } if (repbody != NULL) @@ -846,16 +1010,50 @@ int mdt_close(struct mdt_thread_info *info) if (MDT_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) RETURN(-ENOMEM); - RETURN(rc); + RETURN(rc ? rc : ret); } int mdt_done_writing(struct mdt_thread_info *info) { + struct mdt_body *repbody = NULL; + struct mdt_export_data *med; + struct mdt_file_data *mfd; int rc; ENTRY; - req_capsule_set(&info->mti_pill, &RQF_MDS_DONE_WRITING); rc = req_capsule_pack(&info->mti_pill); + if (rc) + RETURN(rc); + + repbody = req_capsule_server_get(&info->mti_pill, + &RMF_MDT_BODY); + repbody->eadatasize = 0; + repbody->aclsize = 0; - RETURN(0); + /* Done Writing may come with the Size-on-MDS update. Unpack it. */ + rc = mdt_close_unpack(info); + if (rc) + RETURN(rc); + + med = &mdt_info_req(info)->rq_export->exp_mdt_data; + spin_lock(&med->med_open_lock); + mfd = mdt_handle2mfd(&(info->mti_epoch->handle)); + if (mfd == NULL) { + spin_unlock(&med->med_open_lock); + CDEBUG(D_INODE, "no handle for file close: fid = "DFID + ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1), + info->mti_epoch->handle.cookie); + rc = -ESTALE; + } else { + LASSERT((mfd->mfd_mode == FMODE_EPOCH) || + (mfd->mfd_mode == FMODE_EPOCHLCK)); + class_handle_unhash(&mfd->mfd_handle); + list_del_init(&mfd->mfd_list); + spin_unlock(&med->med_open_lock); + + /* Set EPOCH CLOSE flag if not set by client. */ + info->mti_epoch->flags |= MF_EPOCH_CLOSE; + rc = mdt_mfd_close(info, mfd); + } + RETURN(rc); } diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index f291c9f..5554043 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -805,15 +805,15 @@ static int mdt_txn_stop_cb(const struct lu_context *ctx, /* transno in two contexts - for commit_cb and for thread */ txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key); mti = lu_context_key_get(ctx, &mdt_thread_key); + req = mdt_info_req(mti); /* FIXME: don't handle requests from SEQ/FLD, * should be fixed */ - if (mti->mti_mdt == NULL) { + if (mti->mti_mdt == NULL || req == NULL) { txi->txi_transno = 0; return 0; } - req = mdt_info_req(mti); LASSERT(req != NULL); /*TODO: checks for recovery cases, see mds_finish_transno */ spin_lock(&mdt->mdt_transno_lock); @@ -969,8 +969,7 @@ static void mdt_reconstruct_create(struct mdt_thread_info *mti) req->rq_status = rc; body->valid |= OBD_MD_MDS; } - mdt_pack_attr2body(body, &mti->mti_attr.ma_attr, - mti->mti_rr.rr_fid2); + mdt_pack_attr2body(body, &mti->mti_attr.ma_attr, mdt_object_fid(child)); mdt_object_put(mti->mti_ctxt, child); } @@ -990,8 +989,7 @@ static void mdt_reconstruct_setattr(struct mdt_thread_info *mti) obj = mdt_object_find(mti->mti_ctxt, mdt, mti->mti_rr.rr_fid1); LASSERT(!IS_ERR(obj)); mo_attr_get(mti->mti_ctxt, mdt_object_child(obj), &mti->mti_attr); - mdt_pack_attr2body(body, &mti->mti_attr.ma_attr, - mti->mti_rr.rr_fid1); + mdt_pack_attr2body(body, &mti->mti_attr.ma_attr, mdt_object_fid(obj)); /* Don't return OST-specific attributes if we didn't just set them */ /* diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index a6a224e..8b27d94 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -73,7 +73,7 @@ static int mdt_md_create(struct mdt_thread_info *info) if (rc == 0) { /* return fid & attr to client. */ if (ma->ma_valid & MA_INODE) - mdt_pack_attr2body(repbody, &ma->ma_attr, + mdt_pack_attr2body(repbody, &ma->ma_attr, mdt_object_fid(child)); } mdt_object_put(info->mti_ctxt, child); @@ -115,79 +115,165 @@ static int mdt_md_mkobj(struct mdt_thread_info *info) RETURN(rc); } - /* In the raw-setattr case, we lock the child inode. * In the write-back case or if being called from open, * the client holds a lock already. * We use the ATTR_FROM_OPEN (translated into MRF_SETATTR_LOCKED by * mdt_setattr_unpack()) flag to tell these cases apart. */ -static int mdt_reint_setattr(struct mdt_thread_info *info) +int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags) { - struct lu_attr *attr = &info->mti_attr.ma_attr; - struct mdt_reint_record *rr = &info->mti_rr; - struct ptlrpc_request *req = mdt_info_req(info); - struct mdt_object *mo; + struct md_attr *ma = &info->mti_attr; struct md_object *next; struct mdt_lock_handle *lh; - struct mdt_body *repbody; - int rc; - + int som_update = 0; + int rc; ENTRY; - DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1), - (unsigned int)attr->la_valid); + if (info->mti_epoch) + som_update = (info->mti_epoch->flags & MF_SOM_CHANGE); + /* Try to avoid object_lock if another epoch has been started + * already. */ + if (som_update && (info->mti_epoch->ioepoch != mo->mot_ioepoch)) + RETURN(0); + lh = &info->mti_lh[MDT_LH_PARENT]; lh->mlh_mode = LCK_EX; - if (rr->rr_flags & MRF_SETATTR_LOCKED) { - mo = mdt_object_find(info->mti_ctxt, info->mti_mdt, - rr->rr_fid1); - } else { + if (!(flags & MRF_SETATTR_LOCKED)) { __u64 lockpart = MDS_INODELOCK_UPDATE; - if (attr->la_valid & (LA_MODE|LA_UID|LA_GID)) + if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID)) lockpart |= MDS_INODELOCK_LOOKUP; - mo = mdt_object_find_lock(info, rr->rr_fid1, lh, lockpart); + rc = mdt_object_lock(info, mo, lh, lockpart); + if (rc != 0) + GOTO(out, rc); } - if (IS_ERR(mo)) - RETURN(rc = PTR_ERR(mo)); + /* Setattrs are syncronized through dlm lock taken above. If another + * epoch started, its attributes may be already flushed on disk, + * skip setattr. */ + next = mdt_object_child(mo); + if (som_update && (info->mti_epoch->ioepoch != mo->mot_ioepoch)) + GOTO(out, rc = 0); + next = mdt_object_child(mo); if (lu_object_assert_not_exists(&mo->mot_obj.mo_lu)) - GOTO(out_unlock, rc = -ENOENT); + GOTO(out, rc = -ENOENT); /* all attrs are packed into mti_attr in unpack_setattr */ mdt_fail_write(info->mti_ctxt, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_SETATTR_WRITE); - rc = mo_attr_set(info->mti_ctxt, next, &info->mti_attr); + /* all attrs are packed into mti_attr in unpack_setattr */ + rc = mo_attr_set(info->mti_ctxt, next, ma); if (rc != 0) - GOTO(out_unlock, rc); + GOTO(out, rc); - info->mti_attr.ma_need = MA_INODE; - rc = mo_attr_get(info->mti_ctxt, next, &info->mti_attr); - if (rc != 0) - GOTO(out_unlock, rc); + /* Re-enable SIZEONMDS. */ + if (som_update) { + CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n", + mo->mot_ioepoch, PFID(mdt_object_fid(mo)), + mo->mot_epochcount); + + mdt_sizeonmds_enable(info, mo); + } + + EXIT; +out: + mdt_object_unlock(info, mo, lh, rc); + return(rc); +} + +static int mdt_reint_setattr(struct mdt_thread_info *info) +{ + struct md_attr *ma = &info->mti_attr; + struct mdt_reint_record *rr = &info->mti_rr; + struct ptlrpc_request *req = mdt_info_req(info); + struct mdt_export_data *med = &req->rq_export->exp_mdt_data; + struct mdt_file_data *mfd; + struct mdt_object *mo; + struct md_object *next; + struct mdt_body *repbody; + int rc; + + ENTRY; + + DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1), + (unsigned int)ma->ma_attr.la_valid); repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); - mdt_pack_attr2body(repbody, attr, mdt_object_fid(mo)); - - /* don't return OST-specific attributes if we didn't just set them. - if (valid & ATTR_SIZE) - repbody->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; - if (valid & (ATTR_MTIME | ATTR_MTIME_SET)) - repbody->valid |= OBD_MD_FLMTIME; - if (valid & (ATTR_ATIME | ATTR_ATIME_SET)) - repbody->valid |= OBD_MD_FLATIME; - */ - GOTO(out_unlock, rc); -out_unlock: - mdt_object_unlock_put(info, mo, lh, rc); + mo = mdt_object_find(info->mti_ctxt, info->mti_mdt, rr->rr_fid1); + if (IS_ERR(mo)) + RETURN(rc = PTR_ERR(mo)); + + if (info->mti_epoch && (info->mti_epoch->flags & MF_EPOCH_OPEN)) { + /* Truncate case. */ + rc = mdt_write_get(info->mti_mdt, mo); + if (rc) + GOTO(out, rc); + + mfd = mdt_mfd_new(); + if (mfd == NULL) + GOTO(out, rc = -ENOMEM); + + /* FIXME: in recovery, need to pass old epoch here */ + mdt_epoch_open(info, mo, 0); + repbody->ioepoch = mo->mot_ioepoch; + + mdt_object_get(info->mti_ctxt, mo); + mfd->mfd_mode = FMODE_EPOCHLCK; + mfd->mfd_object = mo; + mfd->mfd_xid = req->rq_xid; + + spin_lock(&med->med_open_lock); + list_add(&mfd->mfd_list, &med->med_open_head); + spin_unlock(&med->med_open_lock); + repbody->handle.cookie = mfd->mfd_handle.h_cookie; + } + + rc = mdt_attr_set(info, mo, rr->rr_flags); + if (rc) + GOTO(out, rc); + + if (info->mti_epoch && (info->mti_epoch->flags & MF_SOM_CHANGE)) { + LASSERT(info->mti_epoch); + + /* Size-on-MDS Update. Find and free mfd. */ + spin_lock(&med->med_open_lock); + mfd = mdt_handle2mfd(&(info->mti_epoch->handle)); + if (mfd == NULL) { + spin_unlock(&med->med_open_lock); + CDEBUG(D_INODE, "no handle for file close: " + "fid = "DFID": cookie = "LPX64"\n", + PFID(info->mti_rr.rr_fid1), + info->mti_epoch->handle.cookie); + GOTO(out, rc = -ESTALE); + } + + LASSERT(mfd->mfd_mode == FMODE_SOM); + LASSERT(ma->ma_attr.la_valid & LA_SIZE); + LASSERT(!(info->mti_epoch->flags & MF_EPOCH_CLOSE)); + + class_handle_unhash(&mfd->mfd_handle); + list_del_init(&mfd->mfd_list); + spin_unlock(&med->med_open_lock); + mdt_mfd_close(info, mfd); + } + + ma->ma_need = MA_INODE; + next = mdt_object_child(mo); + rc = mo_attr_get(info->mti_ctxt, next, ma); + if (rc != 0) + GOTO(out, rc); + + mdt_pack_attr2body(repbody, &ma->ma_attr, mdt_object_fid(mo)); + EXIT; +out: + mdt_object_put(info->mti_ctxt, mo); return rc; } - static int mdt_reint_create(struct mdt_thread_info *info) { int rc; diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c index 53b9644..47b7d5c 100644 --- a/lustre/obdclass/class_obd.c +++ b/lustre/obdclass/class_obd.c @@ -417,6 +417,7 @@ EXPORT_SYMBOL(lustre_uuid_to_peer); EXPORT_SYMBOL(class_handle_hash); EXPORT_SYMBOL(class_handle_unhash); +EXPORT_SYMBOL(class_handle_hash_back); EXPORT_SYMBOL(class_handle2object); /* obd_config.c */ diff --git a/lustre/obdclass/linux/linux-obdo.c b/lustre/obdclass/linux/linux-obdo.c index d0ec149..df09f8d 100644 --- a/lustre/obdclass/linux/linux-obdo.c +++ b/lustre/obdclass/linux/linux-obdo.c @@ -44,89 +44,6 @@ #include #include /* for PAGE_CACHE_SIZE */ -void obdo_from_iattr(struct obdo *oa, struct iattr *attr, unsigned int ia_valid) -{ - if (ia_valid & ATTR_ATIME) { - oa->o_atime = LTIME_S(attr->ia_atime); - oa->o_valid |= OBD_MD_FLATIME; - } - if (ia_valid & ATTR_MTIME) { - oa->o_mtime = LTIME_S(attr->ia_mtime); - oa->o_valid |= OBD_MD_FLMTIME; - } - if (ia_valid & ATTR_CTIME) { - oa->o_ctime = LTIME_S(attr->ia_ctime); - oa->o_valid |= OBD_MD_FLCTIME; - } - if (ia_valid & ATTR_SIZE) { - oa->o_size = attr->ia_size; - oa->o_valid |= OBD_MD_FLSIZE; - } - if (ia_valid & ATTR_MODE) { - oa->o_mode = attr->ia_mode; - oa->o_valid |= OBD_MD_FLTYPE | OBD_MD_FLMODE; - if (!in_group_p(oa->o_gid) && !capable(CAP_FSETID)) - oa->o_mode &= ~S_ISGID; - } - if (ia_valid & ATTR_UID) { - oa->o_uid = attr->ia_uid; - oa->o_valid |= OBD_MD_FLUID; - } - if (ia_valid & ATTR_GID) { - oa->o_gid = attr->ia_gid; - oa->o_valid |= OBD_MD_FLGID; - } -} -EXPORT_SYMBOL(obdo_from_iattr); - -void iattr_from_obdo(struct iattr *attr, struct obdo *oa, obd_flag valid) -{ - valid &= oa->o_valid; - - if (valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME)) - CDEBUG(D_INODE, "valid "LPX64", new time "LPU64"/"LPU64"\n", - oa->o_valid, oa->o_mtime, oa->o_ctime); - - attr->ia_valid = 0; - if (valid & OBD_MD_FLATIME) { - LTIME_S(attr->ia_atime) = oa->o_atime; - attr->ia_valid |= ATTR_ATIME; - } - if (valid & OBD_MD_FLMTIME) { - LTIME_S(attr->ia_mtime) = oa->o_mtime; - attr->ia_valid |= ATTR_MTIME; - } - if (valid & OBD_MD_FLCTIME) { - LTIME_S(attr->ia_ctime) = oa->o_ctime; - attr->ia_valid |= ATTR_CTIME; - } - if (valid & OBD_MD_FLSIZE) { - attr->ia_size = oa->o_size; - attr->ia_valid |= ATTR_SIZE; - } -#if 0 /* you shouldn't be able to change a file's type with setattr */ - if (valid & OBD_MD_FLTYPE) { - attr->ia_mode = (attr->ia_mode & ~S_IFMT)|(oa->o_mode & S_IFMT); - attr->ia_valid |= ATTR_MODE; - } -#endif - if (valid & OBD_MD_FLMODE) { - attr->ia_mode = (attr->ia_mode & S_IFMT)|(oa->o_mode & ~S_IFMT); - attr->ia_valid |= ATTR_MODE; - if (!in_group_p(oa->o_gid) && !capable(CAP_FSETID)) - attr->ia_mode &= ~S_ISGID; - } - if (valid & OBD_MD_FLUID) { - attr->ia_uid = oa->o_uid; - attr->ia_valid |= ATTR_UID; - } - if (valid & OBD_MD_FLGID) { - attr->ia_gid = oa->o_gid; - attr->ia_valid |= ATTR_GID; - } -} -EXPORT_SYMBOL(iattr_from_obdo); - /* WARNING: the file systems must take care not to tinker with attributes they don't manage (such as blocks). */ void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid) @@ -316,4 +233,3 @@ void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid) } EXPORT_SYMBOL(obdo_to_inode); #endif - diff --git a/lustre/obdclass/llog_swab.c b/lustre/obdclass/llog_swab.c index 91a331d..47d1ddd 100644 --- a/lustre/obdclass/llog_swab.c +++ b/lustre/obdclass/llog_swab.c @@ -112,7 +112,7 @@ void lustre_swab_llog_rec(struct llog_rec_hdr *rec, struct llog_rec_tail *tail) (struct llog_size_change_rec *)rec; lustre_swab_ll_fid(&lsc->lsc_fid); - __swab32s(&lsc->lsc_io_epoch); + __swab32s(&lsc->lsc_ioepoch); break; } diff --git a/lustre/obdclass/lustre_handles.c b/lustre/obdclass/lustre_handles.c index 21d6f50..d4840ed 100644 --- a/lustre/obdclass/lustre_handles.c +++ b/lustre/obdclass/lustre_handles.c @@ -105,6 +105,20 @@ void class_handle_unhash(struct portals_handle *h) spin_unlock(&handle_lock); } +void class_handle_hash_back(struct portals_handle *h) +{ + struct list_head *bucket; + ENTRY; + + spin_lock(&handle_lock); + bucket = handle_hash + (h->h_cookie & HANDLE_HASH_MASK); + list_add(&h->h_link, bucket); + handle_count++; + spin_unlock(&handle_lock); + + EXIT; +} + void *class_handle2object(__u64 cookie) { struct list_head *bucket, *tmp; diff --git a/lustre/obdclass/obdo.c b/lustre/obdclass/obdo.c index 94e70bb..9476e4d 100644 --- a/lustre/obdclass/obdo.c +++ b/lustre/obdclass/obdo.c @@ -128,3 +128,117 @@ void obdo_to_ioobj(struct obdo *oa, struct obd_ioobj *ioobj) ioobj->ioo_type = oa->o_mode; } EXPORT_SYMBOL(obdo_to_ioobj); + +void obdo_from_iattr(struct obdo *oa, struct iattr *attr, unsigned int ia_valid) +{ + if (ia_valid & ATTR_ATIME) { + oa->o_atime = LTIME_S(attr->ia_atime); + oa->o_valid |= OBD_MD_FLATIME; + } + if (ia_valid & ATTR_MTIME) { + oa->o_mtime = LTIME_S(attr->ia_mtime); + oa->o_valid |= OBD_MD_FLMTIME; + } + if (ia_valid & ATTR_CTIME) { + oa->o_ctime = LTIME_S(attr->ia_ctime); + oa->o_valid |= OBD_MD_FLCTIME; + } + if (ia_valid & ATTR_SIZE) { + oa->o_size = attr->ia_size; + oa->o_valid |= OBD_MD_FLSIZE; + } + if (ia_valid & ATTR_MODE) { + oa->o_mode = attr->ia_mode; + oa->o_valid |= OBD_MD_FLTYPE | OBD_MD_FLMODE; + if (!in_group_p(oa->o_gid) && !capable(CAP_FSETID)) + oa->o_mode &= ~S_ISGID; + } + if (ia_valid & ATTR_UID) { + oa->o_uid = attr->ia_uid; + oa->o_valid |= OBD_MD_FLUID; + } + if (ia_valid & ATTR_GID) { + oa->o_gid = attr->ia_gid; + oa->o_valid |= OBD_MD_FLGID; + } +} +EXPORT_SYMBOL(obdo_from_iattr); + +void iattr_from_obdo(struct iattr *attr, struct obdo *oa, obd_flag valid) +{ + valid &= oa->o_valid; + + if (valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME)) + CDEBUG(D_INODE, "valid "LPX64", new time "LPU64"/"LPU64"\n", + oa->o_valid, oa->o_mtime, oa->o_ctime); + + attr->ia_valid = 0; + if (valid & OBD_MD_FLATIME) { + LTIME_S(attr->ia_atime) = oa->o_atime; + attr->ia_valid |= ATTR_ATIME; + } + if (valid & OBD_MD_FLMTIME) { + LTIME_S(attr->ia_mtime) = oa->o_mtime; + attr->ia_valid |= ATTR_MTIME; + } + if (valid & OBD_MD_FLCTIME) { + LTIME_S(attr->ia_ctime) = oa->o_ctime; + attr->ia_valid |= ATTR_CTIME; + } + if (valid & OBD_MD_FLSIZE) { + attr->ia_size = oa->o_size; + attr->ia_valid |= ATTR_SIZE; + } +#if 0 /* you shouldn't be able to change a file's type with setattr */ + if (valid & OBD_MD_FLTYPE) { + attr->ia_mode = (attr->ia_mode & ~S_IFMT)|(oa->o_mode & S_IFMT); + attr->ia_valid |= ATTR_MODE; + } +#endif + if (valid & OBD_MD_FLMODE) { + attr->ia_mode = (attr->ia_mode & S_IFMT)|(oa->o_mode & ~S_IFMT); + attr->ia_valid |= ATTR_MODE; + if (!in_group_p(oa->o_gid) && !capable(CAP_FSETID)) + attr->ia_mode &= ~S_ISGID; + } + if (valid & OBD_MD_FLUID) { + attr->ia_uid = oa->o_uid; + attr->ia_valid |= ATTR_UID; + } + if (valid & OBD_MD_FLGID) { + attr->ia_gid = oa->o_gid; + attr->ia_valid |= ATTR_GID; + } +} +EXPORT_SYMBOL(iattr_from_obdo); + +void md_from_obdo(struct md_op_data *op_data, struct obdo *oa, obd_flag valid) +{ + iattr_from_obdo(&op_data->attr, oa, valid); + if (valid & OBD_MD_FLBLOCKS) { + op_data->attr_blocks = oa->o_blocks; + op_data->attr.ia_valid |= ATTR_BLOCKS; + } + if (valid & OBD_MD_FLFLAGS) { + ((struct ll_iattr *)&op_data->attr)->ia_attr_flags = + oa->o_flags; + op_data->attr.ia_valid |= ATTR_ATTR_FLAG; + } +} +EXPORT_SYMBOL(md_from_obdo); + +void obdo_from_md(struct obdo *oa, struct md_op_data *op_data, + unsigned int valid) +{ + obdo_from_iattr(oa, &op_data->attr, valid); + if (valid & ATTR_BLOCKS) { + oa->o_blocks = op_data->attr_blocks; + oa->o_valid |= OBD_MD_FLBLOCKS; + } + if (valid & ATTR_ATTR_FLAG) { + oa->o_flags = + ((struct ll_iattr *)&op_data->attr)->ia_attr_flags; + oa->o_valid |= OBD_MD_FLFLAGS; + } +} +EXPORT_SYMBOL(obdo_from_md); diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index 4a3516f..4021e6c 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -160,7 +160,7 @@ struct ost_filterdata { }; int filter_log_sz_change(struct llog_handle *cathandle, struct ll_fid *mds_fid, - __u32 io_epoch, + __u32 ioepoch, struct llog_cookie *logcookie, struct inode *inode); //int filter_get_catalog(struct obd_device *); diff --git a/lustre/obdfilter/filter_log.c b/lustre/obdfilter/filter_log.c index d8cebef..4ee0b79 100644 --- a/lustre/obdfilter/filter_log.c +++ b/lustre/obdfilter/filter_log.c @@ -42,7 +42,7 @@ int filter_log_sz_change(struct llog_handle *cathandle, struct ll_fid *mds_fid, - __u32 io_epoch, + __u32 ioepoch, struct llog_cookie *logcookie, struct inode *inode) { @@ -54,23 +54,23 @@ int filter_log_sz_change(struct llog_handle *cathandle, LOCK_INODE_MUTEX(inode); ofd = inode->i_filterdata; - if (ofd && ofd->ofd_epoch >= io_epoch) { - if (ofd->ofd_epoch > io_epoch) + if (ofd && ofd->ofd_epoch >= ioepoch) { + if (ofd->ofd_epoch > ioepoch) CERROR("client sent old epoch %d for obj ino %ld\n", - io_epoch, inode->i_ino); + ioepoch, inode->i_ino); UNLOCK_INODE_MUTEX(inode); RETURN(0); } - if (ofd && ofd->ofd_epoch < io_epoch) { - ofd->ofd_epoch = io_epoch; + if (ofd && ofd->ofd_epoch < ioepoch) { + ofd->ofd_epoch = ioepoch; } else if (!ofd) { OBD_ALLOC(ofd, sizeof(*ofd)); if (!ofd) GOTO(out, rc = -ENOMEM); igrab(inode); inode->i_filterdata = ofd; - ofd->ofd_epoch = io_epoch; + ofd->ofd_epoch = ioepoch; } /* the decision to write a record is now made, unlock */ UNLOCK_INODE_MUTEX(inode); @@ -81,7 +81,7 @@ int filter_log_sz_change(struct llog_handle *cathandle, lsc->lsc_hdr.lrh_len = lsc->lsc_tail.lrt_len = sizeof(*lsc); lsc->lsc_hdr.lrh_type = OST_SZ_REC; lsc->lsc_fid = *mds_fid; - lsc->lsc_io_epoch = io_epoch; + lsc->lsc_ioepoch = ioepoch; rc = llog_cat_add_rec(cathandle, &lsc->lsc_hdr, logcookie, NULL); OBD_FREE(lsc, sizeof(*lsc)); diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index e4a1e02..431f1d5 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -73,6 +73,12 @@ static const struct req_msg_field *mdt_body_only[] = { &RMF_MDT_BODY }; +static const struct req_msg_field *mdt_close_msg[] = { + &RMF_PTLRPC_BODY, + &RMF_MDT_EPOCH, + &RMF_REC_SETATTR +}; + static const struct req_msg_field *mds_statfs_server[] = { &RMF_PTLRPC_BODY, &RMF_OBD_STATFS @@ -174,6 +180,7 @@ static const struct req_msg_field *mds_last_unlink_server[] = { static const struct req_msg_field *mds_reint_setattr_client[] = { &RMF_PTLRPC_BODY, &RMF_REC_SETATTR, + &RMF_MDT_EPOCH, &RMF_EADATA, &RMF_LOGCOOKIES }; @@ -362,6 +369,11 @@ const struct req_msg_field RMF_MDT_BODY = sizeof(struct mdt_body), lustre_swab_mdt_body); EXPORT_SYMBOL(RMF_MDT_BODY); +const struct req_msg_field RMF_MDT_EPOCH = + DEFINE_MSGF("mdt_epoch", 0, + sizeof(struct mdt_epoch), lustre_swab_mdt_epoch); +EXPORT_SYMBOL(RMF_MDT_EPOCH); + const struct req_msg_field RMF_PTLRPC_BODY = DEFINE_MSGF("ptlrpc_body", 0, sizeof(struct ptlrpc_body), lustre_swab_ptlrpc_body); @@ -620,7 +632,7 @@ EXPORT_SYMBOL(RQF_LDLM_INTENT_UNLINK); const struct req_format RQF_MDS_CLOSE = DEFINE_REQ_FMT0("MDS_CLOSE", - mdt_body_only, mds_last_unlink_server); + mdt_close_msg, mds_last_unlink_server); EXPORT_SYMBOL(RQF_MDS_CLOSE); const struct req_format RQF_MDS_PIN = @@ -630,7 +642,7 @@ EXPORT_SYMBOL(RQF_MDS_PIN); const struct req_format RQF_MDS_DONE_WRITING = DEFINE_REQ_FMT0("MDS_DONE_WRITING", - mdt_body_only, mdt_body_only); + mdt_close_msg, mdt_body_only); EXPORT_SYMBOL(RQF_MDS_DONE_WRITING); const struct req_format RQF_MDS_READPAGE = diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index f88fa3f..764ead0 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -1644,7 +1644,7 @@ void lustre_swab_mdt_body (struct mdt_body *b) __swab64s (&b->atime); __swab64s (&b->ctime); __swab64s (&b->blocks); - __swab64s (&b->io_epoch); + __swab64s (&b->ioepoch); __swab32s (&b->fsuid); __swab32s (&b->fsgid); __swab32s (&b->capability); @@ -1661,6 +1661,13 @@ void lustre_swab_mdt_body (struct mdt_body *b) __swab32s (&b->max_cookiesize); } +void lustre_swab_mdt_epoch (struct mdt_body *b) +{ + /* handle is opaque */ + __swab64s (&b->ioepoch); + __swab32s (&b->flags); +} + void lustre_swab_mgs_target_info(struct mgs_target_info *mti) { int i; @@ -1737,6 +1744,7 @@ void lustre_swab_mdt_rec_setattr (struct mdt_rec_setattr *sa) lustre_swab_lu_fid (&sa->sa_fid); __swab64s (&sa->sa_valid); __swab64s (&sa->sa_size); + __swab64s (&sa->sa_blocks); __swab64s (&sa->sa_mtime); __swab64s (&sa->sa_atime); __swab64s (&sa->sa_ctime); diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index b1c852b..882bdae 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -217,6 +217,7 @@ EXPORT_SYMBOL(lustre_swab_ost_lvb); EXPORT_SYMBOL(lustre_swab_mds_status_req); EXPORT_SYMBOL(lustre_swab_mds_body); EXPORT_SYMBOL(lustre_swab_mdt_body); +EXPORT_SYMBOL(lustre_swab_mdt_epoch); EXPORT_SYMBOL(lustre_swab_obd_quotactl); EXPORT_SYMBOL(lustre_swab_mds_rec_setattr); EXPORT_SYMBOL(lustre_swab_mdt_rec_setattr); diff --git a/lustre/tests/oos.sh b/lustre/tests/oos.sh index 3da2ceb..739db98 100755 --- a/lustre/tests/oos.sh +++ b/lustre/tests/oos.sh @@ -79,6 +79,8 @@ fi rm -f $OOS +sync; sleep 3; sync + if [ $SUCCESS -eq 1 ]; then echo "Success!" rm -f $LOG diff --git a/lustre/utils/req-layout.c b/lustre/utils/req-layout.c index 8c64eb9..fed8092 100644 --- a/lustre/utils/req-layout.c +++ b/lustre/utils/req-layout.c @@ -30,15 +30,16 @@ #include #include +#include #include #define __REQ_LAYOUT_USER__ (1) -#define ARRAY_SIZE(a) ((sizeof (a))/(sizeof (a)[0])) #define lustre_swab_generic_32s NULL #define lustre_swab_lu_range NULL #define lustre_swab_md_fld NULL #define lustre_swab_mdt_body NULL +#define lustre_swab_mdt_epoch NULL #define lustre_swab_ptlrpc_body NULL #define lustre_swab_obd_statfs NULL #define lustre_swab_connect NULL @@ -52,8 +53,6 @@ #define lustre_swab_mdt_rec_create NULL #define lustre_swab_mdt_rec_setattr NULL -#define EXPORT_SYMBOL(name) - /* * Yes, include .c file. */ diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index 189d133..b97130f 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -807,7 +807,7 @@ check_llog_size_change_rec(void) CHECK_STRUCT(llog_size_change_rec); CHECK_MEMBER(llog_size_change_rec, lsc_hdr); CHECK_MEMBER(llog_size_change_rec, lsc_fid); - CHECK_MEMBER(llog_size_change_rec, lsc_io_epoch); + CHECK_MEMBER(llog_size_change_rec, lsc_ioepoch); CHECK_MEMBER(llog_size_change_rec, padding); CHECK_MEMBER(llog_size_change_rec, lsc_tail); } -- 1.8.3.1