Whamcloud - gitweb
LU-3285 mds: add IO locking to the MDC and MDT 18/28018/25
authorMikhal Pershin <mike.pershin@intel.com>
Wed, 9 Dec 2015 13:08:53 +0000 (16:08 +0300)
committerMike Pershin <mike.pershin@intel.com>
Tue, 17 Oct 2017 20:02:07 +0000 (20:02 +0000)
- introduce new DOM inodebit for Data-on-MDT files.
- add IO lock and glimpse handling at MDT along with
needed LVB updates for it.
- a MDC is updated to exclude DOM bit from ELC and to
handle LVB changes due to glimpse on MDT.
- add CLIO locking at MDC, it uses IBITS lock to protect
data at MDT and a MDC handles such locks to convert them
into proper CLIO locks.

Signed-off-by: Mikhal Pershin <mike.pershin@intel.com>
Change-Id: Id95c7a5db814b399c961a5364af33d2bf5798055
Reviewed-on: https://review.whamcloud.com/28018
Reviewed-by: Jinshan Xiong <jinshan.xiong@intel.com>
Reviewed-by: Bobi Jam <bobijam@hotmail.com>
28 files changed:
lustre/include/lustre_dlm.h
lustre/include/lustre_dlm_flags.h
lustre/include/lustre_osc.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/llite/file.c
lustre/llite/llite_lib.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_dev.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_reint.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_io.c
lustre/mdt/mdt_lvb.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/ofd/ofd_dev.c
lustre/ofd/ofd_dlm.c
lustre/ofd/ofd_internal.h
lustre/osc/osc_cache.c
lustre/osc/osc_internal.h
lustre/osc/osc_io.c
lustre/osc/osc_lock.c
lustre/osc/osc_object.c
lustre/osc/osc_request.c

index 2d0952e..5f1dc20 100644 (file)
@@ -842,7 +842,9 @@ struct ldlm_lock {
 
        /** Private storage for lock user. Opaque to LDLM. */
        void                    *l_ast_data;
-
+       /* separate ost_lvb used mostly by Data-on-MDT for now.
+        * It is introduced to don't mix with layout lock data. */
+       struct ost_lvb           l_ost_lvb;
        /*
         * Server-side-only members.
         */
@@ -1011,6 +1013,12 @@ static inline bool ldlm_has_layout(struct ldlm_lock *lock)
                lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_LAYOUT;
 }
 
+static inline bool ldlm_has_dom(struct ldlm_lock *lock)
+{
+       return lock->l_resource->lr_type == LDLM_IBITS &&
+               lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_DOM;
+}
+
 static inline char *
 ldlm_ns_name(struct ldlm_namespace *ns)
 {
index 7912883..7576f16 100644 (file)
 
 /** l_flags bits marked as "ast" bits */
 #define LDLM_FL_AST_MASK                (LDLM_FL_FLOCK_DEADLOCK                |\
-                                        LDLM_FL_AST_DISCARD_DATA)
+                                        LDLM_FL_DISCARD_DATA)
 
 /** l_flags bits marked as "blocked" bits */
 #define LDLM_FL_BLOCKED_MASK            (LDLM_FL_BLOCK_GRANTED         |\
index d2563bb..f9777bb 100644 (file)
@@ -182,6 +182,73 @@ struct osc_thread_info {
        struct lu_buf           oti_ladvise_buf;
 };
 
+static inline __u64 osc_enq2ldlm_flags(__u32 enqflags)
+{
+       __u64 result = 0;
+
+       CDEBUG(D_DLMTRACE, "flags: %x\n", enqflags);
+
+       LASSERT((enqflags & ~CEF_MASK) == 0);
+
+       if (enqflags & CEF_NONBLOCK)
+               result |= LDLM_FL_BLOCK_NOWAIT;
+       if (enqflags & CEF_GLIMPSE)
+               result |= LDLM_FL_HAS_INTENT;
+       if (enqflags & CEF_DISCARD_DATA)
+               result |= LDLM_FL_AST_DISCARD_DATA;
+       if (enqflags & CEF_PEEK)
+               result |= LDLM_FL_TEST_LOCK;
+       if (enqflags & CEF_LOCK_MATCH)
+               result |= LDLM_FL_MATCH_LOCK;
+       if (enqflags & CEF_LOCK_NO_EXPAND)
+               result |= LDLM_FL_NO_EXPANSION;
+       if (enqflags & CEF_SPECULATIVE)
+               result |= LDLM_FL_SPECULATIVE;
+       return result;
+}
+
+typedef int (*osc_enqueue_upcall_f)(void *cookie, struct lustre_handle *lockh,
+                                   int rc);
+
+struct osc_enqueue_args {
+       struct obd_export       *oa_exp;
+       enum ldlm_type          oa_type;
+       enum ldlm_mode          oa_mode;
+       __u64                   *oa_flags;
+       osc_enqueue_upcall_f    oa_upcall;
+       void                    *oa_cookie;
+       struct ost_lvb          *oa_lvb;
+       struct lustre_handle    oa_lockh;
+       bool                    oa_speculative;
+};
+
+/**
+ * Bit flags for osc_dlm_lock_at_pageoff().
+ */
+enum osc_dap_flags {
+       /**
+        * Just check if the desired lock exists, it won't hold reference
+        * count on lock.
+        */
+       OSC_DAP_FL_TEST_LOCK = 1 << 0,
+       /**
+        * Return the lock even if it is being canceled.
+        */
+       OSC_DAP_FL_CANCELING = 1 << 1
+};
+
+/*
+ * The set of operations which are different for MDC and OSC objects
+ */
+struct osc_object_operations {
+       void (*oto_build_res_name)(struct osc_object *osc,
+                                  struct ldlm_res_id *resname);
+       struct ldlm_lock* (*oto_dlmlock_at_pgoff)(const struct lu_env *env,
+                                               struct osc_object *obj,
+                                               pgoff_t index,
+                                               enum osc_dap_flags dap_flags);
+};
+
 struct osc_object {
        struct cl_object        oo_cl;
        struct lov_oinfo        *oo_oinfo;
@@ -242,9 +309,24 @@ struct osc_object {
        atomic_t                oo_nr_ios;
        wait_queue_head_t       oo_io_waitq;
 
+       const struct osc_object_operations *oo_obj_ops;
        bool                    oo_initialized;
 };
 
+static inline void osc_build_res_name(struct osc_object *osc,
+                                     struct ldlm_res_id *resname)
+{
+       return osc->oo_obj_ops->oto_build_res_name(osc, resname);
+}
+
+static inline struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env,
+                                                   struct osc_object *obj,
+                                                   pgoff_t index,
+                                                   enum osc_dap_flags flags)
+{
+       return obj->oo_obj_ops->oto_dlmlock_at_pgoff(env, obj, index, flags);
+}
+
 static inline void osc_object_lock(struct osc_object *obj)
 {
        spin_lock(&obj->oo_lock);
@@ -274,6 +356,18 @@ static inline int osc_object_is_locked(struct osc_object *obj)
 #endif
 }
 
+static inline void osc_object_set_contended(struct osc_object *obj)
+{
+       obj->oo_contention_time = cfs_time_current();
+       /* mb(); */
+       obj->oo_contended = 1;
+}
+
+static inline void osc_object_clear_contended(struct osc_object *obj)
+{
+       obj->oo_contended = 0;
+}
+
 /*
  * Lock "micro-states" for osc layer.
  */
