Whamcloud - gitweb
LU-6271 osc: handle osc eviction correctly
[fs/lustre-release.git] / lustre / osc / osc_cl_internal.h
index 0dea7b5..65658f6 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2013, Intel Corporation.
+ * Copyright (c) 2012, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 #ifndef OSC_CL_INTERNAL_H
 #define OSC_CL_INTERNAL_H
 
-#ifdef __KERNEL__
-# include <libcfs/libcfs.h>
-#else
-# include <liblustre.h>
-#endif
-
+#include <libcfs/libcfs.h>
 #include <obd.h>
 /* osc_build_res_name() */
-#include <obd_ost.h>
 #include <cl_object.h>
-#include <lclient.h>
 #include "osc_internal.h"
 
 /** \defgroup osc osc
@@ -73,14 +66,17 @@ struct osc_io {
         struct cl_io_slice oi_cl;
         /** true if this io is lockless. */
         int                oi_lockless;
+       /** how many LRU pages are reserved for this IO */
+       unsigned long      oi_lru_reserved;
+
        /** active extents, we know how many bytes is going to be written,
         * so having an active extent will prevent it from being fragmented */
        struct osc_extent *oi_active;
        /** partially truncated extent, we need to hold this extent to prevent
         * page writeback from happening. */
        struct osc_extent *oi_trunc;
