From 5517eab06eb99e4ecb66be251a10e70c37547610 Mon Sep 17 00:00:00 2001 From: Jinshan Xiong Date: Thu, 6 Dec 2012 09:34:15 -0800 Subject: [PATCH] LU-1876 hsm: layout lock implementation on server side * enabled OBD_CONNECT_LAYOUTLOCK both on client and mdt * mdt_object_lock_try() is implemented to try layout lock * dlm flag LDLM_FL_BLOCK_NOWAIT is added to implement nonblock mode dlm lock enqueue * try layout lock for IT_GETATTR and IT_OPEN * make layout lock work * add layout intent support. Signed-off-by: Jinshan Xiong Change-Id: I84a6dfca83ded2344240fa5430c9922b7cfc7bcf Reviewed-on: http://review.whamcloud.com/4417 Tested-by: Hudson Tested-by: Maloo Reviewed-by: wangdi Reviewed-by: jacques-Charles Lafoucriere Reviewed-by: Oleg Drokin --- lustre/include/cl_object.h | 21 +++- lustre/include/lustre/lustre_idl.h | 29 ++++- lustre/include/lustre_dlm.h | 7 ++ lustre/include/lustre_export.h | 5 + lustre/include/lustre_fid.h | 10 ++ lustre/include/lustre_req_layout.h | 2 + lustre/include/obd_support.h | 6 + lustre/lclient/glimpse.c | 2 + lustre/lclient/lcommon_cl.c | 6 +- lustre/ldlm/ldlm_inodebits.c | 10 +- lustre/ldlm/ldlm_lock.c | 17 ++- lustre/ldlm/ldlm_lockd.c | 12 +- lustre/ldlm/ldlm_resource.c | 2 + lustre/liblustre/llite_lib.h | 2 + lustre/llite/dcache.c | 24 +--- lustre/llite/file.c | 229 ++++++++++++++++++++++++++----------- lustre/llite/llite_internal.h | 3 +- lustre/llite/llite_lib.c | 62 +++++----- lustre/llite/llite_nfs.c | 2 +- lustre/llite/namei.c | 20 ++-- lustre/llite/statahead.c | 2 +- lustre/llite/vvp_io.c | 6 + lustre/llite/vvp_object.c | 16 ++- lustre/lov/lov_lock.c | 1 + lustre/lov/lov_object.c | 32 +++++- lustre/mdc/mdc_locks.c | 138 +++++++++++++++------- lustre/mdt/mdt_handler.c | 146 ++++++++++++++++------- lustre/mdt/mdt_internal.h | 6 + lustre/mdt/mdt_lvb.c | 92 +++++++++++++-- lustre/mdt/mdt_open.c | 177 ++++++++++++++++++++-------- lustre/ptlrpc/layout.c | 20 +++- lustre/ptlrpc/pack_generic.c | 9 +- lustre/ptlrpc/wiretest.c | 34 ++++++ lustre/tests/multiop.c | 9 +- lustre/tests/sanity.sh | 35 ++++++ lustre/tests/sanityn.sh | 75 ++++++++++++ lustre/utils/req-layout.c | 1 + lustre/utils/wirecheck.c | 19 +++ lustre/utils/wiretest.c | 34 ++++++ 39 files changed, 1029 insertions(+), 294 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index 3fbea9c..e78d05a 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -277,10 +277,25 @@ struct cl_object_conf { */ struct inode *coc_inode; /** - * Invalidate the current stripe configuration due to losing - * layout lock. + * Layout lock handle. */ - bool coc_invalidate; + struct ldlm_lock *coc_lock; + /** + * Operation to handle layout, OBJECT_CONF_XYZ. + */ + int coc_opc; +}; + +enum { + /** configure layout, set up a new stripe, must be called while + * holding layout lock. */ + OBJECT_CONF_SET = 0, + /** invalidate the current stripe configuration due to losing + * layout lock. */ + OBJECT_CONF_INVALIDATE = 1, + /** wait for old layout to go away so that new layout can be + * set up. */ + OBJECT_CONF_WAIT = 2 }; /** diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 797d2c9..3c2e9cb 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -1236,7 +1236,7 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); OBD_CONNECT_64BITHASH | OBD_CONNECT_JOBSTATS | \ OBD_CONNECT_EINPROGRESS | \ OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_UMASK | \ - OBD_CONNECT_LVB_TYPE) + OBD_CONNECT_LVB_TYPE | OBD_CONNECT_LAYOUTLOCK) #define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \ OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \ OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \ @@ -1251,7 +1251,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); OBD_CONNECT_MAX_EASIZE | \ OBD_CONNECT_EINPROGRESS | \ OBD_CONNECT_JOBSTATS | \ - OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_LVB_TYPE) + OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_LVB_TYPE|\ + OBD_CONNECT_LAYOUTLOCK) #define ECHO_CONNECT_SUPPORTED (0) #define MGS_CONNECT_SUPPORTED (OBD_CONNECT_VERSION | OBD_CONNECT_AT | \ OBD_CONNECT_FULL20 | OBD_CONNECT_IMP_RECOV | \ @@ -1894,10 +1895,7 @@ extern void lustre_swab_generic_32s (__u32 *val); #define MDS_INODELOCK_OPEN 0x000004 /* For opened files */ #define MDS_INODELOCK_LAYOUT 0x000008 /* for layout */ -/* Do not forget to increase MDS_INODELOCK_MAXSHIFT when adding new bits - * XXX: MDS_INODELOCK_MAXSHIFT should be increased to 3 once the layout lock is - * supported */ -#define MDS_INODELOCK_MAXSHIFT 2 +#define MDS_INODELOCK_MAXSHIFT 3 /* This FULL lock is useful to take on unlink sort of operations */ #define MDS_INODELOCK_FULL ((1<<(MDS_INODELOCK_MAXSHIFT+1))-1) @@ -3275,6 +3273,25 @@ struct getinfo_fid2path { void lustre_swab_fid2path (struct getinfo_fid2path *gf); +enum { + LAYOUT_INTENT_ACCESS = 0, + LAYOUT_INTENT_READ = 1, + LAYOUT_INTENT_WRITE = 2, + LAYOUT_INTENT_GLIMPSE = 3, + LAYOUT_INTENT_TRUNC = 4, + LAYOUT_INTENT_RELEASE = 5, + LAYOUT_INTENT_RESTORE = 6 +}; + +/* enqueue layout lock with intent */ +struct layout_intent { + __u32 li_opc; /* intent operation for enqueue, read, write etc */ + __u32 li_flags; + __u64 li_start; + __u64 li_end; +}; + +void lustre_swab_layout_intent(struct layout_intent *li); #endif /** @} lustreidl */ diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 08b6e7d..5ad5f9b 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -90,6 +90,7 @@ typedef enum { ELDLM_LOCK_ABORTED = 301, ELDLM_LOCK_REPLACED = 302, ELDLM_NO_LOCK_DATA = 303, + ELDLM_LOCK_WOULDBLOCK = 304, ELDLM_NAMESPACE_EXISTS = 400, ELDLM_BAD_NAMESPACE = 401 @@ -1153,6 +1154,12 @@ struct ldlm_resource { struct inode *lr_lvb_inode; }; +static inline bool ldlm_has_layout(struct ldlm_lock *lock) +{ + return lock->l_resource->lr_type == LDLM_IBITS && + lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_LAYOUT; +} + static inline char * ldlm_ns_name(struct ldlm_namespace *ns) { diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index 6376a9e..60595cd 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -330,6 +330,11 @@ static inline int imp_connect_lru_resize(struct obd_import *imp) return !!(ocd->ocd_connect_flags & OBD_CONNECT_LRU_RESIZE); } +static inline int exp_connect_layout(struct obd_export *exp) +{ + return !!(exp->exp_connect_flags & OBD_CONNECT_LAYOUTLOCK); +} + static inline bool exp_connect_lvb_type(struct obd_export *exp) { LASSERT(exp != NULL); diff --git a/lustre/include/lustre_fid.h b/lustre/include/lustre_fid.h index 99b88be..bd045cb 100644 --- a/lustre/include/lustre_fid.h +++ b/lustre/include/lustre_fid.h @@ -498,6 +498,16 @@ static inline int fid_res_name_eq(const struct lu_fid *f, name->name[LUSTRE_RES_ID_VER_OID_OFF] == fid_ver_oid(f); } +/* reverse function of fid_build_reg_res_name() */ +static inline void fid_build_from_res_name(struct lu_fid *f, + const struct ldlm_res_id *name) +{ + fid_zero(f); + f->f_seq = name->name[LUSTRE_RES_ID_SEQ_OFF]; + f->f_oid = name->name[LUSTRE_RES_ID_VER_OID_OFF] & 0xffffffff; + f->f_ver = name->name[LUSTRE_RES_ID_VER_OID_OFF] >> 32; + LASSERT(fid_res_name_eq(f, name)); +} static inline struct ldlm_res_id * fid_build_pdo_res_name(const struct lu_fid *f, diff --git a/lustre/include/lustre_req_layout.h b/lustre/include/lustre_req_layout.h index 2d09357..697f736 100644 --- a/lustre/include/lustre_req_layout.h +++ b/lustre/include/lustre_req_layout.h @@ -211,6 +211,7 @@ extern struct req_format RQF_LDLM_ENQUEUE_LVB; extern struct req_format RQF_LDLM_CONVERT; extern struct req_format RQF_LDLM_INTENT; extern struct req_format RQF_LDLM_INTENT_BASIC; +extern struct req_format RQF_LDLM_INTENT_LAYOUT; extern struct req_format RQF_LDLM_INTENT_GETATTR; extern struct req_format RQF_LDLM_INTENT_OPEN; extern struct req_format RQF_LDLM_INTENT_CREATE; @@ -257,6 +258,7 @@ extern struct req_msg_field RMF_DLM_REP; extern struct req_msg_field RMF_DLM_LVB; extern struct req_msg_field RMF_DLM_GL_DESC; extern struct req_msg_field RMF_LDLM_INTENT; +extern struct req_msg_field RMF_LAYOUT_INTENT; extern struct req_msg_field RMF_MDT_MD; extern struct req_msg_field RMF_REC_REINT; extern struct req_msg_field RMF_EADATA; diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 12ea449..c99152d 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -234,6 +234,11 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_MDS_OSC_CREATE_FAIL 0x147 #define OBD_FAIL_MDS_NEGATIVE_POSITIVE 0x148 +/* layout lock */ +#define OBD_FAIL_MDS_NO_LL_GETATTR 0x170 +#define OBD_FAIL_MDS_NO_LL_OPEN 0x171 +#define OBD_FAIL_MDS_LL_BLOCK 0x172 + /* CMD */ #define OBD_FAIL_MDS_IS_SUBDIR_NET 0x180 #define OBD_FAIL_MDS_IS_SUBDIR_PACK 0x181 @@ -446,6 +451,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_LLITE_FAULT_TRUNC_RACE 0x1401 #define OBD_FAIL_LOCK_STATE_WAIT_INTR 0x1402 #define OBD_FAIL_LOV_INIT 0x1403 +#define OBD_FAIL_GLIMPSE_DELAY 0x1404 /* Assign references to moved code to reduce code changes */ #define OBD_FAIL_PRECHECK(id) CFS_FAIL_PRECHECK(id) diff --git a/lustre/lclient/glimpse.c b/lustre/lclient/glimpse.c index 5d91c75..c4b44de 100644 --- a/lustre/lclient/glimpse.c +++ b/lustre/lclient/glimpse.c @@ -237,6 +237,8 @@ int cl_glimpse_size0(struct inode *inode, int agl) else if (result == 0) result = cl_glimpse_lock(env, io, inode, io->ci_obj, agl); + + OBD_FAIL_TIMEOUT(OBD_FAIL_GLIMPSE_DELAY, 2); cl_io_fini(env, io); if (unlikely(io->ci_need_restart)) goto again; diff --git a/lustre/lclient/lcommon_cl.c b/lustre/lclient/lcommon_cl.c index 5104717..0ad8f1e 100644 --- a/lustre/lclient/lcommon_cl.c +++ b/lustre/lclient/lcommon_cl.c @@ -1193,15 +1193,13 @@ int cl_file_inode_init(struct inode *inode, struct lustre_md *md) /* * No locking is necessary, as new inode is * locked by I_NEW bit. - * - * XXX not true for call from ll_update_inode(). */ lli->lli_clob = clob; + lli->lli_has_smd = md->lsm != NULL; lu_object_ref_add(&clob->co_lu, "inode", inode); } else result = PTR_ERR(clob); - } else - result = cl_conf_set(env, lli->lli_clob, &conf); + } cl_env_put(env, &refcheck); if (result != 0) diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c index e10a654..b68dd59 100644 --- a/lustre/ldlm/ldlm_inodebits.c +++ b/lustre/ldlm/ldlm_inodebits.c @@ -190,8 +190,12 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags, LASSERT(cfs_list_empty(&res->lr_converting)); check_res_locked(res); - if (!first_enq) { - LASSERT(work_list != NULL); + /* (*flags & LDLM_FL_BLOCK_NOWAIT) is for layout lock right now. */ + if (!first_enq || (*flags & LDLM_FL_BLOCK_NOWAIT)) { + *err = ELDLM_LOCK_ABORTED; + if (*flags & LDLM_FL_BLOCK_NOWAIT) + *err = ELDLM_LOCK_WOULDBLOCK; + rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, NULL); if (!rc) RETURN(LDLM_ITER_STOP); @@ -201,6 +205,8 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags, ldlm_resource_unlink_lock(lock); ldlm_grant_lock(lock, work_list); + + *err = ELDLM_OK; RETURN(LDLM_ITER_CONTINUE); } diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index debc6cf..747217b 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -881,6 +881,20 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) ldlm_lock_decref_internal_nolock(lock, mode); + /* release lvb data for layout lock */ + if (ns_is_client(ns) && !lock->l_readers && !lock->l_writers && + ldlm_has_layout(lock) && lock->l_flags & LDLM_FL_LVB_READY) { + /* this is the last user of a layout lock and stripe has + * been set up, lvb is no longer used. + * This may be a large amount of memory, so we should free it + * when possible. */ + if (lock->l_lvb_data != NULL) { + OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len); + lock->l_lvb_data = NULL; + lock->l_lvb_len = 0; + } + } + if (lock->l_flags & LDLM_FL_LOCAL && !lock->l_readers && !lock->l_writers) { /* If this is a local lock on a server namespace and this was @@ -1567,7 +1581,8 @@ int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill, memcpy(data, lvb, size); break; default: - LDLM_ERROR(lock, "Unexpected LVB type"); + LDLM_ERROR(lock, "Unknown LVB type: %d\n", lock->l_lvb_type); + libcfs_debug_dumpstack(NULL); RETURN(-EINVAL); } diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 5517ed5..464a19a 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -1411,9 +1411,9 @@ existing_lock: "(err=%d, rc=%d)", err, rc); if (rc == 0) { - int lvb_len = ldlm_lvbo_size(lock); - - if (lvb_len > 0) { + if (req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB, + RCL_SERVER) && + ldlm_lvbo_size(lock) > 0) { void *buf; int buflen; @@ -1735,12 +1735,13 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, lock->l_lvb_len, lvb_len); GOTO(out, rc = -EINVAL); } - } else { /* for layout lock, lvb has variable length */ + } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has + * variable length */ void *lvb_data; OBD_ALLOC(lvb_data, lvb_len); if (lvb_data == NULL) { - LDLM_ERROR(lock, "No memory.\n"); + LDLM_ERROR(lock, "No memory: %d.\n", lvb_len); GOTO(out, rc = -ENOMEM); } @@ -1829,6 +1830,7 @@ out: lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_FAILED; unlock_res_and_lock(lock); + cfs_waitq_signal(&lock->l_waitq); } LDLM_LOCK_RELEASE(lock); } diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 3ad511a..494661e 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -808,6 +808,8 @@ static int ldlm_resource_complain(cfs_hash_t *hs, cfs_hash_bd_t *bd, res->lr_name.name[0], res->lr_name.name[1], res->lr_name.name[2], res->lr_name.name[3], cfs_atomic_read(&res->lr_refcount) - 1); + + ldlm_resource_dump(D_ERROR, res); return 0; } diff --git a/lustre/liblustre/llite_lib.h b/lustre/liblustre/llite_lib.h index f8e6e34..88b69b9 100644 --- a/lustre/liblustre/llite_lib.h +++ b/lustre/liblustre/llite_lib.h @@ -117,6 +117,8 @@ struct llu_inode_info { * was opened several times without close, we track an * open_count here */ struct ll_file_data *lli_file_data; + /* checking lli_has_smd is reliable only inside an IO + * i.e, lov stripe has been held. */ bool lli_has_smd; int lli_open_flags; int lli_open_count; diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index 892626a..55a1654 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -259,26 +259,14 @@ int ll_dops_init(struct dentry *de, int block, int init_sa) void ll_intent_drop_lock(struct lookup_intent *it) { - struct lustre_handle *handle; - if (it->it_op && it->d.lustre.it_lock_mode) { - struct ldlm_lock *lock; - - handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle; - lock = ldlm_handle2lock(handle); - if (lock != NULL) { - /* it can only be allowed to match after layout is - * applied to inode otherwise false layout would be - * seen. Applying layout shoud happen before dropping - * the intent lock. */ - if (it->d.lustre.it_lock_bits & MDS_INODELOCK_LAYOUT) - ldlm_lock_allow_match(lock); - LDLM_LOCK_PUT(lock); - } + struct lustre_handle handle; + + handle.cookie = it->d.lustre.it_lock_handle; CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64 - " from it %p\n", handle->cookie, it); - ldlm_lock_decref(handle, it->d.lustre.it_lock_mode); + " from it %p\n", handle.cookie, it); + ldlm_lock_decref(&handle, it->d.lustre.it_lock_mode); /* bug 494: intent_release may be called multiple times, from * this thread and we don't want to double-decref this lock */ @@ -351,7 +339,7 @@ int ll_revalidate_it_finish(struct ptlrpc_request *request, if (it_disposition(it, DISP_LOOKUP_NEG)) RETURN(-ENOENT); - rc = ll_prep_inode(&de->d_inode, request, NULL); + rc = ll_prep_inode(&de->d_inode, request, NULL, it); RETURN(rc); } diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 2fbeb61..b540065 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -411,7 +411,7 @@ static int ll_intent_file_open(struct file *file, void *lmm, GOTO(out, rc); } - rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL); + rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL, itp); if (!rc && itp->d.lustre.it_lock_mode) ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode, itp, NULL); @@ -1486,7 +1486,11 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file, rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size); if (rc == 0) { struct lov_stripe_md *lsm; + __u32 gen; + put_user(0, &lumv1p->lmm_stripe_count); + + ll_layout_refresh(inode, &gen); lsm = ccc_inode_lsm_get(inode); rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm, (void *)arg); @@ -2501,7 +2505,7 @@ int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it, RETURN(rc); } - rc = ll_prep_inode(&inode, req, NULL); + rc = ll_prep_inode(&inode, req, NULL, NULL); } out: ptlrpc_req_finished(req); @@ -2909,10 +2913,126 @@ int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf) result = cl_conf_set(env, lli->lli_clob, conf); cl_env_nested_put(&nest, env); + + if (conf->coc_opc == OBJECT_CONF_SET) { + struct ldlm_lock *lock = conf->coc_lock; + + LASSERT(lock != NULL); + LASSERT(ldlm_has_layout(lock)); + if (result == 0) { + /* it can only be allowed to match after layout is + * applied to inode otherwise false layout would be + * seen. Applying layout shoud happen before dropping + * the intent lock. */ + ldlm_lock_allow_match(lock); + } + } RETURN(result); } /** + * Apply the layout to the inode. Layout lock is held and will be released + * in this function. + */ +static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode, + struct inode *inode, __u32 *gen, bool reconf) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ldlm_lock *lock; + struct lustre_md md = { NULL }; + struct cl_object_conf conf; + int rc = 0; + bool lvb_ready; + ENTRY; + + LASSERT(lustre_handle_is_used(lockh)); + + lock = ldlm_handle2lock(lockh); + LASSERT(lock != NULL); + LASSERT(ldlm_has_layout(lock)); + + LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n", + inode, PFID(&lli->lli_fid), reconf); + + lock_res_and_lock(lock); + lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY); + unlock_res_and_lock(lock); + /* checking lvb_ready is racy but this is okay. The worst case is + * that multi processes may configure the file on the same time. */ + if (lvb_ready || !reconf) { + LDLM_LOCK_PUT(lock); + + rc = -ENODATA; + if (lvb_ready) { + /* layout_gen must be valid if layout lock is not + * cancelled and stripe has already set */ + *gen = lli->lli_layout_gen; + rc = 0; + } + ldlm_lock_decref(lockh, mode); + RETURN(rc); + } + + /* for layout lock, lmm is returned in lock's lvb. + * lvb_data is immutable if the lock is held so it's safe to access it + * without res lock. See the description in ldlm_lock_decref_internal() + * for the condition to free lvb_data of layout lock */ + if (lock->l_lvb_data != NULL) { + rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm, + lock->l_lvb_data, lock->l_lvb_len); + if (rc >= 0) { + if (md.lsm != NULL) + *gen = md.lsm->lsm_layout_gen; + rc = 0; + } else { + CERROR("%s: file "DFID" unpackmd error: %d\n", + ll_get_fsname(inode->i_sb, NULL, 0), + PFID(&lli->lli_fid), rc); + } + } + if (rc < 0) { + LDLM_LOCK_PUT(lock); + ldlm_lock_decref(lockh, mode); + RETURN(rc); + } + + /* set layout to file. Unlikely this will fail as old layout was + * surely eliminated */ + memset(&conf, 0, sizeof conf); + conf.coc_opc = OBJECT_CONF_SET; + conf.coc_inode = inode; + conf.coc_lock = lock; + conf.u.coc_md = &md; + rc = ll_layout_conf(inode, &conf); + LDLM_LOCK_PUT(lock); + + ldlm_lock_decref(lockh, mode); + + if (md.lsm != NULL) + obd_free_memmd(sbi->ll_dt_exp, &md.lsm); + + /* wait for IO to complete if it's still being used. */ + if (rc == -EBUSY) { + CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n", + ll_get_fsname(inode->i_sb, NULL, 0), + inode, PFID(&lli->lli_fid)); + + memset(&conf, 0, sizeof conf); + conf.coc_opc = OBJECT_CONF_WAIT; + conf.coc_inode = inode; + rc = ll_layout_conf(inode, &conf); + if (rc == 0) + rc = -EAGAIN; + + CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n", + PFID(&lli->lli_fid), rc); + } + + RETURN(rc); +} + +/** * This function checks if there exists a LAYOUT lock on the client side, * or enqueues it if it doesn't have one in cache. * @@ -2929,9 +3049,9 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen) { struct ll_inode_info *lli = ll_i2info(inode); struct ll_sb_info *sbi = ll_i2sbi(inode); - struct md_op_data *op_data = NULL; - struct lookup_intent it = { .it_op = IT_LAYOUT }; - struct lustre_handle lockh = { 0 }; + struct md_op_data *op_data; + struct lookup_intent it; + struct lustre_handle lockh; ldlm_mode_t mode; struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS, .ei_mode = LCK_CR, @@ -2941,7 +3061,7 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen) int rc; ENTRY; - *gen = 0; + *gen = LL_LAYOUT_GEN_ZERO; if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK)) RETURN(0); @@ -2951,92 +3071,67 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen) /* mostly layout lock is caching on the local side, so try to match * it before grabbing layout lock mutex. */ - mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, - LDLM_FL_LVB_READY); + mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0); if (mode != 0) { /* hit cached lock */ - /* lsm_layout_gen is started from 0, plus 1 here to distinguish - * the cases of no layout and first layout. */ - *gen = lli->lli_layout_gen + 1; + rc = ll_layout_lock_set(&lockh, mode, inode, gen, false); + if (rc == 0) + RETURN(0); - ldlm_lock_decref(&lockh, mode); - RETURN(0); + /* better hold lli_layout_mutex to try again otherwise + * it will have starvation problem. */ } - op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, - 0, 0, LUSTRE_OPC_ANY, NULL); - if (IS_ERR(op_data)) - RETURN(PTR_ERR(op_data)); - /* take layout lock mutex to enqueue layout lock exclusively. */ mutex_lock(&lli->lli_layout_mutex); - /* try again inside layout mutex */ - mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, - LDLM_FL_LVB_READY); +again: + /* try again. Maybe somebody else has done this. */ + mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0); if (mode != 0) { /* hit cached lock */ - *gen = lli->lli_layout_gen + 1; + rc = ll_layout_lock_set(&lockh, mode, inode, gen, true); + if (rc == -EAGAIN) + goto again; - ldlm_lock_decref(&lockh, mode); mutex_unlock(&lli->lli_layout_mutex); - ll_finish_md_op_data(op_data); - RETURN(0); + RETURN(rc); + } + + op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, + 0, 0, LUSTRE_OPC_ANY, NULL); + if (IS_ERR(op_data)) { + mutex_unlock(&lli->lli_layout_mutex); + RETURN(PTR_ERR(op_data)); } /* have to enqueue one */ + memset(&it, 0, sizeof(it)); + it.it_op = IT_LAYOUT; + lockh.cookie = 0ULL; + + LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file %p/"DFID".\n", + ll_get_fsname(inode->i_sb, NULL, 0), inode, + PFID(&lli->lli_fid)); + rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh, NULL, 0, NULL, 0); if (it.d.lustre.it_data != NULL) ptlrpc_req_finished(it.d.lustre.it_data); it.d.lustre.it_data = NULL; - if (rc == 0) { - struct ldlm_lock *lock; - struct cl_object_conf conf; - struct lustre_md md = { NULL }; - void *lmm; - int lmmsize; + ll_finish_md_op_data(op_data); - LASSERT(lustre_handle_is_used(&lockh)); + mode = it.d.lustre.it_lock_mode; + it.d.lustre.it_lock_mode = 0; + ll_intent_drop_lock(&it); + if (rc == 0) { /* set lock data in case this is a new lock */ ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL); - - lock = ldlm_handle2lock(&lockh); - LASSERT(lock != NULL); - - /* for IT_LAYOUT lock, lmm is returned in lock's lvb - * data via completion callback */ - lmm = lock->l_lvb_data; - lmmsize = lock->l_lvb_len; - if (lmm != NULL) { - rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm, - lmm, lmmsize); - if (rc >= 0) { - if (md.lsm != NULL) - *gen = md.lsm->lsm_layout_gen + 1; - rc = 0; - } else { - CERROR("file: "DFID" unpackmd error: %d\n", - PFID(&lli->lli_fid), rc); - } - } - LDLM_LOCK_PUT(lock); - - /* set layout to file. This may cause lock expiration as we - * set layout inside layout ibits lock. */ - memset(&conf, 0, sizeof conf); - conf.coc_inode = inode; - conf.u.coc_md = &md; - ll_layout_conf(inode, &conf); - /* is this racy? */ - lli->lli_has_smd = md.lsm != NULL; - if (md.lsm != NULL) - obd_free_memmd(sbi->ll_dt_exp, &md.lsm); + rc = ll_layout_lock_set(&lockh, mode, inode, gen, true); + if (rc == -EAGAIN) + goto again; } - ll_intent_drop_lock(&it); - mutex_unlock(&lli->lli_layout_mutex); - ll_finish_md_op_data(op_data); RETURN(rc); } diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index a2d0e22..90c5790 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -832,7 +832,7 @@ int ll_show_options(struct seq_file *seq, struct vfsmount *vfs); #endif void ll_dirty_page_discard_warn(cfs_page_t *page, int ioret); int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req, - struct super_block *); + struct super_block *, struct lookup_intent *); void lustre_dump_dentry(struct dentry *, int recur); void lustre_dump_inode(struct inode *); int ll_obd_statfs(struct inode *inode, void *arg); @@ -1573,6 +1573,7 @@ struct if_quotactl_18 { #warning "remove old LL_IOC_QUOTACTL_18 compatibility code" #endif /* LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 50, 0) */ +#define LL_LAYOUT_GEN_ZERO ((__u32)-1) int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf); int ll_layout_refresh(struct inode *inode, __u32 *gen); diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 38bd420..cf7c2c2 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -221,7 +221,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, OBD_CONNECT_RMT_CLIENT | OBD_CONNECT_VBR | OBD_CONNECT_FULL20 | OBD_CONNECT_64BITHASH| OBD_CONNECT_EINPROGRESS | - OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE; + OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE | + OBD_CONNECT_LAYOUTLOCK; if (sbi->ll_flags & LL_SBI_SOM_PREVIEW) data->ocd_connect_flags |= OBD_CONNECT_SOM; @@ -404,7 +405,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, OBD_CONNECT_FULL20 | OBD_CONNECT_64BITHASH | OBD_CONNECT_MAXBYTES | OBD_CONNECT_EINPROGRESS | - OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE; + OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE | + OBD_CONNECT_LAYOUTLOCK; if (sbi->ll_flags & LL_SBI_SOM_PREVIEW) data->ocd_connect_flags |= OBD_CONNECT_SOM; @@ -923,6 +925,7 @@ void ll_lli_init(struct ll_inode_info *lli) mutex_init(&lli->lli_och_mutex); spin_lock_init(&lli->lli_agl_lock); lli->lli_has_smd = false; + lli->lli_layout_gen = LL_LAYOUT_GEN_ZERO; lli->lli_clob = NULL; LASSERT(lli->lli_vfs_inode.i_mode != 0); @@ -1701,21 +1704,9 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md) LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0)); if (lsm != NULL) { - LASSERT(S_ISREG(inode->i_mode)); - CDEBUG(D_INODE, "adding lsm %p to inode %lu/%u(%p)\n", - lsm, inode->i_ino, inode->i_generation, inode); - /* cl_file_inode_init must go before lli_has_smd or a race - * is possible where client thinks the file has stripes, - * but lov raid0 is not setup yet and parallel e.g. - * glimpse would try to use uninitialized lov */ - if (cl_file_inode_init(inode, md) == 0) - lli->lli_has_smd = true; - lli->lli_maxbytes = lsm->lsm_maxbytes; if (lli->lli_maxbytes > MAX_LFS_FILESIZE) lli->lli_maxbytes = MAX_LFS_FILESIZE; - if (md->lsm != NULL) - obd_free_memmd(ll_i2dtexp(inode), &md->lsm); } if (sbi->ll_flags & LL_SBI_RMT_CLIENT) { @@ -2134,13 +2125,11 @@ int ll_remount_fs(struct super_block *sb, int *flags, char *data) return 0; } -int ll_prep_inode(struct inode **inode, - struct ptlrpc_request *req, - struct super_block *sb) +int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req, + struct super_block *sb, struct lookup_intent *it) { struct ll_sb_info *sbi = NULL; struct lustre_md md; - __u64 ibits; int rc; ENTRY; @@ -2164,8 +2153,6 @@ int ll_prep_inode(struct inode **inode, *inode = ll_iget(sb, cl_fid_build_ino(&md.body->fid1, 0), &md); if (*inode == NULL || IS_ERR(*inode)) { - if (md.lsm) - obd_free_memmd(sbi->ll_dt_exp, &md.lsm); #ifdef CONFIG_FS_POSIX_ACL if (md.posix_acl) { posix_acl_release(md.posix_acl); @@ -2179,16 +2166,37 @@ int ll_prep_inode(struct inode **inode, } } - /* sanity check for LAYOUT lock. */ - ibits = MDS_INODELOCK_LAYOUT; - if (S_ISREG(md.body->mode) && sbi->ll_flags & LL_SBI_LAYOUT_LOCK && - md.lsm != NULL && !ll_have_md_lock(*inode, &ibits, LCK_MINMODE)) { - CERROR("%s: inode "DFID" (%p) layout lock not granted.\n", - ll_get_fsname(sb, NULL, 0), - PFID(ll_inode2fid(*inode)), *inode); + /* Handling piggyback layout lock. + * Layout lock can be piggybacked by getattr and open request. + * The lsm can be applied to inode only if it comes with a layout lock + * otherwise correct layout may be overwritten, for example: + * 1. proc1: mdt returns a lsm but not granting layout + * 2. layout was changed by another client + * 3. proc2: refresh layout and layout lock granted + * 4. proc1: to apply a stale layout */ + if (it != NULL && it->d.lustre.it_lock_mode != 0) { + struct lustre_handle lockh; + struct ldlm_lock *lock; + + lockh.cookie = it->d.lustre.it_lock_handle; + lock = ldlm_handle2lock(&lockh); + LASSERT(lock != NULL); + if (ldlm_has_layout(lock)) { + struct cl_object_conf conf; + + memset(&conf, 0, sizeof(conf)); + conf.coc_opc = OBJECT_CONF_SET; + conf.coc_inode = *inode; + conf.coc_lock = lock; + conf.u.coc_md = &md; + (void)ll_layout_conf(*inode, &conf); + } + LDLM_LOCK_PUT(lock); } out: + if (md.lsm != NULL) + obd_free_memmd(sbi->ll_dt_exp, &md.lsm); md_free_lustre_md(sbi->ll_md_exp, &md); RETURN(rc); } diff --git a/lustre/llite/llite_nfs.c b/lustre/llite/llite_nfs.c index 414f935..eeb24a5 100644 --- a/lustre/llite/llite_nfs.c +++ b/lustre/llite/llite_nfs.c @@ -106,7 +106,7 @@ static struct inode *search_inode_for_lustre(struct super_block *sb, PFID(fid), rc); RETURN(ERR_PTR(rc)); } - rc = ll_prep_inode(&inode, req, sb); + rc = ll_prep_inode(&inode, req, sb, NULL); ptlrpc_req_finished(req); if (rc) RETURN(ERR_PTR(rc)); diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 1d39dfd..3622909 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -150,10 +150,14 @@ struct inode *ll_iget(struct super_block *sb, ino_t hash, ll_read_inode2(inode, md); if (S_ISREG(inode->i_mode) && - ll_i2info(inode)->lli_clob == NULL) + ll_i2info(inode)->lli_clob == NULL) { + CDEBUG(D_INODE, + "%s: apply lsm %p to inode "DFID".\n", + ll_get_fsname(sb, NULL, 0), md->lsm, + PFID(ll_inode2fid(inode))); rc = cl_file_inode_init(inode, md); + } if (rc != 0) { - md->lsm = NULL; make_bad_inode(inode); unlock_new_inode(inode); iput(inode); @@ -258,8 +262,10 @@ int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, lli = ll_i2info(inode); if (bits & MDS_INODELOCK_LAYOUT) { - struct cl_object_conf conf = { .coc_inode = inode, - .coc_invalidate = true }; + struct cl_object_conf conf = { { 0 } }; + + conf.coc_opc = OBJECT_CONF_INVALIDATE; + conf.coc_inode = inode; rc = ll_layout_conf(inode, &conf); if (rc) CDEBUG(D_INODE, "invaliding layout %d.\n", rc); @@ -427,7 +433,7 @@ int ll_lookup_it_finish(struct ptlrpc_request *request, CDEBUG(D_DENTRY, "it %p it_disposition %x\n", it, it->d.lustre.it_disposition); if (!it_disposition(it, DISP_LOOKUP_NEG)) { - rc = ll_prep_inode(&inode, request, (*de)->d_sb); + rc = ll_prep_inode(&inode, request, (*de)->d_sb, it); if (rc) RETURN(rc); @@ -794,7 +800,7 @@ static struct inode *ll_create_node(struct inode *dir, const char *name, LASSERT(it_disposition(it, DISP_ENQ_CREATE_REF)); request = it->d.lustre.it_data; it_clear_disposition(it, DISP_ENQ_CREATE_REF); - rc = ll_prep_inode(&inode, request, dir->i_sb); + rc = ll_prep_inode(&inode, request, dir->i_sb, it); if (rc) GOTO(out, inode = ERR_PTR(rc)); @@ -898,7 +904,7 @@ static int ll_new_node(struct inode *dir, struct qstr *name, ll_update_times(request, dir); if (dchild) { - err = ll_prep_inode(&inode, request, dchild->d_sb); + err = ll_prep_inode(&inode, request, dchild->d_sb, NULL); if (err) GOTO(err_exit, err); diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index 98c40e3..b066a23 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -692,7 +692,7 @@ static void do_statahead_interpret(struct ll_statahead_info *sai, if (rc != 1) GOTO(out, rc = -EAGAIN); - rc = ll_prep_inode(&child, req, dir->i_sb); + rc = ll_prep_inode(&child, req, dir->i_sb, it); if (rc) GOTO(out, rc); diff --git a/lustre/llite/vvp_io.c b/lustre/llite/vvp_io.c index 3fd1fb8..401c4ad 100644 --- a/lustre/llite/vvp_io.c +++ b/lustre/llite/vvp_io.c @@ -1180,6 +1180,12 @@ int vvp_io_init(const struct lu_env *env, struct cl_object *obj, io->ci_lockreq = CILR_MANDATORY; } + /* ignore layout change for generic CIT_MISC but not for glimpse. + * io context for glimpse must set ci_verify_layout to true, + * see cl_glimpse_size0() for details. */ + if (io->ci_type == CIT_MISC && !io->ci_verify_layout) + io->ci_ignore_layout = 1; + /* Enqueue layout lock and get layout version. We need to do this * even for operations requiring to open file, such as read and write, * because it might not grant layout lock in IT_OPEN. */ diff --git a/lustre/llite/vvp_object.c b/lustre/llite/vvp_object.c index f112744..20dcdf9 100644 --- a/lustre/llite/vvp_object.c +++ b/lustre/llite/vvp_object.c @@ -128,9 +128,23 @@ int vvp_conf_set(const struct lu_env *env, struct cl_object *obj, { struct ll_inode_info *lli = ll_i2info(conf->coc_inode); - if (conf->u.coc_md != NULL && conf->u.coc_md->lsm != NULL) + if (conf->coc_opc != OBJECT_CONF_SET) + return 0; + + if (conf->u.coc_md != NULL && conf->u.coc_md->lsm != NULL) { + CDEBUG(D_VFSTRACE, "layout lock change: %u -> %u\n", + lli->lli_layout_gen, + conf->u.coc_md->lsm->lsm_layout_gen); + + lli->lli_has_smd = true; lli->lli_layout_gen = conf->u.coc_md->lsm->lsm_layout_gen; + } else { + CDEBUG(D_VFSTRACE, "layout lock destroyed: %u.\n", + lli->lli_layout_gen); + lli->lli_has_smd = false; + lli->lli_layout_gen = LL_LAYOUT_GEN_ZERO; + } return 0; } diff --git a/lustre/lov/lov_lock.c b/lustre/lov/lov_lock.c index 85958ec..1241f5f 100644 --- a/lustre/lov/lov_lock.c +++ b/lustre/lov/lov_lock.c @@ -1205,6 +1205,7 @@ static int lov_empty_lock_print(const struct lu_env *env, void *cookie, return 0; } +/* XXX: more methods will be added later. */ static const struct cl_lock_operations lov_empty_lock_ops = { .clo_fini = lov_empty_lock_fini, .clo_print = lov_empty_lock_print diff --git a/lustre/lov/lov_object.c b/lustre/lov/lov_object.c index 88124bf..2908bc0 100644 --- a/lustre/lov/lov_object.c +++ b/lustre/lov/lov_object.c @@ -133,6 +133,12 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, int result; if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) { + /* For sanity:test_206. + * Do not leave the object in cache to avoid accessing + * freed memory. This is because osc_object is referring to + * lov_oinfo of lsm_stripe_data which will be freed due to + * this failure. */ + cl_object_kill(env, stripe); cl_object_put(env, stripe); return -EIO; } @@ -541,8 +547,13 @@ static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov) RETURN(0); LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 0); - while (cfs_atomic_read(&lsm->lsm_refc) > 1) { + while (cfs_atomic_read(&lsm->lsm_refc) > 1 && lov->lo_lsm_invalid) { lov_conf_unlock(lov); + + CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n", + PFID(lu_object_fid(lov2lu(lov))), + cfs_atomic_read(&lsm->lsm_refc)); + l_wait_event(lov->lo_waitq, cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi); lov_conf_lock(lov); @@ -642,23 +653,34 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj, ENTRY; lov_conf_lock(lov); - if (conf->coc_invalidate) { + if (conf->coc_opc == OBJECT_CONF_INVALIDATE) { lov->lo_lsm_invalid = 1; GOTO(out, result = 0); } + if (conf->coc_opc == OBJECT_CONF_WAIT) { + result = lov_layout_wait(env, lov); + GOTO(out, result); + } + + LASSERT(conf->coc_opc == OBJECT_CONF_SET); + if (conf->u.coc_md != NULL) lsm = conf->u.coc_md->lsm; - if ((lsm == NULL && lov->lo_lsm == NULL) || (lsm != NULL && lov->lo_lsm != NULL && lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen)) { + /* same version of layout */ lov->lo_lsm_invalid = 0; GOTO(out, result = 0); } - /* will change layout */ - lov_layout_wait(env, lov); + /* will change layout - check if there still exists active IO. */ + if (lov->lo_lsm != NULL && + cfs_atomic_read(&lov->lo_lsm->lsm_refc) > 1) { + lov->lo_lsm_invalid = 1; + GOTO(out, result = -EBUSY); + } /* * Only LLT_EMPTY <-> LLT_RAID0 transitions are supported. diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 93d9f4b..c17feed 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -437,6 +437,45 @@ static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp, RETURN(req); } +static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp, + struct lookup_intent *it, + struct md_op_data *unused) +{ + struct obd_device *obd = class_exp2obd(exp); + struct ptlrpc_request *req; + struct ldlm_intent *lit; + struct layout_intent *layout; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_LDLM_INTENT_LAYOUT); + if (req == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0); + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } + + /* pack the intent */ + lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + lit->opc = (__u64)it->it_op; + + /* pack the layout intent request */ + layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT); + /* LAYOUT_INTENT_ACCESS is generic, specific operation will be + * set for replication */ + layout->li_opc = LAYOUT_INTENT_ACCESS; + + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + obd->u.cli.cl_max_mds_easize); + ptlrpc_request_set_replen(req); + RETURN(req); +} + static struct ptlrpc_request * mdc_enqueue_pack(struct obd_export *exp, int lvb_len) { @@ -470,6 +509,9 @@ static int mdc_finish_enqueue(struct obd_export *exp, struct ldlm_request *lockreq; struct ldlm_reply *lockrep; struct lustre_intent_data *intent = &it->d.lustre; + struct ldlm_lock *lock; + void *lvb_data = NULL; + int lvb_len = 0; ENTRY; LASSERT(rc >= 0); @@ -485,8 +527,8 @@ static int mdc_finish_enqueue(struct obd_export *exp, memset(lockh, 0, sizeof(*lockh)); rc = 0; } else { /* rc = 0 */ - struct ldlm_lock *lock = ldlm_handle2lock(lockh); - LASSERT(lock); + lock = ldlm_handle2lock(lockh); + LASSERT(lock != NULL); /* If the server gave us back a different lock mode, we should * fix up our variables. */ @@ -547,12 +589,10 @@ static int mdc_finish_enqueue(struct obd_export *exp, mdc_set_open_replay_data(NULL, NULL, req); } - /* TODO: make sure LAYOUT lock must be granted along with EA */ - if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) { void *eadata; - mdc_update_max_ea_from_body(exp, body); + mdc_update_max_ea_from_body(exp, body); /* * The eadata is opaque; just check that it is there. @@ -563,6 +603,11 @@ static int mdc_finish_enqueue(struct obd_export *exp, if (eadata == NULL) RETURN(-EPROTO); + /* save lvb data and length in case this is for layout + * lock */ + lvb_data = eadata; + lvb_len = body->eadatasize; + /* * We save the reply LOV EA in case we have to replay a * create for recovery. If we didn't allocate a large @@ -624,44 +669,45 @@ static int mdc_finish_enqueue(struct obd_export *exp, RETURN(-EPROTO); } } else if (it->it_op & IT_LAYOUT) { - struct ldlm_lock *lock = ldlm_handle2lock(lockh); + /* maybe the lock was granted right away and layout + * is packed into RMF_DLM_LVB of req */ + lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER); + if (lvb_len > 0) { + lvb_data = req_capsule_server_sized_get(pill, + &RMF_DLM_LVB, lvb_len); + if (lvb_data == NULL) + RETURN(-EPROTO); + } + } - if (lock != NULL && lock->l_lvb_data == NULL) { - int lvb_len; + /* fill in stripe data for layout lock */ + lock = ldlm_handle2lock(lockh); + if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) { + void *lmm; - /* maybe the lock was granted right away and layout - * is packed into RMF_DLM_LVB of req */ - lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, - RCL_SERVER); - if (lvb_len > 0) { - void *lvb; - void *lmm; + LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n", + ldlm_it2str(it->it_op), lvb_len); - lvb = req_capsule_server_sized_get(pill, - &RMF_DLM_LVB, lvb_len); - if (lvb == NULL) { - LDLM_LOCK_PUT(lock); - RETURN(-EPROTO); - } - - OBD_ALLOC_LARGE(lmm, lvb_len); - if (lmm == NULL) { - LDLM_LOCK_PUT(lock); - RETURN(-ENOMEM); - } - memcpy(lmm, lvb, lvb_len); - - /* install lvb_data */ - lock_res_and_lock(lock); - LASSERT(lock->l_lvb_data == NULL); - lock->l_lvb_data = lmm; - lock->l_lvb_len = lvb_len; - unlock_res_and_lock(lock); - } - } - if (lock != NULL) + OBD_ALLOC_LARGE(lmm, lvb_len); + if (lmm == NULL) { LDLM_LOCK_PUT(lock); + RETURN(-ENOMEM); + } + memcpy(lmm, lvb_data, lvb_len); + + /* install lvb_data */ + lock_res_and_lock(lock); + if (lock->l_lvb_data == NULL) { + lock->l_lvb_data = lmm; + lock->l_lvb_len = lvb_len; + lmm = NULL; + } + unlock_res_and_lock(lock); + if (lmm != NULL) + OBD_FREE_LARGE(lmm, lvb_len); } + if (lock != NULL) + LDLM_LOCK_PUT(lock); RETURN(rc); } @@ -732,7 +778,7 @@ resend: if (!imp_connect_lvb_type(class_exp2cliimp(exp))) RETURN(-EOPNOTSUPP); - req = mdc_enqueue_pack(exp, obddev->u.cli.cl_max_mds_easize); + req = mdc_intent_layout_pack(exp, it, op_data); lvb_type = LVB_T_LAYOUT; } else { LBUG(); @@ -810,9 +856,15 @@ resend: } } - rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); - - RETURN(rc); + rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); + if (rc < 0) { + if (lustre_handle_is_used(lockh)) { + ldlm_lock_decref(lockh, einfo->ei_mode); + memset(lockh, 0, sizeof(*lockh)); + } + ptlrpc_req_finished(req); + } + RETURN(rc); } static int mdc_finish_intent_lock(struct obd_export *exp, @@ -1022,7 +1074,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, lockh.cookie = 0; if (fid_is_sane(&op_data->op_fid2) && - (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) { + (it->it_op & (IT_LOOKUP | IT_GETATTR))) { /* We could just return 1 immediately, but since we should only * be called in revalidate_it if we already have a lock, let's * verify that. */ diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index a44945c..4e266be 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -1246,13 +1246,12 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, LDLM_LOCK_PUT(lock); rc = 0; } else { + bool try_layout = false; + relock: OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2); mdt_lock_handle_init(lhc); - if (child_bits == MDS_INODELOCK_LAYOUT) - mdt_lock_reg_init(lhc, LCK_CR); - else - mdt_lock_reg_init(lhc, LCK_PR); + mdt_lock_reg_init(lhc, LCK_PR); if (mdt_object_exists(child) == 0) { LU_OBJECT_DEBUG(D_INODE, info->mti_env, @@ -1270,12 +1269,6 @@ relock: if (unlikely(rc != 0)) GOTO(out_child, rc); - /* layout lock is used only on regular files */ - if ((ma->ma_valid & MA_INODE) && - (ma->ma_attr.la_valid & LA_MODE) && - !S_ISREG(ma->ma_attr.la_mode)) - child_bits &= ~MDS_INODELOCK_LAYOUT; - /* If the file has not been changed for some time, we * return not only a LOOKUP lock, but also an UPDATE * lock and this might save us RPC on later STAT. For @@ -1288,9 +1281,35 @@ relock: child_bits |= MDS_INODELOCK_UPDATE; } - rc = mdt_object_lock(info, child, lhc, child_bits, - MDT_CROSS_LOCK); + /* layout lock must be granted in a best-effort way + * for IT operations */ + LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT)); + if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) && + exp_connect_layout(info->mti_exp) && + S_ISREG(lu_object_attr(&child->mot_obj.mo_lu)) && + ldlm_rep != NULL) { + /* try to grant layout lock for regular file. */ + try_layout = true; + } + rc = 0; + if (try_layout) { + child_bits |= MDS_INODELOCK_LAYOUT; + /* try layout lock, it may fail to be granted due to + * contention at LOOKUP or UPDATE */ + if (!mdt_object_lock_try(info, child, lhc, child_bits, + MDT_CROSS_LOCK)) { + child_bits &= ~MDS_INODELOCK_LAYOUT; + LASSERT(child_bits != 0); + rc = mdt_object_lock(info, child, lhc, + child_bits, MDT_CROSS_LOCK); + } else { + ma_need |= MA_LOV; + } + } else { + rc = mdt_object_lock(info, child, lhc, child_bits, + MDT_CROSS_LOCK); + } if (unlikely(rc != 0)) GOTO(out_child, rc); } @@ -1300,7 +1319,7 @@ relock: if (lock && lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_UPDATE && S_ISREG(lu_object_attr(&mdt_object_child(child)->mo_lu))) - ma_need = MA_SOM; + ma_need |= MA_SOM; /* finally, we can get attr for child. */ mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA); @@ -2131,6 +2150,7 @@ int mdt_llog_prev_block(struct mdt_thread_info *info) /* * DLM handlers. */ + static struct ldlm_callback_suite cbs = { .lcs_completion = ldlm_server_completion_ast, .lcs_blocking = ldlm_server_blocking_ast, @@ -2362,12 +2382,14 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, RETURN(rc); } -int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, - struct mdt_lock_handle *lh, __u64 ibits, int locality) +static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o, + struct mdt_lock_handle *lh, __u64 ibits, + bool nonblock, int locality) { struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; ldlm_policy_data_t *policy = &info->mti_policy; struct ldlm_res_id *res_id = &info->mti_res_id; + __u64 dlmflags; int rc; ENTRY; @@ -2404,6 +2426,10 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, memset(policy, 0, sizeof(*policy)); fid_build_reg_res_name(mdt_object_fid(o), res_id); + dlmflags = LDLM_FL_ATOMIC_CB; + if (nonblock) + dlmflags |= LDLM_FL_BLOCK_NOWAIT; + /* * Take PDO lock on whole directory and build correct @res_id for lock * on part of directory. @@ -2419,7 +2445,7 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, */ policy->l_inodebits.bits = MDS_INODELOCK_UPDATE; rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, - policy, res_id, LDLM_FL_ATOMIC_CB, + policy, res_id, dlmflags, &info->mti_exp->exp_handle.h_cookie); if (unlikely(rc)) RETURN(rc); @@ -2440,7 +2466,7 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, * fix it up and turn FL_LOCAL flag off. */ rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, - res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB, + res_id, LDLM_FL_LOCAL_ONLY | dlmflags, &info->mti_exp->exp_handle.h_cookie); if (rc) mdt_object_unlock(info, o, lh, 1); @@ -2453,6 +2479,25 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, RETURN(rc); } +int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, + struct mdt_lock_handle *lh, __u64 ibits, int locality) +{ + return mdt_object_lock0(info, o, lh, ibits, false, locality); +} + +int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o, + struct mdt_lock_handle *lh, __u64 ibits, int locality) +{ + struct mdt_lock_handle tmp = *lh; + int rc; + + rc = mdt_object_lock0(info, o, &tmp, ibits, true, locality); + if (rc == 0) + *lh = tmp; + + return rc == 0; +} + /** * Save a lock within request object. * @@ -3215,6 +3260,10 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, struct mdt_thread_info *info, struct ldlm_lock **, __u64); +static int mdt_intent_layout(enum mdt_it_code opcode, + struct mdt_thread_info *info, + struct ldlm_lock **, + __u64); static int mdt_intent_reint(enum mdt_it_code opcode, struct mdt_thread_info *info, struct ldlm_lock **, @@ -3279,11 +3328,11 @@ static struct mdt_it_flavor { .it_flags = 0, .it_act = NULL }, - [MDT_IT_LAYOUT] = { - .it_fmt = &RQF_LDLM_INTENT_GETATTR, - .it_flags = HABEO_REFERO, - .it_act = mdt_intent_getattr - } + [MDT_IT_LAYOUT] = { + .it_fmt = &RQF_LDLM_INTENT_LAYOUT, + .it_flags = 0, + .it_act = mdt_intent_layout + } }; int mdt_intent_lock_replace(struct mdt_thread_info *info, @@ -3460,16 +3509,6 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, case MDT_IT_GETATTR: child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE; break; - case MDT_IT_LAYOUT: { - static int printed = 0; - - if (!printed) { - CERROR("layout lock not supported by this version\n"); - printed = 1; - } - GOTO(out_shrink, rc = -EINVAL); - break; - } default: CERROR("Unsupported intent (%d)\n", opcode); GOTO(out_shrink, rc = -EINVAL); @@ -3509,6 +3548,39 @@ out_shrink: return rc; } +static int mdt_intent_layout(enum mdt_it_code opcode, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, + __u64 flags) +{ + struct layout_intent *layout; + int rc; + ENTRY; + + if (opcode != MDT_IT_LAYOUT) { + CERROR("%s: Unknown intent (%d)\n", + info->mti_exp->exp_obd->obd_name, opcode); + RETURN(-EINVAL); + } + + (*lockp)->l_lvb_type = LVB_T_LAYOUT; + req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER, + ldlm_lvbo_size(*lockp)); + rc = req_capsule_server_pack(info->mti_pill); + if (rc != 0) + RETURN(-EINVAL); + + layout = req_capsule_client_get(info->mti_pill, &RMF_LAYOUT_INTENT); + LASSERT(layout != NULL); + if (layout->li_opc == LAYOUT_INTENT_ACCESS) + /* return to normal ldlm handling */ + RETURN(0); + + CERROR("%s: Unsupported layout intent (%d)\n", + info->mti_exp->exp_obd->obd_name, layout->li_opc); + RETURN(-EINVAL); +} + static int mdt_intent_reint(enum mdt_it_code opcode, struct mdt_thread_info *info, struct ldlm_lock **lockp, @@ -3585,7 +3657,7 @@ static int mdt_intent_reint(enum mdt_it_code opcode, */ if (lustre_handle_is_used(&lhc->mlh_reg_lh)) { LASSERTF(rc == 0, "Error occurred but lock handle " - "is still in use\n"); + "is still in use, rc = %d\n", rc); rep->lock_policy_res2 = 0; rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags); RETURN(rc); @@ -3672,14 +3744,6 @@ static int mdt_intent_opc(long itopc, struct mdt_thread_info *info, RETURN(rc); } - if (opc == MDT_IT_LAYOUT) { - (*lockp)->l_lvb_type = LVB_T_LAYOUT; - /* XXX: set replay RMF_DLM_LVB as the real EA size when LAYOUT - * lock enabled. */ - } else if (opc == MDT_IT_READDIR) { - req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0); - } - flv = &mdt_it_flavor[opc]; if (flv->it_fmt != NULL) req_capsule_extend(pill, flv->it_fmt); diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 262ac79..1f6f78d 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -245,6 +245,7 @@ enum { MDT_LH_PARENT, /* parent lockh */ MDT_LH_CHILD, /* child lockh */ MDT_LH_OLD, /* old lockh for rename */ + MDT_LH_LAYOUT = MDT_LH_OLD, /* layout lock */ MDT_LH_NEW, /* new lockh for rename */ MDT_LH_RMT, /* used for return lh to caller */ MDT_LH_NR @@ -569,6 +570,11 @@ int mdt_object_lock(struct mdt_thread_info *, struct mdt_lock_handle *, __u64, int); +int mdt_object_lock_try(struct mdt_thread_info *, + struct mdt_object *, + struct mdt_lock_handle *, + __u64, int); + void mdt_object_unlock(struct mdt_thread_info *, struct mdt_object *, struct mdt_lock_handle *, diff --git a/lustre/mdt/mdt_lvb.c b/lustre/mdt/mdt_lvb.c index 91df890..40a525e4 100644 --- a/lustre/mdt/mdt_lvb.c +++ b/lustre/mdt/mdt_lvb.c @@ -72,10 +72,13 @@ static int mdt_lvbo_update(struct ldlm_resource *res, static int mdt_lvbo_size(struct ldlm_lock *lock) { - if (IS_LQUOTA_RES(lock->l_resource)) { - struct mdt_device *mdt; + struct mdt_device *mdt; + + /* resource on server side never changes. */ + mdt = ldlm_res_to_ns(lock->l_resource)->ns_lvbp; + LASSERT(mdt != NULL); - mdt = ldlm_res_to_ns(lock->l_resource)->ns_lvbp; + if (IS_LQUOTA_RES(lock->l_resource)) { if (mdt->mdt_qmt_dev == NULL) return 0; @@ -83,24 +86,93 @@ static int mdt_lvbo_size(struct ldlm_lock *lock) return qmt_hdls.qmth_lvbo_size(mdt->mdt_qmt_dev, lock); } + if (ldlm_has_layout(lock)) + return mdt->mdt_max_mdsize; + return 0; } static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen) { + struct lu_env env; + struct mdt_thread_info *info; + struct mdt_device *mdt; + struct lu_fid *fid; + struct mdt_object *obj = NULL; + struct md_object *child = NULL; + int rc; + ENTRY; + + mdt = ldlm_lock_to_ns(lock)->ns_lvbp; if (IS_LQUOTA_RES(lock->l_resource)) { - struct mdt_device *mdt; - - mdt = ldlm_res_to_ns(lock->l_resource)->ns_lvbp; if (mdt->mdt_qmt_dev == NULL) - return 0; + RETURN(0); /* call lvbo fill function of quota master */ - return qmt_hdls.qmth_lvbo_fill(mdt->mdt_qmt_dev, lock, lvb, - lvblen); + rc = qmt_hdls.qmth_lvbo_fill(mdt->mdt_qmt_dev, lock, lvb, + lvblen); + RETURN(rc); } - return 0; + if (lock->l_resource->lr_type != LDLM_IBITS || + !(lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_LAYOUT)) + RETURN(0); + + /* layout lock will be granted to client, fill in lvb with layout */ + + /* XXX create an env to talk to mdt stack. We should get this env from + * ptlrpc_thread->t_env. */ + rc = lu_env_init(&env, LCT_MD_THREAD); + LASSERT(rc == 0); + + info = lu_context_key_get(&env.le_ctx, &mdt_thread_key); + LASSERT(info != NULL); + memset(info, 0, sizeof *info); + info->mti_env = &env; + info->mti_exp = lock->l_export; + info->mti_mdt = mdt; + + /* XXX get fid by resource id. why don't include fid in ldlm_resource */ + fid = &info->mti_tmp_fid2; + fid_build_from_res_name(fid, &lock->l_resource->lr_name); + + obj = mdt_object_find(&env, info->mti_mdt, fid); + if (IS_ERR(obj)) + GOTO(out, rc = PTR_ERR(obj)); + + if (mdt_object_exists(obj) <= 0) + GOTO(out, rc = -ENOENT); + + child = mdt_object_child(obj); + + /* get the length of lsm */ + rc = mo_xattr_get(&env, child, &LU_BUF_NULL, XATTR_NAME_LOV); + if (rc < 0) + GOTO(out, rc); + + if (rc > 0) { + struct lu_buf *lmm = NULL; + + if (lvblen < rc) { + CERROR("%s: expected %d actual %d.\n", + info->mti_exp->exp_obd->obd_name, rc, lvblen); + GOTO(out, rc = -ERANGE); + } + + lmm = &info->mti_buf; + lmm->lb_buf = lvb; + lmm->lb_len = rc; + + rc = mo_xattr_get(&env, child, lmm, XATTR_NAME_LOV); + if (rc < 0) + GOTO(out, rc); + } + +out: + if (obj != NULL && !IS_ERR(obj)) + mdt_object_put(&env, obj); + lu_env_fini(&env); + RETURN(rc < 0 ? 0 : rc); } static int mdt_lvbo_free(struct ldlm_resource *res) diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 4fc48c7..f370e5e 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -1115,6 +1115,115 @@ int mdt_open_by_fid(struct mdt_thread_info* info, RETURN(rc); } +/* lock object for open */ +static int mdt_object_open_lock(struct mdt_thread_info *info, + struct mdt_object *obj, + struct mdt_lock_handle *lhc, + __u64 *ibits) +{ + struct md_attr *ma = &info->mti_attr; + __u64 open_flags = info->mti_spec.sp_cr_flags; + ldlm_mode_t lm = LCK_CR; + bool try_layout = false; + bool create_layout = false; + int rc = 0; + ENTRY; + + *ibits = 0; + if (open_flags & MDS_OPEN_LOCK) { + if (open_flags & FMODE_WRITE) + lm = LCK_CW; + else if (open_flags & MDS_FMODE_EXEC) + lm = LCK_PR; + else + lm = LCK_CR; + + *ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN; + } + + if (S_ISREG(lu_object_attr(&obj->mot_obj.mo_lu))) { + if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV) && + md_should_create(open_flags)) + create_layout = true; + if (exp_connect_layout(info->mti_exp) && !create_layout && + ma->ma_need & MA_LOV) + try_layout = true; + } + + mdt_lock_handle_init(lhc); + mdt_lock_reg_init(lhc, lm); + + /* one problem to return layout lock on open is that it may result + * in too many layout locks cached on the client side. */ + if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_OPEN) && try_layout) { + /* return lookup lock to validate inode at the client side, + * this is pretty important otherwise mdt will return layout + * lock for each open. + * However this is a double-edged sword because changing + * permission will revoke huge # of LOOKUP locks. */ + *ibits |= MDS_INODELOCK_LAYOUT | MDS_INODELOCK_LOOKUP; + if (!mdt_object_lock_try(info, obj, lhc, *ibits, + MDT_CROSS_LOCK)) { + *ibits &= ~(MDS_INODELOCK_LAYOUT|MDS_INODELOCK_LOOKUP); + if (*ibits != 0) + rc = mdt_object_lock(info, obj, lhc, *ibits, + MDT_CROSS_LOCK); + } + } else if (*ibits != 0) { + rc = mdt_object_lock(info, obj, lhc, *ibits, MDT_CROSS_LOCK); + } + + CDEBUG(D_INODE, "Requested bits lock:"DFID ", ibits = "LPX64 + ", open_flags = "LPO64", try_layout = %d, rc = %d\n", + PFID(mdt_object_fid(obj)), *ibits, open_flags, try_layout, rc); + + /* will change layout, revoke layout locks by enqueuing EX lock. */ + if (rc == 0 && create_layout) { + struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT]; + + CDEBUG(D_INODE, "Will create layout, get EX layout lock:"DFID + ", open_flags = "LPO64"\n", + PFID(mdt_object_fid(obj)), open_flags); + + LASSERT(!try_layout); + mdt_lock_handle_init(ll); + mdt_lock_reg_init(ll, LCK_EX); + rc = mdt_object_lock(info, obj, ll, MDS_INODELOCK_LAYOUT, + MDT_LOCAL_LOCK); + + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LL_BLOCK, 2); + } + + RETURN(rc); +} + +static void mdt_object_open_unlock(struct mdt_thread_info *info, + struct mdt_object *obj, + struct mdt_lock_handle *lhc, + __u64 ibits, int rc) +{ + __u64 open_flags = info->mti_spec.sp_cr_flags; + struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT]; + + /* Release local layout lock - the layout lock put in MDT_LH_LAYOUT + * will never return to client side. */ + if (lustre_handle_is_used(&ll->mlh_reg_lh)) { + LASSERT(!(ibits & MDS_INODELOCK_LAYOUT)); + mdt_object_unlock(info, obj, ll, 1); + } + + if (ibits == 0) + return; + + if (!(open_flags & MDS_OPEN_LOCK) && !(ibits & MDS_INODELOCK_LAYOUT)) { + /* for the open request, the lock will only return to client + * if open or layout lock is granted. */ + rc = 1; + } + if (rc != 0) + mdt_object_unlock(info, obj, lhc, 1); +} + int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep, struct mdt_lock_handle *lhc) { @@ -1126,7 +1235,7 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep, struct mdt_object *parent= NULL; struct mdt_object *o; int rc; - ldlm_mode_t lm; + __u64 ibits; ENTRY; if (md_should_create(flags) && !(flags & MDS_OPEN_HAS_EA)) { @@ -1160,22 +1269,11 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep, mdt_set_disposition(info, rep, (DISP_IT_EXECD | DISP_LOOKUP_EXECD)); - if (flags & FMODE_WRITE) - lm = LCK_CW; - else if (flags & MDS_FMODE_EXEC) - lm = LCK_PR; - else - lm = LCK_CR; - - mdt_lock_handle_init(lhc); - mdt_lock_reg_init(lhc, lm); - rc = mdt_object_lock(info, o, lhc, - MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN, - MDT_CROSS_LOCK); + rc = mdt_attr_get_complex(info, o, ma); if (rc) GOTO(out, rc); - rc = mdt_attr_get_complex(info, o, ma); + rc = mdt_object_open_lock(info, o, lhc, &ibits); if (rc) GOTO(out, rc); @@ -1191,18 +1289,15 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep, } rc = mdt_finish_open(info, parent, o, flags, 0, rep); - - if (!(flags & MDS_OPEN_LOCK) || rc) - mdt_object_unlock(info, o, lhc, 1); - if (!rc) { mdt_set_disposition(info, rep, DISP_LOOKUP_POS); if (flags & MDS_OPEN_LOCK) mdt_set_disposition(info, rep, DISP_OPEN_LOCK); } - GOTO(out, rc); + out: + mdt_object_open_unlock(info, o, lhc, ibits, rc); mdt_object_put(env, o); if (parent != NULL) mdt_object_put(env, parent); @@ -1275,6 +1370,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) struct lu_fid *child_fid = &info->mti_tmp_fid1; struct md_attr *ma = &info->mti_attr; __u64 create_flags = info->mti_spec.sp_cr_flags; + __u64 ibits; struct mdt_reint_record *rr = &info->mti_rr; struct lu_name *lname; int result, rc; @@ -1513,41 +1609,24 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh)); - /* get openlock if this is not replay and if a client requested it */ - if (!req_is_replay(req) && create_flags & MDS_OPEN_LOCK) { - ldlm_mode_t lm; - - if (create_flags & FMODE_WRITE) - lm = LCK_CW; - else if (create_flags & MDS_FMODE_EXEC) - lm = LCK_PR; - else - lm = LCK_CR; - mdt_lock_handle_init(lhc); - mdt_lock_reg_init(lhc, lm); - rc = mdt_object_lock(info, child, lhc, - MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN, - MDT_CROSS_LOCK); - if (rc) { - result = rc; - GOTO(out_child, result); - } else { - result = -EREMOTE; - mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK); - } - } + /* get openlock if this is not replay and if a client requested it */ + if (!req_is_replay(req)) { + rc = mdt_object_open_lock(info, child, lhc, &ibits); + if (rc != 0) { + GOTO(out_child, result = rc); + } else if (create_flags & MDS_OPEN_LOCK) { + result = -EREMOTE; + mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK); + } + } /* Try to open it now. */ rc = mdt_finish_open(info, parent, child, create_flags, created, ldlm_rep); if (rc) { result = rc; - if (lustre_handle_is_used(&lhc->mlh_reg_lh)) { - /* openlock was acquired and mdt_finish_open failed - - drop the openlock */ - mdt_object_unlock(info, child, lhc, 1); - mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK); - } + /* openlock will be released if mdt_finish_open failed */ + mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK); if (created) { ma->ma_need = 0; ma->ma_valid = 0; @@ -1564,6 +1643,8 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) } EXIT; out_child: + mdt_object_open_unlock(info, child, lhc, ibits, + result == -EREMOTE ? 0 : result); mdt_object_put(info->mti_env, child); out_parent: mdt_object_unlock_put(info, parent, lh, result || !created); diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index deff55f..f696827 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -383,6 +383,13 @@ static const struct req_msg_field *ldlm_intent_server[] = { &RMF_ACL }; +static const struct req_msg_field *ldlm_intent_layout_client[] = { + &RMF_PTLRPC_BODY, + &RMF_DLM_REQ, + &RMF_LDLM_INTENT, + &RMF_LAYOUT_INTENT, + &RMF_EADATA /* for new layout to be set up */ +}; static const struct req_msg_field *ldlm_intent_open_server[] = { &RMF_PTLRPC_BODY, &RMF_DLM_REP, @@ -642,6 +649,7 @@ static struct req_format *req_formats[] = { &RQF_LDLM_GL_DESC_CALLBACK, &RQF_LDLM_INTENT, &RQF_LDLM_INTENT_BASIC, + &RQF_LDLM_INTENT_LAYOUT, &RQF_LDLM_INTENT_GETATTR, &RQF_LDLM_INTENT_OPEN, &RQF_LDLM_INTENT_CREATE, @@ -935,6 +943,12 @@ struct req_msg_field RMF_CAPA2 = lustre_swab_lustre_capa, NULL); EXPORT_SYMBOL(RMF_CAPA2); +struct req_msg_field RMF_LAYOUT_INTENT = + DEFINE_MSGF("layout_intent", 0, + sizeof(struct layout_intent), lustre_swab_layout_intent, + NULL); +EXPORT_SYMBOL(RMF_LAYOUT_INTENT); + /* * OST request field. */ @@ -1228,6 +1242,11 @@ struct req_format RQF_LDLM_INTENT = ldlm_intent_client, ldlm_intent_server); EXPORT_SYMBOL(RQF_LDLM_INTENT); +struct req_format RQF_LDLM_INTENT_LAYOUT = + DEFINE_REQ_FMT0("LDLM_INTENT_LAYOUT ", + ldlm_intent_layout_client, ldlm_enqueue_lvb_server); +EXPORT_SYMBOL(RQF_LDLM_INTENT_LAYOUT); + struct req_format RQF_LDLM_INTENT_GETATTR = DEFINE_REQ_FMT0("LDLM_INTENT_GETATTR", ldlm_intent_getattr_client, ldlm_intent_getattr_server); @@ -1377,7 +1396,6 @@ struct req_format RQF_OST_GET_INFO_FIEMAP = ost_get_fiemap_server); EXPORT_SYMBOL(RQF_OST_GET_INFO_FIEMAP); - #if !defined(__REQ_LAYOUT_USER__) /* Convenience macro */ diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 3f06381..bb961d3 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -2514,4 +2514,11 @@ void lustre_swab_hsm_progress(struct hsm_progress *hp) } EXPORT_SYMBOL(lustre_swab_hsm_progress); - +void lustre_swab_layout_intent(struct layout_intent *li) +{ + __swab32s(&li->li_opc); + __swab32s(&li->li_flags); + __swab64s(&li->li_start); + __swab64s(&li->li_end); +} +EXPORT_SYMBOL(lustre_swab_layout_intent); diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index 3269fea..1201e46 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -4161,5 +4161,39 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct hsm_user_state, hus_in_progress_location)); LASSERTF((int)sizeof(((struct hsm_user_state *)0)->hus_in_progress_location) == 16, "found %lld\n", (long long)(int)sizeof(((struct hsm_user_state *)0)->hus_in_progress_location)); + + /* Checks for struct layout_intent */ + LASSERTF((int)sizeof(struct layout_intent) == 24, "found %lld\n", + (long long)(int)sizeof(struct layout_intent)); + LASSERTF((int)offsetof(struct layout_intent, li_opc) == 0, "found %lld\n", + (long long)(int)offsetof(struct layout_intent, li_opc)); + LASSERTF((int)sizeof(((struct layout_intent *)0)->li_opc) == 4, "found %lld\n", + (long long)(int)sizeof(((struct layout_intent *)0)->li_opc)); + LASSERTF((int)offsetof(struct layout_intent, li_flags) == 4, "found %lld\n", + (long long)(int)offsetof(struct layout_intent, li_flags)); + LASSERTF((int)sizeof(((struct layout_intent *)0)->li_flags) == 4, "found %lld\n", + (long long)(int)sizeof(((struct layout_intent *)0)->li_flags)); + LASSERTF((int)offsetof(struct layout_intent, li_start) == 8, "found %lld\n", + (long long)(int)offsetof(struct layout_intent, li_start)); + LASSERTF((int)sizeof(((struct layout_intent *)0)->li_start) == 8, "found %lld\n", + (long long)(int)sizeof(((struct layout_intent *)0)->li_start)); + LASSERTF((int)offsetof(struct layout_intent, li_end) == 16, "found %lld\n", + (long long)(int)offsetof(struct layout_intent, li_end)); + LASSERTF((int)sizeof(((struct layout_intent *)0)->li_end) == 8, "found %lld\n", + (long long)(int)sizeof(((struct layout_intent *)0)->li_end)); + LASSERTF(LAYOUT_INTENT_ACCESS == 0, "found %lld\n", + (long long)LAYOUT_INTENT_ACCESS); + LASSERTF(LAYOUT_INTENT_READ == 1, "found %lld\n", + (long long)LAYOUT_INTENT_READ); + LASSERTF(LAYOUT_INTENT_WRITE == 2, "found %lld\n", + (long long)LAYOUT_INTENT_WRITE); + LASSERTF(LAYOUT_INTENT_GLIMPSE == 3, "found %lld\n", + (long long)LAYOUT_INTENT_GLIMPSE); + LASSERTF(LAYOUT_INTENT_TRUNC == 4, "found %lld\n", + (long long)LAYOUT_INTENT_TRUNC); + LASSERTF(LAYOUT_INTENT_RELEASE == 5, "found %lld\n", + (long long)LAYOUT_INTENT_RELEASE); + LASSERTF(LAYOUT_INTENT_RESTORE == 6, "found %lld\n", + (long long)LAYOUT_INTENT_RESTORE); } diff --git a/lustre/tests/multiop.c b/lustre/tests/multiop.c index 4d6fece..ab360c4 100644 --- a/lustre/tests/multiop.c +++ b/lustre/tests/multiop.c @@ -50,6 +50,7 @@ #include #include #include +#include #include @@ -198,6 +199,7 @@ int main(int argc, char **argv) int save_errno; int verbose = 0; int gid = 0; + struct timespec ts; if (argc < 3) { fprintf(stderr, usage, argv[0]); @@ -219,7 +221,12 @@ int main(int argc, char **argv) printf("PAUSING\n"); fflush(stdout); } - while (sem_wait(&sem) == -1 && errno == EINTR); + len = atoi(commands+1); + if (len <= 0) + len = 3600; /* 1 hour */ + ts.tv_sec = time(NULL) + len; + ts.tv_nsec = 0; + while (sem_timedwait(&sem, &ts) < 0 && errno == EINTR); break; case 'c': if (close(fd) == -1) { diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 739b693..495570f 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -9776,6 +9776,41 @@ test_206() { } run_test 206 "fail lov_init_raid0() doesn't lbug" +test_207a() { + dd if=/dev/zero of=$DIR/$tfile bs=4k count=$((RANDOM%10+1)) + local fsz=`stat -c %s $DIR/$tfile` + cancel_lru_locks mdc + + # do not return layout in getattr intent +#define OBD_FAIL_MDS_NO_LL_GETATTR 0x170 + $LCTL set_param fail_loc=0x170 + local sz=`stat -c %s $DIR/$tfile` + + [ $fsz -eq $sz ] || error "file size expected $fsz, actual $sz" + + rm -rf $DIR/$tfile +} +run_test 207a "can refresh layout at glimpse" + +test_207b() { + dd if=/dev/zero of=$DIR/$tfile bs=4k count=$((RANDOM%10+1)) + local cksum=`md5sum $DIR/$tfile` + local fsz=`stat -c %s $DIR/$tfile` + cancel_lru_locks mdc + cancel_lru_locks osc + + # do not return layout in getattr intent +#define OBD_FAIL_MDS_NO_LL_OPEN 0x171 + $LCTL set_param fail_loc=0x171 + + # it will refresh layout after the file is opened but before read issues + echo checksum is "$cksum" + echo "$cksum" |md5sum -c --quiet || error "file differs" + + rm -rf $DIR/$tfile +} +run_test 207b "can refresh layout at open" + test_212() { size=`date +%s` size=$((size % 8192 + 1)) diff --git a/lustre/tests/sanityn.sh b/lustre/tests/sanityn.sh index d79f038..3a84dd4 100644 --- a/lustre/tests/sanityn.sh +++ b/lustre/tests/sanityn.sh @@ -2258,6 +2258,81 @@ test_50() { } run_test 50 "osc lvb attrs: enqueue vs. CP AST ==============" +test_51a() { + local filesize + local origfile=/etc/hosts + + filesize=`stat -c %s $origfile` + + # create an empty file + $MCREATE $DIR1/$tfile + # cache layout lock on both mount point + stat $DIR1/$tfile > /dev/null + stat $DIR2/$tfile > /dev/null + + # open and sleep 2 seconds then read + $MULTIOP $DIR2/$tfile o_2r${filesize}c & + local pid=$! + sleep 0.1 + + # create the layout of testing file + dd if=$origfile of=$DIR1/$tfile conv=notrunc > /dev/null + + # MULTIOP proc should be able to read enough bytes and exit + sleep 2 + kill -0 $pid && error "multiop is still there" + cmp $origfile $DIR2/$tfile || error "$MCREATE and $DIR2/$tfile differs" + + rm -f $DIR1/$tfile +} +run_test 51a "layout lock: refresh layout should work" + +test_51b() { + local tmpfile=`mktemp` + + # create an empty file + $MCREATE $DIR1/$tfile + + # delay glimpse so that layout has changed when glimpse finish +#define OBD_FAIL_GLIMPSE_DELAY 0x1404 + $LCTL set_param fail_loc=0x1404 + stat -c %s $DIR2/$tfile |tee $tmpfile & + local pid=$! + sleep 0.1 + + # create layout of testing file + dd if=/dev/zero of=$DIR1/$tfile bs=1k count=1 conv=notrunc > /dev/null + + wait $pid + local fsize=`cat $tmpfile` + + [ x$fsize = x1024 ] || error "file size is $fsize, should be 1024" + + rm -f $DIR1/$tfile $tmpfile +} +run_test 51b "layout lock: glimpse should be able to restart if layout changed" + +test_51c() { + # create an empty file + $MCREATE $DIR1/$tfile + +#define OBD_FAIL_MDS_LL_BLOCK 0x172 + $LCTL set_param fail_loc=0x172 + + # change the layout of testing file + echo "Setting layout ..." + $LFS setstripe -c $OSTCOUNT $DIR1/$tfile & + pid=$! + sleep 0.1 + + # get layout of this file should wait until dd is finished + local stripecnt=`$LFS getstripe -c $DIR2/$tfile` + [ $stripecnt -eq $OSTCOUNT ] || error "layout wrong" + + rm -f $DIR1/$tfile +} +run_test 51c "layout lock: IT_LAYOUT blocked and correct layout can be returned" + test_60() { [[ $(lustre_version_code $SINGLEMDS) -ge $(version_code 2.3.0) ]] || { skip "Need MDS version at least 2.3.0"; return; } diff --git a/lustre/utils/req-layout.c b/lustre/utils/req-layout.c index e68fdab..cc09b13 100644 --- a/lustre/utils/req-layout.c +++ b/lustre/utils/req-layout.c @@ -59,6 +59,7 @@ #define lustre_swab_ldlm_request NULL #define lustre_swab_ldlm_reply NULL #define lustre_swab_ldlm_intent NULL +#define lustre_swab_layout_intent NULL /* #define lustre_swab_lov_mds_md NULL */ #define lustre_swab_mdt_rec_reint NULL #define lustre_swab_lustre_capa NULL diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index 7ed0777..0970513 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -1880,6 +1880,24 @@ check_hsm_user_state(void) CHECK_MEMBER(hsm_user_state, hus_in_progress_location); } +static void check_layout_intent(void) +{ + BLANK_LINE(); + CHECK_STRUCT(layout_intent); + CHECK_MEMBER(layout_intent, li_opc); + CHECK_MEMBER(layout_intent, li_flags); + CHECK_MEMBER(layout_intent, li_start); + CHECK_MEMBER(layout_intent, li_end); + + CHECK_VALUE(LAYOUT_INTENT_ACCESS); + CHECK_VALUE(LAYOUT_INTENT_READ); + CHECK_VALUE(LAYOUT_INTENT_WRITE); + CHECK_VALUE(LAYOUT_INTENT_GLIMPSE); + CHECK_VALUE(LAYOUT_INTENT_TRUNC); + CHECK_VALUE(LAYOUT_INTENT_RELEASE); + CHECK_VALUE(LAYOUT_INTENT_RESTORE); +} + static void system_string (char *cmdline, char *str, int len) { @@ -2234,6 +2252,7 @@ main(int argc, char **argv) check_hsm_user_item(); check_hsm_user_request(); check_hsm_user_state(); + check_layout_intent(); printf("}\n\n"); diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index b2a95cb..da171e3 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -4169,5 +4169,39 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct hsm_user_state, hus_in_progress_location)); LASSERTF((int)sizeof(((struct hsm_user_state *)0)->hus_in_progress_location) == 16, "found %lld\n", (long long)(int)sizeof(((struct hsm_user_state *)0)->hus_in_progress_location)); + + /* Checks for struct layout_intent */ + LASSERTF((int)sizeof(struct layout_intent) == 24, "found %lld\n", + (long long)(int)sizeof(struct layout_intent)); + LASSERTF((int)offsetof(struct layout_intent, li_opc) == 0, "found %lld\n", + (long long)(int)offsetof(struct layout_intent, li_opc)); + LASSERTF((int)sizeof(((struct layout_intent *)0)->li_opc) == 4, "found %lld\n", + (long long)(int)sizeof(((struct layout_intent *)0)->li_opc)); + LASSERTF((int)offsetof(struct layout_intent, li_flags) == 4, "found %lld\n", + (long long)(int)offsetof(struct layout_intent, li_flags)); + LASSERTF((int)sizeof(((struct layout_intent *)0)->li_flags) == 4, "found %lld\n", + (long long)(int)sizeof(((struct layout_intent *)0)->li_flags)); + LASSERTF((int)offsetof(struct layout_intent, li_start) == 8, "found %lld\n", + (long long)(int)offsetof(struct layout_intent, li_start)); + LASSERTF((int)sizeof(((struct layout_intent *)0)->li_start) == 8, "found %lld\n", + (long long)(int)sizeof(((struct layout_intent *)0)->li_start)); + LASSERTF((int)offsetof(struct layout_intent, li_end) == 16, "found %lld\n", + (long long)(int)offsetof(struct layout_intent, li_end)); + LASSERTF((int)sizeof(((struct layout_intent *)0)->li_end) == 8, "found %lld\n", + (long long)(int)sizeof(((struct layout_intent *)0)->li_end)); + LASSERTF(LAYOUT_INTENT_ACCESS == 0, "found %lld\n", + (long long)LAYOUT_INTENT_ACCESS); + LASSERTF(LAYOUT_INTENT_READ == 1, "found %lld\n", + (long long)LAYOUT_INTENT_READ); + LASSERTF(LAYOUT_INTENT_WRITE == 2, "found %lld\n", + (long long)LAYOUT_INTENT_WRITE); + LASSERTF(LAYOUT_INTENT_GLIMPSE == 3, "found %lld\n", + (long long)LAYOUT_INTENT_GLIMPSE); + LASSERTF(LAYOUT_INTENT_TRUNC == 4, "found %lld\n", + (long long)LAYOUT_INTENT_TRUNC); + LASSERTF(LAYOUT_INTENT_RELEASE == 5, "found %lld\n", + (long long)LAYOUT_INTENT_RELEASE); + LASSERTF(LAYOUT_INTENT_RESTORE == 6, "found %lld\n", + (long long)LAYOUT_INTENT_RESTORE); } -- 1.8.3.1