@@ -350,7 +444,8 @@ struct osc_lock {
        enum osc_lock_state     ols_state;
        /** lock value block */
        struct ost_lvb          ols_lvb;
-
+       /** Lockless operations to be used by lockless lock */
+       const struct cl_lock_operations *ols_lockless_ops;
        /**
         * true, if ldlm_lock_addref() was called against
         * osc_lock::ols_lock. This is used for sanity checking.
@@ -402,6 +497,10 @@ struct osc_lock {
                                ols_speculative:1;
 };
 
+static inline int osc_lock_is_lockless(const struct osc_lock *ols)
+{
+       return (ols->ols_cl.cls_ops == ols->ols_lockless_ops);
+}
 
 /**
  * Page state private for osc layer.
@@ -507,10 +606,13 @@ static inline void osc_io_unplug(const struct lu_env *env,
        (void)osc_io_unplug0(env, cli, osc, 0);
 }
 
-void osc_object_set_contended(struct osc_object *obj);
-void osc_object_clear_contended(struct osc_object *obj);
-int osc_object_is_contended(struct osc_object *obj);
-int osc_lock_is_lockless(const struct osc_lock *olck);
+typedef int (*osc_page_gang_cbt)(const struct lu_env *, struct cl_io *,
+                                struct osc_page *, void *);
+int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
+                       struct osc_object *osc, pgoff_t start, pgoff_t end,
+                       osc_page_gang_cbt cb, void *cbdata);
+int osc_discard_cb(const struct lu_env *env, struct cl_io *io,
+                  struct osc_page *ops, void *cbdata);
 
 /* osc_dev.c */
 int osc_device_init(const struct lu_env *env, struct lu_device *d,
@@ -535,6 +637,10 @@ int osc_attr_update(const struct lu_env *env, struct cl_object *obj,
 int osc_object_glimpse(const struct lu_env *env, const struct cl_object *obj,
                       struct ost_lvb *lvb);
 int osc_object_invalidate(const struct lu_env *env, struct osc_object *osc);
+int osc_object_is_contended(struct osc_object *obj);
+int osc_object_find_cbdata(const struct lu_env *env, struct cl_object *obj,
+                          ldlm_iterator_t iter, void *data);
+int osc_object_prune(const struct lu_env *env, struct cl_object *obj);
 
 /* osc_request.c */
 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd);
@@ -572,11 +678,27 @@ int osc_io_read_start(const struct lu_env *env,
 int osc_io_write_start(const struct lu_env *env,
                       const struct cl_io_slice *slice);
 void osc_io_end(const struct lu_env *env, const struct cl_io_slice *slice);
-
 int osc_io_fsync_start(const struct lu_env *env,
                       const struct cl_io_slice *slice);
 void osc_io_fsync_end(const struct lu_env *env,
                      const struct cl_io_slice *slice);
+void osc_read_ahead_release(const struct lu_env *env, void *cbdata);
+
+/* osc_lock.c */
+void osc_lock_to_lockless(const struct lu_env *env, struct osc_lock *ols,
+                         int force);
+void osc_lock_wake_waiters(const struct lu_env *env, struct osc_object *osc,
+                          struct osc_lock *oscl);
+int osc_lock_enqueue_wait(const struct lu_env *env, struct osc_object *obj,
+                         struct osc_lock *oscl);
+void osc_lock_set_writer(const struct lu_env *env, const struct cl_io *io,
+                        struct cl_object *obj, struct osc_lock *oscl);
+int osc_lock_print(const struct lu_env *env, void *cookie,
+                  lu_printer_t p, const struct cl_lock_slice *slice);
+void osc_lock_cancel(const struct lu_env *env,
+                    const struct cl_lock_slice *slice);
+void osc_lock_fini(const struct lu_env *env, struct cl_lock_slice *slice);
+int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data);
 
 /*****************************************************************************
  *
@@ -825,18 +947,6 @@ struct osc_extent {
        unsigned int            oe_mppr;
 };
 
-int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
-                     int sent, int rc);
-int osc_extent_release(const struct lu_env *env, struct osc_extent *ext);
-
-int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
-                          pgoff_t start, pgoff_t end, bool discard_pages);
-
-typedef int (*osc_page_gang_cbt)(const struct lu_env *, struct cl_io *,
-                                struct osc_page *, void *);
-int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
-                        struct osc_object *osc, pgoff_t start, pgoff_t end,
-                        osc_page_gang_cbt cb, void *cbdata);
 /** @} osc */
 
 #endif /* LUSTRE_OSC_H */
index 8730351..71c77e0 100644 (file)
@@ -1608,6 +1608,8 @@ typedef enum {
 #define MDS_INODELOCK_MAXSHIFT 6
 /* This FULL lock is useful to take on unlink sort of operations */
 #define MDS_INODELOCK_FULL ((1<<(MDS_INODELOCK_MAXSHIFT+1))-1)
+/* DOM lock shouldn't be canceled early, use this macro for ELC */
+#define MDS_INODELOCK_ELC (MDS_INODELOCK_FULL & ~MDS_INODELOCK_DOM)
 
 /* NOTE: until Lustre 1.8.7/2.1.1 the fid_ver() was packed into name[2],
  * but was moved into name[1] along with the OID to avoid consuming the
@@ -2360,6 +2362,8 @@ enum ldlm_intent_flags {
        IT_QUOTA_DQACQ = 0x00000800,
        IT_QUOTA_CONN  = 0x00001000,
        IT_SETXATTR    = 0x00002000,
+       IT_GLIMPSE     = 0x00004000,
+       IT_BRW         = 0x00008000,
 };
 
 struct ldlm_intent {
index 8ef4709..48400a7 100644 (file)
@@ -40,6 +40,7 @@ extern struct mutex ldlm_cli_namespace_lock;
 extern struct list_head ldlm_cli_active_namespace_list;
 extern struct list_head ldlm_cli_inactive_namespace_list;
 extern unsigned int ldlm_cancel_unused_locks_before_replay;
+extern struct kmem_cache *ldlm_glimpse_work_kmem;
 
 static inline int ldlm_namespace_nr_read(enum ldlm_side client)
 {
index 04e3c8e..2396a6c 100644 (file)
@@ -1152,6 +1152,7 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
 
        RETURN(rc);
 }
+EXPORT_SYMBOL(ldlm_server_glimpse_ast);
 
 int ldlm_glimpse_locks(struct ldlm_resource *res,
                       struct list_head *gl_work_list)
@@ -1363,7 +1364,6 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                lock->l_req_extent = lock->l_policy_data.l_extent;
 
 existing_lock:
-
         if (flags & LDLM_FL_HAS_INTENT) {
                 /* In this case, the reply buffer is allocated deep in
                  * local_lock_enqueue by the policy function. */
@@ -3232,11 +3232,22 @@ int ldlm_init(void)
        if (ldlm_interval_tree_slab == NULL)
                goto out_interval;
 
+#ifdef HAVE_SERVER_SUPPORT
+       ldlm_glimpse_work_kmem = kmem_cache_create("ldlm_glimpse_work_kmem",
+                                       sizeof(struct ldlm_glimpse_work),
+                                       0, 0, NULL);
+       if (ldlm_glimpse_work_kmem == NULL)
+               goto out_interval_tree;
+#endif
+
 #if LUSTRE_TRACKS_LOCK_EXP_REFS
        class_export_dump_hook = ldlm_dump_export_locks;
 #endif
        return 0;
-
+#ifdef HAVE_SERVER_SUPPORT
+out_interval_tree:
+       kmem_cache_destroy(ldlm_interval_tree_slab);
+#endif
 out_interval:
        kmem_cache_destroy(ldlm_interval_slab);
 out_lock:
@@ -3259,4 +3270,7 @@ void ldlm_exit(void)
        kmem_cache_destroy(ldlm_lock_slab);
        kmem_cache_destroy(ldlm_interval_slab);
        kmem_cache_destroy(ldlm_interval_tree_slab);
+#ifdef HAVE_SERVER_SUPPORT
+       kmem_cache_destroy(ldlm_glimpse_work_kmem);
+#endif
 }
index fb67a1e..2d0835f 100644 (file)
@@ -1817,8 +1817,8 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
 
                if ((lru_flags & LDLM_LRU_FLAG_CLEANUP) &&
-                   lock->l_resource->lr_type == LDLM_EXTENT &&
-                   lock->l_granted_mode == LCK_PR)
+                   (lock->l_resource->lr_type == LDLM_EXTENT ||
+                    ldlm_has_dom(lock)) && lock->l_granted_mode == LCK_PR)
                        ldlm_set_discard_data(lock);
 
                /* We can't re-add to l_lru as it confuses the
index 36e3a67..f2a7f1d 100644 (file)
@@ -1024,7 +1024,7 @@ int ll_merge_attr(const struct lu_env *env, struct inode *inode)
        cl_object_attr_unlock(obj);
 
        if (rc != 0)
-               GOTO(out_size_unlock, rc);
+               GOTO(out_size_unlock, rc = (rc == -ENODATA ? 0 : rc));
 
        if (atime < attr->cat_atime)
                atime = attr->cat_atime;
index 516c4f4..003b44b 100644 (file)
@@ -200,6 +200,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
        data->ocd_connect_flags = OBD_CONNECT_IBITS    | OBD_CONNECT_NODEVOH  |
                                  OBD_CONNECT_ATTRFID  | OBD_CONNECT_GRANT |
                                  OBD_CONNECT_VERSION  | OBD_CONNECT_BRW_SIZE |
+                                 OBD_CONNECT_SRVLOCK  | OBD_CONNECT_TRUNCLOCK|
                                  OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA |
                                  OBD_CONNECT_CANCELSET | OBD_CONNECT_FID     |
                                  OBD_CONNECT_AT       | OBD_CONNECT_LOV_V3   |
index 9034ba0..1d7dad7 100644 (file)
@@ -1967,7 +1967,7 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
                        RETURN(rc);
 
                rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
-                                     LCK_EX, MDS_INODELOCK_FULL,
+                                     LCK_EX, MDS_INODELOCK_ELC,
                                      MF_MDC_CANCEL_FID3);
                if (rc != 0)
                        RETURN(rc);
@@ -1981,7 +1981,7 @@ retry_rename:
                struct lmv_tgt_desc *tgt;
 
                rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
-                                     LCK_EX, MDS_INODELOCK_FULL,
+                                     LCK_EX, MDS_INODELOCK_ELC,
                                      MF_MDC_CANCEL_FID4);
                if (rc != 0)
                        RETURN(rc);
@@ -2524,7 +2524,7 @@ try_next_stripe:
        }
 
        rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX,
-                             MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
+                             MDS_INODELOCK_ELC, MF_MDC_CANCEL_FID3);
        if (rc != 0)
                RETURN(rc);
 
index f3e4a81..0bb7920 100644 (file)
 
 #include "mdc_internal.h"
 
-int mdc_lock_init(const struct lu_env *env,
-                 struct cl_object *obj, struct cl_lock *lock,
-                 const struct cl_io *unused)
+static void mdc_lock_build_policy(const struct lu_env *env,
+                                 union ldlm_policy_data *policy)
 {
-       return 0;
+       memset(policy, 0, sizeof *policy);
+       policy->l_inodebits.bits = MDS_INODELOCK_DOM;
+}
+
+int mdc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
+{
+       return osc_ldlm_glimpse_ast(dlmlock, data);
+}
+
+static void mdc_lock_build_einfo(const struct lu_env *env,
+                                const struct cl_lock *lock,
+                                struct osc_object *osc,
+                                struct ldlm_enqueue_info *einfo)
+{
+       einfo->ei_type = LDLM_IBITS;
+       einfo->ei_mode = osc_cl_lock2ldlm(lock->cll_descr.cld_mode);
+       einfo->ei_cb_bl = mdc_ldlm_blocking_ast;
+       einfo->ei_cb_cp = ldlm_completion_ast;
+       einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
+       einfo->ei_cbdata = osc; /* value to be put into ->l_ast_data */
+}
+
+static int mdc_set_dom_lock_data(struct ldlm_lock *lock, void *data)
+{
+       int set = 0;
+
+       LASSERT(lock != NULL);
+
+       lock_res_and_lock(lock);
+
+       if (lock->l_ast_data == NULL)
+               lock->l_ast_data = data;
+       if (lock->l_ast_data == data)
+               set = 1;
+
+       unlock_res_and_lock(lock);
+
+       return set;
+}
+
+int mdc_dom_lock_match(struct obd_export *exp, struct ldlm_res_id *res_id,
+                      enum ldlm_type type, union ldlm_policy_data *policy,
+                      enum ldlm_mode mode, __u64 *flags, void *data,
+                      struct lustre_handle *lockh, int unref)
+{
+       struct obd_device *obd = exp->exp_obd;
+       __u64 lflags = *flags;
+       enum ldlm_mode rc;
+
+       ENTRY;
+
+       rc = ldlm_lock_match(obd->obd_namespace, lflags,
+                            res_id, type, policy, mode, lockh, unref);
+       if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
+               RETURN(rc);
+
+       if (data != NULL) {
+               struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+
+               LASSERT(lock != NULL);
+               if (!mdc_set_dom_lock_data(lock, data)) {
+                       ldlm_lock_decref(lockh, rc);
+                       rc = 0;
+               }
+               LDLM_LOCK_PUT(lock);
+       }
+       RETURN(rc);
+}
+
+/**
+ * Finds an existing lock covering a page with given index.
+ * Copy of osc_obj_dlmlock_at_pgoff() but for DoM IBITS lock.
+ */
+struct ldlm_lock *mdc_dlmlock_at_pgoff(const struct lu_env *env,
+                                      struct osc_object *obj, pgoff_t index,
+                                      enum osc_dap_flags dap_flags)
+{
+       struct osc_thread_info *info = osc_env_info(env);
+       struct ldlm_res_id *resname = &info->oti_resname;
+       union ldlm_policy_data *policy = &info->oti_policy;
+       struct lustre_handle lockh;
+       struct ldlm_lock *lock = NULL;
+       enum ldlm_mode mode;
+       __u64 flags;
+
+       ENTRY;
+
+       fid_build_reg_res_name(lu_object_fid(osc2lu(obj)), resname);
+       mdc_lock_build_policy(env, policy);
+
+       flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
+       if (dap_flags & OSC_DAP_FL_TEST_LOCK)
+               flags |= LDLM_FL_TEST_LOCK;
+
+again:
+       /* Next, search for already existing extent locks that will cover us */
+       /* If we're trying to read, we also search for an existing PW lock.  The
+        * VFS and page cache already protect us locally, so lots of readers/
+        * writers can share a single PW lock. */
+       mode = mdc_dom_lock_match(osc_export(obj), resname, LDLM_IBITS, policy,
+                                 LCK_PR | LCK_PW, &flags, obj, &lockh,
+                                 dap_flags & OSC_DAP_FL_CANCELING);
+       if (mode != 0) {
+               lock = ldlm_handle2lock(&lockh);
+               /* RACE: the lock is cancelled so let's try again */
+               if (unlikely(lock == NULL))
+                       goto again;
+       }
+
+       RETURN(lock);
+}
+
+/**
+ * Check if page @page is covered by an extra lock or discard it.
+ */
+static int mdc_check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
+                                   struct osc_page *ops, void *cbdata)
+{
+       struct osc_thread_info *info = osc_env_info(env);
+       struct osc_object *osc = cbdata;
+       pgoff_t index;
+
+       index = osc_index(ops);
+       if (index >= info->oti_fn_index) {
+               struct ldlm_lock *tmp;
+               struct cl_page *page = ops->ops_cl.cpl_page;
+
+               /* refresh non-overlapped index */
+               tmp = mdc_dlmlock_at_pgoff(env, osc, index,
+                                          OSC_DAP_FL_TEST_LOCK);
+               if (tmp != NULL) {
+                       info->oti_fn_index = CL_PAGE_EOF;
+                       LDLM_LOCK_PUT(tmp);
+               } else if (cl_page_own(env, io, page) == 0) {
+                       /* discard the page */
+                       cl_page_discard(env, io, page);
+                       cl_page_disown(env, io, page);
+               } else {
+                       LASSERT(page->cp_state == CPS_FREEING);
+               }
+       }
+
+       info->oti_next_index = index + 1;
+       return CLP_GANG_OKAY;
+}
+
+/**
+ * Discard pages protected by the given lock. This function traverses radix
+ * tree to find all covering pages and discard them. If a page is being covered
+ * by other locks, it should remain in cache.
+ *
+ * If error happens on any step, the process continues anyway (the reasoning
+ * behind this being that lock cancellation cannot be delayed indefinitely).
+ */
+static int mdc_lock_discard_pages(const struct lu_env *env,
+                                 struct osc_object *osc,
+                                 pgoff_t start, pgoff_t end,
+                                 bool discard)
+{
+       struct osc_thread_info *info = osc_env_info(env);
+       struct cl_io *io = &info->oti_io;
+       osc_page_gang_cbt cb;
+       int res;
+       int result;
+
+       ENTRY;
+
+       io->ci_obj = cl_object_top(osc2cl(osc));
+       io->ci_ignore_layout = 1;
+       result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
+       if (result != 0)
+               GOTO(out, result);
+
+       cb = discard ? osc_discard_cb : mdc_check_and_discard_cb;
+       info->oti_fn_index = info->oti_next_index = start;
+       do {
+               res = osc_page_gang_lookup(env, io, osc, info->oti_next_index,
+                                          end, cb, (void *)osc);
+               if (info->oti_next_index > end)
+                       break;
+
+               if (res == CLP_GANG_RESCHED)
+                       cond_resched();
+       } while (res != CLP_GANG_OKAY);
+out:
+       cl_io_fini(env, io);
+       RETURN(result);
+}
+
+static int mdc_lock_flush(const struct lu_env *env, struct osc_object *obj,
+                         pgoff_t start, pgoff_t end, enum cl_lock_mode mode,
+                         bool discard)
+{
+       int result = 0;
+       int rc;
+
+       ENTRY;
+
+       if (mode == CLM_WRITE) {
+               result = osc_cache_writeback_range(env, obj, start, end, 1,
+                                                  discard);
+               CDEBUG(D_CACHE, "object %p: [%lu -> %lu] %d pages were %s.\n",
+                      obj, start, end, result,
+                      discard ? "discarded" : "written back");
+               if (result > 0)
+                       result = 0;
+       }
+
+       rc = mdc_lock_discard_pages(env, obj, start, end, discard);
+       if (result == 0 && rc < 0)
+               result = rc;
+
+       RETURN(result);
+}
+
+void mdc_lock_lockless_cancel(const struct lu_env *env,
+                             const struct cl_lock_slice *slice)
+{
+       struct osc_lock *ols = cl2osc_lock(slice);
+       struct osc_object *osc = cl2osc(slice->cls_obj);
+       struct cl_lock_descr *descr = &slice->cls_lock->cll_descr;
+       int rc;
+
+       LASSERT(ols->ols_dlmlock == NULL);
+       rc = mdc_lock_flush(env, osc, descr->cld_start, descr->cld_end,
+                           descr->cld_mode, 0);
+       if (rc != 0)
+               CERROR("Pages for lockless lock %p were not purged(%d)\n",
+                      ols, rc);
+
+       osc_lock_wake_waiters(env, osc, ols);
+}
+
+/**
+ * Helper for osc_dlm_blocking_ast() handling discrepancies between cl_lock
+ * and ldlm_lock caches.
+ */
+static int mdc_dlm_blocking_ast0(const struct lu_env *env,
+                                struct ldlm_lock *dlmlock,
+                                void *data, int flag)
+{
+       struct cl_object *obj = NULL;
+       int result = 0;
+       bool discard;
+       enum cl_lock_mode mode = CLM_READ;
+
+       ENTRY;
+
+       LASSERT(flag == LDLM_CB_CANCELING);
+       LASSERT(dlmlock != NULL);
+
+       lock_res_and_lock(dlmlock);
+       if (dlmlock->l_granted_mode != dlmlock->l_req_mode) {
+               dlmlock->l_ast_data = NULL;
+               unlock_res_and_lock(dlmlock);
+               RETURN(0);
+       }
+
+       discard = ldlm_is_discard_data(dlmlock);
+       if (dlmlock->l_granted_mode & (LCK_PW | LCK_GROUP))
+               mode = CLM_WRITE;
+
+       if (dlmlock->l_ast_data != NULL) {
+               obj = osc2cl(dlmlock->l_ast_data);
+               dlmlock->l_ast_data = NULL;
+               cl_object_get(obj);
+       }
+       unlock_res_and_lock(dlmlock);
+
+       /* if l_ast_data is NULL, the dlmlock was enqueued by AGL or
+        * the object has been destroyed. */
+       if (obj != NULL) {
+               struct cl_attr *attr = &osc_env_info(env)->oti_attr;
+
+               /* Destroy pages covered by the extent of the DLM lock */
+               result = mdc_lock_flush(env, cl2osc(obj), cl_index(obj, 0),
+                                       CL_PAGE_EOF, mode, discard);
+               /* Losing a lock, set KMS to 0.
+                * NB: assumed that DOM lock covers whole data on MDT.
+                */
+               /* losing a lock, update kms */
+               lock_res_and_lock(dlmlock);
+               cl_object_attr_lock(obj);
+               attr->cat_kms = 0;
+               cl_object_attr_update(env, obj, attr, CAT_KMS);
+               cl_object_attr_unlock(obj);
+               unlock_res_and_lock(dlmlock);
+               cl_object_put(env, obj);
+       }
+       RETURN(result);
+}
+
+int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
+                         struct ldlm_lock_desc *new, void *data, int flag)
+{
+       int rc = 0;
+
+       ENTRY;
+
+       switch (flag) {
+       case LDLM_CB_BLOCKING: {
+               struct lustre_handle lockh;
+
+               ldlm_lock2handle(dlmlock, &lockh);
+               rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
+               if (rc == -ENODATA)
+                       rc = 0;
+               break;
+       }
+       case LDLM_CB_CANCELING: {
+               struct lu_env *env;
+               __u16 refcheck;
+
+               /*
+                * This can be called in the context of outer IO, e.g.,
+                *
+                *    osc_enqueue_base()->...
+                *      ->ldlm_prep_elc_req()->...
+                *        ->ldlm_cancel_callback()->...
+                *          ->osc_ldlm_blocking_ast()
+                *
+                * new environment has to be created to not corrupt outer
+                * context.
+                */
+               env = cl_env_get(&refcheck);
+               if (IS_ERR(env)) {
+                       rc = PTR_ERR(env);
+                       break;
+               }
+
+               rc = mdc_dlm_blocking_ast0(env, dlmlock, data, flag);
+               cl_env_put(env, &refcheck);
+               break;
+       }
+       default:
+               LBUG();
+       }
+       RETURN(rc);
+}
+
+/**
+ * Updates object attributes from a lock value block (lvb) received together
+ * with the DLM lock reply from the server.
+ * This can be optimized to not update attributes when lock is a result of a
+ * local match.
+ *
+ * Called under lock and resource spin-locks.
+ */
+static void mdc_lock_lvb_update(const struct lu_env *env,
+                               struct osc_object *osc,
+                               struct ldlm_lock *dlmlock,
+                               struct ost_lvb *lvb)
+{
+       struct cl_object *obj = osc2cl(osc);
+       struct lov_oinfo *oinfo = osc->oo_oinfo;
+       struct cl_attr *attr = &osc_env_info(env)->oti_attr;
+       unsigned valid = CAT_BLOCKS | CAT_ATIME | CAT_CTIME | CAT_MTIME |
+                        CAT_SIZE;
+
+       ENTRY;
+
+       if (lvb == NULL) {
+               LASSERT(dlmlock != NULL);
+               lvb = &dlmlock->l_ost_lvb;
+       }
+       cl_lvb2attr(attr, lvb);
+
+       cl_object_attr_lock(obj);
+       if (dlmlock != NULL) {
+               __u64 size;
+
+               check_res_locked(dlmlock->l_resource);
+               size = lvb->lvb_size;
+
+               if (size >= oinfo->loi_kms) {
+                       LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu,"
+                                  " kms=%llu", lvb->lvb_size, size);
+                       valid |= CAT_KMS;
+                       attr->cat_kms = size;
+               } else {
+                       LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu,"
+                                  " leaving kms=%llu, end=%llu",
+                                  lvb->lvb_size, oinfo->loi_kms,
+                                  dlmlock->l_policy_data.l_extent.end);
+               }
+       }
+       cl_object_attr_update(env, obj, attr, valid);
+       cl_object_attr_unlock(obj);
+       EXIT;
+}
+
+static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
+                            struct lustre_handle *lockh, bool lvb_update)
+{
+       struct ldlm_lock *dlmlock;
+
+       ENTRY;
+
+       dlmlock = ldlm_handle2lock_long(lockh, 0);
+       LASSERT(dlmlock != NULL);
+
+       /* lock reference taken by ldlm_handle2lock_long() is
+        * owned by osc_lock and released in osc_lock_detach()
+        */
+       lu_ref_add(&dlmlock->l_reference, "osc_lock", oscl);
+       oscl->ols_has_ref = 1;
+
+       LASSERT(oscl->ols_dlmlock == NULL);
+       oscl->ols_dlmlock = dlmlock;
+
+       /* This may be a matched lock for glimpse request, do not hold
+        * lock reference in that case. */
+       if (!oscl->ols_glimpse) {
+               /* hold a refc for non glimpse lock which will
+                * be released in osc_lock_cancel() */
+               lustre_handle_copy(&oscl->ols_handle, lockh);
+               ldlm_lock_addref(lockh, oscl->ols_einfo.ei_mode);
+               oscl->ols_hold = 1;
+       }
+
+       /* Lock must have been granted. */
+       lock_res_and_lock(dlmlock);
+       if (dlmlock->l_granted_mode == dlmlock->l_req_mode) {
+               struct cl_lock_descr *descr = &oscl->ols_cl.cls_lock->cll_descr;
+
+               /* extend the lock extent, otherwise it will have problem when
+                * we decide whether to grant a lockless lock. */
+               descr->cld_mode = osc_ldlm2cl_lock(dlmlock->l_granted_mode);
+               descr->cld_start = cl_index(descr->cld_obj, 0);
+               descr->cld_end = CL_PAGE_EOF;
+
+               /* no lvb update for matched lock */
+               if (lvb_update) {
+                       LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY);
+                       mdc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj),
+                                           dlmlock, NULL);
+               }
+       }
+       unlock_res_and_lock(dlmlock);
+
+       LASSERT(oscl->ols_state != OLS_GRANTED);
+       oscl->ols_state = OLS_GRANTED;
+       EXIT;
+}
+
+/**
+ * Lock upcall function that is executed either when a reply to ENQUEUE rpc is
+ * received from a server, or after osc_enqueue_base() matched a local DLM
+ * lock.
+ */
+static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh,
+                          int errcode)
+{
+       struct osc_lock *oscl = cookie;
+       struct cl_lock_slice *slice = &oscl->ols_cl;
+       struct lu_env *env;
+       int rc;
+
+       ENTRY;
+
+       env = cl_env_percpu_get();
+       /* should never happen, similar to osc_ldlm_blocking_ast(). */
+       LASSERT(!IS_ERR(env));
+
+       rc = ldlm_error2errno(errcode);
+       if (oscl->ols_state == OLS_ENQUEUED) {
+               oscl->ols_state = OLS_UPCALL_RECEIVED;
+       } else if (oscl->ols_state == OLS_CANCELLED) {
+               rc = -EIO;
+       } else {
+               CERROR("Impossible state: %d\n", oscl->ols_state);
+               LBUG();
+       }
+
+       CDEBUG(D_INODE, "rc %d, err %d\n", rc, errcode);
+       if (rc == 0)
+               mdc_lock_granted(env, oscl, lockh, errcode == ELDLM_OK);
+
+       /* Error handling, some errors are tolerable. */
+       if (oscl->ols_locklessable && rc == -EUSERS) {
+               /* This is a tolerable error, turn this lock into
+                * lockless lock.
+                */
+               osc_object_set_contended(cl2osc(slice->cls_obj));
+               LASSERT(slice->cls_ops != oscl->ols_lockless_ops);
+
+               /* Change this lock to ldlmlock-less lock. */
+               osc_lock_to_lockless(env, oscl, 1);
+               oscl->ols_state = OLS_GRANTED;
+               rc = 0;
+       } else if (oscl->ols_glimpse && rc == -ENAVAIL) {
+               LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY);
+               mdc_lock_lvb_update(env, cl2osc(slice->cls_obj),
+                                   NULL, &oscl->ols_lvb);
+               /* Hide the error. */
+               rc = 0;
+       }
+
+       if (oscl->ols_owner != NULL)
+               cl_sync_io_note(env, oscl->ols_owner, rc);
+       cl_env_percpu_put(env);
+
+       RETURN(rc);
+}
+
+int mdc_fill_lvb(struct ptlrpc_request *req, struct ost_lvb *lvb)
+{
+       struct mdt_body *body;
+
+       body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+       if (!body)
+               RETURN(-EPROTO);
+
+       lvb->lvb_mtime = body->mbo_mtime;
+       lvb->lvb_atime = body->mbo_atime;
+       lvb->lvb_ctime = body->mbo_ctime;
+       lvb->lvb_blocks = body->mbo_blocks;
+       lvb->lvb_size = body->mbo_size;
+       RETURN(0);
+}
+
+int mdc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
+                    void *cookie, struct lustre_handle *lockh,
+                    enum ldlm_mode mode, __u64 *flags, int errcode)
+{
+       struct osc_lock *ols = cookie;
+       struct ldlm_lock *lock;
+       int rc = 0;
+
+       ENTRY;
+
+       /* The request was created before ldlm_cli_enqueue call. */
+       if (errcode == ELDLM_LOCK_ABORTED) {
+               struct ldlm_reply *rep;
+
+               rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+               LASSERT(rep != NULL);
+
+               rep->lock_policy_res2 =
+                       ptlrpc_status_ntoh(rep->lock_policy_res2);
+               if (rep->lock_policy_res2)
+                       errcode = rep->lock_policy_res2;
+
+               rc = mdc_fill_lvb(req, &ols->ols_lvb);
+               *flags |= LDLM_FL_LVB_READY;
+       } else if (errcode == ELDLM_OK) {
+               /* Callers have references, should be valid always */
+               lock = ldlm_handle2lock(lockh);
+               LASSERT(lock);
+
+               rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
+               LDLM_LOCK_PUT(lock);
+               *flags |= LDLM_FL_LVB_READY;
+       }
+
+       /* Call the update callback. */
+       rc = (*upcall)(cookie, lockh, rc < 0 ? rc : errcode);
+
+       /* release the reference taken in ldlm_cli_enqueue() */
+       if (errcode == ELDLM_LOCK_MATCHED)
+               errcode = ELDLM_OK;
+       if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
+               ldlm_lock_decref(lockh, mode);
+
+       RETURN(rc);
+}
+
+int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
+                         struct osc_enqueue_args *aa, int rc)
+{
+       struct ldlm_lock *lock;
+       struct lustre_handle *lockh = &aa->oa_lockh;
+       enum ldlm_mode mode = aa->oa_mode;
+
+       ENTRY;
+
+       LASSERT(!aa->oa_speculative);
+
+       /* ldlm_cli_enqueue is holding a reference on the lock, so it must
+        * be valid. */
+       lock = ldlm_handle2lock(lockh);
+       LASSERTF(lock != NULL,
+                "lockh %#llx, req %p, aa %p - client evicted?\n",
+                lockh->cookie, req, aa);
+
+       /* Take an additional reference so that a blocking AST that
+        * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
+        * to arrive after an upcall has been executed by
+        * osc_enqueue_fini(). */
+       ldlm_lock_addref(lockh, mode);
+
+       /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
+       OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
+
+       /* Let CP AST to grant the lock first. */
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
+
+       /* Complete obtaining the lock procedure. */
+       rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
+                                  aa->oa_mode, aa->oa_flags, NULL, 0,
+                                  lockh, rc);
+       /* Complete mdc stuff. */
+       rc = mdc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
+                             aa->oa_flags, rc);
+
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
+
+       ldlm_lock_decref(lockh, mode);
+       LDLM_LOCK_PUT(lock);
+       RETURN(rc);
+}
+
+/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
+ * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
+ * other synchronous requests, however keeping some locks and trying to obtain
+ * others may take a considerable amount of time in a case of ost failure; and
+ * when other sync requests do not get released lock from a client, the client
+ * is excluded from the cluster -- such scenarious make the life difficult, so
+ * release locks just after they are obtained. */
+int mdc_enqueue_send(struct obd_export *exp, struct ldlm_res_id *res_id,
+                    __u64 *flags, union ldlm_policy_data *policy,
+                    struct ost_lvb *lvb, int kms_valid,
+                    osc_enqueue_upcall_f upcall, void *cookie,
+                    struct ldlm_enqueue_info *einfo, int async)
+{
+       struct obd_device *obd = exp->exp_obd;
+       struct lustre_handle lockh = { 0 };
+       struct ptlrpc_request *req = NULL;
+       struct ldlm_intent *lit;
+       enum ldlm_mode mode;
+       bool glimpse = *flags & LDLM_FL_HAS_INTENT;
+       __u64 match_flags = *flags;
+       int rc;
+
+       ENTRY;
+
+       if (!kms_valid)
+               goto no_match;
+
+       mode = einfo->ei_mode;
+       if (einfo->ei_mode == LCK_PR)
+               mode |= LCK_PW;
+
+       if (!glimpse)
+               match_flags |= LDLM_FL_BLOCK_GRANTED;
+       mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
+                              einfo->ei_type, policy, mode, &lockh, 0);
+       if (mode) {
+               struct ldlm_lock *matched;
+
+               if (*flags & LDLM_FL_TEST_LOCK)
+                       RETURN(ELDLM_OK);
+
+               matched = ldlm_handle2lock(&lockh);
+               if (mdc_set_dom_lock_data(matched, einfo->ei_cbdata)) {
+                       *flags |= LDLM_FL_LVB_READY;
+
+                       /* We already have a lock, and it's referenced. */
+                       (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
+
+                       ldlm_lock_decref(&lockh, mode);
+                       LDLM_LOCK_PUT(matched);
+                       RETURN(ELDLM_OK);
+               } else {
+                       ldlm_lock_decref(&lockh, mode);
+                       LDLM_LOCK_PUT(matched);
+               }
+       }
+
+no_match:
+       if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
+               RETURN(-ENOLCK);
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_INTENT);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
+       if (rc < 0) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       /* pack the intent */
+       lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+       lit->opc = glimpse ? IT_GLIMPSE : IT_BRW;
+
+       req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, 0);
+       req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
+       ptlrpc_request_set_replen(req);
+
+       /* users of mdc_enqueue() can pass this flag for ldlm_lock_match() */
+       *flags &= ~LDLM_FL_BLOCK_GRANTED;
+       /* All MDC IO locks are intents */
+       *flags |= LDLM_FL_HAS_INTENT;
+       rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, NULL,
+                             0, LVB_T_NONE, &lockh, async);
+       if (async) {
+               if (!rc) {
+                       struct osc_enqueue_args *aa;
+
+                       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+                       aa = ptlrpc_req_async_args(req);
+                       aa->oa_exp = exp;
+                       aa->oa_mode = einfo->ei_mode;
+                       aa->oa_type = einfo->ei_type;
+                       lustre_handle_copy(&aa->oa_lockh, &lockh);
+                       aa->oa_upcall = upcall;
+                       aa->oa_cookie = cookie;
+                       aa->oa_speculative = false;
+                       aa->oa_flags = flags;
+                       aa->oa_lvb = lvb;
+
+                       req->rq_interpret_reply =
+                               (ptlrpc_interpterer_t)mdc_enqueue_interpret;
+                       ptlrpcd_add_req(req);
+               } else {
+                       ptlrpc_req_finished(req);
+               }
+               RETURN(rc);
+       }
+
+       rc = mdc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
+                             flags, rc);
+       ptlrpc_req_finished(req);
+       RETURN(rc);
+}
+
+/**
+ * Implementation of cl_lock_operations::clo_enqueue() method for osc
+ * layer. This initiates ldlm enqueue:
+ *
+ *     - cancels conflicting locks early (osc_lock_enqueue_wait());
+ *
+ *     - calls osc_enqueue_base() to do actual enqueue.
+ *
+ * osc_enqueue_base() is supplied with an upcall function that is executed
+ * when lock is received either after a local cached ldlm lock is matched, or
+ * when a reply from the server is received.
+ *
+ * This function does not wait for the network communication to complete.
+ */
+static int mdc_lock_enqueue(const struct lu_env *env,
+                           const struct cl_lock_slice *slice,
+                           struct cl_io *unused, struct cl_sync_io *anchor)
+{
+       struct osc_thread_info *info = osc_env_info(env);
+       struct osc_io *oio = osc_env_io(env);
+       struct osc_object *osc = cl2osc(slice->cls_obj);
+       struct osc_lock *oscl = cl2osc_lock(slice);
+       struct cl_lock *lock = slice->cls_lock;
+       struct ldlm_res_id *resname = &info->oti_resname;
+       union ldlm_policy_data *policy = &info->oti_policy;
+       osc_enqueue_upcall_f upcall = mdc_lock_upcall;
+       void *cookie = (void *)oscl;
+       bool async = false;
+       int result;
+
+       ENTRY;
+
+       LASSERTF(ergo(oscl->ols_glimpse, lock->cll_descr.cld_mode <= CLM_READ),
+               "lock = %p, ols = %p\n", lock, oscl);
+
+       if (oscl->ols_state == OLS_GRANTED)
+               RETURN(0);
+
+       /* Lockahead is not supported on MDT yet */
+       if (oscl->ols_flags & LDLM_FL_NO_EXPANSION) {
+               result = -EOPNOTSUPP;
+               RETURN(result);
+       }
+
+       if (oscl->ols_flags & LDLM_FL_TEST_LOCK)
+               GOTO(enqueue_base, 0);
+
+       if (oscl->ols_glimpse) {
+               LASSERT(equi(oscl->ols_speculative, anchor == NULL));
+               async = true;
+               GOTO(enqueue_base, 0);
+       }
+
+       result = osc_lock_enqueue_wait(env, osc, oscl);
+       if (result < 0)
+               GOTO(out, result);
+
+       /* we can grant lockless lock right after all conflicting locks
+        * are canceled. */
+       if (osc_lock_is_lockless(oscl)) {
+               oscl->ols_state = OLS_GRANTED;
+               oio->oi_lockless = 1;
+               RETURN(0);
+       }
+
+enqueue_base:
+       oscl->ols_state = OLS_ENQUEUED;
+       if (anchor != NULL) {
+               atomic_inc(&anchor->csi_sync_nr);
+               oscl->ols_owner = anchor;
+       }
+
+       /**
+        * DLM lock's ast data must be osc_object;
+        * DLM's enqueue callback set to osc_lock_upcall() with cookie as
+        * osc_lock.
+        */
+       fid_build_reg_res_name(lu_object_fid(osc2lu(osc)), resname);
+       mdc_lock_build_policy(env, policy);
+       LASSERT(!oscl->ols_speculative);
+       result = mdc_enqueue_send(osc_export(osc), resname, &oscl->ols_flags,
+                                 policy, &oscl->ols_lvb,
+                                 osc->oo_oinfo->loi_kms_valid,
+                                 upcall, cookie, &oscl->ols_einfo, async);
+       if (result == 0) {
+               if (osc_lock_is_lockless(oscl)) {
+                       oio->oi_lockless = 1;
+               } else if (!async) {
+                       LASSERT(oscl->ols_state == OLS_GRANTED);
+                       LASSERT(oscl->ols_hold);
+                       LASSERT(oscl->ols_dlmlock != NULL);
+               }
+       }
+out:
+       if (result < 0) {
+               oscl->ols_state = OLS_CANCELLED;
+               osc_lock_wake_waiters(env, osc, oscl);
+
+               if (anchor != NULL)
+                       cl_sync_io_note(env, anchor, result);
+       }
+       RETURN(result);
+}
+
+static const struct cl_lock_operations mdc_lock_lockless_ops = {
+       .clo_fini = osc_lock_fini,
+       .clo_enqueue = mdc_lock_enqueue,
+       .clo_cancel = mdc_lock_lockless_cancel,
+       .clo_print = osc_lock_print
+};
+
+static const struct cl_lock_operations mdc_lock_ops = {
+       .clo_fini       = osc_lock_fini,
+       .clo_enqueue    = mdc_lock_enqueue,
+       .clo_cancel     = osc_lock_cancel,
+       .clo_print      = osc_lock_print,
+};
+
+int mdc_lock_init(const struct lu_env *env, struct cl_object *obj,
+                 struct cl_lock *lock, const struct cl_io *io)
+{
+       struct osc_lock *ols;
+       __u32 enqflags = lock->cll_descr.cld_enq_flags;
+       __u64 flags = osc_enq2ldlm_flags(enqflags);
+
+       ENTRY;
+
+       /* Ignore AGL for Data-on-MDT, stat returns size data */
+       if ((enqflags & CEF_SPECULATIVE) != 0)
+               RETURN(0);
+
+       OBD_SLAB_ALLOC_PTR_GFP(ols, osc_lock_kmem, GFP_NOFS);
+       if (unlikely(ols == NULL))
+               RETURN(-ENOMEM);
+
+       ols->ols_state = OLS_NEW;
+       spin_lock_init(&ols->ols_lock);
+       INIT_LIST_HEAD(&ols->ols_waiting_list);
+       INIT_LIST_HEAD(&ols->ols_wait_entry);
+       INIT_LIST_HEAD(&ols->ols_nextlock_oscobj);
+       ols->ols_lockless_ops = &mdc_lock_lockless_ops;
+
+       ols->ols_flags = flags;
+       ols->ols_speculative = !!(enqflags & CEF_SPECULATIVE);
+
+       if (ols->ols_flags & LDLM_FL_HAS_INTENT) {
+               ols->ols_flags |= LDLM_FL_BLOCK_GRANTED;
+               ols->ols_glimpse = 1;
+       }
+       mdc_lock_build_einfo(env, lock, cl2osc(obj), &ols->ols_einfo);
+
+       cl_lock_slice_add(lock, &ols->ols_cl, obj, &mdc_lock_ops);
+
+       if (!(enqflags & CEF_MUST))
+               osc_lock_to_lockless(env, ols, (enqflags & CEF_NEVER));
+       if (ols->ols_locklessable && !(enqflags & CEF_DISCARD_DATA))
+               ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
+
+       if (io->ci_type == CIT_WRITE || cl_io_is_mkwrite(io))
+               osc_lock_set_writer(env, io, obj, ols);
+
+       LDLM_DEBUG_NOLOCK("lock %p, mdc lock %p, flags %llx\n",
+                         lock, ols, ols->ols_flags);
+       RETURN(0);
 }
 
 /**
@@ -139,6 +1029,35 @@ static int mdc_io_setattr_start(const struct lu_env *env,
        return rc;
 }
 
+static int mdc_io_read_ahead(const struct lu_env *env,
+                            const struct cl_io_slice *ios,
+                            pgoff_t start, struct cl_read_ahead *ra)
+{
+       struct osc_object *osc = cl2osc(ios->cis_obj);
+       struct ldlm_lock *dlmlock;
+
+       ENTRY;
+
+       dlmlock = mdc_dlmlock_at_pgoff(env, osc, start, 0);
+       if (dlmlock == NULL)
+               RETURN(-ENODATA);
+
+       if (dlmlock->l_req_mode != LCK_PR) {
+               struct lustre_handle lockh;
+
+               ldlm_lock2handle(dlmlock, &lockh);
+               ldlm_lock_addref(&lockh, LCK_PR);
+               ldlm_lock_decref(&lockh, dlmlock->l_req_mode);
+       }
+
+       ra->cra_rpc_size = osc_cli(osc)->cl_max_pages_per_rpc;
+       ra->cra_end = CL_PAGE_EOF;
+       ra->cra_release = osc_read_ahead_release;
+       ra->cra_cbdata = dlmlock;
+
+       RETURN(0);
+}
+
 static struct cl_io_operations mdc_io_ops = {
        .op = {
                [CIT_READ] = {
@@ -174,6 +1093,7 @@ static struct cl_io_operations mdc_io_ops = {
                        .cio_end   = osc_io_fsync_end,
                },
        },
+       .cio_read_ahead   = mdc_io_read_ahead,
        .cio_submit       = osc_io_submit,
        .cio_commit_async = osc_io_commit_async,
 };
@@ -188,6 +1108,12 @@ int mdc_io_init(const struct lu_env *env, struct cl_object *obj,
        return 0;
 }
 
+static void mdc_build_res_name(struct osc_object *osc,
+                                  struct ldlm_res_id *resname)
+{
+       fid_build_reg_res_name(lu_object_fid(osc2lu(osc)), resname);
+}
+
 /**
  * Implementation of struct cl_req_operations::cro_attr_set() for MDC
  * layer. MDC is responsible for struct obdo::o_id and struct obdo::o_seq
@@ -206,16 +1132,66 @@ static void mdc_req_attr_set(const struct lu_env *env, struct cl_object *obj,
 
        if (flags & OBD_MD_FLID)
                attr->cra_oa->o_valid |= OBD_MD_FLID;
+
+       if (flags & OBD_MD_FLHANDLE) {
+               struct ldlm_lock *lock;  /* _some_ lock protecting @apage */
+               struct osc_page *opg;
+
+               opg = osc_cl_page_osc(attr->cra_page, cl2osc(obj));
+               lock = mdc_dlmlock_at_pgoff(env, cl2osc(obj), osc_index(opg),
+                               OSC_DAP_FL_TEST_LOCK | OSC_DAP_FL_CANCELING);
+               if (lock == NULL && !opg->ops_srvlock) {
+                       struct ldlm_resource *res;
+                       struct ldlm_res_id *resname;
+
+                       CL_PAGE_DEBUG(D_ERROR, env, attr->cra_page,
+                                     "uncovered page!\n");
+
+                       resname = &osc_env_info(env)->oti_resname;
+                       mdc_build_res_name(cl2osc(obj), resname);
+                       res = ldlm_resource_get(
+                               osc_export(cl2osc(obj))->exp_obd->obd_namespace,
+                               NULL, resname, LDLM_IBITS, 0);
+                       ldlm_resource_dump(D_ERROR, res);
+
+                       libcfs_debug_dumpstack(NULL);
+                       LBUG();
+               }
+
+               /* check for lockless io. */
+               if (lock != NULL) {
+                       attr->cra_oa->o_handle = lock->l_remote_handle;
+                       attr->cra_oa->o_valid |= OBD_MD_FLHANDLE;
+                       LDLM_LOCK_PUT(lock);
+               }
+       }
+}
+
+static int mdc_attr_get(const struct lu_env *env, struct cl_object *obj,
+                       struct cl_attr *attr)
+{
+       struct lov_oinfo *oinfo = cl2osc(obj)->oo_oinfo;
+
+       if (OST_LVB_IS_ERR(oinfo->loi_lvb.lvb_blocks))
+               return OST_LVB_GET_ERR(oinfo->loi_lvb.lvb_blocks);
+
+       return osc_attr_get(env, obj, attr);
 }
 
 static const struct cl_object_operations mdc_ops = {
        .coo_page_init = osc_page_init,
        .coo_lock_init = mdc_lock_init,
        .coo_io_init = mdc_io_init,
-       .coo_attr_get = osc_attr_get,
+       .coo_attr_get = mdc_attr_get,
        .coo_attr_update = osc_attr_update,
        .coo_glimpse = osc_object_glimpse,
        .coo_req_attr_set = mdc_req_attr_set,
+       .coo_prune = osc_object_prune,
+};
+
+static const struct osc_object_operations mdc_object_ops = {
+       .oto_build_res_name = mdc_build_res_name,
+       .oto_dlmlock_at_pgoff = mdc_dlmlock_at_pgoff,
 };
 
 static int mdc_object_init(const struct lu_env *env, struct lu_object *obj,
@@ -258,6 +1234,7 @@ struct lu_object *mdc_object_alloc(const struct lu_env *env,
                lu_object_init(obj, NULL, dev);
                osc->oo_cl.co_ops = &mdc_ops;
                obj->lo_ops = &mdc_lu_obj_ops;
+               osc->oo_obj_ops = &mdc_object_ops;
                osc->oo_initialized = false;
        } else {
                obj = NULL;
index 4dc7a5e..69b6ed2 100644 (file)
@@ -167,5 +167,7 @@ static inline unsigned long hash_x_index(__u64 hash, int hash64)
 
 /* mdc_dev.c */
 extern struct lu_device_type mdc_device_type;
+int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
+                         struct ldlm_lock_desc *new, void *data, int flag);
 
 #endif
index fb59428..7d12863 100644 (file)
@@ -273,9 +273,10 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
                                                MDS_INODELOCK_UPDATE);
        if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
            (fid_is_sane(&op_data->op_fid3)))