-
-       struct obd_info    oi_info;
+       /** write osc_lock for this IO, used by osc_extent_find(). */
+       struct osc_lock   *oi_write_osclock;
        struct obdo        oi_oa;
        struct osc_async_cbargs {
                bool              opc_rpc_sent;
@@ -103,16 +99,22 @@ struct osc_session {
         struct osc_io       os_io;
 };
 
-#define OTI_PVEC_SIZE 64
+#define OTI_PVEC_SIZE 256
 struct osc_thread_info {
-        struct ldlm_res_id      oti_resname;
-        ldlm_policy_data_t      oti_policy;
-        struct cl_lock_descr    oti_descr;
-        struct cl_attr          oti_attr;
-        struct lustre_handle    oti_handle;
-        struct cl_page_list     oti_plist;
+       struct ldlm_res_id      oti_resname;
+       union ldlm_policy_data  oti_policy;
+       struct cl_lock_descr    oti_descr;
+       struct cl_attr          oti_attr;
+       struct lustre_handle    oti_handle;
+       struct cl_page_list     oti_plist;
        struct cl_io            oti_io;
-       struct cl_page         *oti_pvec[OTI_PVEC_SIZE];
+       void                    *oti_pvec[OTI_PVEC_SIZE];
+       /**
+        * Fields used by cl_lock_discard_pages().
+        */
+       pgoff_t                 oti_next_index;
+       pgoff_t                 oti_fn_index; /* first non-overlapped index */
+       struct cl_sync_io       oti_anchor;
 };
 
 struct osc_object {
@@ -123,7 +125,7 @@ struct osc_object {
          */
         int                oo_contended;
         cfs_time_t         oo_contention_time;
-#ifdef INVARIANT_CHECK
+#ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
         /**
          * IO context used for invariant checks in osc_lock_has_pages().
          */
@@ -131,24 +133,14 @@ struct osc_object {
         /** Serialization object for osc_object::oo_debug_io. */
        struct mutex       oo_debug_mutex;
 #endif
-        /**
-         * List of pages in transfer.
-         */
-        cfs_list_t         oo_inflight[CRT_NR];
-        /**
-         * Lock, protecting ccc_object::cob_inflight, because a seat-belt is
-         * locked during take-off and landing.
-         */
-       spinlock_t         oo_seatbelt;
-
        /**
         * used by the osc to keep track of what objects to build into rpcs.
         * Protected by client_obd->cli_loi_list_lock.
         */
-       cfs_list_t         oo_ready_item;
-       cfs_list_t         oo_hp_ready_item;
-       cfs_list_t         oo_write_item;
-       cfs_list_t         oo_read_item;
+       struct list_head        oo_ready_item;
+       struct list_head        oo_hp_ready_item;
+       struct list_head        oo_write_item;
+       struct list_head        oo_read_item;
 
        /**
         * extent is a red black tree to manage (async) dirty pages.
@@ -157,18 +149,33 @@ struct osc_object {
        /**
         * Manage write(dirty) extents.
         */
-       cfs_list_t         oo_hp_exts; /* list of hp extents */
-       cfs_list_t         oo_urgent_exts; /* list of writeback extents */
-       cfs_list_t         oo_rpc_exts;
+       struct list_head        oo_hp_exts;     /* list of hp extents */
+       struct list_head        oo_urgent_exts; /* list of writeback extents */
+       struct list_head        oo_rpc_exts;
 
-       cfs_list_t         oo_reading_exts;
+       struct list_head        oo_reading_exts;
 
-       cfs_atomic_t     oo_nr_reads;
-       cfs_atomic_t     oo_nr_writes;
+       atomic_t         oo_nr_reads;
+       atomic_t         oo_nr_writes;
 
        /** Protect extent tree. Will be used to protect
         * oo_{read|write}_pages soon. */
        spinlock_t          oo_lock;
+
+       /**
+        * Radix tree for caching pages
+        */
+       struct radix_tree_root  oo_tree;
+       spinlock_t              oo_tree_lock;
+       unsigned long           oo_npages;
+
+       /* Protect osc_lock this osc_object has */
+       spinlock_t              oo_ol_spin;
+       struct list_head        oo_ol_list;
+
+       /** number of active IOs of this object */
+       atomic_t                oo_nr_ios;
+       wait_queue_head_t       oo_io_waitq;
 };
 
 static inline void osc_object_lock(struct osc_object *obj)
@@ -188,7 +195,16 @@ static inline void osc_object_unlock(struct osc_object *obj)
 
 static inline int osc_object_is_locked(struct osc_object *obj)
 {
+#if defined(CONFIG_SMP) || defined(CONFIG_DEBUG_SPINLOCK)
        return spin_is_locked(&obj->oo_lock);
+#else
+       /*
+        * It is not perfect to return true all the time.
+        * But since this function is only used for assertion
+        * and checking, it seems OK.
+        */
+       return 1;
+#endif
 }
 
 /*
@@ -199,8 +215,6 @@ enum osc_lock_state {
         OLS_ENQUEUED,
         OLS_UPCALL_RECEIVED,
         OLS_GRANTED,
-        OLS_RELEASED,
-        OLS_BLOCKED,
         OLS_CANCELLED
 };
 
@@ -209,10 +223,8 @@ enum osc_lock_state {
  *
  * Interaction with DLM.
  *
- * CLIO enqueues all DLM locks through ptlrpcd (that is, in "async" mode).
- *
  * Once receive upcall is invoked, osc_lock remembers a handle of DLM lock in
- * osc_lock::ols_handle and a pointer to that lock in osc_lock::ols_lock.
+ * osc_lock::ols_handle and a pointer to that lock in osc_lock::ols_dlmlock.
  *
  * This pointer is protected through a reference, acquired by
  * osc_lock_upcall0(). Also, an additional reference is acquired by
@@ -249,27 +261,28 @@ enum osc_lock_state {
  * future.
  */
 struct osc_lock {
-        struct cl_lock_slice     ols_cl;
-        /** underlying DLM lock */
-        struct ldlm_lock        *ols_lock;
-        /** lock value block */
-        struct ost_lvb           ols_lvb;
-        /** DLM flags with which osc_lock::ols_lock was enqueued */
-       __u64                    ols_flags;
-        /** osc_lock::ols_lock handle */
-        struct lustre_handle     ols_handle;
-        struct ldlm_enqueue_info ols_einfo;
-        enum osc_lock_state      ols_state;
-
-        /**
-         * How many pages are using this lock for io, currently only used by
-         * read-ahead. If non-zero, the underlying dlm lock won't be cancelled
-         * during recovery to avoid deadlock. see bz16774.
-         *
-         * \see osc_page::ops_lock
-         * \see osc_page_addref_lock(), osc_page_putref_lock()
-         */
-        cfs_atomic_t             ols_pageref;
+       struct cl_lock_slice    ols_cl;
+       /** Internal lock to protect states, etc. */
+       spinlock_t              ols_lock;
+       /** Owner sleeps on this channel for state change */
+       struct cl_sync_io       *ols_owner;
+       /** waiting list for this lock to be cancelled */
+       struct list_head        ols_waiting_list;
+       /** wait entry of ols_waiting_list */
+       struct list_head        ols_wait_entry;
+       /** list entry for osc_object::oo_ol_list */
+       struct list_head        ols_nextlock_oscobj;
+
+       /** underlying DLM lock */
+       struct ldlm_lock        *ols_dlmlock;
+       /** DLM flags with which osc_lock::ols_lock was enqueued */
+       __u64                   ols_flags;
+       /** osc_lock::ols_lock handle */
+       struct lustre_handle     ols_handle;
+       struct ldlm_enqueue_info ols_einfo;
+       enum osc_lock_state      ols_state;
+       /** lock value block */
+       struct ost_lvb          ols_lvb;
 
         /**
          * true, if ldlm_lock_addref() was called against
@@ -300,16 +313,6 @@ struct osc_lock {
          */
                                  ols_locklessable:1,
         /**
-         * set by osc_lock_use() to wait until blocking AST enters into
-         * osc_ldlm_blocking_ast0(), so that cl_lock mutex can be used for
-         * further synchronization.
-         */
-                                 ols_ast_wait:1,
-        /**
-         * If the data of this lock has been flushed to server side.
-         */
-                                 ols_flush:1,
-        /**
          * if set, the osc_lock is a glimpse lock. For glimpse locks, we treat
          * the EVAVAIL error as torerable, this will make upper logic happy
          * to wait all glimpse locks to each OSTs to be completed.
@@ -322,15 +325,6 @@ struct osc_lock {
          * For async glimpse lock.
          */
                                  ols_agl:1;
-        /**
-         * IO that owns this lock. This field is used for a dead-lock
-         * avoidance by osc_lock_enqueue_wait().
-         *
-         * XXX: unfortunately, the owner of a osc_lock is not unique, 
-         * the lock may have multiple users, if the lock is granted and
-         * then matched.
-         */
-        struct osc_io           *ols_owner;
 };
 
 
@@ -368,47 +362,25 @@ struct osc_page {
         */
                              ops_in_lru:1,
        /**
-         * Set if the page must be transferred with OBD_BRW_SRVLOCK.
-         */
-                              ops_srvlock:1;
-       union {
-               /**
-                * lru page list. ops_inflight and ops_lru are exclusive so
-                * that they can share the same data.
-                */
-               cfs_list_t            ops_lru;
-               /**
-                * Linkage into a per-osc_object list of pages in flight. For
-                * debugging.
-                */
-               cfs_list_t            ops_inflight;
-       };
-        /**
-         * Thread that submitted this page for transfer. For debugging.
-         */
-        cfs_task_t           *ops_submitter;
-        /**
-         * Submit time - the time when the page is starting RPC. For debugging.
-         */
-        cfs_time_t            ops_submit_time;
-
-        /**
-         * A lock of which we hold a reference covers this page. Only used by
-         * read-ahead: for a readahead page, we hold it's covering lock to
-         * prevent it from being canceled during recovery.
-         *
-         * \see osc_lock::ols_pageref
-         * \see osc_page_addref_lock(), osc_page_putref_lock().
-         */
-        struct cl_lock       *ops_lock;
+        * Set if the page must be transferred with OBD_BRW_SRVLOCK.
+        */
+                             ops_srvlock:1;
+       /**
+        * lru page list. See osc_lru_{del|use}() in osc_page.c for usage.
+        */
+       struct list_head        ops_lru;
+       /**
+        * Submit time - the time when the page is starting RPC. For debugging.
+        */
+       cfs_time_t            ops_submit_time;
 };
 
-extern cfs_mem_cache_t *osc_lock_kmem;
-extern cfs_mem_cache_t *osc_object_kmem;
-extern cfs_mem_cache_t *osc_thread_kmem;
-extern cfs_mem_cache_t *osc_session_kmem;
-extern cfs_mem_cache_t *osc_req_kmem;
-extern cfs_mem_cache_t *osc_extent_kmem;
+extern struct kmem_cache *osc_lock_kmem;
+extern struct kmem_cache *osc_object_kmem;
+extern struct kmem_cache *osc_thread_kmem;
+extern struct kmem_cache *osc_session_kmem;
+extern struct kmem_cache *osc_req_kmem;
+extern struct kmem_cache *osc_extent_kmem;
 
 extern struct lu_device_type osc_device_type;
 extern struct lu_context_key osc_key;
@@ -427,38 +399,41 @@ struct lu_object *osc_object_alloc(const struct lu_env *env,
                                    const struct lu_object_header *hdr,
                                    struct lu_device *dev);
 int osc_page_init(const struct lu_env *env, struct cl_object *obj,
-                 struct cl_page *page, cfs_page_t *vmpage);
+                 struct cl_page *page, pgoff_t ind);
 
-void osc_index2policy  (ldlm_policy_data_t *policy, const struct cl_object *obj,
-                        pgoff_t start, pgoff_t end);
-int  osc_lvb_print     (const struct lu_env *env, void *cookie,
-                        lu_printer_t p, const struct ost_lvb *lvb);
+void osc_index2policy(union ldlm_policy_data *policy,
+                     const struct cl_object *obj, pgoff_t start, pgoff_t end);
+int  osc_lvb_print(const struct lu_env *env, void *cookie,
+                  lu_printer_t p, const struct ost_lvb *lvb);
 
+void osc_lru_add_batch(struct client_obd *cli, struct list_head *list);
 void osc_page_submit(const struct lu_env *env, struct osc_page *opg,
                     enum cl_req_type crt, int brw_flags);
 int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops);
 int osc_set_async_flags(struct osc_object *obj, struct osc_page *opg,
-                       obd_flag async_flags);
+                       u32 async_flags);
 int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
