Whamcloud - gitweb
LU-6042 osc: osc_object_ast_clear() LBUG
[fs/lustre-release.git] / lustre / osc / osc_cl_internal.h
index be6badb..1518db4 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  * Internal interfaces of OSC layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
+ *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
  */
 
 #ifndef OSC_CL_INTERNAL_H
 #define OSC_CL_INTERNAL_H
 
-#ifdef __KERNEL__
-# include <libcfs/libcfs.h>
-#else
-# include <liblustre.h>
-#endif
-
+#include <libcfs/libcfs.h>
 #include <obd.h>
 /* osc_build_res_name() */
-#include <obd_ost.h>
 #include <cl_object.h>
 #include "osc_internal.h"
 
-/** \addtogroup osc osc @{ */
+/** \defgroup osc osc
+ *  @{
+ */
+
+struct osc_extent;
 
 /**
  * State maintained by osc layer for each IO context.
@@ -67,12 +66,25 @@ struct osc_io {
         struct cl_io_slice oi_cl;
         /** true if this io is lockless. */
         int                oi_lockless;
-
-        struct obdo        oi_oa;
-        struct osc_punch_cbargs {
-                int               opc_rc;
-                struct completion opc_sync;
-        } oi_punch_cbarg;
+       /** how many LRU pages are reserved for this IO */
+       unsigned long      oi_lru_reserved;
+
+       /** active extents, we know how many bytes is going to be written,
+        * so having an active extent will prevent it from being fragmented */
+       struct osc_extent *oi_active;
+       /** partially truncated extent, we need to hold this extent to prevent
+        * page writeback from happening. */
+       struct osc_extent *oi_trunc;
+       /** write osc_lock for this IO, used by osc_extent_find(). */
+       struct osc_lock   *oi_write_osclock;
+
+       struct obd_info    oi_info;
+       struct obdo        oi_oa;
+       struct osc_async_cbargs {
+               bool              opc_rpc_sent;
+               int               opc_rc;
+               struct completion       opc_sync;
+       } oi_cbarg;
 };
 
 /**
@@ -89,14 +101,22 @@ struct osc_session {
         struct osc_io       os_io;
 };
 
+#define OTI_PVEC_SIZE 256
 struct osc_thread_info {
         struct ldlm_res_id      oti_resname;
         ldlm_policy_data_t      oti_policy;
         struct cl_lock_descr    oti_descr;
         struct cl_attr          oti_attr;
         struct lustre_handle    oti_handle;
-        struct cl_lock_closure  oti_closure;
         struct cl_page_list     oti_plist;
+       struct cl_io            oti_io;
+       void                    *oti_pvec[OTI_PVEC_SIZE];
+       /**
+        * Fields used by cl_lock_discard_pages().
+        */
+       pgoff_t                 oti_next_index;
+       pgoff_t                 oti_fn_index; /* first non-overlapped index */
+       struct cl_sync_io       oti_anchor;
 };
 
 struct osc_object {
@@ -107,25 +127,94 @@ struct osc_object {
          */
         int                oo_contended;
         cfs_time_t         oo_contention_time;
-#ifdef INVARIANT_CHECK
+#ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
         /**
          * IO context used for invariant checks in osc_lock_has_pages().
          */
         struct cl_io       oo_debug_io;
         /** Serialization object for osc_object::oo_debug_io. */
-        struct mutex       oo_debug_mutex;
+       struct mutex       oo_debug_mutex;
 #endif
         /**
          * List of pages in transfer.
          */
-        struct list_head   oo_inflight[CRT_NR];
+       struct list_head        oo_inflight[CRT_NR];
         /**
          * Lock, protecting ccc_object::cob_inflight, because a seat-belt is
          * locked during take-off and landing.
          */
-        spinlock_t         oo_seatbelt;
+       spinlock_t              oo_seatbelt;
+
+       /**
+        * used by the osc to keep track of what objects to build into rpcs.
+        * Protected by client_obd->cli_loi_list_lock.
+        */
+       struct list_head        oo_ready_item;
+       struct list_head        oo_hp_ready_item;
+       struct list_head        oo_write_item;
+       struct list_head        oo_read_item;
+
+       /**
+        * extent is a red black tree to manage (async) dirty pages.
+        */
+       struct rb_root       oo_root;
+       /**
+        * Manage write(dirty) extents.
+        */
+       struct list_head        oo_hp_exts;     /* list of hp extents */
+       struct list_head        oo_urgent_exts; /* list of writeback extents */
+       struct list_head        oo_rpc_exts;
+
+       struct list_head        oo_reading_exts;
+
+       atomic_t         oo_nr_reads;
+       atomic_t         oo_nr_writes;
+
+       /** Protect extent tree. Will be used to protect
+        * oo_{read|write}_pages soon. */
+       spinlock_t          oo_lock;
+
+       /**
+        * Radix tree for caching pages
+        */
+       struct radix_tree_root  oo_tree;
+       spinlock_t              oo_tree_lock;
+       unsigned long           oo_npages;
+
+       /* Protect osc_lock this osc_object has */
+       spinlock_t              oo_ol_spin;
+       struct list_head        oo_ol_list;
 };
 
+static inline void osc_object_lock(struct osc_object *obj)
+{
+       spin_lock(&obj->oo_lock);
+}
+
+static inline int osc_object_trylock(struct osc_object *obj)
+{
+       return spin_trylock(&obj->oo_lock);
+}
+
+static inline void osc_object_unlock(struct osc_object *obj)
+{
+       spin_unlock(&obj->oo_lock);
+}
+
+static inline int osc_object_is_locked(struct osc_object *obj)
+{
+#if defined(CONFIG_SMP) || defined(CONFIG_DEBUG_SPINLOCK)
+       return spin_is_locked(&obj->oo_lock);
+#else
+       /*
+        * It is not perfect to return true all the time.
+        * But since this function is only used for assertion
+        * and checking, it seems OK.
+        */
+       return 1;
+#endif
+}
+
 /*
  * Lock "micro-states" for osc layer.
  */
@@ -134,8 +223,6 @@ enum osc_lock_state {
         OLS_ENQUEUED,
         OLS_UPCALL_RECEIVED,
         OLS_GRANTED,
-        OLS_RELEASED,
-        OLS_BLOCKED,
         OLS_CANCELLED
 };
 
@@ -144,10 +231,8 @@ enum osc_lock_state {
  *
  * Interaction with DLM.
  *
- * CLIO enqueues all DLM locks through ptlrpcd (that is, in "async" mode).
- *
  * Once receive upcall is invoked, osc_lock remembers a handle of DLM lock in
- * osc_lock::ols_handle and a pointer to that lock in osc_lock::ols_lock.
+ * osc_lock::ols_handle and a pointer to that lock in osc_lock::ols_dlmlock.
  *
  * This pointer is protected through a reference, acquired by
  * osc_lock_upcall0(). Also, an additional reference is acquired by
@@ -184,17 +269,29 @@ enum osc_lock_state {
  * future.
  */
 struct osc_lock {
-        struct cl_lock_slice     ols_cl;
-        /** underlying DLM lock */
-        struct ldlm_lock        *ols_lock;
-        /** lock value block */
-        struct ost_lvb           ols_lvb;
-        /** DLM flags with which osc_lock::ols_lock was enqueued */
-        int                      ols_flags;
-        /** osc_lock::ols_lock handle */
-        struct lustre_handle     ols_handle;
-        struct ldlm_enqueue_info ols_einfo;
-        enum osc_lock_state      ols_state;
+       struct cl_lock_slice    ols_cl;
+       /** Internal lock to protect states, etc. */
+       spinlock_t              ols_lock;
+       /** Owner sleeps on this channel for state change */
+       struct cl_sync_io       *ols_owner;
+       /** waiting list for this lock to be cancelled */
+       struct list_head        ols_waiting_list;
+       /** wait entry of ols_waiting_list */
+       struct list_head        ols_wait_entry;
+       /** list entry for osc_object::oo_ol_list */
+       struct list_head        ols_nextlock_oscobj;
+
+       /** underlying DLM lock */
+       struct ldlm_lock        *ols_dlmlock;
+       /** DLM flags with which osc_lock::ols_lock was enqueued */
+       __u64                   ols_flags;
+       /** osc_lock::ols_lock handle */
+       struct lustre_handle     ols_handle;
+       struct ldlm_enqueue_info ols_einfo;
+       enum osc_lock_state      ols_state;
+       /** lock value block */
+       struct ost_lvb          ols_lvb;
+
         /**
          * true, if ldlm_lock_addref() was called against
          * osc_lock::ols_lock. This is used for sanity checking.
@@ -224,16 +321,6 @@ struct osc_lock {
          */
                                  ols_locklessable:1,
         /**
-         * set by osc_lock_use() to wait until blocking AST enters into
-         * osc_ldlm_blocking_ast0(), so that cl_lock mutex can be used for
-         * further synchronization.
-         */
-                                 ols_ast_wait:1,
-        /**
-         * If the data of this lock has been flushed to server side.
-         */
-                                 ols_flush:1,
-        /**
          * if set, the osc_lock is a glimpse lock. For glimpse locks, we treat
          * the EVAVAIL error as torerable, this will make upper logic happy
          * to wait all glimpse locks to each OSTs to be completed.
@@ -241,14 +328,11 @@ struct osc_lock {
          * granted.
          * Glimpse lock should be destroyed immediately after use.
          */
-                                 ols_glimpse:1;
+                                 ols_glimpse:1,
         /**
-         * IO that owns this lock. This field is used for a dead-lock
-         * avoidance by osc_lock_enqueue().
-         *
-         * \see osc_deadlock_is_possible()
+         * For async glimpse lock.
          */
-        struct osc_io           *ols_owner;
+                                 ols_agl:1;
 };
 
 
@@ -282,26 +366,38 @@ struct osc_page {
          */
                               ops_temp:1,
         /**
-         * True iff page was created by a user with `appropriate privileges'.
-         */
-                              ops_ignore_quota:1;
-        /**
-         * Linkage into a per-osc_object list of pages in flight. For
-         * debugging.
-         */
-        struct list_head      ops_inflight;
-        /**
-         * Thread that submitted this page for transfer. For debugging.
-         */
-        cfs_task_t           *ops_submitter;
+        * in LRU?
+        */
+                             ops_in_lru:1,
+       /**
+        * Set if the page must be transferred with OBD_BRW_SRVLOCK.
+        */
+                             ops_srvlock:1;
+       /**
+        * lru page list. See osc_lru_{del|use}() in osc_page.c for usage.
+        */
+       struct list_head        ops_lru;
+       /**
+        * Linkage into a per-osc_object list of pages in flight. For
+        * debugging.
+        */
+       struct list_head        ops_inflight;
+       /**
+        * Thread that submitted this page for transfer. For debugging.
+        */
+       struct task_struct           *ops_submitter;
+       /**
+        * Submit time - the time when the page is starting RPC. For debugging.
+        */
+       cfs_time_t            ops_submit_time;
 };
 
-extern cfs_mem_cache_t *osc_page_kmem;
-extern cfs_mem_cache_t *osc_lock_kmem;
-extern cfs_mem_cache_t *osc_object_kmem;
-extern cfs_mem_cache_t *osc_thread_kmem;
-extern cfs_mem_cache_t *osc_session_kmem;
-extern cfs_mem_cache_t *osc_req_kmem;
+extern struct kmem_cache *osc_lock_kmem;
+extern struct kmem_cache *osc_object_kmem;
+extern struct kmem_cache *osc_thread_kmem;
+extern struct kmem_cache *osc_session_kmem;
+extern struct kmem_cache *osc_req_kmem;
+extern struct kmem_cache *osc_extent_kmem;
 
 extern struct lu_device_type osc_device_type;
 extern struct lu_context_key osc_key;
@@ -319,19 +415,43 @@ int osc_req_init (const struct lu_env *env, struct cl_device *dev,
 struct lu_object *osc_object_alloc(const struct lu_env *env,
                                    const struct lu_object_header *hdr,
                                    struct lu_device *dev);
-struct cl_page   *osc_page_init   (const struct lu_env *env,
-                                   struct cl_object *obj,
-                                   struct cl_page *page, cfs_page_t *vmpage);
+int osc_page_init(const struct lu_env *env, struct cl_object *obj,
+                 struct cl_page *page, pgoff_t ind);
 
-void osc_lock_build_res(const struct lu_env *env, const struct osc_object *obj,
-                        struct ldlm_res_id *resname);
 void osc_index2policy  (ldlm_policy_data_t *policy, const struct cl_object *obj,
                         pgoff_t start, pgoff_t end);
 int  osc_lvb_print     (const struct lu_env *env, void *cookie,
                         lu_printer_t p, const struct ost_lvb *lvb);
-void osc_io_submit_page(const struct lu_env *env,
-                        struct osc_io *oio, struct osc_page *opg,
-                        enum cl_req_type crt);
+
+void osc_lru_add_batch(struct client_obd *cli, struct list_head *list);
+void osc_page_submit(const struct lu_env *env, struct osc_page *opg,
+                    enum cl_req_type crt, int brw_flags);
+int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops);
+int osc_set_async_flags(struct osc_object *obj, struct osc_page *opg,
+                       obd_flag async_flags);
+int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
+                       struct page *page, loff_t offset);
+int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
+                      struct osc_page *ops);
+int osc_page_cache_add(const struct lu_env *env,
+                      const struct cl_page_slice *slice, struct cl_io *io);
+int osc_teardown_async_page(const struct lu_env *env, struct osc_object *obj,
+                           struct osc_page *ops);
+int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
+                        struct osc_page *ops);
+int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
+                        struct list_head *list, int cmd, int brw_flags);
+int osc_cache_truncate_start(const struct lu_env *env, struct osc_io *oio,
+                            struct osc_object *obj, __u64 size);
+void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio,
+                           struct osc_object *obj);
+int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
+                             pgoff_t start, pgoff_t end, int hp, int discard);
+int osc_cache_wait_range(const struct lu_env *env, struct osc_object *obj,
+                        pgoff_t start, pgoff_t end);
+void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
+                  struct osc_object *osc, pdl_policy_t pol);
+int lru_queue_work(const struct lu_env *env, void *data);
 
 void osc_object_set_contended  (struct osc_object *obj);
 void osc_object_clear_contended(struct osc_object *obj);