+               /* don't cancel DoM lock which may cause data flush */
                count += mdc_resource_get_unused(exp, &op_data->op_fid3,
                                                 &cancels, LCK_EX,
-                                                MDS_INODELOCK_FULL);
+                                                MDS_INODELOCK_ELC);
         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
                                    &RQF_MDS_REINT_UNLINK);
         if (req == NULL) {
@@ -376,11 +377,11 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
                 count += mdc_resource_get_unused(exp, &op_data->op_fid3,
                                                  &cancels, LCK_EX,
                                                  MDS_INODELOCK_LOOKUP);
-        if ((op_data->op_flags & MF_MDC_CANCEL_FID4) &&
-             (fid_is_sane(&op_data->op_fid4)))
-                count += mdc_resource_get_unused(exp, &op_data->op_fid4,
-                                                 &cancels, LCK_EX,
-                                                 MDS_INODELOCK_FULL);
+       if ((op_data->op_flags & MF_MDC_CANCEL_FID4) &&
+           (fid_is_sane(&op_data->op_fid4)))
+               count += mdc_resource_get_unused(exp, &op_data->op_fid4,
+                                                &cancels, LCK_EX,
+                                                MDS_INODELOCK_ELC);
 
        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
                           op_data->op_cli_flags & CLI_MIGRATE ?
index 4916d40..7bd6531 100644 (file)
@@ -676,9 +676,6 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                        else
                                b->mbo_blocks = 1;
                        b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
-               } else if (lov_pattern(ma->ma_lmm->lmm_pattern) ==
-                          LOV_PATTERN_MDT) {
-                       b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
                }
        }
 