-                       cfs_page_t *page, loff_t offset);
+                       struct page *page, loff_t offset);
 int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
                       struct osc_page *ops);
+int osc_page_cache_add(const struct lu_env *env,
+                      const struct cl_page_slice *slice, struct cl_io *io);
 int osc_teardown_async_page(const struct lu_env *env, struct osc_object *obj,
                            struct osc_page *ops);
 int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
                         struct osc_page *ops);
 int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
-                        cfs_list_t *list, int cmd, int brw_flags);
-int osc_cache_truncate_start(const struct lu_env *env, struct osc_io *oio,
-                            struct osc_object *obj, __u64 size);
-void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio,
-                           struct osc_object *obj);
+                        struct list_head *list, int cmd, int brw_flags);
+int osc_cache_truncate_start(const struct lu_env *env, struct osc_object *obj,
+                            __u64 size, struct osc_extent **extp);
+void osc_cache_truncate_end(const struct lu_env *env, struct osc_extent *ext);
 int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
                              pgoff_t start, pgoff_t end, int hp, int discard);
 int osc_cache_wait_range(const struct lu_env *env, struct osc_object *obj,
                         pgoff_t start, pgoff_t end);
 void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
-                  struct osc_object *osc, pdl_policy_t pol);
+                  struct osc_object *osc);
+int lru_queue_work(const struct lu_env *env, void *data);
 
 void osc_object_set_contended  (struct osc_object *obj);
 void osc_object_clear_contended(struct osc_object *obj);
