From 4f35c341f22b06b6c15a4af763e49b1e05f0dff9 Mon Sep 17 00:00:00 2001 From: Mikhal Pershin Date: Wed, 9 Dec 2015 16:08:53 +0300 Subject: [PATCH] LU-3285 mds: add IO locking to the MDC and MDT - introduce new DOM inodebit for Data-on-MDT files. - add IO lock and glimpse handling at MDT along with needed LVB updates for it. - a MDC is updated to exclude DOM bit from ELC and to handle LVB changes due to glimpse on MDT. - add CLIO locking at MDC, it uses IBITS lock to protect data at MDT and a MDC handles such locks to convert them into proper CLIO locks. Signed-off-by: Mikhal Pershin Change-Id: Id95c7a5db814b399c961a5364af33d2bf5798055 Reviewed-on: https://review.whamcloud.com/28018 Reviewed-by: Jinshan Xiong Reviewed-by: Bobi Jam --- lustre/include/lustre_dlm.h | 10 +- lustre/include/lustre_dlm_flags.h | 2 +- lustre/include/lustre_osc.h | 146 +++- lustre/include/uapi/linux/lustre/lustre_idl.h | 4 + lustre/ldlm/ldlm_internal.h | 1 + lustre/ldlm/ldlm_lockd.c | 18 +- lustre/ldlm/ldlm_request.c | 4 +- lustre/llite/file.c | 2 +- lustre/llite/llite_lib.c | 1 + lustre/lmv/lmv_obd.c | 6 +- lustre/mdc/mdc_dev.c | 987 +++++++++++++++++++++++++- lustre/mdc/mdc_internal.h | 2 + lustre/mdc/mdc_reint.c | 13 +- lustre/mdt/mdt_handler.c | 97 ++- lustre/mdt/mdt_internal.h | 41 +- lustre/mdt/mdt_io.c | 380 +++++++++- lustre/mdt/mdt_lvb.c | 254 ++++++- lustre/mdt/mdt_open.c | 6 +- lustre/mdt/mdt_reint.c | 60 +- lustre/ofd/ofd_dev.c | 8 - lustre/ofd/ofd_dlm.c | 19 - lustre/ofd/ofd_internal.h | 3 +- lustre/osc/osc_cache.c | 13 +- lustre/osc/osc_internal.h | 32 +- lustre/osc/osc_io.c | 4 +- lustre/osc/osc_lock.c | 162 ++--- lustre/osc/osc_object.c | 40 +- lustre/osc/osc_request.c | 29 +- 28 files changed, 2023 insertions(+), 321 deletions(-) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 2d0952e..5f1dc20 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -842,7 +842,9 @@ struct ldlm_lock { /** Private storage for lock user. Opaque to LDLM. */ void *l_ast_data; - + /* separate ost_lvb used mostly by Data-on-MDT for now. + * It is introduced to don't mix with layout lock data. */ + struct ost_lvb l_ost_lvb; /* * Server-side-only members. */ @@ -1011,6 +1013,12 @@ static inline bool ldlm_has_layout(struct ldlm_lock *lock) lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_LAYOUT; } +static inline bool ldlm_has_dom(struct ldlm_lock *lock) +{ + return lock->l_resource->lr_type == LDLM_IBITS && + lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_DOM; +} + static inline char * ldlm_ns_name(struct ldlm_namespace *ns) { diff --git a/lustre/include/lustre_dlm_flags.h b/lustre/include/lustre_dlm_flags.h index 7912883..7576f16 100644 --- a/lustre/include/lustre_dlm_flags.h +++ b/lustre/include/lustre_dlm_flags.h @@ -392,7 +392,7 @@ /** l_flags bits marked as "ast" bits */ #define LDLM_FL_AST_MASK (LDLM_FL_FLOCK_DEADLOCK |\ - LDLM_FL_AST_DISCARD_DATA) + LDLM_FL_DISCARD_DATA) /** l_flags bits marked as "blocked" bits */ #define LDLM_FL_BLOCKED_MASK (LDLM_FL_BLOCK_GRANTED |\ diff --git a/lustre/include/lustre_osc.h b/lustre/include/lustre_osc.h index d2563bb..f9777bb 100644 --- a/lustre/include/lustre_osc.h +++ b/lustre/include/lustre_osc.h @@ -182,6 +182,73 @@ struct osc_thread_info { struct lu_buf oti_ladvise_buf; }; +static inline __u64 osc_enq2ldlm_flags(__u32 enqflags) +{ + __u64 result = 0; + + CDEBUG(D_DLMTRACE, "flags: %x\n", enqflags); + + LASSERT((enqflags & ~CEF_MASK) == 0); + + if (enqflags & CEF_NONBLOCK) + result |= LDLM_FL_BLOCK_NOWAIT; + if (enqflags & CEF_GLIMPSE) + result |= LDLM_FL_HAS_INTENT; + if (enqflags & CEF_DISCARD_DATA) + result |= LDLM_FL_AST_DISCARD_DATA; + if (enqflags & CEF_PEEK) + result |= LDLM_FL_TEST_LOCK; + if (enqflags & CEF_LOCK_MATCH) + result |= LDLM_FL_MATCH_LOCK; + if (enqflags & CEF_LOCK_NO_EXPAND) + result |= LDLM_FL_NO_EXPANSION; + if (enqflags & CEF_SPECULATIVE) + result |= LDLM_FL_SPECULATIVE; + return result; +} + +typedef int (*osc_enqueue_upcall_f)(void *cookie, struct lustre_handle *lockh, + int rc); + +struct osc_enqueue_args { + struct obd_export *oa_exp; + enum ldlm_type oa_type; + enum ldlm_mode oa_mode; + __u64 *oa_flags; + osc_enqueue_upcall_f oa_upcall; + void *oa_cookie; + struct ost_lvb *oa_lvb; + struct lustre_handle oa_lockh; + bool oa_speculative; +}; + +/** + * Bit flags for osc_dlm_lock_at_pageoff(). + */ +enum osc_dap_flags { + /** + * Just check if the desired lock exists, it won't hold reference + * count on lock. + */ + OSC_DAP_FL_TEST_LOCK = 1 << 0, + /** + * Return the lock even if it is being canceled. + */ + OSC_DAP_FL_CANCELING = 1 << 1 +}; + +/* + * The set of operations which are different for MDC and OSC objects + */ +struct osc_object_operations { + void (*oto_build_res_name)(struct osc_object *osc, + struct ldlm_res_id *resname); + struct ldlm_lock* (*oto_dlmlock_at_pgoff)(const struct lu_env *env, + struct osc_object *obj, + pgoff_t index, + enum osc_dap_flags dap_flags); +}; + struct osc_object { struct cl_object oo_cl; struct lov_oinfo *oo_oinfo; @@ -242,9 +309,24 @@ struct osc_object { atomic_t oo_nr_ios; wait_queue_head_t oo_io_waitq; + const struct osc_object_operations *oo_obj_ops; bool oo_initialized; }; +static inline void osc_build_res_name(struct osc_object *osc, + struct ldlm_res_id *resname) +{ + return osc->oo_obj_ops->oto_build_res_name(osc, resname); +} + +static inline struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env, + struct osc_object *obj, + pgoff_t index, + enum osc_dap_flags flags) +{ + return obj->oo_obj_ops->oto_dlmlock_at_pgoff(env, obj, index, flags); +} + static inline void osc_object_lock(struct osc_object *obj) { spin_lock(&obj->oo_lock); @@ -274,6 +356,18 @@ static inline int osc_object_is_locked(struct osc_object *obj) #endif } +static inline void osc_object_set_contended(struct osc_object *obj) +{ + obj->oo_contention_time = cfs_time_current(); + /* mb(); */ + obj->oo_contended = 1; +} + +static inline void osc_object_clear_contended(struct osc_object *obj) +{ + obj->oo_contended = 0; +} + /* * Lock "micro-states" for osc layer. */ @@ -350,7 +444,8 @@ struct osc_lock { enum osc_lock_state ols_state; /** lock value block */ struct ost_lvb ols_lvb; - + /** Lockless operations to be used by lockless lock */ + const struct cl_lock_operations *ols_lockless_ops; /** * true, if ldlm_lock_addref() was called against * osc_lock::ols_lock. This is used for sanity checking. @@ -402,6 +497,10 @@ struct osc_lock { ols_speculative:1; }; +static inline int osc_lock_is_lockless(const struct osc_lock *ols) +{ + return (ols->ols_cl.cls_ops == ols->ols_lockless_ops); +} /** * Page state private for osc layer. @@ -507,10 +606,13 @@ static inline void osc_io_unplug(const struct lu_env *env, (void)osc_io_unplug0(env, cli, osc, 0); } -void osc_object_set_contended(struct osc_object *obj); -void osc_object_clear_contended(struct osc_object *obj); -int osc_object_is_contended(struct osc_object *obj); -int osc_lock_is_lockless(const struct osc_lock *olck); +typedef int (*osc_page_gang_cbt)(const struct lu_env *, struct cl_io *, + struct osc_page *, void *); +int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io, + struct osc_object *osc, pgoff_t start, pgoff_t end, + osc_page_gang_cbt cb, void *cbdata); +int osc_discard_cb(const struct lu_env *env, struct cl_io *io, + struct osc_page *ops, void *cbdata); /* osc_dev.c */ int osc_device_init(const struct lu_env *env, struct lu_device *d, @@ -535,6 +637,10 @@ int osc_attr_update(const struct lu_env *env, struct cl_object *obj, int osc_object_glimpse(const struct lu_env *env, const struct cl_object *obj, struct ost_lvb *lvb); int osc_object_invalidate(const struct lu_env *env, struct osc_object *osc); +int osc_object_is_contended(struct osc_object *obj); +int osc_object_find_cbdata(const struct lu_env *env, struct cl_object *obj, + ldlm_iterator_t iter, void *data); +int osc_object_prune(const struct lu_env *env, struct cl_object *obj); /* osc_request.c */ void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd); @@ -572,11 +678,27 @@ int osc_io_read_start(const struct lu_env *env, int osc_io_write_start(const struct lu_env *env, const struct cl_io_slice *slice); void osc_io_end(const struct lu_env *env, const struct cl_io_slice *slice); - int osc_io_fsync_start(const struct lu_env *env, const struct cl_io_slice *slice); void osc_io_fsync_end(const struct lu_env *env, const struct cl_io_slice *slice); +void osc_read_ahead_release(const struct lu_env *env, void *cbdata); + +/* osc_lock.c */ +void osc_lock_to_lockless(const struct lu_env *env, struct osc_lock *ols, + int force); +void osc_lock_wake_waiters(const struct lu_env *env, struct osc_object *osc, + struct osc_lock *oscl); +int osc_lock_enqueue_wait(const struct lu_env *env, struct osc_object *obj, + struct osc_lock *oscl); +void osc_lock_set_writer(const struct lu_env *env, const struct cl_io *io, + struct cl_object *obj, struct osc_lock *oscl); +int osc_lock_print(const struct lu_env *env, void *cookie, + lu_printer_t p, const struct cl_lock_slice *slice); +void osc_lock_cancel(const struct lu_env *env, + const struct cl_lock_slice *slice); +void osc_lock_fini(const struct lu_env *env, struct cl_lock_slice *slice); +int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data); /***************************************************************************** * @@ -825,18 +947,6 @@ struct osc_extent { unsigned int oe_mppr; }; -int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext, - int sent, int rc); -int osc_extent_release(const struct lu_env *env, struct osc_extent *ext); - -int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc, - pgoff_t start, pgoff_t end, bool discard_pages); - -typedef int (*osc_page_gang_cbt)(const struct lu_env *, struct cl_io *, - struct osc_page *, void *); -int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io, - struct osc_object *osc, pgoff_t start, pgoff_t end, - osc_page_gang_cbt cb, void *cbdata); /** @} osc */ #endif /* LUSTRE_OSC_H */ diff --git a/lustre/include/uapi/linux/lustre/lustre_idl.h b/lustre/include/uapi/linux/lustre/lustre_idl.h index 8730351..71c77e0 100644 --- a/lustre/include/uapi/linux/lustre/lustre_idl.h +++ b/lustre/include/uapi/linux/lustre/lustre_idl.h @@ -1608,6 +1608,8 @@ typedef enum { #define MDS_INODELOCK_MAXSHIFT 6 /* This FULL lock is useful to take on unlink sort of operations */ #define MDS_INODELOCK_FULL ((1<<(MDS_INODELOCK_MAXSHIFT+1))-1) +/* DOM lock shouldn't be canceled early, use this macro for ELC */ +#define MDS_INODELOCK_ELC (MDS_INODELOCK_FULL & ~MDS_INODELOCK_DOM) /* NOTE: until Lustre 1.8.7/2.1.1 the fid_ver() was packed into name[2], * but was moved into name[1] along with the OID to avoid consuming the @@ -2360,6 +2362,8 @@ enum ldlm_intent_flags { IT_QUOTA_DQACQ = 0x00000800, IT_QUOTA_CONN = 0x00001000, IT_SETXATTR = 0x00002000, + IT_GLIMPSE = 0x00004000, + IT_BRW = 0x00008000, }; struct ldlm_intent { diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 8ef4709..48400a7 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -40,6 +40,7 @@ extern struct mutex ldlm_cli_namespace_lock; extern struct list_head ldlm_cli_active_namespace_list; extern struct list_head ldlm_cli_inactive_namespace_list; extern unsigned int ldlm_cancel_unused_locks_before_replay; +extern struct kmem_cache *ldlm_glimpse_work_kmem; static inline int ldlm_namespace_nr_read(enum ldlm_side client) { diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 04e3c8e..2396a6c 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -1152,6 +1152,7 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) RETURN(rc); } +EXPORT_SYMBOL(ldlm_server_glimpse_ast); int ldlm_glimpse_locks(struct ldlm_resource *res, struct list_head *gl_work_list) @@ -1363,7 +1364,6 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, lock->l_req_extent = lock->l_policy_data.l_extent; existing_lock: - if (flags & LDLM_FL_HAS_INTENT) { /* In this case, the reply buffer is allocated deep in * local_lock_enqueue by the policy function. */ @@ -3232,11 +3232,22 @@ int ldlm_init(void) if (ldlm_interval_tree_slab == NULL) goto out_interval; +#ifdef HAVE_SERVER_SUPPORT + ldlm_glimpse_work_kmem = kmem_cache_create("ldlm_glimpse_work_kmem", + sizeof(struct ldlm_glimpse_work), + 0, 0, NULL); + if (ldlm_glimpse_work_kmem == NULL) + goto out_interval_tree; +#endif + #if LUSTRE_TRACKS_LOCK_EXP_REFS class_export_dump_hook = ldlm_dump_export_locks; #endif return 0; - +#ifdef HAVE_SERVER_SUPPORT +out_interval_tree: + kmem_cache_destroy(ldlm_interval_tree_slab); +#endif out_interval: kmem_cache_destroy(ldlm_interval_slab); out_lock: @@ -3259,4 +3270,7 @@ void ldlm_exit(void) kmem_cache_destroy(ldlm_lock_slab); kmem_cache_destroy(ldlm_interval_slab); kmem_cache_destroy(ldlm_interval_tree_slab); +#ifdef HAVE_SERVER_SUPPORT + kmem_cache_destroy(ldlm_glimpse_work_kmem); +#endif } diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index fb67a1e..2d0835f 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -1817,8 +1817,8 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING; if ((lru_flags & LDLM_LRU_FLAG_CLEANUP) && - lock->l_resource->lr_type == LDLM_EXTENT && - lock->l_granted_mode == LCK_PR) + (lock->l_resource->lr_type == LDLM_EXTENT || + ldlm_has_dom(lock)) && lock->l_granted_mode == LCK_PR) ldlm_set_discard_data(lock); /* We can't re-add to l_lru as it confuses the diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 36e3a67..f2a7f1d 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -1024,7 +1024,7 @@ int ll_merge_attr(const struct lu_env *env, struct inode *inode) cl_object_attr_unlock(obj); if (rc != 0) - GOTO(out_size_unlock, rc); + GOTO(out_size_unlock, rc = (rc == -ENODATA ? 0 : rc)); if (atime < attr->cat_atime) atime = attr->cat_atime; diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 516c4f4..003b44b 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -200,6 +200,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, data->ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_NODEVOH | OBD_CONNECT_ATTRFID | OBD_CONNECT_GRANT | OBD_CONNECT_VERSION | OBD_CONNECT_BRW_SIZE | + OBD_CONNECT_SRVLOCK | OBD_CONNECT_TRUNCLOCK| OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA | OBD_CONNECT_CANCELSET | OBD_CONNECT_FID | OBD_CONNECT_AT | OBD_CONNECT_LOV_V3 | diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 9034ba0..1d7dad7 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -1967,7 +1967,7 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx, - LCK_EX, MDS_INODELOCK_FULL, + LCK_EX, MDS_INODELOCK_ELC, MF_MDC_CANCEL_FID3); if (rc != 0) RETURN(rc); @@ -1981,7 +1981,7 @@ retry_rename: struct lmv_tgt_desc *tgt; rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx, - LCK_EX, MDS_INODELOCK_FULL, + LCK_EX, MDS_INODELOCK_ELC, MF_MDC_CANCEL_FID4); if (rc != 0) RETURN(rc); @@ -2524,7 +2524,7 @@ try_next_stripe: } rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX, - MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3); + MDS_INODELOCK_ELC, MF_MDC_CANCEL_FID3); if (rc != 0) RETURN(rc); diff --git a/lustre/mdc/mdc_dev.c b/lustre/mdc/mdc_dev.c index f3e4a81..0bb7920 100644 --- a/lustre/mdc/mdc_dev.c +++ b/lustre/mdc/mdc_dev.c @@ -37,11 +37,901 @@ #include "mdc_internal.h" -int mdc_lock_init(const struct lu_env *env, - struct cl_object *obj, struct cl_lock *lock, - const struct cl_io *unused) +static void mdc_lock_build_policy(const struct lu_env *env, + union ldlm_policy_data *policy) { - return 0; + memset(policy, 0, sizeof *policy); + policy->l_inodebits.bits = MDS_INODELOCK_DOM; +} + +int mdc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) +{ + return osc_ldlm_glimpse_ast(dlmlock, data); +} + +static void mdc_lock_build_einfo(const struct lu_env *env, + const struct cl_lock *lock, + struct osc_object *osc, + struct ldlm_enqueue_info *einfo) +{ + einfo->ei_type = LDLM_IBITS; + einfo->ei_mode = osc_cl_lock2ldlm(lock->cll_descr.cld_mode); + einfo->ei_cb_bl = mdc_ldlm_blocking_ast; + einfo->ei_cb_cp = ldlm_completion_ast; + einfo->ei_cb_gl = mdc_ldlm_glimpse_ast; + einfo->ei_cbdata = osc; /* value to be put into ->l_ast_data */ +} + +static int mdc_set_dom_lock_data(struct ldlm_lock *lock, void *data) +{ + int set = 0; + + LASSERT(lock != NULL); + + lock_res_and_lock(lock); + + if (lock->l_ast_data == NULL) + lock->l_ast_data = data; + if (lock->l_ast_data == data) + set = 1; + + unlock_res_and_lock(lock); + + return set; +} + +int mdc_dom_lock_match(struct obd_export *exp, struct ldlm_res_id *res_id, + enum ldlm_type type, union ldlm_policy_data *policy, + enum ldlm_mode mode, __u64 *flags, void *data, + struct lustre_handle *lockh, int unref) +{ + struct obd_device *obd = exp->exp_obd; + __u64 lflags = *flags; + enum ldlm_mode rc; + + ENTRY; + + rc = ldlm_lock_match(obd->obd_namespace, lflags, + res_id, type, policy, mode, lockh, unref); + if (rc == 0 || lflags & LDLM_FL_TEST_LOCK) + RETURN(rc); + + if (data != NULL) { + struct ldlm_lock *lock = ldlm_handle2lock(lockh); + + LASSERT(lock != NULL); + if (!mdc_set_dom_lock_data(lock, data)) { + ldlm_lock_decref(lockh, rc); + rc = 0; + } + LDLM_LOCK_PUT(lock); + } + RETURN(rc); +} + +/** + * Finds an existing lock covering a page with given index. + * Copy of osc_obj_dlmlock_at_pgoff() but for DoM IBITS lock. + */ +struct ldlm_lock *mdc_dlmlock_at_pgoff(const struct lu_env *env, + struct osc_object *obj, pgoff_t index, + enum osc_dap_flags dap_flags) +{ + struct osc_thread_info *info = osc_env_info(env); + struct ldlm_res_id *resname = &info->oti_resname; + union ldlm_policy_data *policy = &info->oti_policy; + struct lustre_handle lockh; + struct ldlm_lock *lock = NULL; + enum ldlm_mode mode; + __u64 flags; + + ENTRY; + + fid_build_reg_res_name(lu_object_fid(osc2lu(obj)), resname); + mdc_lock_build_policy(env, policy); + + flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; + if (dap_flags & OSC_DAP_FL_TEST_LOCK) + flags |= LDLM_FL_TEST_LOCK; + +again: + /* Next, search for already existing extent locks that will cover us */ + /* If we're trying to read, we also search for an existing PW lock. The + * VFS and page cache already protect us locally, so lots of readers/ + * writers can share a single PW lock. */ + mode = mdc_dom_lock_match(osc_export(obj), resname, LDLM_IBITS, policy, + LCK_PR | LCK_PW, &flags, obj, &lockh, + dap_flags & OSC_DAP_FL_CANCELING); + if (mode != 0) { + lock = ldlm_handle2lock(&lockh); + /* RACE: the lock is cancelled so let's try again */ + if (unlikely(lock == NULL)) + goto again; + } + + RETURN(lock); +} + +/** + * Check if page @page is covered by an extra lock or discard it. + */ +static int mdc_check_and_discard_cb(const struct lu_env *env, struct cl_io *io, + struct osc_page *ops, void *cbdata) +{ + struct osc_thread_info *info = osc_env_info(env); + struct osc_object *osc = cbdata; + pgoff_t index; + + index = osc_index(ops); + if (index >= info->oti_fn_index) { + struct ldlm_lock *tmp; + struct cl_page *page = ops->ops_cl.cpl_page; + + /* refresh non-overlapped index */ + tmp = mdc_dlmlock_at_pgoff(env, osc, index, + OSC_DAP_FL_TEST_LOCK); + if (tmp != NULL) { + info->oti_fn_index = CL_PAGE_EOF; + LDLM_LOCK_PUT(tmp); + } else if (cl_page_own(env, io, page) == 0) { + /* discard the page */ + cl_page_discard(env, io, page); + cl_page_disown(env, io, page); + } else { + LASSERT(page->cp_state == CPS_FREEING); + } + } + + info->oti_next_index = index + 1; + return CLP_GANG_OKAY; +} + +/** + * Discard pages protected by the given lock. This function traverses radix + * tree to find all covering pages and discard them. If a page is being covered + * by other locks, it should remain in cache. + * + * If error happens on any step, the process continues anyway (the reasoning + * behind this being that lock cancellation cannot be delayed indefinitely). + */ +static int mdc_lock_discard_pages(const struct lu_env *env, + struct osc_object *osc, + pgoff_t start, pgoff_t end, + bool discard) +{ + struct osc_thread_info *info = osc_env_info(env); + struct cl_io *io = &info->oti_io; + osc_page_gang_cbt cb; + int res; + int result; + + ENTRY; + + io->ci_obj = cl_object_top(osc2cl(osc)); + io->ci_ignore_layout = 1; + result = cl_io_init(env, io, CIT_MISC, io->ci_obj); + if (result != 0) + GOTO(out, result); + + cb = discard ? osc_discard_cb : mdc_check_and_discard_cb; + info->oti_fn_index = info->oti_next_index = start; + do { + res = osc_page_gang_lookup(env, io, osc, info->oti_next_index, + end, cb, (void *)osc); + if (info->oti_next_index > end) + break; + + if (res == CLP_GANG_RESCHED) + cond_resched(); + } while (res != CLP_GANG_OKAY); +out: + cl_io_fini(env, io); + RETURN(result); +} + +static int mdc_lock_flush(const struct lu_env *env, struct osc_object *obj, + pgoff_t start, pgoff_t end, enum cl_lock_mode mode, + bool discard) +{ + int result = 0; + int rc; + + ENTRY; + + if (mode == CLM_WRITE) { + result = osc_cache_writeback_range(env, obj, start, end, 1, + discard); + CDEBUG(D_CACHE, "object %p: [%lu -> %lu] %d pages were %s.\n", + obj, start, end, result, + discard ? "discarded" : "written back"); + if (result > 0) + result = 0; + } + + rc = mdc_lock_discard_pages(env, obj, start, end, discard); + if (result == 0 && rc < 0) + result = rc; + + RETURN(result); +} + +void mdc_lock_lockless_cancel(const struct lu_env *env, + const struct cl_lock_slice *slice) +{ + struct osc_lock *ols = cl2osc_lock(slice); + struct osc_object *osc = cl2osc(slice->cls_obj); + struct cl_lock_descr *descr = &slice->cls_lock->cll_descr; + int rc; + + LASSERT(ols->ols_dlmlock == NULL); + rc = mdc_lock_flush(env, osc, descr->cld_start, descr->cld_end, + descr->cld_mode, 0); + if (rc != 0) + CERROR("Pages for lockless lock %p were not purged(%d)\n", + ols, rc); + + osc_lock_wake_waiters(env, osc, ols); +} + +/** + * Helper for osc_dlm_blocking_ast() handling discrepancies between cl_lock + * and ldlm_lock caches. + */ +static int mdc_dlm_blocking_ast0(const struct lu_env *env, + struct ldlm_lock *dlmlock, + void *data, int flag) +{ + struct cl_object *obj = NULL; + int result = 0; + bool discard; + enum cl_lock_mode mode = CLM_READ; + + ENTRY; + + LASSERT(flag == LDLM_CB_CANCELING); + LASSERT(dlmlock != NULL); + + lock_res_and_lock(dlmlock); + if (dlmlock->l_granted_mode != dlmlock->l_req_mode) { + dlmlock->l_ast_data = NULL; + unlock_res_and_lock(dlmlock); + RETURN(0); + } + + discard = ldlm_is_discard_data(dlmlock); + if (dlmlock->l_granted_mode & (LCK_PW | LCK_GROUP)) + mode = CLM_WRITE; + + if (dlmlock->l_ast_data != NULL) { + obj = osc2cl(dlmlock->l_ast_data); + dlmlock->l_ast_data = NULL; + cl_object_get(obj); + } + unlock_res_and_lock(dlmlock); + + /* if l_ast_data is NULL, the dlmlock was enqueued by AGL or + * the object has been destroyed. */ + if (obj != NULL) { + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + + /* Destroy pages covered by the extent of the DLM lock */ + result = mdc_lock_flush(env, cl2osc(obj), cl_index(obj, 0), + CL_PAGE_EOF, mode, discard); + /* Losing a lock, set KMS to 0. + * NB: assumed that DOM lock covers whole data on MDT. + */ + /* losing a lock, update kms */ + lock_res_and_lock(dlmlock); + cl_object_attr_lock(obj); + attr->cat_kms = 0; + cl_object_attr_update(env, obj, attr, CAT_KMS); + cl_object_attr_unlock(obj); + unlock_res_and_lock(dlmlock); + cl_object_put(env, obj); + } + RETURN(result); +} + +int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock, + struct ldlm_lock_desc *new, void *data, int flag) +{ + int rc = 0; + + ENTRY; + + switch (flag) { + case LDLM_CB_BLOCKING: { + struct lustre_handle lockh; + + ldlm_lock2handle(dlmlock, &lockh); + rc = ldlm_cli_cancel(&lockh, LCF_ASYNC); + if (rc == -ENODATA) + rc = 0; + break; + } + case LDLM_CB_CANCELING: { + struct lu_env *env; + __u16 refcheck; + + /* + * This can be called in the context of outer IO, e.g., + * + * osc_enqueue_base()->... + * ->ldlm_prep_elc_req()->... + * ->ldlm_cancel_callback()->... + * ->osc_ldlm_blocking_ast() + * + * new environment has to be created to not corrupt outer + * context. + */ + env = cl_env_get(&refcheck); + if (IS_ERR(env)) { + rc = PTR_ERR(env); + break; + } + + rc = mdc_dlm_blocking_ast0(env, dlmlock, data, flag); + cl_env_put(env, &refcheck); + break; + } + default: + LBUG(); + } + RETURN(rc); +} + +/** + * Updates object attributes from a lock value block (lvb) received together + * with the DLM lock reply from the server. + * This can be optimized to not update attributes when lock is a result of a + * local match. + * + * Called under lock and resource spin-locks. + */ +static void mdc_lock_lvb_update(const struct lu_env *env, + struct osc_object *osc, + struct ldlm_lock *dlmlock, + struct ost_lvb *lvb) +{ + struct cl_object *obj = osc2cl(osc); + struct lov_oinfo *oinfo = osc->oo_oinfo; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + unsigned valid = CAT_BLOCKS | CAT_ATIME | CAT_CTIME | CAT_MTIME | + CAT_SIZE; + + ENTRY; + + if (lvb == NULL) { + LASSERT(dlmlock != NULL); + lvb = &dlmlock->l_ost_lvb; + } + cl_lvb2attr(attr, lvb); + + cl_object_attr_lock(obj); + if (dlmlock != NULL) { + __u64 size; + + check_res_locked(dlmlock->l_resource); + size = lvb->lvb_size; + + if (size >= oinfo->loi_kms) { + LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu," + " kms=%llu", lvb->lvb_size, size); + valid |= CAT_KMS; + attr->cat_kms = size; + } else { + LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu," + " leaving kms=%llu, end=%llu", + lvb->lvb_size, oinfo->loi_kms, + dlmlock->l_policy_data.l_extent.end); + } + } + cl_object_attr_update(env, obj, attr, valid); + cl_object_attr_unlock(obj); + EXIT; +} + +static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, + struct lustre_handle *lockh, bool lvb_update) +{ + struct ldlm_lock *dlmlock; + + ENTRY; + + dlmlock = ldlm_handle2lock_long(lockh, 0); + LASSERT(dlmlock != NULL); + + /* lock reference taken by ldlm_handle2lock_long() is + * owned by osc_lock and released in osc_lock_detach() + */ + lu_ref_add(&dlmlock->l_reference, "osc_lock", oscl); + oscl->ols_has_ref = 1; + + LASSERT(oscl->ols_dlmlock == NULL); + oscl->ols_dlmlock = dlmlock; + + /* This may be a matched lock for glimpse request, do not hold + * lock reference in that case. */ + if (!oscl->ols_glimpse) { + /* hold a refc for non glimpse lock which will + * be released in osc_lock_cancel() */ + lustre_handle_copy(&oscl->ols_handle, lockh); + ldlm_lock_addref(lockh, oscl->ols_einfo.ei_mode); + oscl->ols_hold = 1; + } + + /* Lock must have been granted. */ + lock_res_and_lock(dlmlock); + if (dlmlock->l_granted_mode == dlmlock->l_req_mode) { + struct cl_lock_descr *descr = &oscl->ols_cl.cls_lock->cll_descr; + + /* extend the lock extent, otherwise it will have problem when + * we decide whether to grant a lockless lock. */ + descr->cld_mode = osc_ldlm2cl_lock(dlmlock->l_granted_mode); + descr->cld_start = cl_index(descr->cld_obj, 0); + descr->cld_end = CL_PAGE_EOF; + + /* no lvb update for matched lock */ + if (lvb_update) { + LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY); + mdc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj), + dlmlock, NULL); + } + } + unlock_res_and_lock(dlmlock); + + LASSERT(oscl->ols_state != OLS_GRANTED); + oscl->ols_state = OLS_GRANTED; + EXIT; +} + +/** + * Lock upcall function that is executed either when a reply to ENQUEUE rpc is + * received from a server, or after osc_enqueue_base() matched a local DLM + * lock. + */ +static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh, + int errcode) +{ + struct osc_lock *oscl = cookie; + struct cl_lock_slice *slice = &oscl->ols_cl; + struct lu_env *env; + int rc; + + ENTRY; + + env = cl_env_percpu_get(); + /* should never happen, similar to osc_ldlm_blocking_ast(). */ + LASSERT(!IS_ERR(env)); + + rc = ldlm_error2errno(errcode); + if (oscl->ols_state == OLS_ENQUEUED) { + oscl->ols_state = OLS_UPCALL_RECEIVED; + } else if (oscl->ols_state == OLS_CANCELLED) { + rc = -EIO; + } else { + CERROR("Impossible state: %d\n", oscl->ols_state); + LBUG(); + } + + CDEBUG(D_INODE, "rc %d, err %d\n", rc, errcode); + if (rc == 0) + mdc_lock_granted(env, oscl, lockh, errcode == ELDLM_OK); + + /* Error handling, some errors are tolerable. */ + if (oscl->ols_locklessable && rc == -EUSERS) { + /* This is a tolerable error, turn this lock into + * lockless lock. + */ + osc_object_set_contended(cl2osc(slice->cls_obj)); + LASSERT(slice->cls_ops != oscl->ols_lockless_ops); + + /* Change this lock to ldlmlock-less lock. */ + osc_lock_to_lockless(env, oscl, 1); + oscl->ols_state = OLS_GRANTED; + rc = 0; + } else if (oscl->ols_glimpse && rc == -ENAVAIL) { + LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY); + mdc_lock_lvb_update(env, cl2osc(slice->cls_obj), + NULL, &oscl->ols_lvb); + /* Hide the error. */ + rc = 0; + } + + if (oscl->ols_owner != NULL) + cl_sync_io_note(env, oscl->ols_owner, rc); + cl_env_percpu_put(env); + + RETURN(rc); +} + +int mdc_fill_lvb(struct ptlrpc_request *req, struct ost_lvb *lvb) +{ + struct mdt_body *body; + + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + if (!body) + RETURN(-EPROTO); + + lvb->lvb_mtime = body->mbo_mtime; + lvb->lvb_atime = body->mbo_atime; + lvb->lvb_ctime = body->mbo_ctime; + lvb->lvb_blocks = body->mbo_blocks; + lvb->lvb_size = body->mbo_size; + RETURN(0); +} + +int mdc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall, + void *cookie, struct lustre_handle *lockh, + enum ldlm_mode mode, __u64 *flags, int errcode) +{ + struct osc_lock *ols = cookie; + struct ldlm_lock *lock; + int rc = 0; + + ENTRY; + + /* The request was created before ldlm_cli_enqueue call. */ + if (errcode == ELDLM_LOCK_ABORTED) { + struct ldlm_reply *rep; + + rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); + LASSERT(rep != NULL); + + rep->lock_policy_res2 = + ptlrpc_status_ntoh(rep->lock_policy_res2); + if (rep->lock_policy_res2) + errcode = rep->lock_policy_res2; + + rc = mdc_fill_lvb(req, &ols->ols_lvb); + *flags |= LDLM_FL_LVB_READY; + } else if (errcode == ELDLM_OK) { + /* Callers have references, should be valid always */ + lock = ldlm_handle2lock(lockh); + LASSERT(lock); + + rc = mdc_fill_lvb(req, &lock->l_ost_lvb); + LDLM_LOCK_PUT(lock); + *flags |= LDLM_FL_LVB_READY; + } + + /* Call the update callback. */ + rc = (*upcall)(cookie, lockh, rc < 0 ? rc : errcode); + + /* release the reference taken in ldlm_cli_enqueue() */ + if (errcode == ELDLM_LOCK_MATCHED) + errcode = ELDLM_OK; + if (errcode == ELDLM_OK && lustre_handle_is_used(lockh)) + ldlm_lock_decref(lockh, mode); + + RETURN(rc); +} + +int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, + struct osc_enqueue_args *aa, int rc) +{ + struct ldlm_lock *lock; + struct lustre_handle *lockh = &aa->oa_lockh; + enum ldlm_mode mode = aa->oa_mode; + + ENTRY; + + LASSERT(!aa->oa_speculative); + + /* ldlm_cli_enqueue is holding a reference on the lock, so it must + * be valid. */ + lock = ldlm_handle2lock(lockh); + LASSERTF(lock != NULL, + "lockh %#llx, req %p, aa %p - client evicted?\n", + lockh->cookie, req, aa); + + /* Take an additional reference so that a blocking AST that + * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed + * to arrive after an upcall has been executed by + * osc_enqueue_fini(). */ + ldlm_lock_addref(lockh, mode); + + /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */ + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2); + + /* Let CP AST to grant the lock first. */ + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); + + /* Complete obtaining the lock procedure. */ + rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1, + aa->oa_mode, aa->oa_flags, NULL, 0, + lockh, rc); + /* Complete mdc stuff. */ + rc = mdc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode, + aa->oa_flags, rc); + + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); + + ldlm_lock_decref(lockh, mode); + LDLM_LOCK_PUT(lock); + RETURN(rc); +} + +/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock + * from the 2nd OSC before a lock from the 1st one. This does not deadlock with + * other synchronous requests, however keeping some locks and trying to obtain + * others may take a considerable amount of time in a case of ost failure; and + * when other sync requests do not get released lock from a client, the client + * is excluded from the cluster -- such scenarious make the life difficult, so + * release locks just after they are obtained. */ +int mdc_enqueue_send(struct obd_export *exp, struct ldlm_res_id *res_id, + __u64 *flags, union ldlm_policy_data *policy, + struct ost_lvb *lvb, int kms_valid, + osc_enqueue_upcall_f upcall, void *cookie, + struct ldlm_enqueue_info *einfo, int async) +{ + struct obd_device *obd = exp->exp_obd; + struct lustre_handle lockh = { 0 }; + struct ptlrpc_request *req = NULL; + struct ldlm_intent *lit; + enum ldlm_mode mode; + bool glimpse = *flags & LDLM_FL_HAS_INTENT; + __u64 match_flags = *flags; + int rc; + + ENTRY; + + if (!kms_valid) + goto no_match; + + mode = einfo->ei_mode; + if (einfo->ei_mode == LCK_PR) + mode |= LCK_PW; + + if (!glimpse) + match_flags |= LDLM_FL_BLOCK_GRANTED; + mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id, + einfo->ei_type, policy, mode, &lockh, 0); + if (mode) { + struct ldlm_lock *matched; + + if (*flags & LDLM_FL_TEST_LOCK) + RETURN(ELDLM_OK); + + matched = ldlm_handle2lock(&lockh); + if (mdc_set_dom_lock_data(matched, einfo->ei_cbdata)) { + *flags |= LDLM_FL_LVB_READY; + + /* We already have a lock, and it's referenced. */ + (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED); + + ldlm_lock_decref(&lockh, mode); + LDLM_LOCK_PUT(matched); + RETURN(ELDLM_OK); + } else { + ldlm_lock_decref(&lockh, mode); + LDLM_LOCK_PUT(matched); + } + } + +no_match: + if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK)) + RETURN(-ENOLCK); + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_INTENT); + if (req == NULL) + RETURN(-ENOMEM); + + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc < 0) { + ptlrpc_request_free(req); + RETURN(rc); + } + + /* pack the intent */ + lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + lit->opc = glimpse ? IT_GLIMPSE : IT_BRW; + + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, 0); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0); + ptlrpc_request_set_replen(req); + + /* users of mdc_enqueue() can pass this flag for ldlm_lock_match() */ + *flags &= ~LDLM_FL_BLOCK_GRANTED; + /* All MDC IO locks are intents */ + *flags |= LDLM_FL_HAS_INTENT; + rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, NULL, + 0, LVB_T_NONE, &lockh, async); + if (async) { + if (!rc) { + struct osc_enqueue_args *aa; + + CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + aa->oa_exp = exp; + aa->oa_mode = einfo->ei_mode; + aa->oa_type = einfo->ei_type; + lustre_handle_copy(&aa->oa_lockh, &lockh); + aa->oa_upcall = upcall; + aa->oa_cookie = cookie; + aa->oa_speculative = false; + aa->oa_flags = flags; + aa->oa_lvb = lvb; + + req->rq_interpret_reply = + (ptlrpc_interpterer_t)mdc_enqueue_interpret; + ptlrpcd_add_req(req); + } else { + ptlrpc_req_finished(req); + } + RETURN(rc); + } + + rc = mdc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode, + flags, rc); + ptlrpc_req_finished(req); + RETURN(rc); +} + +/** + * Implementation of cl_lock_operations::clo_enqueue() method for osc + * layer. This initiates ldlm enqueue: + * + * - cancels conflicting locks early (osc_lock_enqueue_wait()); + * + * - calls osc_enqueue_base() to do actual enqueue. + * + * osc_enqueue_base() is supplied with an upcall function that is executed + * when lock is received either after a local cached ldlm lock is matched, or + * when a reply from the server is received. + * + * This function does not wait for the network communication to complete. + */ +static int mdc_lock_enqueue(const struct lu_env *env, + const struct cl_lock_slice *slice, + struct cl_io *unused, struct cl_sync_io *anchor) +{ + struct osc_thread_info *info = osc_env_info(env); + struct osc_io *oio = osc_env_io(env); + struct osc_object *osc = cl2osc(slice->cls_obj); + struct osc_lock *oscl = cl2osc_lock(slice); + struct cl_lock *lock = slice->cls_lock; + struct ldlm_res_id *resname = &info->oti_resname; + union ldlm_policy_data *policy = &info->oti_policy; + osc_enqueue_upcall_f upcall = mdc_lock_upcall; + void *cookie = (void *)oscl; + bool async = false; + int result; + + ENTRY; + + LASSERTF(ergo(oscl->ols_glimpse, lock->cll_descr.cld_mode <= CLM_READ), + "lock = %p, ols = %p\n", lock, oscl); + + if (oscl->ols_state == OLS_GRANTED) + RETURN(0); + + /* Lockahead is not supported on MDT yet */ + if (oscl->ols_flags & LDLM_FL_NO_EXPANSION) { + result = -EOPNOTSUPP; + RETURN(result); + } + + if (oscl->ols_flags & LDLM_FL_TEST_LOCK) + GOTO(enqueue_base, 0); + + if (oscl->ols_glimpse) { + LASSERT(equi(oscl->ols_speculative, anchor == NULL)); + async = true; + GOTO(enqueue_base, 0); + } + + result = osc_lock_enqueue_wait(env, osc, oscl); + if (result < 0) + GOTO(out, result); + + /* we can grant lockless lock right after all conflicting locks + * are canceled. */ + if (osc_lock_is_lockless(oscl)) { + oscl->ols_state = OLS_GRANTED; + oio->oi_lockless = 1; + RETURN(0); + } + +enqueue_base: + oscl->ols_state = OLS_ENQUEUED; + if (anchor != NULL) { + atomic_inc(&anchor->csi_sync_nr); + oscl->ols_owner = anchor; + } + + /** + * DLM lock's ast data must be osc_object; + * DLM's enqueue callback set to osc_lock_upcall() with cookie as + * osc_lock. + */ + fid_build_reg_res_name(lu_object_fid(osc2lu(osc)), resname); + mdc_lock_build_policy(env, policy); + LASSERT(!oscl->ols_speculative); + result = mdc_enqueue_send(osc_export(osc), resname, &oscl->ols_flags, + policy, &oscl->ols_lvb, + osc->oo_oinfo->loi_kms_valid, + upcall, cookie, &oscl->ols_einfo, async); + if (result == 0) { + if (osc_lock_is_lockless(oscl)) { + oio->oi_lockless = 1; + } else if (!async) { + LASSERT(oscl->ols_state == OLS_GRANTED); + LASSERT(oscl->ols_hold); + LASSERT(oscl->ols_dlmlock != NULL); + } + } +out: + if (result < 0) { + oscl->ols_state = OLS_CANCELLED; + osc_lock_wake_waiters(env, osc, oscl); + + if (anchor != NULL) + cl_sync_io_note(env, anchor, result); + } + RETURN(result); +} + +static const struct cl_lock_operations mdc_lock_lockless_ops = { + .clo_fini = osc_lock_fini, + .clo_enqueue = mdc_lock_enqueue, + .clo_cancel = mdc_lock_lockless_cancel, + .clo_print = osc_lock_print +}; + +static const struct cl_lock_operations mdc_lock_ops = { + .clo_fini = osc_lock_fini, + .clo_enqueue = mdc_lock_enqueue, + .clo_cancel = osc_lock_cancel, + .clo_print = osc_lock_print, +}; + +int mdc_lock_init(const struct lu_env *env, struct cl_object *obj, + struct cl_lock *lock, const struct cl_io *io) +{ + struct osc_lock *ols; + __u32 enqflags = lock->cll_descr.cld_enq_flags; + __u64 flags = osc_enq2ldlm_flags(enqflags); + + ENTRY; + + /* Ignore AGL for Data-on-MDT, stat returns size data */ + if ((enqflags & CEF_SPECULATIVE) != 0) + RETURN(0); + + OBD_SLAB_ALLOC_PTR_GFP(ols, osc_lock_kmem, GFP_NOFS); + if (unlikely(ols == NULL)) + RETURN(-ENOMEM); + + ols->ols_state = OLS_NEW; + spin_lock_init(&ols->ols_lock); + INIT_LIST_HEAD(&ols->ols_waiting_list); + INIT_LIST_HEAD(&ols->ols_wait_entry); + INIT_LIST_HEAD(&ols->ols_nextlock_oscobj); + ols->ols_lockless_ops = &mdc_lock_lockless_ops; + + ols->ols_flags = flags; + ols->ols_speculative = !!(enqflags & CEF_SPECULATIVE); + + if (ols->ols_flags & LDLM_FL_HAS_INTENT) { + ols->ols_flags |= LDLM_FL_BLOCK_GRANTED; + ols->ols_glimpse = 1; + } + mdc_lock_build_einfo(env, lock, cl2osc(obj), &ols->ols_einfo); + + cl_lock_slice_add(lock, &ols->ols_cl, obj, &mdc_lock_ops); + + if (!(enqflags & CEF_MUST)) + osc_lock_to_lockless(env, ols, (enqflags & CEF_NEVER)); + if (ols->ols_locklessable && !(enqflags & CEF_DISCARD_DATA)) + ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION; + + if (io->ci_type == CIT_WRITE || cl_io_is_mkwrite(io)) + osc_lock_set_writer(env, io, obj, ols); + + LDLM_DEBUG_NOLOCK("lock %p, mdc lock %p, flags %llx\n", + lock, ols, ols->ols_flags); + RETURN(0); } /** @@ -139,6 +1029,35 @@ static int mdc_io_setattr_start(const struct lu_env *env, return rc; } +static int mdc_io_read_ahead(const struct lu_env *env, + const struct cl_io_slice *ios, + pgoff_t start, struct cl_read_ahead *ra) +{ + struct osc_object *osc = cl2osc(ios->cis_obj); + struct ldlm_lock *dlmlock; + + ENTRY; + + dlmlock = mdc_dlmlock_at_pgoff(env, osc, start, 0); + if (dlmlock == NULL) + RETURN(-ENODATA); + + if (dlmlock->l_req_mode != LCK_PR) { + struct lustre_handle lockh; + + ldlm_lock2handle(dlmlock, &lockh); + ldlm_lock_addref(&lockh, LCK_PR); + ldlm_lock_decref(&lockh, dlmlock->l_req_mode); + } + + ra->cra_rpc_size = osc_cli(osc)->cl_max_pages_per_rpc; + ra->cra_end = CL_PAGE_EOF; + ra->cra_release = osc_read_ahead_release; + ra->cra_cbdata = dlmlock; + + RETURN(0); +} + static struct cl_io_operations mdc_io_ops = { .op = { [CIT_READ] = { @@ -174,6 +1093,7 @@ static struct cl_io_operations mdc_io_ops = { .cio_end = osc_io_fsync_end, }, }, + .cio_read_ahead = mdc_io_read_ahead, .cio_submit = osc_io_submit, .cio_commit_async = osc_io_commit_async, }; @@ -188,6 +1108,12 @@ int mdc_io_init(const struct lu_env *env, struct cl_object *obj, return 0; } +static void mdc_build_res_name(struct osc_object *osc, + struct ldlm_res_id *resname) +{ + fid_build_reg_res_name(lu_object_fid(osc2lu(osc)), resname); +} + /** * Implementation of struct cl_req_operations::cro_attr_set() for MDC * layer. MDC is responsible for struct obdo::o_id and struct obdo::o_seq @@ -206,16 +1132,66 @@ static void mdc_req_attr_set(const struct lu_env *env, struct cl_object *obj, if (flags & OBD_MD_FLID) attr->cra_oa->o_valid |= OBD_MD_FLID; + + if (flags & OBD_MD_FLHANDLE) { + struct ldlm_lock *lock; /* _some_ lock protecting @apage */ + struct osc_page *opg; + + opg = osc_cl_page_osc(attr->cra_page, cl2osc(obj)); + lock = mdc_dlmlock_at_pgoff(env, cl2osc(obj), osc_index(opg), + OSC_DAP_FL_TEST_LOCK | OSC_DAP_FL_CANCELING); + if (lock == NULL && !opg->ops_srvlock) { + struct ldlm_resource *res; + struct ldlm_res_id *resname; + + CL_PAGE_DEBUG(D_ERROR, env, attr->cra_page, + "uncovered page!\n"); + + resname = &osc_env_info(env)->oti_resname; + mdc_build_res_name(cl2osc(obj), resname); + res = ldlm_resource_get( + osc_export(cl2osc(obj))->exp_obd->obd_namespace, + NULL, resname, LDLM_IBITS, 0); + ldlm_resource_dump(D_ERROR, res); + + libcfs_debug_dumpstack(NULL); + LBUG(); + } + + /* check for lockless io. */ + if (lock != NULL) { + attr->cra_oa->o_handle = lock->l_remote_handle; + attr->cra_oa->o_valid |= OBD_MD_FLHANDLE; + LDLM_LOCK_PUT(lock); + } + } +} + +static int mdc_attr_get(const struct lu_env *env, struct cl_object *obj, + struct cl_attr *attr) +{ + struct lov_oinfo *oinfo = cl2osc(obj)->oo_oinfo; + + if (OST_LVB_IS_ERR(oinfo->loi_lvb.lvb_blocks)) + return OST_LVB_GET_ERR(oinfo->loi_lvb.lvb_blocks); + + return osc_attr_get(env, obj, attr); } static const struct cl_object_operations mdc_ops = { .coo_page_init = osc_page_init, .coo_lock_init = mdc_lock_init, .coo_io_init = mdc_io_init, - .coo_attr_get = osc_attr_get, + .coo_attr_get = mdc_attr_get, .coo_attr_update = osc_attr_update, .coo_glimpse = osc_object_glimpse, .coo_req_attr_set = mdc_req_attr_set, + .coo_prune = osc_object_prune, +}; + +static const struct osc_object_operations mdc_object_ops = { + .oto_build_res_name = mdc_build_res_name, + .oto_dlmlock_at_pgoff = mdc_dlmlock_at_pgoff, }; static int mdc_object_init(const struct lu_env *env, struct lu_object *obj, @@ -258,6 +1234,7 @@ struct lu_object *mdc_object_alloc(const struct lu_env *env, lu_object_init(obj, NULL, dev); osc->oo_cl.co_ops = &mdc_ops; obj->lo_ops = &mdc_lu_obj_ops; + osc->oo_obj_ops = &mdc_object_ops; osc->oo_initialized = false; } else { obj = NULL; diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index 4dc7a5e..69b6ed2 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -167,5 +167,7 @@ static inline unsigned long hash_x_index(__u64 hash, int hash64) /* mdc_dev.c */ extern struct lu_device_type mdc_device_type; +int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock, + struct ldlm_lock_desc *new, void *data, int flag); #endif diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index fb59428..7d12863 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -273,9 +273,10 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data, MDS_INODELOCK_UPDATE); if ((op_data->op_flags & MF_MDC_CANCEL_FID3) && (fid_is_sane(&op_data->op_fid3))) + /* don't cancel DoM lock which may cause data flush */ count += mdc_resource_get_unused(exp, &op_data->op_fid3, &cancels, LCK_EX, - MDS_INODELOCK_FULL); + MDS_INODELOCK_ELC); req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_REINT_UNLINK); if (req == NULL) { @@ -376,11 +377,11 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data, count += mdc_resource_get_unused(exp, &op_data->op_fid3, &cancels, LCK_EX, MDS_INODELOCK_LOOKUP); - if ((op_data->op_flags & MF_MDC_CANCEL_FID4) && - (fid_is_sane(&op_data->op_fid4))) - count += mdc_resource_get_unused(exp, &op_data->op_fid4, - &cancels, LCK_EX, - MDS_INODELOCK_FULL); + if ((op_data->op_flags & MF_MDC_CANCEL_FID4) && + (fid_is_sane(&op_data->op_fid4))) + count += mdc_resource_get_unused(exp, &op_data->op_fid4, + &cancels, LCK_EX, + MDS_INODELOCK_ELC); req = ptlrpc_request_alloc(class_exp2cliimp(exp), op_data->op_cli_flags & CLI_MIGRATE ? diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 4916d40..7bd6531 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -676,9 +676,6 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, else b->mbo_blocks = 1; b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; - } else if (lov_pattern(ma->ma_lmm->lmm_pattern) == - LOV_PATTERN_MDT) { - b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; } } @@ -3275,13 +3272,14 @@ enum mdt_it_code { MDT_IT_GETXATTR, MDT_IT_LAYOUT, MDT_IT_QUOTA, - MDT_IT_NR + MDT_IT_GLIMPSE, + MDT_IT_BRW, + MDT_IT_NR }; static int mdt_intent_getattr(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **, - __u64); + struct mdt_thread_info *info, + struct ldlm_lock **, __u64); static int mdt_intent_getxattr(enum mdt_it_code opcode, struct mdt_thread_info *info, @@ -3296,6 +3294,20 @@ static int mdt_intent_reint(enum mdt_it_code opcode, struct mdt_thread_info *info, struct ldlm_lock **, __u64); +static int mdt_intent_glimpse(enum mdt_it_code opcode, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, __u64 flags) +{ + return mdt_glimpse_enqueue(info, info->mti_mdt->mdt_namespace, + lockp, flags); +} +static int mdt_intent_brw(enum mdt_it_code opcode, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, __u64 flags) +{ + return mdt_brw_enqueue(info, info->mti_mdt->mdt_namespace, + lockp, flags); +} static struct mdt_it_flavor { const struct req_format *it_fmt; @@ -3367,14 +3379,24 @@ static struct mdt_it_flavor { .it_fmt = &RQF_LDLM_INTENT_LAYOUT, .it_flags = 0, .it_act = mdt_intent_layout - } + }, + [MDT_IT_GLIMPSE] = { + .it_fmt = &RQF_LDLM_INTENT, + .it_flags = 0, + .it_act = mdt_intent_glimpse, + }, + [MDT_IT_BRW] = { + .it_fmt = &RQF_LDLM_INTENT, + .it_flags = 0, + .it_act = mdt_intent_brw, + }, + }; -static int -mdt_intent_lock_replace(struct mdt_thread_info *info, - struct ldlm_lock **lockp, - struct mdt_lock_handle *lh, - __u64 flags, int result) +int mdt_intent_lock_replace(struct mdt_thread_info *info, + struct ldlm_lock **lockp, + struct mdt_lock_handle *lh, + __u64 flags, int result) { struct ptlrpc_request *req = mdt_info_req(info); struct ldlm_lock *lock = *lockp; @@ -3450,6 +3472,8 @@ mdt_intent_lock_replace(struct mdt_thread_info *info, new_lock->l_export = class_export_lock_get(req->rq_export, new_lock); new_lock->l_blocking_ast = lock->l_blocking_ast; new_lock->l_completion_ast = lock->l_completion_ast; + if (ldlm_has_dom(new_lock)) + new_lock->l_glimpse_ast = ldlm_server_glimpse_ast; new_lock->l_remote_handle = lock->l_remote_handle; new_lock->l_flags &= ~LDLM_FL_LOCAL; @@ -3465,10 +3489,9 @@ mdt_intent_lock_replace(struct mdt_thread_info *info, RETURN(ELDLM_LOCK_REPLACED); } -static void mdt_intent_fixup_resent(struct mdt_thread_info *info, - struct ldlm_lock *new_lock, - struct mdt_lock_handle *lh, - __u64 flags) +void mdt_intent_fixup_resent(struct mdt_thread_info *info, + struct ldlm_lock *new_lock, + struct mdt_lock_handle *lh, __u64 flags) { struct ptlrpc_request *req = mdt_info_req(info); struct ldlm_request *dlmreq; @@ -3883,6 +3906,12 @@ static int mdt_intent_code(enum ldlm_intent_flags itcode) case IT_QUOTA_CONN: rc = MDT_IT_QUOTA; break; + case IT_GLIMPSE: + rc = MDT_IT_GLIMPSE; + break; + case IT_BRW: + rc = MDT_IT_BRW; + break; default: CERROR("Unknown intent opcode: 0x%08x\n", itcode); rc = -EINVAL; @@ -3963,6 +3992,7 @@ static int mdt_intent_policy(struct ldlm_namespace *ns, struct ptlrpc_request *req = req_cookie; struct ldlm_intent *it; struct req_capsule *pill; + const struct ldlm_lock_desc *ldesc; int rc; ENTRY; @@ -3972,11 +4002,12 @@ static int mdt_intent_policy(struct ldlm_namespace *ns, tsi = tgt_ses_info(req->rq_svc_thread->t_env); info = tsi2mdt_info(tsi); - LASSERT(info != NULL); - pill = info->mti_pill; - LASSERT(pill->rc_req == req); + LASSERT(info != NULL); + pill = info->mti_pill; + LASSERT(pill->rc_req == req); + ldesc = &info->mti_dlm_req->lock_desc; - if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) { + if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) { req_capsule_extend(pill, &RQF_LDLM_INTENT_BASIC); it = req_capsule_client_get(pill, &RMF_LDLM_INTENT); if (it != NULL) { @@ -3989,20 +4020,18 @@ static int mdt_intent_policy(struct ldlm_namespace *ns, * ibits corrupted somewhere in mdt_intent_opc(). * The case for client miss to set ibits has been * processed by others. */ - LASSERT(ergo(info->mti_dlm_req->lock_desc.l_resource.\ - lr_type == LDLM_IBITS, - info->mti_dlm_req->lock_desc.\ - l_policy_data.l_inodebits.bits != 0)); - } else - rc = err_serious(-EFAULT); - } else { - /* No intent was provided */ - LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE); + LASSERT(ergo(ldesc->l_resource.lr_type == LDLM_IBITS, + ldesc->l_policy_data.l_inodebits.bits != 0)); + } else { + rc = err_serious(-EFAULT); + } + } else { + /* No intent was provided */ req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0); - rc = req_capsule_server_pack(pill); - if (rc) - rc = err_serious(rc); - } + rc = req_capsule_server_pack(pill); + if (rc) + rc = err_serious(rc); + } mdt_thread_info_fini(info); RETURN(rc); } diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index bb8e2df..f81cfa1 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -767,6 +767,13 @@ void mdt_thread_info_init(struct ptlrpc_request *req, struct mdt_thread_info *mti); void mdt_thread_info_fini(struct mdt_thread_info *mti); struct mdt_thread_info *tsi2mdt_info(struct tgt_session_info *tsi); +void mdt_intent_fixup_resent(struct mdt_thread_info *info, + struct ldlm_lock *new_lock, + struct mdt_lock_handle *lh, __u64 flags); +int mdt_intent_lock_replace(struct mdt_thread_info *info, + struct ldlm_lock **lockp, + struct mdt_lock_handle *lh, + __u64 flags, int result); int mdt_hsm_attr_set(struct mdt_thread_info *info, struct mdt_object *obj, const struct md_hsm *mh); @@ -1010,6 +1017,11 @@ static inline int is_identity_get_disabled(struct upcall_cache *cache) int mdt_blocking_ast(struct ldlm_lock*, struct ldlm_lock_desc*, void*, int); +static int mdt_dom_glimpse_ast(struct ldlm_lock *lock, void *reqp) +{ + return -ELDLM_NO_LOCK_DATA; +} + /* Issues dlm lock on passed @ns, @f stores it lock handle into @lh. */ static inline int mdt_fid_lock(struct ldlm_namespace *ns, struct lustre_handle *lh, enum ldlm_mode mode, @@ -1018,14 +1030,16 @@ static inline int mdt_fid_lock(struct ldlm_namespace *ns, __u64 flags, const __u64 *client_cookie) { int rc; + bool glimpse = policy->l_inodebits.bits & MDS_INODELOCK_DOM; LASSERT(ns != NULL); LASSERT(lh != NULL); rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy, mode, &flags, mdt_blocking_ast, - ldlm_completion_ast, NULL, NULL, 0, - LVB_T_NONE, client_cookie, lh); + ldlm_completion_ast, + glimpse ? mdt_dom_glimpse_ast : NULL, + NULL, 0, LVB_T_NONE, client_cookie, lh); return rc == ELDLM_OK ? 0 : -EIO; } @@ -1058,6 +1072,9 @@ static inline enum ldlm_mode mdt_mdl_mode2dlm_mode(mdl_mode_t mode) /* mdt_lvb.c */ extern struct ldlm_valblock_ops mdt_lvbo; +int mdt_dom_lvb_is_valid(struct ldlm_resource *res); +int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, + struct ptlrpc_request *req, bool increase_only); void mdt_enable_cos(struct mdt_device *, int); int mdt_cos_is_enabled(struct mdt_device *); @@ -1139,9 +1156,29 @@ int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, struct niobuf_remote *rnb, int npages, struct niobuf_local *lnb, int old_rc); int mdt_punch_hdl(struct tgt_session_info *tsi); +int mdt_glimpse_enqueue(struct mdt_thread_info *mti, struct ldlm_namespace *ns, + struct ldlm_lock **lockp, __u64 flags); +int mdt_brw_enqueue(struct mdt_thread_info *info, struct ldlm_namespace *ns, + struct ldlm_lock **lockp, __u64 flags); +void mdt_dom_discard_data(struct mdt_thread_info *info, + const struct lu_fid *fid); +int mdt_dom_disk_lvbo_update(const struct lu_env *env, struct mdt_object *mo, + struct ldlm_resource *res, bool increase_only); +void mdt_dom_obj_lvb_update(const struct lu_env *env, struct mdt_object *mo, + bool increase_only); +int mdt_dom_lvb_alloc(struct ldlm_resource *res); + +static inline void mdt_dom_check_and_discard(struct mdt_thread_info *mti, + struct mdt_object *mo) +{ + if (lu_object_is_dying(&mo->mot_header) && + S_ISREG(lu_object_attr(&mo->mot_obj))) + mdt_dom_discard_data(mti, mdt_object_fid(mo)); +} /* grants */ long mdt_grant_connect(const struct lu_env *env, struct obd_export *exp, u64 want, bool conservative); +extern struct kmem_cache *ldlm_glimpse_work_kmem; #endif /* _MDT_INTERNAL_H */ diff --git a/lustre/mdt/mdt_io.c b/lustre/mdt/mdt_io.c index 954713e..2548612 100644 --- a/lustre/mdt/mdt_io.c +++ b/lustre/mdt/mdt_io.c @@ -383,6 +383,25 @@ out: RETURN(rc); } +void mdt_dom_obj_lvb_update(const struct lu_env *env, struct mdt_object *mo, + bool increase_only) +{ + struct mdt_device *mdt = mdt_dev(mo->mot_obj.lo_dev); + struct ldlm_res_id resid; + struct ldlm_resource *res; + + fid_build_reg_res_name(mdt_object_fid(mo), &resid); + res = ldlm_resource_get(mdt->mdt_namespace, NULL, &resid, + LDLM_IBITS, 1); + if (IS_ERR(res)) + return; + + /* Update lvbo data if exists. */ + if (mdt_dom_lvb_is_valid(res)) + mdt_dom_disk_lvbo_update(env, mo, res, increase_only); + ldlm_resource_putref(res); +} + int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, struct niobuf_remote *rnb, int npages, @@ -423,6 +442,7 @@ int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, else obdo_from_la(oa, la, LA_GID | LA_UID); + mdt_dom_obj_lvb_update(env, mo, false); /* don't report overquota flag if we failed before reaching * commit */ if (old_rc == 0 && (rc == 0 || rc == -EDQUOT)) { @@ -445,6 +465,11 @@ int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, OBD_MD_FLGRPQUOTA; } } else if (cmd == OBD_BRW_READ) { + /* If oa != NULL then mdt_preprw_read updated the inode + * atime and we should update the lvb so that other glimpses + * will also get the updated value. bug 5972 */ + if (oa) + mdt_dom_obj_lvb_update(env, mo, true); rc = mdt_commitrw_read(env, mdt, mo, objcount, npages, lnb); if (old_rc) rc = old_rc; @@ -584,6 +609,7 @@ int mdt_punch_hdl(struct tgt_session_info *tsi) if (rc) GOTO(out_put, rc); + mdt_dom_obj_lvb_update(tsi->tsi_env, mo, false); mdt_io_counter_incr(tsi->tsi_exp, LPROC_MDT_IO_PUNCH, tsi->tsi_jobid, 1); EXIT; @@ -594,21 +620,347 @@ out_unlock: mdt_save_lock(info, &lh, LCK_PW, rc); out: mdt_thread_info_fini(info); - if (rc == 0) { - struct ldlm_resource *res; - - /* we do not call this before to avoid lu_object_find() in - * ->lvbo_update() holding another reference on the object. - * otherwise concurrent destroy can make the object unavailable - * for 2nd lu_object_find() waiting for the first reference - * to go... deadlock! */ - res = ldlm_resource_get(ns, NULL, &tsi->tsi_resid, - LDLM_IBITS, 0); - if (!IS_ERR(res)) { - ldlm_res_lvbo_update(res, NULL, 0); - ldlm_resource_putref(res); - } + return rc; +} + +/** + * MDT glimpse for Data-on-MDT + * + * If there is write lock on client then function issues glimpse_ast to get + * an actual size from that client. + * + */ +int mdt_do_glimpse(const struct lu_env *env, struct ldlm_namespace *ns, + struct ldlm_resource *res) +{ + union ldlm_policy_data policy; + struct lustre_handle lockh; + enum ldlm_mode mode; + struct ldlm_lock *lock; + struct ldlm_glimpse_work *gl_work; + struct list_head gl_list; + int rc; + + ENTRY; + + /* There can be only one write lock covering data, try to match it. */ + policy.l_inodebits.bits = MDS_INODELOCK_DOM; + mode = ldlm_lock_match(ns, LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK, + &res->lr_name, LDLM_IBITS, &policy, + LCK_PW, &lockh, 0); + + /* There is no PW lock on this object; finished. */ + if (mode == 0) + RETURN(0); + + lock = ldlm_handle2lock(&lockh); + if (lock == NULL) + RETURN(0); + + /* + * This check is for lock taken in mdt_reint_unlink() that does + * not have l_glimpse_ast set. So the logic is: if there is a lock + * with no l_glimpse_ast set, this object is being destroyed already. + * Hence, if you are grabbing DLM locks on the server, always set + * non-NULL glimpse_ast (e.g., ldlm_request.c::ldlm_glimpse_ast()). + */ + if (lock->l_glimpse_ast == NULL) { + LDLM_DEBUG(lock, "no l_glimpse_ast"); + GOTO(out, rc = -ENOENT); } + + OBD_SLAB_ALLOC_PTR_GFP(gl_work, ldlm_glimpse_work_kmem, GFP_ATOMIC); + if (!gl_work) + GOTO(out, rc = -ENOMEM); + + /* Populate the gl_work structure. + * Grab additional reference on the lock which will be released in + * ldlm_work_gl_ast_lock() */ + gl_work->gl_lock = LDLM_LOCK_GET(lock); + /* The glimpse callback is sent to one single IO lock. As a result, + * the gl_work list is just composed of one element */ + INIT_LIST_HEAD(&gl_list); + list_add_tail(&gl_work->gl_list, &gl_list); + /* There is actually no need for a glimpse descriptor when glimpsing + * IO locks */ + gl_work->gl_desc = NULL; + /* the ldlm_glimpse_work structure is allocated on the stack */ + gl_work->gl_flags = LDLM_GL_WORK_SLAB_ALLOCATED; + + ldlm_glimpse_locks(res, &gl_list); /* this will update the LVB */ + + /* If the list is not empty, we failed to glimpse a lock and + * must clean it up. Usually due to a race with unlink.*/ + if (!list_empty(&gl_list)) { + LDLM_LOCK_RELEASE(lock); + OBD_SLAB_FREE_PTR(gl_work, ldlm_glimpse_work_kmem); + } + rc = 0; + EXIT; +out: + LDLM_LOCK_PUT(lock); return rc; } +static void mdt_lvb2body(struct ldlm_resource *res, struct mdt_body *mb) +{ + struct ost_lvb *res_lvb; + + lock_res(res); + res_lvb = res->lr_lvb_data; + mb->mbo_size = res_lvb->lvb_size; + mb->mbo_blocks = res_lvb->lvb_blocks; + mb->mbo_mtime = res_lvb->lvb_mtime; + mb->mbo_ctime = res_lvb->lvb_ctime; + mb->mbo_atime = res_lvb->lvb_atime; + + CDEBUG(D_DLMTRACE, "size %llu\n", res_lvb->lvb_size); + + mb->mbo_valid |= OBD_MD_FLATIME | OBD_MD_FLCTIME | OBD_MD_FLMTIME | + OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + unlock_res(res); +} + +/** + * MDT glimpse for Data-on-MDT + * + * This function is called when MDT get attributes for the DoM object. + * If there is write lock on client then function issues glimpse_ast to get + * an actual size from that client. + */ +int mdt_dom_object_size(const struct lu_env *env, struct mdt_device *mdt, + const struct lu_fid *fid, struct mdt_body *mb, + bool dom_lock) +{ + struct ldlm_res_id resid; + struct ldlm_resource *res; + int rc = 0; + + ENTRY; + + fid_build_reg_res_name(fid, &resid); + res = ldlm_resource_get(mdt->mdt_namespace, NULL, &resid, + LDLM_IBITS, 1); + if (IS_ERR(res) || res->lr_lvb_data == NULL) + RETURN(-ENOENT); + + /* if there is no DOM bit in the lock then glimpse is needed + * to return valid size */ + if (!dom_lock) { + rc = mdt_do_glimpse(env, mdt->mdt_namespace, res); + if (rc < 0) + GOTO(out, rc); + } + + /* Update lvbo data if DoM lock returned or if LVB is not yet valid. */ + if (dom_lock || !mdt_dom_lvb_is_valid(res)) + mdt_dom_lvbo_update(res, NULL, NULL, false); + + mdt_lvb2body(res, mb); +out: + ldlm_resource_putref(res); + RETURN(rc); +} + +/** + * MDT DoM lock intent policy (glimpse) + * + * Intent policy is called when lock has an intent, for DoM file that + * means glimpse lock and policy fills Lock Value Block (LVB). + * + * If already granted lock is found it will be placed in \a lockp and + * returned back to caller function. + * + * \param[in] tsi session info + * \param[in,out] lockp pointer to the lock + * \param[in] flags LDLM flags + * + * \retval ELDLM_LOCK_REPLACED if already granted lock was found + * and placed in \a lockp + * \retval ELDLM_LOCK_ABORTED in other cases except error + * \retval negative value on error + */ +int mdt_glimpse_enqueue(struct mdt_thread_info *mti, struct ldlm_namespace *ns, + struct ldlm_lock **lockp, __u64 flags) +{ + struct ldlm_lock *lock = *lockp; + struct ldlm_resource *res = lock->l_resource; + ldlm_processing_policy policy; + struct ldlm_reply *rep; + struct mdt_body *mbo; + int rc; + + ENTRY; + + policy = ldlm_get_processing_policy(res); + LASSERT(policy != NULL); + + req_capsule_set_size(mti->mti_pill, &RMF_MDT_MD, RCL_SERVER, 0); + req_capsule_set_size(mti->mti_pill, &RMF_ACL, RCL_SERVER, 0); + rc = req_capsule_server_pack(mti->mti_pill); + if (rc) + RETURN(err_serious(rc)); + + rep = req_capsule_server_get(mti->mti_pill, &RMF_DLM_REP); + if (rep == NULL) + RETURN(-EPROTO); + + mbo = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY); + if (mbo == NULL) + RETURN(-EPROTO); + + lock_res(res); + /* Check if this is a resend case (MSG_RESENT is set on RPC) and a + * lock was found by ldlm_handle_enqueue(); if so no need to grant + * it again. */ + if (flags & LDLM_FL_RESENT) { + rc = LDLM_ITER_CONTINUE; + } else { + __u64 tmpflags = 0; + enum ldlm_error err; + + rc = policy(lock, &tmpflags, 0, &err, NULL); + check_res_locked(res); + } + unlock_res(res); + + /* The lock met with no resistance; we're finished. */ + if (rc == LDLM_ITER_CONTINUE) { + GOTO(fill_mbo, rc = ELDLM_LOCK_REPLACED); + } else if (flags & LDLM_FL_BLOCK_NOWAIT) { + /* LDLM_FL_BLOCK_NOWAIT means it is for AGL. Do not send glimpse + * callback for glimpse size. The real size user will trigger + * the glimpse callback when necessary. */ + GOTO(fill_mbo, rc = ELDLM_LOCK_ABORTED); + } + + rc = mdt_do_glimpse(mti->mti_env, ns, res); + if (rc == -ENOENT) { + /* We are racing with unlink(); just return -ENOENT */ + rep->lock_policy_res2 = ptlrpc_status_hton(-ENOENT); + rc = 0; + } else if (rc == -EINVAL) { + /* this is possible is client lock has been cancelled but + * still exists on server. If that lock was found on server + * as only conflicting lock then the client has already + * size authority and glimpse is not needed. */ + CDEBUG(D_DLMTRACE, "Glimpse from the client owning lock\n"); + rc = 0; + } else if (rc < 0) { + RETURN(rc); + } + rc = ELDLM_LOCK_ABORTED; +fill_mbo: + /* LVB can be without valid data in case of DOM */ + if (!mdt_dom_lvb_is_valid(res)) + mdt_dom_lvbo_update(res, lock, NULL, false); + mdt_lvb2body(res, mbo); + RETURN(rc); +} + +int mdt_brw_enqueue(struct mdt_thread_info *mti, struct ldlm_namespace *ns, + struct ldlm_lock **lockp, __u64 flags) +{ + struct tgt_session_info *tsi = tgt_ses_info(mti->mti_env); + struct lu_fid *fid = &tsi->tsi_fid; + struct ldlm_lock *lock = *lockp; + struct ldlm_resource *res = lock->l_resource; + struct ldlm_reply *rep; + struct mdt_body *mbo; + struct mdt_lock_handle *lhc = &mti->mti_lh[MDT_LH_RMT]; + struct mdt_object *mo; + int rc = 0; + + ENTRY; + + /* Get lock from request for possible resent case. */ + mdt_intent_fixup_resent(mti, *lockp, lhc, flags); + req_capsule_set_size(mti->mti_pill, &RMF_MDT_MD, RCL_SERVER, 0); + req_capsule_set_size(mti->mti_pill, &RMF_ACL, RCL_SERVER, 0); + rc = req_capsule_server_pack(mti->mti_pill); + if (rc) + RETURN(err_serious(rc)); + + rep = req_capsule_server_get(mti->mti_pill, &RMF_DLM_REP); + if (rep == NULL) + RETURN(-EPROTO); + + mbo = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY); + if (mbo == NULL) + RETURN(-EPROTO); + + fid_extract_from_res_name(fid, &res->lr_name); + mo = mdt_object_find(mti->mti_env, mti->mti_mdt, fid); + if (unlikely(IS_ERR(mo))) + RETURN(PTR_ERR(mo)); + + if (!mdt_object_exists(mo)) + GOTO(out, rc = -ENOENT); + + if (mdt_object_remote(mo)) + GOTO(out, rc = -EPROTO); + + /* resent case */ + if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) { + mdt_lock_handle_init(lhc); + mdt_lock_reg_init(lhc, (*lockp)->l_req_mode); + /* This will block MDT thread but it should be fine until + * client caches small amount of data for DoM, which should be + * smaller than one BRW RPC and should be able to be + * piggybacked by lock cancel RPC. + * If the client could hold the lock too long, this code can be + * revised to call mdt_object_lock_try(). And if fails, it will + * return ELDLM_OK here and fall back into normal lock enqueue + * process. + */ + rc = mdt_object_lock(mti, mo, lhc, MDS_INODELOCK_DOM); + if (rc) + GOTO(out, rc); + } + + if (!mdt_dom_lvb_is_valid(res)) { + rc = mdt_dom_lvb_alloc(res); + if (rc) + GOTO(out_fail, rc); + mdt_dom_disk_lvbo_update(mti->mti_env, mo, res, false); + } + mdt_lvb2body(res, mbo); +out_fail: + rep->lock_policy_res2 = clear_serious(rc); + if (rep->lock_policy_res2) { + lhc->mlh_reg_lh.cookie = 0ull; + GOTO(out, rc = ELDLM_LOCK_ABORTED); + } + + rc = mdt_intent_lock_replace(mti, lockp, lhc, flags, rc); +out: + mdt_object_put(mti->mti_env, mo); + RETURN(rc); +} + +void mdt_dom_discard_data(struct mdt_thread_info *info, + const struct lu_fid *fid) +{ + struct mdt_device *mdt = info->mti_mdt; + union ldlm_policy_data *policy = &info->mti_policy; + struct ldlm_res_id *res_id = &info->mti_res_id; + struct lustre_handle dom_lh; + __u64 flags = LDLM_FL_AST_DISCARD_DATA; + __u64 rc = 0; + + policy->l_inodebits.bits = MDS_INODELOCK_DOM; + policy->l_inodebits.try_bits = 0; + fid_build_reg_res_name(fid, res_id); + + /* Tell the clients that the object is gone now and that they should + * throw away any cached pages. */ + rc = ldlm_cli_enqueue_local(mdt->mdt_namespace, res_id, LDLM_IBITS, + policy, LCK_PW, &flags, ldlm_blocking_ast, + ldlm_completion_ast, NULL, NULL, 0, + LVB_T_NONE, NULL, &dom_lh); + + /* We only care about the side-effects, just drop the lock. */ + if (rc == ELDLM_OK) + ldlm_lock_decref(&dom_lh, LCK_PW); +} + diff --git a/lustre/mdt/mdt_lvb.c b/lustre/mdt/mdt_lvb.c index 31c2438..1896039 100644 --- a/lustre/mdt/mdt_lvb.c +++ b/lustre/mdt/mdt_lvb.c @@ -30,7 +30,7 @@ */ #define DEBUG_SUBSYSTEM S_MDS - +#include #include "mdt_internal.h" /* Called with res->lr_lvb_sem held */ @@ -46,17 +46,216 @@ static int mdt_lvbo_init(struct ldlm_resource *res) /* call lvbo init function of quota master */ return qmt_hdls.qmth_lvbo_init(mdt->mdt_qmt_dev, res); } + return 0; +} + +int mdt_dom_lvb_alloc(struct ldlm_resource *res) +{ + struct ost_lvb *lvb; + + mutex_lock(&res->lr_lvb_mutex); + if (res->lr_lvb_data == NULL) { + OBD_ALLOC_PTR(lvb); + if (lvb == NULL) { + mutex_unlock(&res->lr_lvb_mutex); + return -ENOMEM; + } + res->lr_lvb_data = lvb; + res->lr_lvb_len = sizeof(*lvb); + + /* Store error in LVB to inidicate it has no data yet. + */ + OST_LVB_SET_ERR(lvb->lvb_blocks, -ENODATA); + } + mutex_unlock(&res->lr_lvb_mutex); return 0; } -static int mdt_lvbo_update(struct ldlm_resource *res, - struct ldlm_lock *lock, - struct ptlrpc_request *req, - int increase_only) +int mdt_dom_lvb_is_valid(struct ldlm_resource *res) +{ + struct ost_lvb *res_lvb = res->lr_lvb_data; + + return !(res_lvb == NULL || OST_LVB_IS_ERR(res_lvb->lvb_blocks)); +} + +int mdt_dom_disk_lvbo_update(const struct lu_env *env, struct mdt_object *mo, + struct ldlm_resource *res, bool increase_only) +{ + struct mdt_thread_info *info = mdt_th_info(env); + const struct lu_fid *fid = mdt_object_fid(mo); + struct ost_lvb *lvb; + struct md_attr *ma; + int rc = 0; + + ENTRY; + + lvb = res->lr_lvb_data; + LASSERT(lvb); + + if (!mdt_object_exists(mo) || mdt_object_remote(mo)) + RETURN(-ENOENT); + + ma = &info->mti_attr; + ma->ma_valid = 0; + ma->ma_need = MA_INODE; + rc = mo_attr_get(env, mdt_object_child(mo), ma); + if (rc) + RETURN(rc); + + lock_res(res); + if (ma->ma_attr.la_size > lvb->lvb_size || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size from disk: " + "%llu -> %llu\n", PFID(fid), + lvb->lvb_size, ma->ma_attr.la_size); + lvb->lvb_size = ma->ma_attr.la_size; + } + + if (ma->ma_attr.la_mtime > lvb->lvb_mtime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime from disk: " + "%llu -> %llu\n", PFID(fid), + lvb->lvb_mtime, ma->ma_attr.la_mtime); + lvb->lvb_mtime = ma->ma_attr.la_mtime; + } + if (ma->ma_attr.la_atime > lvb->lvb_atime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime from disk: " + "%llu -> %llu\n", PFID(fid), + lvb->lvb_atime, ma->ma_attr.la_atime); + lvb->lvb_atime = ma->ma_attr.la_atime; + } + if (ma->ma_attr.la_ctime > lvb->lvb_ctime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime from disk: " + "%llu -> %llu\n", PFID(fid), + lvb->lvb_ctime, ma->ma_attr.la_ctime); + lvb->lvb_ctime = ma->ma_attr.la_ctime; + } + if (ma->ma_attr.la_blocks > lvb->lvb_blocks || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks from disk: " + "%llu -> %llu\n", PFID(fid), lvb->lvb_blocks, + (unsigned long long)ma->ma_attr.la_blocks); + lvb->lvb_blocks = ma->ma_attr.la_blocks; + } + unlock_res(res); + + RETURN(rc); +} + +int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, + struct ptlrpc_request *req, bool increase_only) { + struct obd_export *exp = lock ? lock->l_export : NULL; + struct mdt_device *mdt; + struct mdt_object *mo; + struct mdt_thread_info *info; + struct ost_lvb *lvb; + struct lu_env env; + struct lu_fid *fid; + int rc = 0; + + ENTRY; + + /* Before going further let's check that OBD and export are healthy. + */ + if (exp != NULL && + (exp->exp_disconnected || exp->exp_failed || + exp->exp_obd->obd_stopping)) { + CDEBUG(D_INFO, "Skip LVB update, export is %s, obd is %s\n", + exp->exp_failed ? "failed" : "disconnected", + exp->exp_obd->obd_stopping ? "stopping" : "OK"); + RETURN(0); + } + + rc = mdt_dom_lvb_alloc(res); + if (rc < 0) + RETURN(rc); + + mdt = ldlm_res_to_ns(res)->ns_lvbp; + if (mdt == NULL) + RETURN(-ENOENT); + + rc = lu_env_init(&env, LCT_MD_THREAD); + if (rc) + RETURN(rc); + + info = lu_context_key_get(&env.le_ctx, &mdt_thread_key); + if (info == NULL) + GOTO(out_env, rc = -ENOMEM); + + memset(info, 0, sizeof *info); + info->mti_env = &env; + info->mti_exp = req ? req->rq_export : NULL; + info->mti_mdt = mdt; + + fid = &info->mti_tmp_fid2; + fid_extract_from_res_name(fid, &res->lr_name); + + lvb = res->lr_lvb_data; + LASSERT(lvb); + + /* Update the LVB from the network message */ + if (req != NULL) { + struct ost_lvb *rpc_lvb; + + rpc_lvb = req_capsule_server_swab_get(&req->rq_pill, + &RMF_DLM_LVB, + lustre_swab_ost_lvb); + if (rpc_lvb == NULL) + goto disk_update; + + lock_res(res); + if (rpc_lvb->lvb_size > lvb->lvb_size || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size: " + "%llu -> %llu\n", PFID(fid), + lvb->lvb_size, rpc_lvb->lvb_size); + lvb->lvb_size = rpc_lvb->lvb_size; + } + if (rpc_lvb->lvb_mtime > lvb->lvb_mtime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime: " + "%llu -> %llu\n", PFID(fid), + lvb->lvb_mtime, rpc_lvb->lvb_mtime); + lvb->lvb_mtime = rpc_lvb->lvb_mtime; + } + if (rpc_lvb->lvb_atime > lvb->lvb_atime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime: " + "%llu -> %llu\n", PFID(fid), + lvb->lvb_atime, rpc_lvb->lvb_atime); + lvb->lvb_atime = rpc_lvb->lvb_atime; + } + if (rpc_lvb->lvb_ctime > lvb->lvb_ctime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime: " + "%llu -> %llu\n", PFID(fid), + lvb->lvb_ctime, rpc_lvb->lvb_ctime); + lvb->lvb_ctime = rpc_lvb->lvb_ctime; + } + if (rpc_lvb->lvb_blocks > lvb->lvb_blocks || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks: " + "%llu -> %llu\n", PFID(fid), + lvb->lvb_blocks, rpc_lvb->lvb_blocks); + lvb->lvb_blocks = rpc_lvb->lvb_blocks; + } + unlock_res(res); + } + +disk_update: + /* Update the LVB from the disk inode */ + mo = mdt_object_find(&env, mdt, fid); + if (IS_ERR(mo)) + GOTO(out_env, rc = PTR_ERR(mo)); + + rc = mdt_dom_disk_lvbo_update(&env, mo, res, !!increase_only); + mdt_object_put(&env, mo); +out_env: + lu_env_fini(&env); + return rc; +} + +static int mdt_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock, + struct ptlrpc_request *req, int increase_only) +{ + ENTRY; + if (IS_LQUOTA_RES(res)) { - struct mdt_device *mdt; + struct mdt_device *mdt; mdt = ldlm_res_to_ns(res)->ns_lvbp; if (mdt->mdt_qmt_dev == NULL) @@ -67,6 +266,13 @@ static int mdt_lvbo_update(struct ldlm_resource *res, increase_only); } + /* Data-on-MDT lvbo update. + * Like a ldlm_lock_init() the lock can be skipped and that means + * it is DOM resource because lvbo_update() without lock is called + * by MDT for DOM objects only. + */ + if (lock == NULL || ldlm_has_dom(lock)) + return mdt_dom_lvbo_update(res, lock, req, !!increase_only); return 0; } @@ -87,6 +293,9 @@ static int mdt_lvbo_size(struct ldlm_lock *lock) return qmt_hdls.qmth_lvbo_size(mdt->mdt_qmt_dev, lock); } + if (ldlm_has_dom(lock)) + return sizeof(struct ost_lvb); + if (ldlm_has_layout(lock)) return mdt->mdt_max_mdsize; @@ -115,12 +324,29 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen) RETURN(rc); } + /* LVB for DoM lock is needed only for glimpse, + * don't fill DoM data if there is layout lock */ + if (ldlm_has_dom(lock)) { + struct ldlm_resource *res = lock->l_resource; + int lvb_len = sizeof(struct ost_lvb); + + if (!mdt_dom_lvb_is_valid(res)) + mdt_dom_lvbo_update(lock->l_resource, lock, NULL, 0); + + if (lvb_len > lvblen) + lvb_len = lvblen; + + lock_res(res); + memcpy(lvb, res->lr_lvb_data, lvb_len); + unlock_res(res); + + RETURN(lvb_len); + } + /* Only fill layout if layout lock is granted */ if (!ldlm_has_layout(lock) || lock->l_granted_mode != lock->l_req_mode) RETURN(0); - /* layout lock will be granted to client, fill in lvb with layout */ - /* XXX create an env to talk to mdt stack. We should get this env from * ptlrpc_thread->t_env. */ rc = lu_env_init(&env, LCT_MD_THREAD); @@ -155,20 +381,19 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen) rc = mo_xattr_get(&env, child, &LU_BUF_NULL, XATTR_NAME_LOV); if (rc < 0) GOTO(out, rc); - if (rc > 0) { struct lu_buf *lmm = NULL; - if (lvblen < rc) { CERROR("%s: expected %d actual %d.\n", - mdt_obd_name(mdt), rc, lvblen); + mdt_obd_name(mdt), rc, lvblen); + /* if layout size is bigger then update max_mdsize */ + if (rc > info->mti_mdt->mdt_max_mdsize) + info->mti_mdt->mdt_max_mdsize = rc; GOTO(out, rc = -ERANGE); } - lmm = &info->mti_buf; lmm->lb_buf = lvb; lmm->lb_len = rc; - rc = mo_xattr_get(&env, child, lmm, XATTR_NAME_LOV); if (rc < 0) GOTO(out, rc); @@ -194,6 +419,9 @@ static int mdt_lvbo_free(struct ldlm_resource *res) return qmt_hdls.qmth_lvbo_free(mdt->mdt_qmt_dev, res); } + /* Data-on-MDT lvbo free */ + if (res->lr_lvb_data != NULL) + OBD_FREE(res->lr_lvb_data, res->lr_lvb_len); return 0; } diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index e04c202..e7c68df 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -2074,8 +2074,10 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd) atomic_dec(&o->mot_open_count); mdt_handle_last_unlink(info, o, ma); - if (!MFD_CLOSED(mode)) - rc = mo_close(info->mti_env, next, ma, mode); + if (!MFD_CLOSED(mode)) { + rc = mo_close(info->mti_env, next, ma, mode); + mdt_dom_check_and_discard(info, o); + } /* adjust open and lease count */ if (mode & MDS_OPEN_LEASE) { diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index a252b5f..1f079be 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -660,7 +660,7 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, if (rc != 0) GOTO(out_unlock, rc); - + mdt_dom_obj_lvb_update(info->mti_env, mo, false); EXIT; out_unlock: mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo, rc); @@ -795,11 +795,11 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo)); - EXIT; + EXIT; out_put: - mdt_object_put(info->mti_env, mo); + mdt_object_put(info->mti_env, mo); out: - if (rc == 0) + if (rc == 0) mdt_counter_incr(req, LPROC_MDT_SETATTR); mdt_client_compatibility(info); @@ -873,6 +873,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, bool cos_incompat = false; int no_name = 0; int rc; + ENTRY; DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1), @@ -1044,32 +1045,39 @@ relock: mdt_object_child(mc), &rr->rr_name, ma, no_name); mutex_unlock(&mc->mot_lov_mutex); + if (rc != 0) + GOTO(unlock_child, rc); - if (rc == 0 && !lu_object_is_dying(&mc->mot_header)) + if (!lu_object_is_dying(&mc->mot_header)) { rc = mdt_attr_get_complex(info, mc, ma); - if (rc == 0) - mdt_handle_last_unlink(info, mc, ma); + if (rc) + GOTO(out_stat, rc); + } else { + mdt_dom_check_and_discard(info, mc); + } + mdt_handle_last_unlink(info, mc, ma); - if (ma->ma_valid & MA_INODE) { - switch (ma->ma_attr.la_mode & S_IFMT) { - case S_IFDIR: +out_stat: + if (ma->ma_valid & MA_INODE) { + switch (ma->ma_attr.la_mode & S_IFMT) { + case S_IFDIR: mdt_counter_incr(req, LPROC_MDT_RMDIR); - break; - case S_IFREG: - case S_IFLNK: - case S_IFCHR: - case S_IFBLK: - case S_IFIFO: - case S_IFSOCK: + break; + case S_IFREG: + case S_IFLNK: + case S_IFCHR: + case S_IFBLK: + case S_IFIFO: + case S_IFSOCK: mdt_counter_incr(req, LPROC_MDT_UNLINK); - break; - default: - LASSERTF(0, "bad file type %o unlinking\n", - ma->ma_attr.la_mode); - } - } + break; + default: + LASSERTF(0, "bad file type %o unlinking\n", + ma->ma_attr.la_mode); + } + } - EXIT; + EXIT; unlock_child: mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo, @@ -2106,8 +2114,10 @@ relock: /* handle last link of tgt object */ if (rc == 0) { mdt_counter_incr(req, LPROC_MDT_RENAME); - if (mnew) + if (mnew) { mdt_handle_last_unlink(info, mnew, ma); + mdt_dom_check_and_discard(info, mnew); + } mdt_rename_counter_tally(info, info->mti_mdt, req, msrcdir, mtgtdir); diff --git a/lustre/ofd/ofd_dev.c b/lustre/ofd/ofd_dev.c index 3bb3d59..43e9acc 100644 --- a/lustre/ofd/ofd_dev.c +++ b/lustre/ofd/ofd_dev.c @@ -3260,13 +3260,6 @@ static int __init ofd_init(void) return(rc); } - rc = ofd_dlm_init(); - if (rc) { - lu_kmem_fini(ofd_caches); - ofd_fmd_exit(); - return rc; - } - rc = class_register_type(&ofd_obd_ops, NULL, true, NULL, LUSTRE_OST_NAME, &ofd_device_type); return rc; @@ -3281,7 +3274,6 @@ static int __init ofd_init(void) static void __exit ofd_exit(void) { ofd_fmd_exit(); - ofd_dlm_exit(); lu_kmem_fini(ofd_caches); class_unregister_type(LUSTRE_OST_NAME); } diff --git a/lustre/ofd/ofd_dlm.c b/lustre/ofd/ofd_dlm.c index c18ade0..a96a91d 100644 --- a/lustre/ofd/ofd_dlm.c +++ b/lustre/ofd/ofd_dlm.c @@ -51,25 +51,6 @@ struct ofd_intent_args { int error; }; -int ofd_dlm_init(void) -{ - ldlm_glimpse_work_kmem = kmem_cache_create("ldlm_glimpse_work_kmem", - sizeof(struct ldlm_glimpse_work), - 0, 0, NULL); - if (ldlm_glimpse_work_kmem == NULL) - return -ENOMEM; - else - return 0; -} - -void ofd_dlm_exit(void) -{ - if (ldlm_glimpse_work_kmem) { - kmem_cache_destroy(ldlm_glimpse_work_kmem); - ldlm_glimpse_work_kmem = NULL; - } -} - /** * OFD interval callback. * diff --git a/lustre/ofd/ofd_internal.h b/lustre/ofd/ofd_internal.h index 611c8db..6cc7952 100644 --- a/lustre/ofd/ofd_internal.h +++ b/lustre/ofd/ofd_internal.h @@ -419,8 +419,7 @@ extern struct ldlm_valblock_ops ofd_lvbo; /* ofd_dlm.c */ extern struct kmem_cache *ldlm_glimpse_work_kmem; -int ofd_dlm_init(void); -void ofd_dlm_exit(void); + int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, void *req_cookie, enum ldlm_mode mode, __u64 flags, void *data); diff --git a/lustre/osc/osc_cache.c b/lustre/osc/osc_cache.c index 2752009..814af07 100644 --- a/lustre/osc/osc_cache.c +++ b/lustre/osc/osc_cache.c @@ -226,7 +226,9 @@ static int osc_extent_sanity_check0(struct osc_extent *ext, if (ext->oe_sync && ext->oe_grants > 0) GOTO(out, rc = 90); - if (ext->oe_dlmlock != NULL && !ldlm_is_failed(ext->oe_dlmlock)) { + if (ext->oe_dlmlock != NULL && + ext->oe_dlmlock->l_resource->lr_type == LDLM_EXTENT && + !ldlm_is_failed(ext->oe_dlmlock)) { struct ldlm_extent *extent; extent = &ext->oe_dlmlock->l_policy_data.l_extent; @@ -1295,7 +1297,6 @@ static int osc_refresh_count(const struct lu_env *env, pgoff_t index = osc_index(oap2osc(oap)); struct cl_object *obj; struct cl_attr *attr = &osc_env_info(env)->oti_attr; - int result; loff_t kms; @@ -3206,6 +3207,7 @@ int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io, spin_unlock(&osc->oo_tree_lock); RETURN(res); } +EXPORT_SYMBOL(osc_page_gang_lookup); /** * Check if page @page is covered by an extra lock or discard it. @@ -3248,8 +3250,8 @@ static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io, return CLP_GANG_OKAY; } -static int discard_cb(const struct lu_env *env, struct cl_io *io, - struct osc_page *ops, void *cbdata) +int osc_discard_cb(const struct lu_env *env, struct cl_io *io, + struct osc_page *ops, void *cbdata) { struct osc_thread_info *info = osc_env_info(env); struct cl_page *page = ops->ops_cl.cpl_page; @@ -3271,6 +3273,7 @@ static int discard_cb(const struct lu_env *env, struct cl_io *io, return CLP_GANG_OKAY; } +EXPORT_SYMBOL(osc_discard_cb); /** * Discard pages protected by the given lock. This function traverses radix @@ -3297,7 +3300,7 @@ int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc, if (result != 0) GOTO(out, result); - cb = discard ? discard_cb : check_and_discard_cb; + cb = discard ? osc_discard_cb : check_and_discard_cb; info->oti_fn_index = info->oti_next_index = start; do { res = osc_page_gang_lookup(env, io, osc, diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h index 7a86744..4fdfb13 100644 --- a/lustre/osc/osc_internal.h +++ b/lustre/osc/osc_internal.h @@ -45,12 +45,14 @@ void osc_wake_cache_waiters(struct client_obd *cli); int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes); void osc_update_next_shrink(struct client_obd *cli); int lru_queue_work(const struct lu_env *env, void *data); +int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext, + int sent, int rc); +int osc_extent_release(const struct lu_env *env, struct osc_extent *ext); +int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc, + pgoff_t start, pgoff_t end, bool discard); extern struct ptlrpc_request_set *PTLRPCD_SET; -typedef int (*osc_enqueue_upcall_f)(void *cookie, struct lustre_handle *lockh, - int rc); - int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, __u64 *flags, union ldlm_policy_data *policy, struct ost_lvb *lvb, int kms_valid, @@ -151,24 +153,12 @@ int osc_quotactl(struct obd_device *unused, struct obd_export *exp, void osc_inc_unstable_pages(struct ptlrpc_request *req); void osc_dec_unstable_pages(struct ptlrpc_request *req); bool osc_over_unstable_soft_limit(struct client_obd *cli); -/** - * Bit flags for osc_dlm_lock_at_pageoff(). - */ -enum osc_dap_flags { - /** - * Just check if the desired lock exists, it won't hold reference - * count on lock. - */ - OSC_DAP_FL_TEST_LOCK = 1 << 0, - /** - * Return the lock even if it is being canceled. - */ - OSC_DAP_FL_CANCELING = 1 << 1 -}; -struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env, - struct osc_object *obj, pgoff_t index, - enum osc_dap_flags flags); -void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa); + +struct ldlm_lock *osc_obj_dlmlock_at_pgoff(const struct lu_env *env, + struct osc_object *obj, + pgoff_t index, + enum osc_dap_flags flags); + int osc_object_invalidate(const struct lu_env *env, struct osc_object *osc); /** osc shrink list to link all osc client obd */ diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 6e25563..e959ae0 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -56,8 +56,7 @@ static void osc_io_fini(const struct lu_env *env, const struct cl_io_slice *io) { } -static void osc_read_ahead_release(const struct lu_env *env, - void *cbdata) +void osc_read_ahead_release(const struct lu_env *env, void *cbdata) { struct ldlm_lock *dlmlock = cbdata; struct lustre_handle lockh; @@ -66,6 +65,7 @@ static void osc_read_ahead_release(const struct lu_env *env, ldlm_lock_decref(&lockh, LCK_PR); LDLM_LOCK_PUT(dlmlock); } +EXPORT_SYMBOL(osc_read_ahead_release); static int osc_io_read_ahead(const struct lu_env *env, const struct cl_io_slice *ios, diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index 6fd75da..e9d5ae1 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -48,22 +48,6 @@ * @{ */ -/***************************************************************************** - * - * Type conversions. - * - */ - -static const struct cl_lock_operations osc_lock_ops; -static const struct cl_lock_operations osc_lock_lockless_ops; -static void osc_lock_to_lockless(const struct lu_env *env, - struct osc_lock *ols, int force); - -int osc_lock_is_lockless(const struct osc_lock *olck) -{ - return (olck->ols_cl.cls_ops == &osc_lock_lockless_ops); -} - /** * Returns a weak pointer to the ldlm lock identified by a handle. Returned * pointer cannot be dereferenced, as lock is not protected from concurrent @@ -135,8 +119,7 @@ static int osc_lock_invariant(struct osc_lock *ols) * */ -static void osc_lock_fini(const struct lu_env *env, - struct cl_lock_slice *slice) +void osc_lock_fini(const struct lu_env *env, struct cl_lock_slice *slice) { struct osc_lock *ols = cl2osc_lock(slice); @@ -145,6 +128,7 @@ static void osc_lock_fini(const struct lu_env *env, OBD_SLAB_FREE_PTR(ols, osc_lock_kmem); } +EXPORT_SYMBOL(osc_lock_fini); static void osc_lock_build_policy(const struct lu_env *env, const struct cl_lock *lock, @@ -156,31 +140,6 @@ static void osc_lock_build_policy(const struct lu_env *env, policy->l_extent.gid = d->cld_gid; } -static __u64 osc_enq2ldlm_flags(__u32 enqflags) -{ - __u64 result = 0; - - CDEBUG(D_DLMTRACE, "flags: %x\n", enqflags); - - LASSERT((enqflags & ~CEF_MASK) == 0); - - if (enqflags & CEF_NONBLOCK) - result |= LDLM_FL_BLOCK_NOWAIT; - if (enqflags & CEF_GLIMPSE) - result |= LDLM_FL_HAS_INTENT; - if (enqflags & CEF_DISCARD_DATA) - result |= LDLM_FL_AST_DISCARD_DATA; - if (enqflags & CEF_PEEK) - result |= LDLM_FL_TEST_LOCK; - if (enqflags & CEF_LOCK_MATCH) - result |= LDLM_FL_MATCH_LOCK; - if (enqflags & CEF_LOCK_NO_EXPAND) - result |= LDLM_FL_NO_EXPANSION; - if (enqflags & CEF_SPECULATIVE) - result |= LDLM_FL_SPECULATIVE; - return result; -} - /** * Updates object attributes from a lock value block (lvb) received together * with the DLM lock reply from the server. Copy of osc_update_enqueue() @@ -335,7 +294,7 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh, * lockless lock. */ osc_object_set_contended(cl2osc(slice->cls_obj)); - LASSERT(slice->cls_ops == &osc_lock_ops); + LASSERT(slice->cls_ops != oscl->ols_lockless_ops); /* Change this lock to ldlmlock-less lock. */ osc_lock_to_lockless(env, oscl, 1); @@ -582,7 +541,7 @@ static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock, RETURN(result); } -static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) +int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) { struct ptlrpc_request *req = data; struct lu_env *env; @@ -644,6 +603,7 @@ out: req->rq_status = result; RETURN(result); } +EXPORT_SYMBOL(osc_ldlm_glimpse_ast); static int weigh_cb(const struct lu_env *env, struct cl_io *io, struct osc_page *ops, void *cbdata) @@ -777,46 +737,46 @@ static void osc_lock_build_einfo(const struct lu_env *env, * Additional policy can be implemented here, e.g., never do lockless-io * for large extents. */ -static void osc_lock_to_lockless(const struct lu_env *env, - struct osc_lock *ols, int force) +void osc_lock_to_lockless(const struct lu_env *env, + struct osc_lock *ols, int force) { - struct cl_lock_slice *slice = &ols->ols_cl; - - LASSERT(ols->ols_state == OLS_NEW || - ols->ols_state == OLS_UPCALL_RECEIVED); - - if (force) { - ols->ols_locklessable = 1; - slice->cls_ops = &osc_lock_lockless_ops; - } else { - struct osc_io *oio = osc_env_io(env); - struct cl_io *io = oio->oi_cl.cis_io; - struct cl_object *obj = slice->cls_obj; - struct osc_object *oob = cl2osc(obj); - const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); - struct obd_connect_data *ocd; - - LASSERT(io->ci_lockreq == CILR_MANDATORY || - io->ci_lockreq == CILR_MAYBE || - io->ci_lockreq == CILR_NEVER); - - ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data; - ols->ols_locklessable = (io->ci_type != CIT_SETATTR) && - (io->ci_lockreq == CILR_MAYBE) && - (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK); - if (io->ci_lockreq == CILR_NEVER || - /* lockless IO */ - (ols->ols_locklessable && osc_object_is_contended(oob)) || - /* lockless truncate */ - (cl_io_is_trunc(io) && - (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) && - osd->od_lockless_truncate)) { - ols->ols_locklessable = 1; - slice->cls_ops = &osc_lock_lockless_ops; - } - } - LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols))); + struct cl_lock_slice *slice = &ols->ols_cl; + struct osc_io *oio = osc_env_io(env); + struct cl_io *io = oio->oi_cl.cis_io; + struct cl_object *obj = slice->cls_obj; + struct osc_object *oob = cl2osc(obj); + const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); + struct obd_connect_data *ocd; + + LASSERT(ols->ols_state == OLS_NEW || + ols->ols_state == OLS_UPCALL_RECEIVED); + + if (force) { + ols->ols_locklessable = 1; + slice->cls_ops = ols->ols_lockless_ops; + } else { + LASSERT(io->ci_lockreq == CILR_MANDATORY || + io->ci_lockreq == CILR_MAYBE || + io->ci_lockreq == CILR_NEVER); + + ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data; + ols->ols_locklessable = (io->ci_type != CIT_SETATTR) && + (io->ci_lockreq == CILR_MAYBE) && + (ocd->ocd_connect_flags & + OBD_CONNECT_SRVLOCK); + if (io->ci_lockreq == CILR_NEVER || + /* lockless IO */ + (ols->ols_locklessable && osc_object_is_contended(oob)) || + /* lockless truncate */ + (cl_io_is_trunc(io) && osd->od_lockless_truncate && + (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK))) { + ols->ols_locklessable = 1; + slice->cls_ops = ols->ols_lockless_ops; + } + } + LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols))); } +EXPORT_SYMBOL(osc_lock_to_lockless); static bool osc_lock_compatible(const struct osc_lock *qing, const struct osc_lock *qed) @@ -841,9 +801,8 @@ static bool osc_lock_compatible(const struct osc_lock *qing, return false; } -static void osc_lock_wake_waiters(const struct lu_env *env, - struct osc_object *osc, - struct osc_lock *oscl) +void osc_lock_wake_waiters(const struct lu_env *env, struct osc_object *osc, + struct osc_lock *oscl) { spin_lock(&osc->oo_ol_spin); list_del_init(&oscl->ols_nextlock_oscobj); @@ -861,14 +820,16 @@ static void osc_lock_wake_waiters(const struct lu_env *env, } spin_unlock(&oscl->ols_lock); } +EXPORT_SYMBOL(osc_lock_wake_waiters); -static int osc_lock_enqueue_wait(const struct lu_env *env, - struct osc_object *obj, struct osc_lock *oscl) +int osc_lock_enqueue_wait(const struct lu_env *env, struct osc_object *obj, + struct osc_lock *oscl) { struct osc_lock *tmp_oscl; struct cl_lock_descr *need = &oscl->ols_cl.cls_lock->cll_descr; struct cl_sync_io *waiter = &osc_env_info(env)->oti_anchor; int rc = 0; + ENTRY; spin_lock(&obj->oo_ol_spin); @@ -919,6 +880,7 @@ restart: RETURN(rc); } +EXPORT_SYMBOL(osc_lock_enqueue_wait); /** * Implementation of cl_lock_operations::clo_enqueue() method for osc @@ -1096,8 +1058,8 @@ static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck) * * - cancels ldlm lock (ldlm_cli_cancel()). */ -static void osc_lock_cancel(const struct lu_env *env, - const struct cl_lock_slice *slice) +void osc_lock_cancel(const struct lu_env *env, + const struct cl_lock_slice *slice) { struct osc_object *obj = cl2osc(slice->cls_obj); struct osc_lock *oscl = cl2osc_lock(slice); @@ -1113,9 +1075,10 @@ static void osc_lock_cancel(const struct lu_env *env, osc_lock_wake_waiters(env, obj, oscl); EXIT; } +EXPORT_SYMBOL(osc_lock_cancel); -static int osc_lock_print(const struct lu_env *env, void *cookie, - lu_printer_t p, const struct cl_lock_slice *slice) +int osc_lock_print(const struct lu_env *env, void *cookie, + lu_printer_t p, const struct cl_lock_slice *slice) { struct osc_lock *lock = cl2osc_lock(slice); @@ -1125,6 +1088,7 @@ static int osc_lock_print(const struct lu_env *env, void *cookie, osc_lvb_print(env, cookie, p, &lock->ols_lvb); return 0; } +EXPORT_SYMBOL(osc_lock_print); static const struct cl_lock_operations osc_lock_ops = { .clo_fini = osc_lock_fini, @@ -1158,9 +1122,8 @@ static const struct cl_lock_operations osc_lock_lockless_ops = { .clo_print = osc_lock_print }; -static void osc_lock_set_writer(const struct lu_env *env, - const struct cl_io *io, - struct cl_object *obj, struct osc_lock *oscl) +void osc_lock_set_writer(const struct lu_env *env, const struct cl_io *io, + struct cl_object *obj, struct osc_lock *oscl) { struct cl_lock_descr *descr = &oscl->ols_cl.cls_lock->cll_descr; pgoff_t io_start; @@ -1188,6 +1151,7 @@ static void osc_lock_set_writer(const struct lu_env *env, oio->oi_write_osclock = oscl; } } +EXPORT_SYMBOL(osc_lock_set_writer); int osc_lock_init(const struct lu_env *env, struct cl_object *obj, struct cl_lock *lock, @@ -1205,6 +1169,7 @@ int osc_lock_init(const struct lu_env *env, INIT_LIST_HEAD(&oscl->ols_waiting_list); INIT_LIST_HEAD(&oscl->ols_wait_entry); INIT_LIST_HEAD(&oscl->ols_nextlock_oscobj); + oscl->ols_lockless_ops = &osc_lock_lockless_ops; /* Speculative lock requests must be either no_expand or glimpse * request (CEF_GLIMPSE). non-glimpse no_expand speculative extent @@ -1242,9 +1207,10 @@ int osc_lock_init(const struct lu_env *env, * Finds an existing lock covering given index and optionally different from a * given \a except lock. */ -struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env, - struct osc_object *obj, pgoff_t index, - enum osc_dap_flags dap_flags) +struct ldlm_lock *osc_obj_dlmlock_at_pgoff(const struct lu_env *env, + struct osc_object *obj, + pgoff_t index, + enum osc_dap_flags dap_flags) { struct osc_thread_info *info = osc_env_info(env); struct ldlm_res_id *resname = &info->oti_resname; diff --git a/lustre/osc/osc_object.c b/lustre/osc/osc_object.c index 4597201..1998275 100644 --- a/lustre/osc/osc_object.c +++ b/lustre/osc/osc_object.c @@ -49,6 +49,17 @@ * Object operations. * */ +static void osc_obj_build_res_name(struct osc_object *osc, + struct ldlm_res_id *resname) +{ + ostid_build_res_name(&osc->oo_oinfo->loi_oi, resname); +} + +static const struct osc_object_operations osc_object_ops = { + .oto_build_res_name = osc_obj_build_res_name, + .oto_dlmlock_at_pgoff = osc_obj_dlmlock_at_pgoff, +}; + int osc_object_init(const struct lu_env *env, struct lu_object *obj, const struct lu_object_conf *conf) { @@ -79,6 +90,8 @@ int osc_object_init(const struct lu_env *env, struct lu_object *obj, atomic_set(&osc->oo_nr_ios, 0); init_waitqueue_head(&osc->oo_io_waitq); + LASSERT(osc->oo_obj_ops != NULL); + cl_object_page_init(lu2cl(obj), sizeof(struct osc_page)); return 0; @@ -189,23 +202,28 @@ static int osc_object_ast_clear(struct ldlm_lock *lock, void *data) { ENTRY; + CDEBUG(D_DLMTRACE, "obj: %p/%p, lock %p\n", + data, lock->l_ast_data, lock); + + LASSERT(lock->l_granted_mode == lock->l_req_mode); if (lock->l_ast_data == data) lock->l_ast_data = NULL; RETURN(LDLM_ITER_CONTINUE); } -static int osc_object_prune(const struct lu_env *env, struct cl_object *obj) +int osc_object_prune(const struct lu_env *env, struct cl_object *obj) { - struct osc_object *osc = cl2osc(obj); - struct ldlm_res_id *resname = &osc_env_info(env)->oti_resname; + struct osc_object *osc = cl2osc(obj); + struct ldlm_res_id *resname = &osc_env_info(env)->oti_resname; /* DLM locks don't hold a reference of osc_object so we have to * clear it before the object is being destroyed. */ - ostid_build_res_name(&osc->oo_oinfo->loi_oi, resname); + osc_build_res_name(osc, resname); ldlm_resource_iterate(osc_export(osc)->exp_obd->obd_namespace, resname, osc_object_ast_clear, osc); return 0; } +EXPORT_SYMBOL(osc_object_prune); static int osc_object_fiemap(const struct lu_env *env, struct cl_object *obj, struct ll_fiemap_info_key *fmkey, @@ -292,18 +310,6 @@ drop_lock: RETURN(rc); } -void osc_object_set_contended(struct osc_object *obj) -{ - obj->oo_contention_time = cfs_time_current(); - /* mb(); */ - obj->oo_contended = 1; -} - -void osc_object_clear_contended(struct osc_object *obj) -{ - obj->oo_contended = 0; -} - int osc_object_is_contended(struct osc_object *obj) { struct osc_device *dev = lu2osc_dev(obj->oo_cl.co_lu.lo_dev); @@ -329,6 +335,7 @@ int osc_object_is_contended(struct osc_object *obj) } return 1; } +EXPORT_SYMBOL(osc_object_is_contended); /** * Implementation of struct cl_object_operations::coo_req_attr_set() for osc @@ -441,6 +448,7 @@ struct lu_object *osc_object_alloc(const struct lu_env *env, lu_object_init(obj, NULL, dev); osc->oo_cl.co_ops = &osc_ops; obj->lo_ops = &osc_lu_obj_ops; + osc->oo_obj_ops = &osc_object_ops; } else obj = NULL; return obj; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 85df555..87b6c22 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -91,18 +91,6 @@ struct osc_ladvise_args { void *la_cookie; }; -struct osc_enqueue_args { - struct obd_export *oa_exp; - enum ldlm_type oa_type; - enum ldlm_mode oa_mode; - __u64 *oa_flags; - osc_enqueue_upcall_f oa_upcall; - void *oa_cookie; - struct ost_lvb *oa_lvb; - struct lustre_handle oa_lockh; - bool oa_speculative; -}; - static void osc_release_ppga(struct brw_page **ppga, size_t count); static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc); @@ -2031,10 +2019,10 @@ static int osc_set_lock_data(struct ldlm_lock *lock, void *data) return set; } -static int osc_enqueue_fini(struct ptlrpc_request *req, - osc_enqueue_upcall_f upcall, void *cookie, - struct lustre_handle *lockh, enum ldlm_mode mode, - __u64 *flags, bool speculative, int errcode) +int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall, + void *cookie, struct lustre_handle *lockh, + enum ldlm_mode mode, __u64 *flags, bool speculative, + int errcode) { bool intent = *flags & LDLM_FL_HAS_INTENT; int rc; @@ -2066,12 +2054,11 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, if (errcode == ELDLM_OK && lustre_handle_is_used(lockh)) ldlm_lock_decref(lockh, mode); - RETURN(rc); + RETURN(rc); } -static int osc_enqueue_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - struct osc_enqueue_args *aa, int rc) +int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, + struct osc_enqueue_args *aa, int rc) { struct ldlm_lock *lock; struct lustre_handle *lockh = &aa->oa_lockh; @@ -2115,7 +2102,7 @@ static int osc_enqueue_interpret(const struct lu_env *env, rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode, aa->oa_flags, aa->oa_speculative, rc); - OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); ldlm_lock_decref(lockh, mode); LDLM_LOCK_PUT(lock); -- 1.8.3.1