@@ -3275,13 +3272,14 @@ enum mdt_it_code {
         MDT_IT_GETXATTR,
         MDT_IT_LAYOUT,
        MDT_IT_QUOTA,
-        MDT_IT_NR
+       MDT_IT_GLIMPSE,
+       MDT_IT_BRW,
+       MDT_IT_NR
 };
 
 static int mdt_intent_getattr(enum mdt_it_code opcode,
-                              struct mdt_thread_info *info,
-                              struct ldlm_lock **,
-                             __u64);
+                             struct mdt_thread_info *info,
+                             struct ldlm_lock **, __u64);
 
 static int mdt_intent_getxattr(enum mdt_it_code opcode,
                                struct mdt_thread_info *info,
@@ -3296,6 +3294,20 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
                             struct mdt_thread_info *info,
                             struct ldlm_lock **,
                            __u64);
+static int mdt_intent_glimpse(enum mdt_it_code opcode,
+                             struct mdt_thread_info *info,
+                             struct ldlm_lock **lockp, __u64 flags)
+{
+       return mdt_glimpse_enqueue(info, info->mti_mdt->mdt_namespace,
+                                  lockp, flags);
+}
+static int mdt_intent_brw(enum mdt_it_code opcode,
+                         struct mdt_thread_info *info,
+                         struct ldlm_lock **lockp, __u64 flags)
+{
+       return mdt_brw_enqueue(info, info->mti_mdt->mdt_namespace,
+                              lockp, flags);
+}
 
 static struct mdt_it_flavor {
         const struct req_format *it_fmt;
@@ -3367,14 +3379,24 @@ static struct mdt_it_flavor {
                .it_fmt   = &RQF_LDLM_INTENT_LAYOUT,
                .it_flags = 0,
                .it_act   = mdt_intent_layout
-       }
+       },
+       [MDT_IT_GLIMPSE] = {
+               .it_fmt = &RQF_LDLM_INTENT,
+               .it_flags = 0,
+               .it_act = mdt_intent_glimpse,
+       },
+       [MDT_IT_BRW] = {
+               .it_fmt = &RQF_LDLM_INTENT,
+               .it_flags = 0,
+               .it_act = mdt_intent_brw,
+       },
+
 };
 
-static int
-mdt_intent_lock_replace(struct mdt_thread_info *info,
-                       struct ldlm_lock **lockp,
-                       struct mdt_lock_handle *lh,
-                       __u64 flags, int result)
+int mdt_intent_lock_replace(struct mdt_thread_info *info,
+                           struct ldlm_lock **lockp,
+                           struct mdt_lock_handle *lh,
+                           __u64 flags, int result)
 {
         struct ptlrpc_request  *req = mdt_info_req(info);
         struct ldlm_lock       *lock = *lockp;
@@ -3450,6 +3472,8 @@ mdt_intent_lock_replace(struct mdt_thread_info *info,
         new_lock->l_export = class_export_lock_get(req->rq_export, new_lock);
         new_lock->l_blocking_ast = lock->l_blocking_ast;
         new_lock->l_completion_ast = lock->l_completion_ast;
+       if (ldlm_has_dom(new_lock))
+               new_lock->l_glimpse_ast = ldlm_server_glimpse_ast;
         new_lock->l_remote_handle = lock->l_remote_handle;
         new_lock->l_flags &= ~LDLM_FL_LOCAL;
 
@@ -3465,10 +3489,9 @@ mdt_intent_lock_replace(struct mdt_thread_info *info,
         RETURN(ELDLM_LOCK_REPLACED);
 }
 
-static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
-                                   struct ldlm_lock *new_lock,
-                                   struct mdt_lock_handle *lh,
-                                   __u64 flags)
+void mdt_intent_fixup_resent(struct mdt_thread_info *info,
+                            struct ldlm_lock *new_lock,
+                            struct mdt_lock_handle *lh, __u64 flags)
 {
         struct ptlrpc_request  *req = mdt_info_req(info);
         struct ldlm_request    *dlmreq;
@@ -3883,6 +3906,12 @@ static int mdt_intent_code(enum ldlm_intent_flags itcode)
        case IT_QUOTA_CONN:
                rc = MDT_IT_QUOTA;
                break;
+       case IT_GLIMPSE:
+               rc = MDT_IT_GLIMPSE;
+               break;
+       case IT_BRW:
+               rc = MDT_IT_BRW;
+               break;
        default:
                CERROR("Unknown intent opcode: 0x%08x\n", itcode);
                rc = -EINVAL;
@@ -3963,6 +3992,7 @@ static int mdt_intent_policy(struct ldlm_namespace *ns,
        struct ptlrpc_request   *req  =  req_cookie;
        struct ldlm_intent      *it;
        struct req_capsule      *pill;
+       const struct ldlm_lock_desc *ldesc;
        int rc;
 
        ENTRY;
@@ -3972,11 +4002,12 @@ static int mdt_intent_policy(struct ldlm_namespace *ns,
        tsi = tgt_ses_info(req->rq_svc_thread->t_env);
 
        info = tsi2mdt_info(tsi);
-        LASSERT(info != NULL);
-        pill = info->mti_pill;
-        LASSERT(pill->rc_req == req);
+       LASSERT(info != NULL);
+       pill = info->mti_pill;
+       LASSERT(pill->rc_req == req);
+       ldesc = &info->mti_dlm_req->lock_desc;
 
-        if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
+       if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
                req_capsule_extend(pill, &RQF_LDLM_INTENT_BASIC);
                 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
                 if (it != NULL) {
@@ -3989,20 +4020,18 @@ static int mdt_intent_policy(struct ldlm_namespace *ns,
                          * ibits corrupted somewhere in mdt_intent_opc().
                          * The case for client miss to set ibits has been
                          * processed by others. */
-                        LASSERT(ergo(info->mti_dlm_req->lock_desc.l_resource.\
-                                        lr_type == LDLM_IBITS,
-                                     info->mti_dlm_req->lock_desc.\
-                                        l_policy_data.l_inodebits.bits != 0));
-                } else
-                        rc = err_serious(-EFAULT);
-        } else {
-                /* No intent was provided */
-                LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
+                       LASSERT(ergo(ldesc->l_resource.lr_type == LDLM_IBITS,
+                               ldesc->l_policy_data.l_inodebits.bits != 0));
+               } else {
+                       rc = err_serious(-EFAULT);
+               }
+       } else {
+               /* No intent was provided */
                req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0);
-                rc = req_capsule_server_pack(pill);
-                if (rc)
-                        rc = err_serious(rc);
-        }
+               rc = req_capsule_server_pack(pill);
+               if (rc)
+                       rc = err_serious(rc);
+       }
        mdt_thread_info_fini(info);
        RETURN(rc);
 }