@@ -527,26 +502,24 @@ static inline struct cl_object *osc2cl(const struct osc_object *obj)
        return (struct cl_object *)&obj->oo_cl;
 }
 
-static inline ldlm_mode_t osc_cl_lock2ldlm(enum cl_lock_mode mode)
+static inline enum ldlm_mode osc_cl_lock2ldlm(enum cl_lock_mode mode)
 {
-        LASSERT(mode == CLM_READ || mode == CLM_WRITE || mode == CLM_GROUP);
-        if (mode == CLM_READ)
-                return LCK_PR;
-        else if (mode == CLM_WRITE)
-                return LCK_PW;
-        else
-                return LCK_GROUP;
+       LASSERT(mode == CLM_READ || mode == CLM_WRITE || mode == CLM_GROUP);
+       if (mode == CLM_READ)
+               return LCK_PR;
+       if (mode == CLM_WRITE)
+               return LCK_PW;
+       return LCK_GROUP;
 }
 
-static inline enum cl_lock_mode osc_ldlm2cl_lock(ldlm_mode_t mode)
+static inline enum cl_lock_mode osc_ldlm2cl_lock(enum ldlm_mode mode)
 {
-        LASSERT(mode == LCK_PR || mode == LCK_PW || mode == LCK_GROUP);
-        if (mode == LCK_PR)
-                return CLM_READ;
-        else if (mode == LCK_PW)
-                return CLM_WRITE;
-        else
-                return CLM_GROUP;
+       LASSERT(mode == LCK_PR || mode == LCK_PW || mode == LCK_GROUP);
+       if (mode == LCK_PR)
+               return CLM_READ;
+       if (mode == LCK_PW)
+               return CLM_WRITE;
+       return CLM_GROUP;
 }
 
 static inline struct osc_page *cl2osc_page(const struct cl_page_slice *slice)
@@ -560,6 +533,11 @@ static inline struct osc_page *oap2osc(struct osc_async_page *oap)
        return container_of0(oap, struct osc_page, ops_oap);
 }
 