@@ -384,22 +504,42 @@ static inline struct obd_export *osc_export(const struct osc_object *obj)
         return lu2osc_dev(obj->oo_cl.co_lu.lo_dev)->od_exp;
 }
 
+static inline struct client_obd *osc_cli(const struct osc_object *obj)
+{
+       return &osc_export(obj)->exp_obd->u.cli;
+}
+
 static inline struct osc_object *cl2osc(const struct cl_object *obj)
 {
         LINVRNT(osc_is_object(&obj->co_lu));
         return container_of0(obj, struct osc_object, oo_cl);
 }
 
+static inline struct cl_object *osc2cl(const struct osc_object *obj)
+{
+       return (struct cl_object *)&obj->oo_cl;
+}
+
 static inline ldlm_mode_t osc_cl_lock2ldlm(enum cl_lock_mode mode)
 {
-        LASSERT(mode == CLM_READ || mode == CLM_WRITE);
-        return mode == CLM_READ ? LCK_PR : LCK_PW;
+        LASSERT(mode == CLM_READ || mode == CLM_WRITE || mode == CLM_GROUP);
+        if (mode == CLM_READ)
+                return LCK_PR;
+        else if (mode == CLM_WRITE)
+                return LCK_PW;
+        else
+                return LCK_GROUP;
 }
 
 static inline enum cl_lock_mode osc_ldlm2cl_lock(ldlm_mode_t mode)
 {
-        LASSERT(mode == LCK_PR || mode == LCK_PW);
-        return mode == LCK_PR ? CLM_READ : CLM_WRITE;
+        LASSERT(mode == LCK_PR || mode == LCK_PW || mode == LCK_GROUP);
+        if (mode == LCK_PR)
+                return CLM_READ;
+        else if (mode == LCK_PW)
+                return CLM_WRITE;
+        else
+                return CLM_GROUP;
 }
 
 static inline struct osc_page *cl2osc_page(const struct cl_page_slice *slice)