index bb8e2df..f81cfa1 100644 (file)
@@ -767,6 +767,13 @@ void mdt_thread_info_init(struct ptlrpc_request *req,
                          struct mdt_thread_info *mti);
 void mdt_thread_info_fini(struct mdt_thread_info *mti);
 struct mdt_thread_info *tsi2mdt_info(struct tgt_session_info *tsi);
+void mdt_intent_fixup_resent(struct mdt_thread_info *info,
+                            struct ldlm_lock *new_lock,
+                            struct mdt_lock_handle *lh, __u64 flags);
+int mdt_intent_lock_replace(struct mdt_thread_info *info,
+                           struct ldlm_lock **lockp,
+                           struct mdt_lock_handle *lh,
+                           __u64 flags, int result);
 
 int mdt_hsm_attr_set(struct mdt_thread_info *info, struct mdt_object *obj,
                     const struct md_hsm *mh);
@@ -1010,6 +1017,11 @@ static inline int is_identity_get_disabled(struct upcall_cache *cache)
 
 int mdt_blocking_ast(struct ldlm_lock*, struct ldlm_lock_desc*, void*, int);
 
+static int mdt_dom_glimpse_ast(struct ldlm_lock *lock, void *reqp)
+{
+       return -ELDLM_NO_LOCK_DATA;
+}
+
 /* Issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
 static inline int mdt_fid_lock(struct ldlm_namespace *ns,
                               struct lustre_handle *lh, enum ldlm_mode mode,
@@ -1018,14 +1030,16 @@ static inline int mdt_fid_lock(struct ldlm_namespace *ns,
                               __u64 flags, const __u64 *client_cookie)
 {
        int rc;
+       bool glimpse = policy->l_inodebits.bits & MDS_INODELOCK_DOM;
 
        LASSERT(ns != NULL);
        LASSERT(lh != NULL);
 
        rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
                                    mode, &flags, mdt_blocking_ast,
-                                   ldlm_completion_ast, NULL, NULL, 0,
-                                   LVB_T_NONE, client_cookie, lh);
+                                   ldlm_completion_ast,
+                                   glimpse ? mdt_dom_glimpse_ast : NULL,
+                                   NULL, 0, LVB_T_NONE, client_cookie, lh);
        return rc == ELDLM_OK ? 0 : -EIO;
 }
 
@@ -1058,6 +1072,9 @@ static inline enum ldlm_mode mdt_mdl_mode2dlm_mode(mdl_mode_t mode)
 
 /* mdt_lvb.c */
 extern struct ldlm_valblock_ops mdt_lvbo;
+int mdt_dom_lvb_is_valid(struct ldlm_resource *res);
+int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
+                       struct ptlrpc_request *req, bool increase_only);
 
 void mdt_enable_cos(struct mdt_device *, int);
 int mdt_cos_is_enabled(struct mdt_device *);
@@ -1139,9 +1156,29 @@ int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                     struct niobuf_remote *rnb, int npages,
                     struct niobuf_local *lnb, int old_rc);
 int mdt_punch_hdl(struct tgt_session_info *tsi);
+int mdt_glimpse_enqueue(struct mdt_thread_info *mti, struct ldlm_namespace *ns,
+                       struct ldlm_lock **lockp, __u64 flags);
+int mdt_brw_enqueue(struct mdt_thread_info *info, struct ldlm_namespace *ns,
+                   struct ldlm_lock **lockp, __u64 flags);
+void mdt_dom_discard_data(struct mdt_thread_info *info,
+                         const struct lu_fid *fid);
+int mdt_dom_disk_lvbo_update(const struct lu_env *env, struct mdt_object *mo,
+                            struct ldlm_resource *res, bool increase_only);
+void mdt_dom_obj_lvb_update(const struct lu_env *env, struct mdt_object *mo,
+                           bool increase_only);
+int mdt_dom_lvb_alloc(struct ldlm_resource *res);
+
+static inline void mdt_dom_check_and_discard(struct mdt_thread_info *mti,
+                                            struct mdt_object *mo)
+{
+       if (lu_object_is_dying(&mo->mot_header) &&
+           S_ISREG(lu_object_attr(&mo->mot_obj)))
+               mdt_dom_discard_data(mti, mdt_object_fid(mo));
+}
 
 /* grants */
 long mdt_grant_connect(const struct lu_env *env, struct obd_export *exp,
                       u64 want, bool conservative);
+extern struct kmem_cache *ldlm_glimpse_work_kmem;
 
 #endif /* _MDT_INTERNAL_H */
index 954713e..2548612 100644 (file)
@@ -383,6 +383,25 @@ out:
        RETURN(rc);
 }
 
+void mdt_dom_obj_lvb_update(const struct lu_env *env, struct mdt_object *mo,
+                           bool increase_only)
+{
+       struct mdt_device *mdt = mdt_dev(mo->mot_obj.lo_dev);
+       struct ldlm_res_id resid;
+       struct ldlm_resource *res;
+
+       fid_build_reg_res_name(mdt_object_fid(mo), &resid);
+       res = ldlm_resource_get(mdt->mdt_namespace, NULL, &resid,
+                               LDLM_IBITS, 1);
+       if (IS_ERR(res))
+               return;
+
+       /* Update lvbo data if exists. */
+       if (mdt_dom_lvb_is_valid(res))
+               mdt_dom_disk_lvbo_update(env, mo, res, increase_only);
+       ldlm_resource_putref(res);
+}
+
 int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                     struct obdo *oa, int objcount, struct obd_ioobj *obj,
                     struct niobuf_remote *rnb, int npages,
@@ -423,6 +442,7 @@ int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                else
                        obdo_from_la(oa, la, LA_GID | LA_UID);
 
+               mdt_dom_obj_lvb_update(env, mo, false);
                /* don't report overquota flag if we failed before reaching
                 * commit */
                if (old_rc == 0 && (rc == 0 || rc == -EDQUOT)) {
@@ -445,6 +465,11 @@ int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                                       OBD_MD_FLGRPQUOTA;
                }
        } else if (cmd == OBD_BRW_READ) {
+               /* If oa != NULL then mdt_preprw_read updated the inode
+                * atime and we should update the lvb so that other glimpses
+                * will also get the updated value. bug 5972 */
+               if (oa)
+                       mdt_dom_obj_lvb_update(env, mo, true);
                rc = mdt_commitrw_read(env, mdt, mo, objcount, npages, lnb);
                if (old_rc)
                        rc = old_rc;
@@ -584,6 +609,7 @@ int mdt_punch_hdl(struct tgt_session_info *tsi)
        if (rc)
                GOTO(out_put, rc);
 
+       mdt_dom_obj_lvb_update(tsi->tsi_env, mo, false);
        mdt_io_counter_incr(tsi->tsi_exp, LPROC_MDT_IO_PUNCH,
                            tsi->tsi_jobid, 1);
        EXIT;
@@ -594,21 +620,347 @@ out_unlock:
                mdt_save_lock(info, &lh, LCK_PW, rc);
 out:
        mdt_thread_info_fini(info);
-       if (rc == 0) {
-               struct ldlm_resource *res;
-
-               /* we do not call this before to avoid lu_object_find() in
-                *  ->lvbo_update() holding another reference on the object.
-                * otherwise concurrent destroy can make the object unavailable
-                * for 2nd lu_object_find() waiting for the first reference
-                * to go... deadlock! */
-               res = ldlm_resource_get(ns, NULL, &tsi->tsi_resid,
-                                       LDLM_IBITS, 0);
-               if (!IS_ERR(res)) {
-                       ldlm_res_lvbo_update(res, NULL, 0);
-                       ldlm_resource_putref(res);
-               }
+       return rc;
+}
+
+/**
+ * MDT glimpse for Data-on-MDT
+ *
+ * If there is write lock on client then function issues glimpse_ast to get
+ * an actual size from that client.
+ *
+ */
+int mdt_do_glimpse(const struct lu_env *env, struct ldlm_namespace *ns,
+                  struct ldlm_resource *res)
+{
+       union ldlm_policy_data policy;
+       struct lustre_handle lockh;
+       enum ldlm_mode mode;
+       struct ldlm_lock *lock;
+       struct ldlm_glimpse_work *gl_work;
+       struct list_head gl_list;
+       int rc;
+
+       ENTRY;
+
+       /* There can be only one write lock covering data, try to match it. */
+       policy.l_inodebits.bits = MDS_INODELOCK_DOM;
+       mode = ldlm_lock_match(ns, LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK,
+                              &res->lr_name, LDLM_IBITS, &policy,
+                              LCK_PW, &lockh, 0);
+
+       /* There is no PW lock on this object; finished. */
+       if (mode == 0)
+               RETURN(0);
+
+       lock = ldlm_handle2lock(&lockh);
+       if (lock == NULL)
+               RETURN(0);
+
+       /*
+        * This check is for lock taken in mdt_reint_unlink() that does
+        * not have l_glimpse_ast set. So the logic is: if there is a lock
+        * with no l_glimpse_ast set, this object is being destroyed already.
+        * Hence, if you are grabbing DLM locks on the server, always set
+        * non-NULL glimpse_ast (e.g., ldlm_request.c::ldlm_glimpse_ast()).
+        */
+       if (lock->l_glimpse_ast == NULL) {
+               LDLM_DEBUG(lock, "no l_glimpse_ast");
+               GOTO(out, rc = -ENOENT);
        }
+
+       OBD_SLAB_ALLOC_PTR_GFP(gl_work, ldlm_glimpse_work_kmem, GFP_ATOMIC);
+       if (!gl_work)
+               GOTO(out, rc = -ENOMEM);
+
+       /* Populate the gl_work structure.
+        * Grab additional reference on the lock which will be released in
+        * ldlm_work_gl_ast_lock() */
+       gl_work->gl_lock = LDLM_LOCK_GET(lock);
+       /* The glimpse callback is sent to one single IO lock. As a result,
+        * the gl_work list is just composed of one element */
+       INIT_LIST_HEAD(&gl_list);
+       list_add_tail(&gl_work->gl_list, &gl_list);
+       /* There is actually no need for a glimpse descriptor when glimpsing
+        * IO locks */
+       gl_work->gl_desc = NULL;
+       /* the ldlm_glimpse_work structure is allocated on the stack */
+       gl_work->gl_flags = LDLM_GL_WORK_SLAB_ALLOCATED;
+
+       ldlm_glimpse_locks(res, &gl_list); /* this will update the LVB */
+
+       /* If the list is not empty, we failed to glimpse a lock and
+        * must clean it up. Usually due to a race with unlink.*/
+       if (!list_empty(&gl_list)) {
+               LDLM_LOCK_RELEASE(lock);
+               OBD_SLAB_FREE_PTR(gl_work, ldlm_glimpse_work_kmem);
+       }
+       rc = 0;
+       EXIT;
+out:
+       LDLM_LOCK_PUT(lock);
        return rc;
 }
 
+static void mdt_lvb2body(struct ldlm_resource *res, struct mdt_body *mb)
+{
+       struct ost_lvb *res_lvb;
+
+       lock_res(res);
+       res_lvb = res->lr_lvb_data;
+       mb->mbo_size = res_lvb->lvb_size;
+       mb->mbo_blocks = res_lvb->lvb_blocks;
+       mb->mbo_mtime = res_lvb->lvb_mtime;
+       mb->mbo_ctime = res_lvb->lvb_ctime;
+       mb->mbo_atime = res_lvb->lvb_atime;
+
+       CDEBUG(D_DLMTRACE, "size %llu\n", res_lvb->lvb_size);
+
+       mb->mbo_valid |= OBD_MD_FLATIME | OBD_MD_FLCTIME | OBD_MD_FLMTIME |
+                        OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+       unlock_res(res);
+}
+
+/**
+ * MDT glimpse for Data-on-MDT
+ *
+ * This function is called when MDT get attributes for the DoM object.
+ * If there is write lock on client then function issues glimpse_ast to get
+ * an actual size from that client.
+ */
+int mdt_dom_object_size(const struct lu_env *env, struct mdt_device *mdt,
+                       const struct lu_fid *fid, struct mdt_body *mb,
+                       bool dom_lock)
+{
+       struct ldlm_res_id resid;
+       struct ldlm_resource *res;
+       int rc = 0;
+
+       ENTRY;
+
+       fid_build_reg_res_name(fid, &resid);
+       res = ldlm_resource_get(mdt->mdt_namespace, NULL, &resid,
+                               LDLM_IBITS, 1);
+       if (IS_ERR(res) || res->lr_lvb_data == NULL)
+               RETURN(-ENOENT);
+
+       /* if there is no DOM bit in the lock then glimpse is needed
+        * to return valid size */
+       if (!dom_lock) {
+               rc = mdt_do_glimpse(env, mdt->mdt_namespace, res);
+               if (rc < 0)
+                       GOTO(out, rc);
+       }
+
+       /* Update lvbo data if DoM lock returned or if LVB is not yet valid. */
+       if (dom_lock || !mdt_dom_lvb_is_valid(res))
+               mdt_dom_lvbo_update(res, NULL, NULL, false);
+
+       mdt_lvb2body(res, mb);
+out:
+       ldlm_resource_putref(res);
+       RETURN(rc);
+}
+
+/**
+ * MDT DoM lock intent policy (glimpse)
+ *
+ * Intent policy is called when lock has an intent, for DoM file that
+ * means glimpse lock and policy fills Lock Value Block (LVB).
+ *
+ * If already granted lock is found it will be placed in \a lockp and
+ * returned back to caller function.
+ *
+ * \param[in] tsi       session info
+ * \param[in,out] lockp         pointer to the lock
+ * \param[in] flags     LDLM flags
+ *
+ * \retval             ELDLM_LOCK_REPLACED if already granted lock was found
+ *                     and placed in \a lockp
+ * \retval             ELDLM_LOCK_ABORTED in other cases except error
+ * \retval             negative value on error
+ */
+int mdt_glimpse_enqueue(struct mdt_thread_info *mti, struct ldlm_namespace *ns,
+                       struct ldlm_lock **lockp, __u64 flags)
+{
+       struct ldlm_lock *lock = *lockp;
+       struct ldlm_resource *res = lock->l_resource;
+       ldlm_processing_policy policy;
+       struct ldlm_reply *rep;
+       struct mdt_body *mbo;
+       int rc;
+
+       ENTRY;
+
+       policy = ldlm_get_processing_policy(res);
+       LASSERT(policy != NULL);
+
+       req_capsule_set_size(mti->mti_pill, &RMF_MDT_MD, RCL_SERVER, 0);
+       req_capsule_set_size(mti->mti_pill, &RMF_ACL, RCL_SERVER, 0);
+       rc = req_capsule_server_pack(mti->mti_pill);
+       if (rc)
+               RETURN(err_serious(rc));
+
+       rep = req_capsule_server_get(mti->mti_pill, &RMF_DLM_REP);
+       if (rep == NULL)
+               RETURN(-EPROTO);
+
+       mbo = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY);
+       if (mbo == NULL)
+               RETURN(-EPROTO);
+
+       lock_res(res);
+       /* Check if this is a resend case (MSG_RESENT is set on RPC) and a
+        * lock was found by ldlm_handle_enqueue(); if so no need to grant
+        * it again. */
+       if (flags & LDLM_FL_RESENT) {
+               rc = LDLM_ITER_CONTINUE;
+       } else {
+               __u64 tmpflags = 0;
+               enum ldlm_error err;
+
+               rc = policy(lock, &tmpflags, 0, &err, NULL);
+               check_res_locked(res);
+       }
+       unlock_res(res);
+
+       /* The lock met with no resistance; we're finished. */
+       if (rc == LDLM_ITER_CONTINUE) {
+               GOTO(fill_mbo, rc = ELDLM_LOCK_REPLACED);
+       } else if (flags & LDLM_FL_BLOCK_NOWAIT) {
+               /* LDLM_FL_BLOCK_NOWAIT means it is for AGL. Do not send glimpse
+                * callback for glimpse size. The real size user will trigger
+                * the glimpse callback when necessary. */
+               GOTO(fill_mbo, rc = ELDLM_LOCK_ABORTED);
+       }
+
+       rc = mdt_do_glimpse(mti->mti_env, ns, res);
+       if (rc == -ENOENT) {
+               /* We are racing with unlink(); just return -ENOENT */
+               rep->lock_policy_res2 = ptlrpc_status_hton(-ENOENT);
+               rc = 0;
+       } else if (rc == -EINVAL) {
+               /* this is possible is client lock has been cancelled but
+                * still exists on server. If that lock was found on server
+                * as only conflicting lock then the client has already
+                * size authority and glimpse is not needed. */
+               CDEBUG(D_DLMTRACE, "Glimpse from the client owning lock\n");
+               rc = 0;
+       } else if (rc < 0) {
+               RETURN(rc);
+       }
+       rc = ELDLM_LOCK_ABORTED;
+fill_mbo:
+       /* LVB can be without valid data in case of DOM */
+       if (!mdt_dom_lvb_is_valid(res))
+               mdt_dom_lvbo_update(res, lock, NULL, false);
+       mdt_lvb2body(res, mbo);
+       RETURN(rc);
+}
+
+int mdt_brw_enqueue(struct mdt_thread_info *mti, struct ldlm_namespace *ns,
+                   struct ldlm_lock **lockp, __u64 flags)
+{
+       struct tgt_session_info *tsi = tgt_ses_info(mti->mti_env);
+       struct lu_fid *fid = &tsi->tsi_fid;
+       struct ldlm_lock *lock = *lockp;
+       struct ldlm_resource *res = lock->l_resource;
+       struct ldlm_reply *rep;
+       struct mdt_body *mbo;
+       struct mdt_lock_handle *lhc = &mti->mti_lh[MDT_LH_RMT];
+       struct mdt_object *mo;
+       int rc = 0;
+
+       ENTRY;
+
+       /* Get lock from request for possible resent case. */
+       mdt_intent_fixup_resent(mti, *lockp, lhc, flags);
+       req_capsule_set_size(mti->mti_pill, &RMF_MDT_MD, RCL_SERVER, 0);
+       req_capsule_set_size(mti->mti_pill, &RMF_ACL, RCL_SERVER, 0);
+       rc = req_capsule_server_pack(mti->mti_pill);
+       if (rc)
+               RETURN(err_serious(rc));
+
+       rep = req_capsule_server_get(mti->mti_pill, &RMF_DLM_REP);
+       if (rep == NULL)
+               RETURN(-EPROTO);
+
+       mbo = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY);
+       if (mbo == NULL)
+               RETURN(-EPROTO);
+
+       fid_extract_from_res_name(fid, &res->lr_name);
+       mo = mdt_object_find(mti->mti_env, mti->mti_mdt, fid);
+       if (unlikely(IS_ERR(mo)))
+               RETURN(PTR_ERR(mo));
+
+       if (!mdt_object_exists(mo))
+               GOTO(out, rc = -ENOENT);
+
+       if (mdt_object_remote(mo))
+               GOTO(out, rc = -EPROTO);
+
+       /* resent case */
+       if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) {
+               mdt_lock_handle_init(lhc);
+               mdt_lock_reg_init(lhc, (*lockp)->l_req_mode);
+               /* This will block MDT thread but it should be fine until
+                * client caches small amount of data for DoM, which should be
+                * smaller than one BRW RPC and should be able to be
+                * piggybacked by lock cancel RPC.
+                * If the client could hold the lock too long, this code can be
+                * revised to call mdt_object_lock_try(). And if fails, it will
+                * return ELDLM_OK here and fall back into normal lock enqueue
+                * process.
+                */
+               rc = mdt_object_lock(mti, mo, lhc, MDS_INODELOCK_DOM);
+               if (rc)
+                       GOTO(out, rc);
+       }
+
+       if (!mdt_dom_lvb_is_valid(res)) {
+               rc = mdt_dom_lvb_alloc(res);
+               if (rc)
+                       GOTO(out_fail, rc);
+               mdt_dom_disk_lvbo_update(mti->mti_env, mo, res, false);
+       }
+       mdt_lvb2body(res, mbo);
+out_fail:
+       rep->lock_policy_res2 = clear_serious(rc);
+       if (rep->lock_policy_res2) {
+               lhc->mlh_reg_lh.cookie = 0ull;
+               GOTO(out, rc = ELDLM_LOCK_ABORTED);
+       }
+
+       rc = mdt_intent_lock_replace(mti, lockp, lhc, flags, rc);
+out:
+       mdt_object_put(mti->mti_env, mo);
+       RETURN(rc);
+}
+
+void mdt_dom_discard_data(struct mdt_thread_info *info,
+                         const struct lu_fid *fid)
+{
+       struct mdt_device *mdt = info->mti_mdt;
+       union ldlm_policy_data *policy = &info->mti_policy;
+       struct ldlm_res_id *res_id = &info->mti_res_id;
+       struct lustre_handle dom_lh;
+       __u64 flags = LDLM_FL_AST_DISCARD_DATA;
+       __u64 rc = 0;
+
+       policy->l_inodebits.bits = MDS_INODELOCK_DOM;
+       policy->l_inodebits.try_bits = 0;
+       fid_build_reg_res_name(fid, res_id);
+
+       /* Tell the clients that the object is gone now and that they should
+        * throw away any cached pages. */
+       rc = ldlm_cli_enqueue_local(mdt->mdt_namespace, res_id, LDLM_IBITS,
+                                   policy, LCK_PW, &flags, ldlm_blocking_ast,
+                                   ldlm_completion_ast, NULL, NULL, 0,
+                                   LVB_T_NONE, NULL, &dom_lh);
+
+       /* We only care about the side-effects, just drop the lock. */
+       if (rc == ELDLM_OK)
+               ldlm_lock_decref(&dom_lh, LCK_PW);
+}
+
index 31c2438..1896039 100644 (file)
@@ -30,7 +30,7 @@
  */
 
 #define DEBUG_SUBSYSTEM S_MDS