+static inline pgoff_t osc_index(struct osc_page *opg)
+{
+       return opg->ops_cl.cpl_index;
+}
+
 static inline struct cl_page *oap2cl_page(struct osc_async_page *oap)
 {
        return oap2osc(oap)->ops_cl.cpl_page;
@@ -614,76 +592,86 @@ enum osc_extent_state {
  */
 struct osc_extent {
        /** red-black tree node */
-       struct rb_node     oe_node;
+       struct rb_node          oe_node;
        /** osc_object of this extent */
-       struct osc_object *oe_obj;
+       struct osc_object       *oe_obj;
        /** refcount, removed from red-black tree if reaches zero. */
-       cfs_atomic_t       oe_refc;
+       atomic_t                oe_refc;
        /** busy if non-zero */
-       cfs_atomic_t       oe_users;
+       atomic_t                oe_users;
        /** link list of osc_object's oo_{hp|urgent|locking}_exts. */
-       cfs_list_t       oe_link;
+       struct list_head        oe_link;
        /** state of this extent */
-       unsigned int       oe_state;
+       enum osc_extent_state   oe_state;
        /** flags for this extent. */
-       unsigned int       oe_intree:1,
+       unsigned int            oe_intree:1,
        /** 0 is write, 1 is read */
-                          oe_rw:1,
-                          oe_srvlock:1,
-                          oe_memalloc:1,
+                               oe_rw:1,
+       /** sync extent, queued by osc_queue_sync_pages() */
+                               oe_sync:1,
+                               oe_srvlock:1,
+                               oe_memalloc:1,
        /** an ACTIVE extent is going to be truncated, so when this extent
         * is released, it will turn into TRUNC state instead of CACHE. */
-                          oe_trunc_pending:1,
+                               oe_trunc_pending:1,
        /** this extent should be written asap and someone may wait for the
         * write to finish. This bit is usually set along with urgent if
         * the extent was CACHE state.
         * fsync_wait extent can't be merged because new extent region may
         * exceed fsync range. */
-                          oe_fsync_wait:1,
+                               oe_fsync_wait:1,
        /** covering lock is being canceled */
-                          oe_hp:1,
+                               oe_hp:1,
        /** this extent should be written back asap. set if one of pages is
         * called by page WB daemon, or sync write or reading requests. */
-                          oe_urgent:1;
+                               oe_urgent:1;
        /** how many grants allocated for this extent.
         *  Grant allocated for this extent. There is no grant allocated
         *  for reading extents and sync write extents. */
-       unsigned int       oe_grants;
+       unsigned int            oe_grants;
        /** # of dirty pages in this extent */
-       unsigned int       oe_nr_pages;
+       unsigned int            oe_nr_pages;
        /** list of pending oap pages. Pages in this list are NOT sorted. */
-       cfs_list_t         oe_pages;
+       struct list_head        oe_pages;
        /** Since an extent has to be written out in atomic, this is used to
         * remember the next page need to be locked to write this extent out.
         * Not used right now.
         */
-       struct osc_page   *oe_next_page;
+       struct osc_page         *oe_next_page;
        /** start and end index of this extent, include start and end
         * themselves. Page offset here is the page index of osc_pages.
         * oe_start is used as keyword for red-black tree. */
-       pgoff_t            oe_start;
-       pgoff_t            oe_end;
+       pgoff_t                 oe_start;
+       pgoff_t                 oe_end;
        /** maximum ending index of this extent, this is limited by
         * max_pages_per_rpc, lock extent and chunk size. */
-       pgoff_t            oe_max_end;
+       pgoff_t                 oe_max_end;
        /** waitqueue - for those who want to be notified if this extent's
         * state has changed. */
-       cfs_waitq_t        oe_waitq;
+       wait_queue_head_t       oe_waitq;
        /** lock covering this extent */
-       struct cl_lock    *oe_osclock;
+       struct ldlm_lock        *oe_dlmlock;
        /** terminator of this extent. Must be true if this extent is in IO. */
-       cfs_task_t        *oe_owner;
+       struct task_struct      *oe_owner;
        /** return value of writeback. If somebody is waiting for this extent,
         * this value can be known by outside world. */
-       int                oe_rc;
+       int                     oe_rc;
        /** max pages per rpc when this extent was created */
-       unsigned int       oe_mppr;
+       unsigned int            oe_mppr;
 };
 
 int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
                      int sent, int rc);
 int osc_extent_release(const struct lu_env *env, struct osc_extent *ext);
 
+int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
+                          pgoff_t start, pgoff_t end, enum cl_lock_mode mode);
+
+typedef int (*osc_page_gang_cbt)(const struct lu_env *, struct cl_io *,
+                                struct osc_page *, void *);
+int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
+                        struct osc_object *osc, pgoff_t start, pgoff_t end,
+                        osc_page_gang_cbt cb, void *cbdata);
 /** @} osc */
 
 #endif /* OSC_CL_INTERNAL_H */