@@ -408,6 +548,26 @@ static inline struct osc_page *cl2osc_page(const struct cl_page_slice *slice)
         return container_of0(slice, struct osc_page, ops_cl);
 }
 
+static inline struct osc_page *oap2osc(struct osc_async_page *oap)
+{
+       return container_of0(oap, struct osc_page, ops_oap);
+}
+
+static inline pgoff_t osc_index(struct osc_page *opg)
+{
+       return opg->ops_cl.cpl_index;
+}
+
+static inline struct cl_page *oap2cl_page(struct osc_async_page *oap)
+{
+       return oap2osc(oap)->ops_cl.cpl_page;
+}
+
+static inline struct osc_page *oap2osc_page(struct osc_async_page *oap)
+{
+       return (struct osc_page *)container_of(oap, struct osc_page, ops_oap);
+}
+
 static inline struct osc_lock *cl2osc_lock(const struct cl_lock_slice *slice)
 {
         LINVRNT(osc_is_object(&slice->cls_obj->co_lu));
@@ -419,6 +579,119 @@ static inline struct osc_lock *osc_lock_at(const struct cl_lock *lock)
         return cl2osc_lock(cl_lock_at(lock, &osc_device_type));
 }
 
+static inline int osc_io_srvlock(struct osc_io *oio)
+{
+        return (oio->oi_lockless && !oio->oi_cl.cis_io->ci_no_srvlock);
+}
+
+enum osc_extent_state {
+       OES_INV       = 0, /** extent is just initialized or destroyed */
+       OES_ACTIVE    = 1, /** process is using this extent */
+       OES_CACHE     = 2, /** extent is ready for IO */
+       OES_LOCKING   = 3, /** locking page to prepare IO */
+       OES_LOCK_DONE = 4, /** locking finished, ready to send */
+       OES_RPC       = 5, /** in RPC */
+       OES_TRUNC     = 6, /** being truncated */
+       OES_STATE_MAX
+};
+
+/**
+ * osc_extent data to manage dirty pages.
+ * osc_extent has the following attributes:
+ * 1. all pages in the same must be in one RPC in write back;
+ * 2. # of pages must be less than max_pages_per_rpc - implied by 1;
+ * 3. must be covered by only 1 osc_lock;
+ * 4. exclusive. It's impossible to have overlapped osc_extent.
+ *
+ * The lifetime of an extent is from when the 1st page is dirtied to when
+ * all pages inside it are written out.
+ *
+ * LOCKING ORDER
+ * =============
+ * page lock -> client_obd_list_lock -> object lock(osc_object::oo_lock)
+ */
+struct osc_extent {
+       /** red-black tree node */
+       struct rb_node          oe_node;
+       /** osc_object of this extent */
+       struct osc_object       *oe_obj;
+       /** refcount, removed from red-black tree if reaches zero. */
+       atomic_t                oe_refc;
+       /** busy if non-zero */
+       atomic_t                oe_users;
+       /** link list of osc_object's oo_{hp|urgent|locking}_exts. */
+       struct list_head        oe_link;
+       /** state of this extent */
+       unsigned int            oe_state;
+       /** flags for this extent. */
+       unsigned int            oe_intree:1,
+       /** 0 is write, 1 is read */
+                               oe_rw:1,
+       /** sync extent, queued by osc_queue_sync_pages() */
+                               oe_sync:1,
+                               oe_srvlock:1,
+                               oe_memalloc:1,
+       /** an ACTIVE extent is going to be truncated, so when this extent
+        * is released, it will turn into TRUNC state instead of CACHE. */
+                               oe_trunc_pending:1,
+       /** this extent should be written asap and someone may wait for the
+        * write to finish. This bit is usually set along with urgent if
+        * the extent was CACHE state.
+        * fsync_wait extent can't be merged because new extent region may
+        * exceed fsync range. */
+                               oe_fsync_wait:1,
+       /** covering lock is being canceled */
+                               oe_hp:1,
+       /** this extent should be written back asap. set if one of pages is
+        * called by page WB daemon, or sync write or reading requests. */
+                               oe_urgent:1;
+       /** how many grants allocated for this extent.
+        *  Grant allocated for this extent. There is no grant allocated
+        *  for reading extents and sync write extents. */
+       unsigned int            oe_grants;
+       /** # of dirty pages in this extent */
+       unsigned int            oe_nr_pages;
+       /** list of pending oap pages. Pages in this list are NOT sorted. */
+       struct list_head        oe_pages;
+       /** Since an extent has to be written out in atomic, this is used to
+        * remember the next page need to be locked to write this extent out.
+        * Not used right now.
+        */
+       struct osc_page         *oe_next_page;
+       /** start and end index of this extent, include start and end
+        * themselves. Page offset here is the page index of osc_pages.
+        * oe_start is used as keyword for red-black tree. */
+       pgoff_t                 oe_start;
+       pgoff_t                 oe_end;
+       /** maximum ending index of this extent, this is limited by
+        * max_pages_per_rpc, lock extent and chunk size. */
+       pgoff_t                 oe_max_end;
+       /** waitqueue - for those who want to be notified if this extent's
+        * state has changed. */
+       wait_queue_head_t       oe_waitq;
+       /** lock covering this extent */
+       struct ldlm_lock        *oe_dlmlock;
+       /** terminator of this extent. Must be true if this extent is in IO. */
+       struct task_struct      *oe_owner;
+       /** return value of writeback. If somebody is waiting for this extent,
+        * this value can be known by outside world. */
+       int                     oe_rc;
+       /** max pages per rpc when this extent was created */
+       unsigned int            oe_mppr;
+};
+
+int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
+                     int sent, int rc);
+int osc_extent_release(const struct lu_env *env, struct osc_extent *ext);
+
+int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
+                          pgoff_t start, pgoff_t end, enum cl_lock_mode mode);
+
+typedef int (*osc_page_gang_cbt)(const struct lu_env *, struct cl_io *,
+                                struct osc_page *, void *);
+int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
+                        struct osc_object *osc, pgoff_t start, pgoff_t end,
+                        osc_page_gang_cbt cb, void *cbdata);
 /** @} osc */
 
 #endif /* OSC_CL_INTERNAL_H */