-
+#include <lustre_swab.h>
 #include "mdt_internal.h"
 
 /* Called with res->lr_lvb_sem held */
@@ -46,17 +46,216 @@ static int mdt_lvbo_init(struct ldlm_resource *res)
                /* call lvbo init function of quota master */
                return qmt_hdls.qmth_lvbo_init(mdt->mdt_qmt_dev, res);
        }
+       return 0;
+}
+
+int mdt_dom_lvb_alloc(struct ldlm_resource *res)
+{
+       struct ost_lvb *lvb;
+
+       mutex_lock(&res->lr_lvb_mutex);
+       if (res->lr_lvb_data == NULL) {
+               OBD_ALLOC_PTR(lvb);
+               if (lvb == NULL) {
+                       mutex_unlock(&res->lr_lvb_mutex);
+                       return -ENOMEM;
+               }
 
+               res->lr_lvb_data = lvb;
+               res->lr_lvb_len = sizeof(*lvb);
+
+               /* Store error in LVB to inidicate it has no data yet.
+                */
+               OST_LVB_SET_ERR(lvb->lvb_blocks, -ENODATA);
+       }
+       mutex_unlock(&res->lr_lvb_mutex);
        return 0;
 }
 
-static int mdt_lvbo_update(struct ldlm_resource *res,
-                          struct ldlm_lock *lock,
-                          struct ptlrpc_request *req,
-                          int increase_only)
+int mdt_dom_lvb_is_valid(struct ldlm_resource *res)
+{
+       struct ost_lvb *res_lvb = res->lr_lvb_data;
+
+       return !(res_lvb == NULL || OST_LVB_IS_ERR(res_lvb->lvb_blocks));
+}
+
+int mdt_dom_disk_lvbo_update(const struct lu_env *env, struct mdt_object *mo,
+                            struct ldlm_resource *res, bool increase_only)
+{
+       struct mdt_thread_info *info = mdt_th_info(env);
+       const struct lu_fid *fid = mdt_object_fid(mo);
+       struct ost_lvb *lvb;
+       struct md_attr *ma;
+       int rc = 0;
+
+       ENTRY;
+
+       lvb = res->lr_lvb_data;
+       LASSERT(lvb);
+
+       if (!mdt_object_exists(mo) || mdt_object_remote(mo))
+               RETURN(-ENOENT);
+
+       ma = &info->mti_attr;
+       ma->ma_valid = 0;
+       ma->ma_need = MA_INODE;
+       rc = mo_attr_get(env, mdt_object_child(mo), ma);
+       if (rc)
+               RETURN(rc);
+
+       lock_res(res);
+       if (ma->ma_attr.la_size > lvb->lvb_size || !increase_only) {
+               CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size from disk: "
+                      "%llu -> %llu\n", PFID(fid),
+                      lvb->lvb_size, ma->ma_attr.la_size);
+               lvb->lvb_size = ma->ma_attr.la_size;
+       }
+
+       if (ma->ma_attr.la_mtime > lvb->lvb_mtime || !increase_only) {
+               CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime from disk: "
+                      "%llu -> %llu\n", PFID(fid),
+                      lvb->lvb_mtime, ma->ma_attr.la_mtime);
+               lvb->lvb_mtime = ma->ma_attr.la_mtime;
+       }
+       if (ma->ma_attr.la_atime > lvb->lvb_atime || !increase_only) {
+               CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime from disk: "
+                      "%llu -> %llu\n", PFID(fid),
+                      lvb->lvb_atime, ma->ma_attr.la_atime);
+               lvb->lvb_atime = ma->ma_attr.la_atime;
+       }
+       if (ma->ma_attr.la_ctime > lvb->lvb_ctime || !increase_only) {
+               CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime from disk: "
+                      "%llu -> %llu\n", PFID(fid),
+                      lvb->lvb_ctime, ma->ma_attr.la_ctime);
+               lvb->lvb_ctime = ma->ma_attr.la_ctime;
+       }
+       if (ma->ma_attr.la_blocks > lvb->lvb_blocks || !increase_only) {
+               CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks from disk: "
+                      "%llu -> %llu\n", PFID(fid), lvb->lvb_blocks,
+                      (unsigned long long)ma->ma_attr.la_blocks);
+               lvb->lvb_blocks = ma->ma_attr.la_blocks;
+       }
+       unlock_res(res);
+
+       RETURN(rc);
+}
+
+int mdt_dom_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
+                       struct ptlrpc_request *req, bool increase_only)
 {
+       struct obd_export *exp = lock ? lock->l_export : NULL;
+       struct mdt_device *mdt;
+       struct mdt_object *mo;
+       struct mdt_thread_info *info;
+       struct ost_lvb *lvb;
+       struct lu_env env;
+       struct lu_fid *fid;
+       int rc = 0;
+
+       ENTRY;
+
+       /* Before going further let's check that OBD and export are healthy.
+        */
+       if (exp != NULL &&
+           (exp->exp_disconnected || exp->exp_failed ||
+            exp->exp_obd->obd_stopping)) {
+               CDEBUG(D_INFO, "Skip LVB update, export is %s, obd is %s\n",
+                      exp->exp_failed ? "failed" : "disconnected",
+                      exp->exp_obd->obd_stopping ? "stopping" : "OK");
+               RETURN(0);
+       }
+
+       rc = mdt_dom_lvb_alloc(res);
+       if (rc < 0)
+               RETURN(rc);
+
+       mdt = ldlm_res_to_ns(res)->ns_lvbp;
+       if (mdt == NULL)
+               RETURN(-ENOENT);
+
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc)
+               RETURN(rc);
+
+       info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
+       if (info == NULL)
+               GOTO(out_env, rc = -ENOMEM);
+
+       memset(info, 0, sizeof *info);
+       info->mti_env = &env;
+       info->mti_exp = req ? req->rq_export : NULL;
+       info->mti_mdt = mdt;
+
+       fid = &info->mti_tmp_fid2;
+       fid_extract_from_res_name(fid, &res->lr_name);
+
+       lvb = res->lr_lvb_data;
+       LASSERT(lvb);
+
+       /* Update the LVB from the network message */
+       if (req != NULL) {
+               struct ost_lvb *rpc_lvb;
+
+               rpc_lvb = req_capsule_server_swab_get(&req->rq_pill,
+                                                     &RMF_DLM_LVB,
+                                                     lustre_swab_ost_lvb);
+               if (rpc_lvb == NULL)
+                       goto disk_update;
+
+               lock_res(res);
+               if (rpc_lvb->lvb_size > lvb->lvb_size || !increase_only) {
+                       CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size: "
+                              "%llu -> %llu\n", PFID(fid),
+                              lvb->lvb_size, rpc_lvb->lvb_size);
+                       lvb->lvb_size = rpc_lvb->lvb_size;
+               }
+               if (rpc_lvb->lvb_mtime > lvb->lvb_mtime || !increase_only) {
+                       CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime: "
+                              "%llu -> %llu\n", PFID(fid),
+                              lvb->lvb_mtime, rpc_lvb->lvb_mtime);
+                       lvb->lvb_mtime = rpc_lvb->lvb_mtime;
+               }
+               if (rpc_lvb->lvb_atime > lvb->lvb_atime || !increase_only) {
+                       CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime: "
+                              "%llu -> %llu\n", PFID(fid),
+                              lvb->lvb_atime, rpc_lvb->lvb_atime);
+                       lvb->lvb_atime = rpc_lvb->lvb_atime;
+               }
+               if (rpc_lvb->lvb_ctime > lvb->lvb_ctime || !increase_only) {
+                       CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime: "
+                              "%llu -> %llu\n", PFID(fid),
+                              lvb->lvb_ctime, rpc_lvb->lvb_ctime);
+                       lvb->lvb_ctime = rpc_lvb->lvb_ctime;
+               }
+               if (rpc_lvb->lvb_blocks > lvb->lvb_blocks || !increase_only) {
+                       CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks: "
+                              "%llu -> %llu\n", PFID(fid),
+                              lvb->lvb_blocks, rpc_lvb->lvb_blocks);
+                       lvb->lvb_blocks = rpc_lvb->lvb_blocks;
+               }
+               unlock_res(res);
+       }
+
+disk_update:
+       /* Update the LVB from the disk inode */
+       mo = mdt_object_find(&env, mdt, fid);
+       if (IS_ERR(mo))
+               GOTO(out_env, rc = PTR_ERR(mo));
+
+       rc = mdt_dom_disk_lvbo_update(&env, mo, res, !!increase_only);
+       mdt_object_put(&env, mo);
+out_env:
+       lu_env_fini(&env);
+       return rc;
+}
+
+static int mdt_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
+                          struct ptlrpc_request *req, int increase_only)
+{
+       ENTRY;
+
        if (IS_LQUOTA_RES(res)) {
-               struct mdt_device       *mdt;
+               struct mdt_device *mdt;
 
                mdt = ldlm_res_to_ns(res)->ns_lvbp;
                if (mdt->mdt_qmt_dev == NULL)
@@ -67,6 +266,13 @@ static int mdt_lvbo_update(struct ldlm_resource *res,
                                                 increase_only);
        }
 
+       /* Data-on-MDT lvbo update.
+        * Like a ldlm_lock_init() the lock can be skipped and that means
+        * it is DOM resource because lvbo_update() without lock is called
+        * by MDT for DOM objects only.
+        */
+       if (lock == NULL || ldlm_has_dom(lock))
+               return mdt_dom_lvbo_update(res, lock, req, !!increase_only);
        return 0;
 }
 
@@ -87,6 +293,9 @@ static int mdt_lvbo_size(struct ldlm_lock *lock)
                return qmt_hdls.qmth_lvbo_size(mdt->mdt_qmt_dev, lock);
        }
 
+       if (ldlm_has_dom(lock))
+               return sizeof(struct ost_lvb);
+
        if (ldlm_has_layout(lock))
                return mdt->mdt_max_mdsize;
 
@@ -115,12 +324,29 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
                RETURN(rc);
        }
 
+       /* LVB for DoM lock is needed only for glimpse,
+        * don't fill DoM data if there is layout lock */
+       if (ldlm_has_dom(lock)) {
+               struct ldlm_resource *res = lock->l_resource;
+               int lvb_len = sizeof(struct ost_lvb);
+
+               if (!mdt_dom_lvb_is_valid(res))
+                       mdt_dom_lvbo_update(lock->l_resource, lock, NULL, 0);
+
+               if (lvb_len > lvblen)
+                       lvb_len = lvblen;
+
+               lock_res(res);
+               memcpy(lvb, res->lr_lvb_data, lvb_len);
+               unlock_res(res);
+
+               RETURN(lvb_len);
+       }
+
        /* Only fill layout if layout lock is granted */
        if (!ldlm_has_layout(lock) || lock->l_granted_mode != lock->l_req_mode)
                RETURN(0);
 
-       /* layout lock will be granted to client, fill in lvb with layout */
-
        /* XXX create an env to talk to mdt stack. We should get this env from
         * ptlrpc_thread->t_env. */
        rc = lu_env_init(&env, LCT_MD_THREAD);
@@ -155,20 +381,19 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
        rc = mo_xattr_get(&env, child, &LU_BUF_NULL, XATTR_NAME_LOV);
        if (rc < 0)
                GOTO(out, rc);
-
        if (rc > 0) {
                struct lu_buf *lmm = NULL;
-
                if (lvblen < rc) {
                        CERROR("%s: expected %d actual %d.\n",
-                               mdt_obd_name(mdt), rc, lvblen);
+                              mdt_obd_name(mdt), rc, lvblen);
+                       /* if layout size is bigger then update max_mdsize */
+                       if (rc > info->mti_mdt->mdt_max_mdsize)
+                               info->mti_mdt->mdt_max_mdsize = rc;
                        GOTO(out, rc = -ERANGE);
                }
-
                lmm = &info->mti_buf;
                lmm->lb_buf = lvb;
                lmm->lb_len = rc;
-
                rc = mo_xattr_get(&env, child, lmm, XATTR_NAME_LOV);
                if (rc < 0)
                        GOTO(out, rc);
@@ -194,6 +419,9 @@ static int mdt_lvbo_free(struct ldlm_resource *res)
                return qmt_hdls.qmth_lvbo_free(mdt->mdt_qmt_dev, res);
        }
 
+       /* Data-on-MDT lvbo free */
+       if (res->lr_lvb_data != NULL)
+               OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
        return 0;
 }
 
index e04c202..e7c68df 100644 (file)
@@ -2074,8 +2074,10 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
        atomic_dec(&o->mot_open_count);
        mdt_handle_last_unlink(info, o, ma);
 
-        if (!MFD_CLOSED(mode))
-                rc = mo_close(info->mti_env, next, ma, mode);
+       if (!MFD_CLOSED(mode)) {
+               rc = mo_close(info->mti_env, next, ma, mode);
+               mdt_dom_check_and_discard(info, o);
+       }
 
        /* adjust open and lease count */
        if (mode & MDS_OPEN_LEASE) {
index a252b5f..1f079be 100644 (file)
@@ -660,7 +660,7 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
 
         if (rc != 0)
                 GOTO(out_unlock, rc);
-
+       mdt_dom_obj_lvb_update(info->mti_env, mo, false);
         EXIT;
 out_unlock:
        mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo, rc);
@@ -795,11 +795,11 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
 
        mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
 
-        EXIT;
+       EXIT;
 out_put:
-        mdt_object_put(info->mti_env, mo);
+       mdt_object_put(info->mti_env, mo);
 out:
-        if (rc == 0)
+       if (rc == 0)
                mdt_counter_incr(req, LPROC_MDT_SETATTR);
 
         mdt_client_compatibility(info);
@@ -873,6 +873,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
        bool cos_incompat = false;
        int no_name = 0;
        int rc;
+
        ENTRY;
 
        DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
@@ -1044,32 +1045,39 @@ relock:
                        mdt_object_child(mc), &rr->rr_name, ma, no_name);
 
        mutex_unlock(&mc->mot_lov_mutex);
+       if (rc != 0)
+               GOTO(unlock_child, rc);
 
-       if (rc == 0 && !lu_object_is_dying(&mc->mot_header))
+       if (!lu_object_is_dying(&mc->mot_header)) {
                rc = mdt_attr_get_complex(info, mc, ma);
-       if (rc == 0)
-               mdt_handle_last_unlink(info, mc, ma);
+               if (rc)
+                       GOTO(out_stat, rc);
+       } else {
+               mdt_dom_check_and_discard(info, mc);
+       }
+       mdt_handle_last_unlink(info, mc, ma);
 
-        if (ma->ma_valid & MA_INODE) {
-                switch (ma->ma_attr.la_mode & S_IFMT) {
-                case S_IFDIR:
+out_stat:
+       if (ma->ma_valid & MA_INODE) {
+               switch (ma->ma_attr.la_mode & S_IFMT) {
+               case S_IFDIR:
                        mdt_counter_incr(req, LPROC_MDT_RMDIR);
-                        break;
-                case S_IFREG:
-                case S_IFLNK:
-                case S_IFCHR:
-                case S_IFBLK:
-                case S_IFIFO:
-                case S_IFSOCK:
+                       break;
+               case S_IFREG:
+               case S_IFLNK:
+               case S_IFCHR:
+               case S_IFBLK:
+               case S_IFIFO:
+               case S_IFSOCK:
                        mdt_counter_incr(req, LPROC_MDT_UNLINK);
-                        break;
-                default:
-                        LASSERTF(0, "bad file type %o unlinking\n",
-                                 ma->ma_attr.la_mode);
-                }
-        }
+                       break;
+               default:
+                       LASSERTF(0, "bad file type %o unlinking\n",
+                               ma->ma_attr.la_mode);
+               }
+       }
 
-        EXIT;
+       EXIT;
 
 unlock_child:
        mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo,
@@ -2106,8 +2114,10 @@ relock:
        /* handle last link of tgt object */
        if (rc == 0) {
                mdt_counter_incr(req, LPROC_MDT_RENAME);
-               if (mnew)
+               if (mnew) {
                        mdt_handle_last_unlink(info, mnew, ma);
+                       mdt_dom_check_and_discard(info, mnew);
+               }
 
                mdt_rename_counter_tally(info, info->mti_mdt, req,
                                         msrcdir, mtgtdir);
index 3bb3d59..43e9acc 100644 (file)
@@ -3260,13 +3260,6 @@ static int __init ofd_init(void)
                return(rc);
        }
 
-       rc = ofd_dlm_init();
-       if (rc) {
-               lu_kmem_fini(ofd_caches);
-               ofd_fmd_exit();
-               return rc;
-       }
-
        rc = class_register_type(&ofd_obd_ops, NULL, true, NULL,
                                 LUSTRE_OST_NAME, &ofd_device_type);
        return rc;
@@ -3281,7 +3274,6 @@ static int __init ofd_init(void)
 static void __exit ofd_exit(void)
 {
        ofd_fmd_exit();
-       ofd_dlm_exit();
        lu_kmem_fini(ofd_caches);
        class_unregister_type(LUSTRE_OST_NAME);
 }
index c18ade0..a96a91d 100644 (file)
@@ -51,25 +51,6 @@ struct ofd_intent_args {
        int                     error;
 };
 
-int ofd_dlm_init(void)
-{
-       ldlm_glimpse_work_kmem = kmem_cache_create("ldlm_glimpse_work_kmem",
-                                            sizeof(struct ldlm_glimpse_work),
-                                            0, 0, NULL);
-       if (ldlm_glimpse_work_kmem == NULL)
-               return -ENOMEM;
-       else
-               return 0;
-}
-
-void ofd_dlm_exit(void)
-{
-       if (ldlm_glimpse_work_kmem) {
-               kmem_cache_destroy(ldlm_glimpse_work_kmem);
-               ldlm_glimpse_work_kmem = NULL;
-       }
-}
-
 /**
  * OFD interval callback.
  *
index 611c8db..6cc7952 100644 (file)
@@ -419,8 +419,7 @@ extern struct ldlm_valblock_ops ofd_lvbo;
 
 /* ofd_dlm.c */
 extern struct kmem_cache *ldlm_glimpse_work_kmem;
-int ofd_dlm_init(void);
-void ofd_dlm_exit(void);
+
 int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
                      void *req_cookie, enum ldlm_mode mode, __u64 flags,
                      void *data);
index 2752009..814af07 100644 (file)
@@ -226,7 +226,9 @@ static int osc_extent_sanity_check0(struct osc_extent *ext,
        if (ext->oe_sync && ext->oe_grants > 0)
                GOTO(out, rc = 90);
 
-       if (ext->oe_dlmlock != NULL && !ldlm_is_failed(ext->oe_dlmlock)) {
+       if (ext->oe_dlmlock != NULL &&
+           ext->oe_dlmlock->l_resource->lr_type == LDLM_EXTENT &&
+           !ldlm_is_failed(ext->oe_dlmlock)) {
                struct ldlm_extent *extent;
 
                extent = &ext->oe_dlmlock->l_policy_data.l_extent;
@@ -1295,7 +1297,6 @@ static int osc_refresh_count(const struct lu_env *env,
        pgoff_t index = osc_index(oap2osc(oap));
        struct cl_object *obj;
        struct cl_attr   *attr = &osc_env_info(env)->oti_attr;
-
        int result;
        loff_t kms;
 
@@ -3206,6 +3207,7 @@ int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
                spin_unlock(&osc->oo_tree_lock);
        RETURN(res);
 }
+EXPORT_SYMBOL(osc_page_gang_lookup);
 
 /**
  * Check if page @page is covered by an extra lock or discard it.
@@ -3248,8 +3250,8 @@ static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
        return CLP_GANG_OKAY;
 }
 
-static int discard_cb(const struct lu_env *env, struct cl_io *io,
-                     struct osc_page *ops, void *cbdata)
+int osc_discard_cb(const struct lu_env *env, struct cl_io *io,
+                  struct osc_page *ops, void *cbdata)
 {
        struct osc_thread_info *info = osc_env_info(env);
        struct cl_page *page = ops->ops_cl.cpl_page;
@@ -3271,6 +3273,7 @@ static int discard_cb(const struct lu_env *env, struct cl_io *io,
 
        return CLP_GANG_OKAY;
 }
+EXPORT_SYMBOL(osc_discard_cb);
 
 /**
  * Discard pages protected by the given lock. This function traverses radix
@@ -3297,7 +3300,7 @@ int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
        if (result != 0)
                GOTO(out, result);
 
-       cb = discard ? discard_cb : check_and_discard_cb;
+       cb = discard ? osc_discard_cb : check_and_discard_cb;
        info->oti_fn_index = info->oti_next_index = start;
        do {
                res = osc_page_gang_lookup(env, io, osc,
index 7a86744..4fdfb13 100644 (file)
@@ -45,12 +45,14 @@ void osc_wake_cache_waiters(struct client_obd *cli);
 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes);
 void osc_update_next_shrink(struct client_obd *cli);
 int lru_queue_work(const struct lu_env *env, void *data);
+int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
+                     int sent, int rc);
+int osc_extent_release(const struct lu_env *env, struct osc_extent *ext);
+int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
+                          pgoff_t start, pgoff_t end, bool discard);
 
 extern struct ptlrpc_request_set *PTLRPCD_SET;
 
-typedef int (*osc_enqueue_upcall_f)(void *cookie, struct lustre_handle *lockh,
-                                   int rc);
-
 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                     __u64 *flags, union ldlm_policy_data *policy,
                     struct ost_lvb *lvb, int kms_valid,
@@ -151,24 +153,12 @@ int osc_quotactl(struct obd_device *unused, struct obd_export *exp,
 void osc_inc_unstable_pages(struct ptlrpc_request *req);
 void osc_dec_unstable_pages(struct ptlrpc_request *req);
 bool osc_over_unstable_soft_limit(struct client_obd *cli);
-/**
- * Bit flags for osc_dlm_lock_at_pageoff().
- */
-enum osc_dap_flags {
-       /**
-        * Just check if the desired lock exists, it won't hold reference
-        * count on lock.
-        */
-       OSC_DAP_FL_TEST_LOCK = 1 << 0,
-       /**
-        * Return the lock even if it is being canceled.
-        */
-       OSC_DAP_FL_CANCELING = 1 << 1
-};
-struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env,
-                                      struct osc_object *obj, pgoff_t index,
-                                      enum osc_dap_flags flags);
-void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa);
+
+struct ldlm_lock *osc_obj_dlmlock_at_pgoff(const struct lu_env *env,
+                                          struct osc_object *obj,
+                                          pgoff_t index,
+                                          enum osc_dap_flags flags);
+
 int osc_object_invalidate(const struct lu_env *env, struct osc_object *osc);
 
 /** osc shrink list to link all osc client obd */
index 6e25563..e959ae0 100644 (file)
@@ -56,8 +56,7 @@ static void osc_io_fini(const struct lu_env *env, const struct cl_io_slice *io)
 {
 }
 
-static void osc_read_ahead_release(const struct lu_env *env,
-                                  void *cbdata)
+void osc_read_ahead_release(const struct lu_env *env, void *cbdata)
 {
        struct ldlm_lock *dlmlock = cbdata;
        struct lustre_handle lockh;
@@ -66,6 +65,7 @@ static void osc_read_ahead_release(const struct lu_env *env,
        ldlm_lock_decref(&lockh, LCK_PR);
        LDLM_LOCK_PUT(dlmlock);
 }
+EXPORT_SYMBOL(osc_read_ahead_release);
 
 static int osc_io_read_ahead(const struct lu_env *env,
                             const struct cl_io_slice *ios,
index 6fd75da..e9d5ae1 100644 (file)
  *  @{
  */
 
-/*****************************************************************************
- *
- * Type conversions.
- *
- */
-
-static const struct cl_lock_operations osc_lock_ops;
-static const struct cl_lock_operations osc_lock_lockless_ops;
-static void osc_lock_to_lockless(const struct lu_env *env,
-                                 struct osc_lock *ols, int force);
-
-int osc_lock_is_lockless(const struct osc_lock *olck)
-{
-        return (olck->ols_cl.cls_ops == &osc_lock_lockless_ops);
-}
-
 /**
  * Returns a weak pointer to the ldlm lock identified by a handle. Returned
  * pointer cannot be dereferenced, as lock is not protected from concurrent
@@ -135,8 +119,7 @@ static int osc_lock_invariant(struct osc_lock *ols)
  *
  */
 
-static void osc_lock_fini(const struct lu_env *env,
-                          struct cl_lock_slice *slice)
+void osc_lock_fini(const struct lu_env *env, struct cl_lock_slice *slice)
 {
        struct osc_lock  *ols = cl2osc_lock(slice);
 
@@ -145,6 +128,7 @@ static void osc_lock_fini(const struct lu_env *env,
 
        OBD_SLAB_FREE_PTR(ols, osc_lock_kmem);
 }
+EXPORT_SYMBOL(osc_lock_fini);
 
 static void osc_lock_build_policy(const struct lu_env *env,
                                  const struct cl_lock *lock,
@@ -156,31 +140,6 @@ static void osc_lock_build_policy(const struct lu_env *env,
        policy->l_extent.gid = d->cld_gid;
 }
 
-static __u64 osc_enq2ldlm_flags(__u32 enqflags)
-{
-       __u64 result = 0;
-
-       CDEBUG(D_DLMTRACE, "flags: %x\n", enqflags);
-
-       LASSERT((enqflags & ~CEF_MASK) == 0);
-
-       if (enqflags & CEF_NONBLOCK)
-               result |= LDLM_FL_BLOCK_NOWAIT;
-       if (enqflags & CEF_GLIMPSE)
-               result |= LDLM_FL_HAS_INTENT;
-       if (enqflags & CEF_DISCARD_DATA)
-               result |= LDLM_FL_AST_DISCARD_DATA;
-       if (enqflags & CEF_PEEK)
-               result |= LDLM_FL_TEST_LOCK;
-       if (enqflags & CEF_LOCK_MATCH)
-               result |= LDLM_FL_MATCH_LOCK;
-       if (enqflags & CEF_LOCK_NO_EXPAND)
-               result |= LDLM_FL_NO_EXPANSION;
-       if (enqflags & CEF_SPECULATIVE)
-               result |= LDLM_FL_SPECULATIVE;
-       return result;
-}
-
 /**
  * Updates object attributes from a lock value block (lvb) received together
  * with the DLM lock reply from the server. Copy of osc_update_enqueue()
@@ -335,7 +294,7 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh,
                 * lockless lock.
                 */
                osc_object_set_contended(cl2osc(slice->cls_obj));
-               LASSERT(slice->cls_ops == &osc_lock_ops);
+               LASSERT(slice->cls_ops != oscl->ols_lockless_ops);
 
                /* Change this lock to ldlmlock-less lock. */
                osc_lock_to_lockless(env, oscl, 1);
@@ -582,7 +541,7 @@ static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
        RETURN(result);
 }
 
-static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
+int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
 {
        struct ptlrpc_request   *req  = data;
        struct lu_env           *env;
@@ -644,6 +603,7 @@ out:
        req->rq_status = result;
        RETURN(result);
 }
+EXPORT_SYMBOL(osc_ldlm_glimpse_ast);
 
 static int weigh_cb(const struct lu_env *env, struct cl_io *io,
                    struct osc_page *ops, void *cbdata)
@@ -777,46 +737,46 @@ static void osc_lock_build_einfo(const struct lu_env *env,
  *  Additional policy can be implemented here, e.g., never do lockless-io
  *  for large extents.
  */
-static void osc_lock_to_lockless(const struct lu_env *env,
-                                 struct osc_lock *ols, int force)
+void osc_lock_to_lockless(const struct lu_env *env,
+                         struct osc_lock *ols, int force)
 {
-        struct cl_lock_slice *slice = &ols->ols_cl;
-
-        LASSERT(ols->ols_state == OLS_NEW ||
-                ols->ols_state == OLS_UPCALL_RECEIVED);
-
-        if (force) {
-                ols->ols_locklessable = 1;
-                slice->cls_ops = &osc_lock_lockless_ops;
-        } else {
-                struct osc_io *oio     = osc_env_io(env);
-                struct cl_io  *io      = oio->oi_cl.cis_io;
-                struct cl_object *obj  = slice->cls_obj;
-                struct osc_object *oob = cl2osc(obj);
-                const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
-                struct obd_connect_data *ocd;
-
-                LASSERT(io->ci_lockreq == CILR_MANDATORY ||
-                        io->ci_lockreq == CILR_MAYBE ||
-                        io->ci_lockreq == CILR_NEVER);
-
-                ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
-                ols->ols_locklessable = (io->ci_type != CIT_SETATTR) &&
-                                (io->ci_lockreq == CILR_MAYBE) &&
-                                (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
-                if (io->ci_lockreq == CILR_NEVER ||
-                        /* lockless IO */
-                    (ols->ols_locklessable && osc_object_is_contended(oob)) ||
-                        /* lockless truncate */
-                    (cl_io_is_trunc(io) &&
-                     (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
-                      osd->od_lockless_truncate)) {
-                        ols->ols_locklessable = 1;
-                        slice->cls_ops = &osc_lock_lockless_ops;
-                }
-        }
-        LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
+       struct cl_lock_slice *slice = &ols->ols_cl;
+       struct osc_io *oio = osc_env_io(env);
+       struct cl_io *io = oio->oi_cl.cis_io;
+       struct cl_object *obj = slice->cls_obj;
+       struct osc_object *oob = cl2osc(obj);
+       const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
+       struct obd_connect_data *ocd;
+
+       LASSERT(ols->ols_state == OLS_NEW ||
+               ols->ols_state == OLS_UPCALL_RECEIVED);
+
+       if (force) {
+               ols->ols_locklessable = 1;
+               slice->cls_ops = ols->ols_lockless_ops;
+       } else {
+               LASSERT(io->ci_lockreq == CILR_MANDATORY ||
+                       io->ci_lockreq == CILR_MAYBE ||
+                       io->ci_lockreq == CILR_NEVER);
+
+               ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
+               ols->ols_locklessable = (io->ci_type != CIT_SETATTR) &&
+                                       (io->ci_lockreq == CILR_MAYBE) &&
+                                       (ocd->ocd_connect_flags &
+                                        OBD_CONNECT_SRVLOCK);
+               if (io->ci_lockreq == CILR_NEVER ||
+                   /* lockless IO */
+                   (ols->ols_locklessable && osc_object_is_contended(oob)) ||
+                   /* lockless truncate */
+                   (cl_io_is_trunc(io) && osd->od_lockless_truncate &&
+                    (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK))) {
+                       ols->ols_locklessable = 1;
+                       slice->cls_ops = ols->ols_lockless_ops;
+               }
+       }
+       LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
 }
+EXPORT_SYMBOL(osc_lock_to_lockless);
 
 static bool osc_lock_compatible(const struct osc_lock *qing,
                                const struct osc_lock *qed)
@@ -841,9 +801,8 @@ static bool osc_lock_compatible(const struct osc_lock *qing,
        return false;
 }
 
-static void osc_lock_wake_waiters(const struct lu_env *env,
-                                 struct osc_object *osc,
-                                 struct osc_lock *oscl)
+void osc_lock_wake_waiters(const struct lu_env *env, struct osc_object *osc,
+                          struct osc_lock *oscl)
 {
        spin_lock(&osc->oo_ol_spin);
        list_del_init(&oscl->ols_nextlock_oscobj);
@@ -861,14 +820,16 @@ static void osc_lock_wake_waiters(const struct lu_env *env,
        }
        spin_unlock(&oscl->ols_lock);
 }
+EXPORT_SYMBOL(osc_lock_wake_waiters);
 
-static int osc_lock_enqueue_wait(const struct lu_env *env,
-               struct osc_object *obj, struct osc_lock *oscl)
+int osc_lock_enqueue_wait(const struct lu_env *env, struct osc_object *obj,
+                         struct osc_lock *oscl)
 {
        struct osc_lock         *tmp_oscl;
        struct cl_lock_descr    *need = &oscl->ols_cl.cls_lock->cll_descr;
        struct cl_sync_io       *waiter = &osc_env_info(env)->oti_anchor;
        int rc = 0;
+
        ENTRY;
 
        spin_lock(&obj->oo_ol_spin);
@@ -919,6 +880,7 @@ restart:
 
        RETURN(rc);
 }
+EXPORT_SYMBOL(osc_lock_enqueue_wait);
 
 /**
  * Implementation of cl_lock_operations::clo_enqueue() method for osc
@@ -1096,8 +1058,8 @@ static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck)
  *
  *     - cancels ldlm lock (ldlm_cli_cancel()).
  */
-static void osc_lock_cancel(const struct lu_env *env,
-                            const struct cl_lock_slice *slice)
+void osc_lock_cancel(const struct lu_env *env,
+                    const struct cl_lock_slice *slice)
 {
        struct osc_object *obj  = cl2osc(slice->cls_obj);
        struct osc_lock   *oscl = cl2osc_lock(slice);
@@ -1113,9 +1075,10 @@ static void osc_lock_cancel(const struct lu_env *env,
        osc_lock_wake_waiters(env, obj, oscl);
        EXIT;
 }
+EXPORT_SYMBOL(osc_lock_cancel);
 
-static int osc_lock_print(const struct lu_env *env, void *cookie,
-                         lu_printer_t p, const struct cl_lock_slice *slice)
+int osc_lock_print(const struct lu_env *env, void *cookie,
+                  lu_printer_t p, const struct cl_lock_slice *slice)
 {
        struct osc_lock *lock = cl2osc_lock(slice);
 
@@ -1125,6 +1088,7 @@ static int osc_lock_print(const struct lu_env *env, void *cookie,
        osc_lvb_print(env, cookie, p, &lock->ols_lvb);
        return 0;
 }
+EXPORT_SYMBOL(osc_lock_print);
 
 static const struct cl_lock_operations osc_lock_ops = {
         .clo_fini    = osc_lock_fini,
@@ -1158,9 +1122,8 @@ static const struct cl_lock_operations osc_lock_lockless_ops = {
         .clo_print     = osc_lock_print
 };
 
-static void osc_lock_set_writer(const struct lu_env *env,
-                               const struct cl_io *io,
-                               struct cl_object *obj, struct osc_lock *oscl)
+void osc_lock_set_writer(const struct lu_env *env, const struct cl_io *io,
+                        struct cl_object *obj, struct osc_lock *oscl)
 {
        struct cl_lock_descr *descr = &oscl->ols_cl.cls_lock->cll_descr;
        pgoff_t io_start;
@@ -1188,6 +1151,7 @@ static void osc_lock_set_writer(const struct lu_env *env,
                oio->oi_write_osclock = oscl;
        }
 }
+EXPORT_SYMBOL(osc_lock_set_writer);
 
 int osc_lock_init(const struct lu_env *env,
                  struct cl_object *obj, struct cl_lock *lock,
@@ -1205,6 +1169,7 @@ int osc_lock_init(const struct lu_env *env,
        INIT_LIST_HEAD(&oscl->ols_waiting_list);
        INIT_LIST_HEAD(&oscl->ols_wait_entry);
        INIT_LIST_HEAD(&oscl->ols_nextlock_oscobj);
+       oscl->ols_lockless_ops = &osc_lock_lockless_ops;
 
        /* Speculative lock requests must be either no_expand or glimpse
         * request (CEF_GLIMPSE).  non-glimpse no_expand speculative extent
@@ -1242,9 +1207,10 @@ int osc_lock_init(const struct lu_env *env,
  * Finds an existing lock covering given index and optionally different from a
  * given \a except lock.
  */
-struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env,
-                                      struct osc_object *obj, pgoff_t index,
-                                      enum osc_dap_flags dap_flags)
+struct ldlm_lock *osc_obj_dlmlock_at_pgoff(const struct lu_env *env,
+                                          struct osc_object *obj,
+                                          pgoff_t index,
+                                          enum osc_dap_flags dap_flags)
 {
        struct osc_thread_info *info = osc_env_info(env);
        struct ldlm_res_id *resname = &info->oti_resname;
index 4597201..1998275 100644 (file)
  * Object operations.
  *
  */
+static void osc_obj_build_res_name(struct osc_object *osc,
+                                  struct ldlm_res_id *resname)
+{
+       ostid_build_res_name(&osc->oo_oinfo->loi_oi, resname);
+}
+
+static const struct osc_object_operations osc_object_ops = {
+       .oto_build_res_name = osc_obj_build_res_name,
+       .oto_dlmlock_at_pgoff = osc_obj_dlmlock_at_pgoff,
+};
+
 int osc_object_init(const struct lu_env *env, struct lu_object *obj,
                    const struct lu_object_conf *conf)
 {
@@ -79,6 +90,8 @@ int osc_object_init(const struct lu_env *env, struct lu_object *obj,
        atomic_set(&osc->oo_nr_ios, 0);
        init_waitqueue_head(&osc->oo_io_waitq);
 
+       LASSERT(osc->oo_obj_ops != NULL);
+
        cl_object_page_init(lu2cl(obj), sizeof(struct osc_page));
 
        return 0;
@@ -189,23 +202,28 @@ static int osc_object_ast_clear(struct ldlm_lock *lock, void *data)
 {
        ENTRY;
 
+       CDEBUG(D_DLMTRACE, "obj: %p/%p, lock %p\n",
+               data, lock->l_ast_data, lock);
+
+       LASSERT(lock->l_granted_mode == lock->l_req_mode);
        if (lock->l_ast_data == data)
                lock->l_ast_data = NULL;
        RETURN(LDLM_ITER_CONTINUE);
 }
 
-static int osc_object_prune(const struct lu_env *env, struct cl_object *obj)
+int osc_object_prune(const struct lu_env *env, struct cl_object *obj)
 {
-       struct osc_object       *osc = cl2osc(obj);
-       struct ldlm_res_id      *resname = &osc_env_info(env)->oti_resname;
+       struct osc_object  *osc = cl2osc(obj);
+       struct ldlm_res_id *resname = &osc_env_info(env)->oti_resname;
 
        /* DLM locks don't hold a reference of osc_object so we have to
         * clear it before the object is being destroyed. */
-       ostid_build_res_name(&osc->oo_oinfo->loi_oi, resname);
+       osc_build_res_name(osc, resname);
        ldlm_resource_iterate(osc_export(osc)->exp_obd->obd_namespace, resname,
                              osc_object_ast_clear, osc);
        return 0;
 }
+EXPORT_SYMBOL(osc_object_prune);
 
 static int osc_object_fiemap(const struct lu_env *env, struct cl_object *obj,
                             struct ll_fiemap_info_key *fmkey,
@@ -292,18 +310,6 @@ drop_lock:
        RETURN(rc);
 }
 
-void osc_object_set_contended(struct osc_object *obj)
-{
-        obj->oo_contention_time = cfs_time_current();
-        /* mb(); */
-        obj->oo_contended = 1;
-}
-
-void osc_object_clear_contended(struct osc_object *obj)
-{
-        obj->oo_contended = 0;
-}
-
 int osc_object_is_contended(struct osc_object *obj)
 {
         struct osc_device *dev  = lu2osc_dev(obj->oo_cl.co_lu.lo_dev);
@@ -329,6 +335,7 @@ int osc_object_is_contended(struct osc_object *obj)
         }
         return 1;
 }
+EXPORT_SYMBOL(osc_object_is_contended);
 
 /**
  * Implementation of struct cl_object_operations::coo_req_attr_set() for osc
@@ -441,6 +448,7 @@ struct lu_object *osc_object_alloc(const struct lu_env *env,
                lu_object_init(obj, NULL, dev);
                osc->oo_cl.co_ops = &osc_ops;
                obj->lo_ops = &osc_lu_obj_ops;
+               osc->oo_obj_ops = &osc_object_ops;
        } else
                obj = NULL;
        return obj;
index 85df555..87b6c22 100644 (file)
@@ -91,18 +91,6 @@ struct osc_ladvise_args {
        void                    *la_cookie;
 };
 
-struct osc_enqueue_args {
-       struct obd_export       *oa_exp;
-       enum ldlm_type          oa_type;
-       enum ldlm_mode          oa_mode;
-       __u64                   *oa_flags;
-       osc_enqueue_upcall_f    oa_upcall;
-       void                    *oa_cookie;
-       struct ost_lvb          *oa_lvb;
-       struct lustre_handle    oa_lockh;
-       bool                    oa_speculative;
-};
-
 static void osc_release_ppga(struct brw_page **ppga, size_t count);
 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
                         void *data, int rc);
@@ -2031,10 +2019,10 @@ static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
        return set;
 }
 
-static int osc_enqueue_fini(struct ptlrpc_request *req,
-                           osc_enqueue_upcall_f upcall, void *cookie,
-                           struct lustre_handle *lockh, enum ldlm_mode mode,
-                           __u64 *flags, bool speculative, int errcode)
+int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
+                    void *cookie, struct lustre_handle *lockh,
+                    enum ldlm_mode mode, __u64 *flags, bool speculative,
+                    int errcode)
 {
        bool intent = *flags & LDLM_FL_HAS_INTENT;
        int rc;
@@ -2066,12 +2054,11 @@ static int osc_enqueue_fini(struct ptlrpc_request *req,
        if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
                ldlm_lock_decref(lockh, mode);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
-static int osc_enqueue_interpret(const struct lu_env *env,
-                                struct ptlrpc_request *req,
-                                struct osc_enqueue_args *aa, int rc)
+int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
+                         struct osc_enqueue_args *aa, int rc)
 {
        struct ldlm_lock *lock;
        struct lustre_handle *lockh = &aa->oa_lockh;
@@ -2115,7 +2102,7 @@ static int osc_enqueue_interpret(const struct lu_env *env,
        rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
                              aa->oa_flags, aa->oa_speculative, rc);
 
-        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
 
        ldlm_lock_decref(lockh, mode);
        LDLM_LOCK_PUT(lock);