Whamcloud - gitweb
LU-6271 osc: further OSC cleanup after eviction 56/16456/8
authorJinshan Xiong <jinshan.xiong@intel.com>
Wed, 16 Sep 2015 18:47:20 +0000 (11:47 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 1 Oct 2015 14:22:20 +0000 (14:22 +0000)
A few problems are fixed in this patch:
1. a ldlm lock could be canceled simutaneously by ldlm bl thread and
  cleanup_resource(). In this case, only one side will win the race
  and the other side should wait for the work to complete;
2. in lov_io_iter_init(), if cl_io_iter_init() against sub io fails,
  it should call cl_io_iter_fini() to cleanup leftover information;
3. define osc_lru_reserve() and osc_lru_unreserve() to reserve LRU
  slots in osc_io_write_iter_init() and unreserve them in fini();
4. eviction on group lock is well supported;
5. cleanups

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: I293770b62e177a9ecefe0b4e05f3a8f44b1c831d
Reviewed-on: http://review.whamcloud.com/16456
Tested-by: Jenkins
Reviewed-by: Bobi Jam <bobijam@hotmail.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
15 files changed:
lustre/include/cl_object.h
lustre/include/lustre_dlm_flags.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/llite/vvp_io.c
lustre/lov/lov_io.c
lustre/osc/osc_cache.c
lustre/osc/osc_cl_internal.h
lustre/osc/osc_internal.h
lustre/osc/osc_io.c
lustre/osc/osc_lock.c
lustre/osc/osc_object.c
lustre/osc/osc_page.c
lustre/osc/osc_request.c

index a9543a1..6c4db88 100644 (file)
@@ -1642,9 +1642,14 @@ enum cl_enq_flags {
         */
        CEF_PEEK        = 0x00000040,
        /**
         */
        CEF_PEEK        = 0x00000040,
        /**
+        * Lock match only. Used by group lock in I/O as group lock
+        * is known to exist.
+        */
+       CEF_LOCK_MATCH  = 0x00000080,
+       /**
         * mask of enq_flags.
         */
         * mask of enq_flags.
         */
-       CEF_MASK         = 0x0000007f,
+       CEF_MASK         = 0x000000ff,
 };
 
 /**
 };
 
 /**
index 5357746..5f7206d 100644 (file)
 #define ldlm_set_test_lock(_l)          LDLM_SET_FLAG((  _l), 1ULL << 19)
 #define ldlm_clear_test_lock(_l)        LDLM_CLEAR_FLAG((_l), 1ULL << 19)
 
 #define ldlm_set_test_lock(_l)          LDLM_SET_FLAG((  _l), 1ULL << 19)
 #define ldlm_clear_test_lock(_l)        LDLM_CLEAR_FLAG((_l), 1ULL << 19)
 
+/** match lock only */
+#define LDLM_FL_MATCH_LOCK               0x0000000000100000ULL // bit  20
+
 /**
  * Immediatelly cancel such locks when they block some other locks. Send
  * cancel notification to original lock holder, but expect no reply. This
 /**
  * Immediatelly cancel such locks when they block some other locks. Send
  * cancel notification to original lock holder, but expect no reply. This
index 39abaef..13741a0 100644 (file)
@@ -832,24 +832,23 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
 
         ldlm_lock_decref_internal_nolock(lock, mode);
 
 
         ldlm_lock_decref_internal_nolock(lock, mode);
 
-       if (ldlm_is_local(lock) &&
-            !lock->l_readers && !lock->l_writers) {
-                /* If this is a local lock on a server namespace and this was
-                 * the last reference, cancel the lock. */
-                CDEBUG(D_INFO, "forcing cancel of local lock\n");
-               ldlm_set_cbpending(lock);
-        }
-
-       if (!lock->l_readers && !lock->l_writers &&
-           (ldlm_is_cbpending(lock) || lock->l_req_mode == LCK_GROUP)) {
-               /* If we received a blocked AST and this was the last reference,
-                * run the callback.
+       if ((ldlm_is_local(lock) || lock->l_req_mode == LCK_GROUP) &&
+           !lock->l_readers && !lock->l_writers) {
+               /* If this is a local lock on a server namespace and this was
+                * the last reference, cancel the lock.
+                *
                 * Group locks are special:
                 * They must not go in LRU, but they are not called back
                 * like non-group locks, instead they are manually released.
                 * They have an l_writers reference which they keep until
                 * they are manually released, so we remove them when they have
                 * no more reader or writer references. - LU-6368 */
                 * Group locks are special:
                 * They must not go in LRU, but they are not called back
                 * like non-group locks, instead they are manually released.
                 * They have an l_writers reference which they keep until
                 * they are manually released, so we remove them when they have
                 * no more reader or writer references. - LU-6368 */
+               ldlm_set_cbpending(lock);
+       }
+
+       if (!lock->l_readers && !lock->l_writers && ldlm_is_cbpending(lock)) {
+               /* If we received a blocked AST and this was the last reference,
+                * run the callback. */
                if (ldlm_is_ns_srv(lock) && lock->l_export)
                         CERROR("FL_CBPENDING set on non-local lock--just a "
                                "warning\n");
                if (ldlm_is_ns_srv(lock) && lock->l_export)
                         CERROR("FL_CBPENDING set on non-local lock--just a "
                                "warning\n");
@@ -2169,6 +2168,19 @@ restart:
 }
 EXPORT_SYMBOL(ldlm_reprocess_all);
 
 }
 EXPORT_SYMBOL(ldlm_reprocess_all);
 
+static bool is_bl_done(struct ldlm_lock *lock)
+{
+       bool bl_done = true;
+
+       if (!ldlm_is_bl_done(lock)) {
+               lock_res_and_lock(lock);
+               bl_done = ldlm_is_bl_done(lock);
+               unlock_res_and_lock(lock);
+       }
+
+       return bl_done;
+}
+
 /**
  * Helper function to call blocking AST for LDLM lock \a lock in a
  * "cancelling" mode.
 /**
  * Helper function to call blocking AST for LDLM lock \a lock in a
  * "cancelling" mode.
@@ -2186,8 +2198,19 @@ void ldlm_cancel_callback(struct ldlm_lock *lock)
                 } else {
                         LDLM_DEBUG(lock, "no blocking ast");
                 }
                 } else {
                         LDLM_DEBUG(lock, "no blocking ast");
                 }
-        }
-       ldlm_set_bl_done(lock);
+
+               /* only canceller can set bl_done bit */
+               ldlm_set_bl_done(lock);
+               wake_up_all(&lock->l_waitq);
+       } else if (!ldlm_is_bl_done(lock)) {
+               struct l_wait_info lwi = { 0 };
+
+               /* The lock is guaranteed to have been canceled once
+                * returning from this function. */
+               unlock_res_and_lock(lock);
+               l_wait_event(lock->l_waitq, is_bl_done(lock), &lwi);
+               lock_res_and_lock(lock);
+       }
 }
 
 /**
 }
 
 /**
index ef2fb06..53d2f43 100644 (file)
@@ -1342,13 +1342,23 @@ int ldlm_cli_cancel(struct lustre_handle *lockh,
        struct list_head cancels = LIST_HEAD_INIT(cancels);
        ENTRY;
 
        struct list_head cancels = LIST_HEAD_INIT(cancels);
        ENTRY;
 
-       /* concurrent cancels on the same handle can happen */
-       lock = ldlm_handle2lock_long(lockh, LDLM_FL_CANCELING);
+       lock = ldlm_handle2lock_long(lockh, 0);
        if (lock == NULL) {
                LDLM_DEBUG_NOLOCK("lock is already being destroyed");
                RETURN(0);
        }
 
        if (lock == NULL) {
                LDLM_DEBUG_NOLOCK("lock is already being destroyed");
                RETURN(0);
        }
 
+       lock_res_and_lock(lock);
+       /* Lock is being canceled and the caller doesn't want to wait */
+       if (ldlm_is_canceling(lock) && (cancel_flags & LCF_ASYNC)) {
+               unlock_res_and_lock(lock);
+               LDLM_LOCK_RELEASE(lock);
+               RETURN(0);
+       }
+
+       ldlm_set_canceling(lock);
+       unlock_res_and_lock(lock);
+
        rc = ldlm_cli_cancel_local(lock);
        if (rc == LDLM_FL_LOCAL_ONLY || cancel_flags & LCF_LOCAL) {
                LDLM_LOCK_RELEASE(lock);
        rc = ldlm_cli_cancel_local(lock);
        if (rc == LDLM_FL_LOCAL_ONLY || cancel_flags & LCF_LOCAL) {
                LDLM_LOCK_RELEASE(lock);
index 0a81a87..1ec08e7 100644 (file)
@@ -849,7 +849,7 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
 
                         unlock_res(res);
                         ldlm_lock2handle(lock, &lockh);
 
                         unlock_res(res);
                         ldlm_lock2handle(lock, &lockh);
-                       rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
+                       rc = ldlm_cli_cancel(&lockh, LCF_LOCAL);
                         if (rc)
                                 CERROR("ldlm_cli_cancel: %d\n", rc);
                 } else {
                         if (rc)
                                 CERROR("ldlm_cli_cancel: %d\n", rc);
                 } else {
index 6b063fe..c2a088a 100644 (file)
@@ -238,6 +238,7 @@ static int vvp_io_one_lock_index(const struct lu_env *env, struct cl_io *io,
        if (vio->vui_fd && (vio->vui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
                descr->cld_mode = CLM_GROUP;
                descr->cld_gid  = vio->vui_fd->fd_grouplock.lg_gid;
        if (vio->vui_fd && (vio->vui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
                descr->cld_mode = CLM_GROUP;
                descr->cld_gid  = vio->vui_fd->fd_grouplock.lg_gid;
+               enqflags |= CEF_LOCK_MATCH;
        } else {
                descr->cld_mode  = mode;
        }
        } else {
                descr->cld_mode  = mode;
        }
index cf1ccfe..00644f6 100644 (file)
@@ -439,24 +439,27 @@ static int lov_io_iter_init(const struct lu_env *env,
                        continue;
                }
 
                        continue;
                }
 
-                end = lov_offset_mod(end, +1);
-                sub = lov_sub_get(env, lio, stripe);
-                if (!IS_ERR(sub)) {
-                        lov_io_sub_inherit(sub->sub_io, lio, stripe,
-                                           start, end);
-                        rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
-                        lov_sub_put(sub);
-                        CDEBUG(D_VFSTRACE, "shrink: %d ["LPU64", "LPU64")\n",
-                               stripe, start, end);
-                } else
-                        rc = PTR_ERR(sub);
-
-                if (!rc)
-                       list_add_tail(&sub->sub_linkage, &lio->lis_active);
-                else
-                        break;
-        }
-        RETURN(rc);
+               end = lov_offset_mod(end, +1);
+               sub = lov_sub_get(env, lio, stripe);
+               if (IS_ERR(sub)) {
+                       rc = PTR_ERR(sub);
+                       break;
+               }
+
+               lov_io_sub_inherit(sub->sub_io, lio, stripe, start, end);
+               rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
+               if (rc != 0)
+                       cl_io_iter_fini(sub->sub_env, sub->sub_io);
+               lov_sub_put(sub);
+               if (rc != 0)
+                       break;
+
+               CDEBUG(D_VFSTRACE, "shrink: %d ["LPU64", "LPU64")\n",
+                      stripe, start, end);
+
+               list_add_tail(&sub->sub_linkage, &lio->lis_active);
+       }
+       RETURN(rc);
 }
 
 static int lov_io_rw_iter_init(const struct lu_env *env,
 }
 
 static int lov_io_rw_iter_init(const struct lu_env *env,
index 0b01b36..e70d61f 100644 (file)
@@ -2694,7 +2694,6 @@ again:
                         * a page already having been flushed by write_page().
                         * We have to wait for this extent because we can't
                         * truncate that page. */
                         * a page already having been flushed by write_page().
                         * We have to wait for this extent because we can't
                         * truncate that page. */
-                       LASSERT(!ext->oe_hp);
                        OSC_EXTENT_DUMP(D_CACHE, ext,
                                        "waiting for busy extent\n");
                        waiting = osc_extent_get(ext);
                        OSC_EXTENT_DUMP(D_CACHE, ext,
                                        "waiting for busy extent\n");
                        waiting = osc_extent_get(ext);
index e5caf37..7ca58c5 100644 (file)
@@ -62,10 +62,12 @@ struct osc_extent;
  * State maintained by osc layer for each IO context.
  */
 struct osc_io {
  * State maintained by osc layer for each IO context.
  */
 struct osc_io {
-        /** super class */
-        struct cl_io_slice oi_cl;
-        /** true if this io is lockless. */
-        int                oi_lockless;
+       /** super class */
+       struct cl_io_slice oi_cl;
+       /** true if this io is lockless. */
+       int                oi_lockless:1,
+       /** true if this io is counted as active IO */
+                          oi_is_active:1;
        /** how many LRU pages are reserved for this IO */
        unsigned long      oi_lru_reserved;
 
        /** how many LRU pages are reserved for this IO */
        unsigned long      oi_lru_reserved;
 
index 9a03ba3..636a315 100644 (file)
@@ -135,7 +135,8 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
                  struct list_head *ext_list, int cmd);
 long osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
                   long target, bool force);
                  struct list_head *ext_list, int cmd);
 long osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
                   long target, bool force);
-long osc_lru_reclaim(struct client_obd *cli, unsigned long npages);
+unsigned long osc_lru_reserve(struct client_obd *cli, unsigned long npages);
+void osc_lru_unreserve(struct client_obd *cli, unsigned long npages);
 
 extern spinlock_t osc_ast_guard;
 extern struct lu_kmem_descr osc_caches[];
 
 extern spinlock_t osc_ast_guard;
 extern struct lu_kmem_descr osc_caches[];
index 2fb42ba..8df200b 100644 (file)
@@ -340,7 +340,10 @@ static int osc_io_iter_init(const struct lu_env *env,
 
        spin_lock(&imp->imp_lock);
        if (likely(!imp->imp_invalid)) {
 
        spin_lock(&imp->imp_lock);
        if (likely(!imp->imp_invalid)) {
+               struct osc_io *oio = osc_env_io(env);
+
                atomic_inc(&osc->oo_nr_ios);
                atomic_inc(&osc->oo_nr_ios);
+               oio->oi_is_active = 1;
                rc = 0;
        }
        spin_unlock(&imp->imp_lock);
                rc = 0;
        }
        spin_unlock(&imp->imp_lock);
@@ -354,9 +357,6 @@ static int osc_io_write_iter_init(const struct lu_env *env,
        struct cl_io *io = ios->cis_io;
        struct osc_io *oio = osc_env_io(env);
        struct osc_object *osc = cl2osc(ios->cis_obj);
        struct cl_io *io = ios->cis_io;
        struct osc_io *oio = osc_env_io(env);
        struct osc_object *osc = cl2osc(ios->cis_obj);
-       struct client_obd *cli = osc_cli(osc);
-       unsigned long c;
-       unsigned long max_pages;
        unsigned long npages;
        ENTRY;
 
        unsigned long npages;
        ENTRY;
 
@@ -367,29 +367,7 @@ static int osc_io_write_iter_init(const struct lu_env *env,
        if (io->u.ci_rw.crw_pos & ~PAGE_MASK)
                ++npages;
 
        if (io->u.ci_rw.crw_pos & ~PAGE_MASK)
                ++npages;
 
-       max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
-       if (npages > max_pages)
-               npages = max_pages;
-
-       c = atomic_long_read(cli->cl_lru_left);
-       if (c < npages && osc_lru_reclaim(cli, npages) > 0)
-               c = atomic_long_read(cli->cl_lru_left);
-       while (c >= npages) {
-               if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) {
-                       oio->oi_lru_reserved = npages;
-                       break;
-               }
-               c = atomic_long_read(cli->cl_lru_left);
-       }
-       if (atomic_long_read(cli->cl_lru_left) < max_pages) {
-               /* If there aren't enough pages in the per-OSC LRU then
-                * wake up the LRU thread to try and clear out space, so
-                * we don't block if pages are being dirtied quickly. */
-               CDEBUG(D_CACHE, "%s: queue LRU, left: %lu/%ld.\n",
-                      cli_name(cli), atomic_long_read(cli->cl_lru_left),
-                      max_pages);
-               (void)ptlrpcd_queue_work(cli->cl_lru_work);
-       }
+       oio->oi_lru_reserved = osc_lru_reserve(osc_cli(osc), npages);
 
        RETURN(osc_io_iter_init(env, ios));
 }
 
        RETURN(osc_io_iter_init(env, ios));
 }
@@ -397,11 +375,16 @@ static int osc_io_write_iter_init(const struct lu_env *env,
 static void osc_io_iter_fini(const struct lu_env *env,
                             const struct cl_io_slice *ios)
 {
 static void osc_io_iter_fini(const struct lu_env *env,
                             const struct cl_io_slice *ios)
 {
-       struct osc_object *osc = cl2osc(ios->cis_obj);
+       struct osc_io *oio = osc_env_io(env);
 
 
-       LASSERT(atomic_read(&osc->oo_nr_ios) > 0);
-       if (atomic_dec_and_test(&osc->oo_nr_ios))
-               wake_up_all(&osc->oo_io_waitq);
+       if (oio->oi_is_active) {
+               struct osc_object *osc = cl2osc(ios->cis_obj);
+
+               oio->oi_is_active = 0;
+               LASSERT(atomic_read(&osc->oo_nr_ios) > 0);
+               if (atomic_dec_and_test(&osc->oo_nr_ios))
+                       wake_up_all(&osc->oo_io_waitq);
+       }
 }
 
 static void osc_io_write_iter_fini(const struct lu_env *env,
 }
 
 static void osc_io_write_iter_fini(const struct lu_env *env,
@@ -409,10 +392,9 @@ static void osc_io_write_iter_fini(const struct lu_env *env,
 {
        struct osc_io *oio = osc_env_io(env);
        struct osc_object *osc = cl2osc(ios->cis_obj);
 {
        struct osc_io *oio = osc_env_io(env);
        struct osc_object *osc = cl2osc(ios->cis_obj);
-       struct client_obd *cli = osc_cli(osc);
 
        if (oio->oi_lru_reserved > 0) {
 
        if (oio->oi_lru_reserved > 0) {
-               atomic_long_add(oio->oi_lru_reserved, cli->cl_lru_left);
+               osc_lru_unreserve(osc_cli(osc), oio->oi_lru_reserved);
                oio->oi_lru_reserved = 0;
        }
        oio->oi_write_osclock = NULL;
                oio->oi_lru_reserved = 0;
        }
        oio->oi_write_osclock = NULL;
index b645379..eebc0d6 100644 (file)
@@ -173,6 +173,8 @@ static __u64 osc_enq2ldlm_flags(__u32 enqflags)
                result |= LDLM_FL_AST_DISCARD_DATA;
        if (enqflags & CEF_PEEK)
                result |= LDLM_FL_TEST_LOCK;
                result |= LDLM_FL_AST_DISCARD_DATA;
        if (enqflags & CEF_PEEK)
                result |= LDLM_FL_TEST_LOCK;
+       if (enqflags & CEF_LOCK_MATCH)
+               result |= LDLM_FL_MATCH_LOCK;
        return result;
 }
 
        return result;
 }
 
@@ -848,13 +850,14 @@ static void osc_lock_wake_waiters(const struct lu_env *env,
        spin_unlock(&oscl->ols_lock);
 }
 
        spin_unlock(&oscl->ols_lock);
 }
 
-static void osc_lock_enqueue_wait(const struct lu_env *env,
-                                 struct osc_object *obj,
-                                 struct osc_lock *oscl)
+static int osc_lock_enqueue_wait(const struct lu_env *env,
+               struct osc_object *obj, struct osc_lock *oscl)
 {
        struct osc_lock         *tmp_oscl;
        struct cl_lock_descr    *need = &oscl->ols_cl.cls_lock->cll_descr;
        struct cl_sync_io       *waiter = &osc_env_info(env)->oti_anchor;
 {
        struct osc_lock         *tmp_oscl;
        struct cl_lock_descr    *need = &oscl->ols_cl.cls_lock->cll_descr;
        struct cl_sync_io       *waiter = &osc_env_info(env)->oti_anchor;
+       int rc = 0;
+       ENTRY;
 
        spin_lock(&obj->oo_ol_spin);
        list_add_tail(&oscl->ols_nextlock_oscobj, &obj->oo_ol_list);
 
        spin_lock(&obj->oo_ol_spin);
        list_add_tail(&oscl->ols_nextlock_oscobj, &obj->oo_ol_list);
@@ -891,13 +894,18 @@ restart:
                spin_unlock(&tmp_oscl->ols_lock);
 
                spin_unlock(&obj->oo_ol_spin);
                spin_unlock(&tmp_oscl->ols_lock);
 
                spin_unlock(&obj->oo_ol_spin);
-               (void)cl_sync_io_wait(env, waiter, 0);
-
+               rc = cl_sync_io_wait(env, waiter, 0);
                spin_lock(&obj->oo_ol_spin);
                spin_lock(&obj->oo_ol_spin);
+
+               if (rc < 0)
+                       break;
+
                oscl->ols_owner = NULL;
                goto restart;
        }
        spin_unlock(&obj->oo_ol_spin);
                oscl->ols_owner = NULL;
                goto restart;
        }
        spin_unlock(&obj->oo_ol_spin);
+
+       RETURN(rc);
 }
 
 /**
 }
 
 /**
@@ -947,7 +955,9 @@ static int osc_lock_enqueue(const struct lu_env *env,
                GOTO(enqueue_base, 0);
        }
 
                GOTO(enqueue_base, 0);
        }
 
-       osc_lock_enqueue_wait(env, osc, oscl);
+       result = osc_lock_enqueue_wait(env, osc, oscl);
+       if (result < 0)
+               GOTO(out, result);
 
        /* we can grant lockless lock right after all conflicting locks
         * are canceled. */
 
        /* we can grant lockless lock right after all conflicting locks
         * are canceled. */
@@ -971,7 +981,6 @@ enqueue_base:
         * osc_lock.
         */
        ostid_build_res_name(&osc->oo_oinfo->loi_oi, resname);
         * osc_lock.
         */
        ostid_build_res_name(&osc->oo_oinfo->loi_oi, resname);
-       osc_lock_build_einfo(env, lock, osc, &oscl->ols_einfo);
        osc_lock_build_policy(env, lock, policy);
        if (oscl->ols_agl) {
                oscl->ols_einfo.ei_cbdata = NULL;
        osc_lock_build_policy(env, lock, policy);
        if (oscl->ols_agl) {
                oscl->ols_einfo.ei_cbdata = NULL;
@@ -986,19 +995,7 @@ enqueue_base:
                                  upcall, cookie,
                                  &oscl->ols_einfo, PTLRPCD_SET, async,
                                  oscl->ols_agl);
                                  upcall, cookie,
                                  &oscl->ols_einfo, PTLRPCD_SET, async,
                                  oscl->ols_agl);
-       if (result != 0) {
-               oscl->ols_state = OLS_CANCELLED;
-               osc_lock_wake_waiters(env, osc, oscl);
-
-               /* hide error for AGL lock. */
-               if (oscl->ols_agl) {
-                       cl_object_put(env, osc2cl(osc));
-                       result = 0;
-               }
-
-               if (anchor != NULL)
-                       cl_sync_io_note(env, anchor, result);
-       } else {
+       if (result == 0) {
                if (osc_lock_is_lockless(oscl)) {
                        oio->oi_lockless = 1;
                } else if (!async) {
                if (osc_lock_is_lockless(oscl)) {
                        oio->oi_lockless = 1;
                } else if (!async) {
@@ -1006,6 +1003,18 @@ enqueue_base:
                        LASSERT(oscl->ols_hold);
                        LASSERT(oscl->ols_dlmlock != NULL);
                }
                        LASSERT(oscl->ols_hold);
                        LASSERT(oscl->ols_dlmlock != NULL);
                }
+       } else if (oscl->ols_agl) {
+               cl_object_put(env, osc2cl(osc));
+               result = 0;
+       }
+
+out:
+       if (result < 0) {
+               oscl->ols_state = OLS_CANCELLED;
+               osc_lock_wake_waiters(env, osc, oscl);
+
+               if (anchor != NULL)
+                       cl_sync_io_note(env, anchor, result);
        }
        RETURN(result);
 }
        }
        RETURN(result);
 }
@@ -1175,6 +1184,7 @@ int osc_lock_init(const struct lu_env *env,
                oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED;
                oscl->ols_glimpse = 1;
        }
                oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED;
                oscl->ols_glimpse = 1;
        }
+       osc_lock_build_einfo(env, lock, cl2osc(obj), &oscl->ols_einfo);
 
        cl_lock_slice_add(lock, &oscl->ols_cl, obj, &osc_lock_ops);
 
 
        cl_lock_slice_add(lock, &oscl->ols_cl, obj, &osc_lock_ops);
 
index 7337489..94b3cf5 100644 (file)
@@ -494,9 +494,15 @@ int osc_object_invalidate(const struct lu_env *env, struct osc_object *osc)
 
        l_wait_event(osc->oo_io_waitq, atomic_read(&osc->oo_nr_ios) == 0, &lwi);
 
 
        l_wait_event(osc->oo_io_waitq, atomic_read(&osc->oo_nr_ios) == 0, &lwi);
 
-       /* Discard all pages of this object. */
+       /* Discard all dirty pages of this object. */
        osc_cache_truncate_start(env, osc, 0, NULL);
 
        osc_cache_truncate_start(env, osc, 0, NULL);
 
+       /* Discard all caching pages */
+       osc_lock_discard_pages(env, osc, 0, CL_PAGE_EOF, CLM_WRITE);
+
+       /* Clear ast data of dlm lock. Do this after discarding all pages */
+       osc_object_prune(env, osc2cl(osc));
+
        RETURN(0);
 }
 
        RETURN(0);
 }
 
index b861a9d..12380c6 100644 (file)
@@ -45,8 +45,8 @@
 
 static void osc_lru_del(struct client_obd *cli, struct osc_page *opg);
 static void osc_lru_use(struct client_obd *cli, struct osc_page *opg);
 
 static void osc_lru_del(struct client_obd *cli, struct osc_page *opg);
 static void osc_lru_use(struct client_obd *cli, struct osc_page *opg);
-static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj,
-                          struct osc_page *opg);
+static int osc_lru_alloc(const struct lu_env *env, struct client_obd *cli,
+                        struct osc_page *opg);
 
 /** \addtogroup osc
  *  @{
 
 /** \addtogroup osc
  *  @{
@@ -406,7 +406,7 @@ int osc_page_init(const struct lu_env *env, struct cl_object *obj,
 
        /* reserve an LRU space for this page */
        if (page->cp_type == CPT_CACHEABLE && result == 0) {
 
        /* reserve an LRU space for this page */
        if (page->cp_type == CPT_CACHEABLE && result == 0) {
-               result = osc_lru_reserve(env, osc, opg);
+               result = osc_lru_alloc(env, osc_cli(osc), opg);
                if (result == 0) {
                        spin_lock(&osc->oo_tree_lock);
                        result = radix_tree_insert(&osc->oo_tree, index, opg);
                if (result == 0) {
                        spin_lock(&osc->oo_tree_lock);
                        result = radix_tree_insert(&osc->oo_tree, index, opg);
@@ -805,7 +805,7 @@ long osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
  * LRU pages in batch. Therefore, the actual number is adjusted at least
  * max_pages_per_rpc.
  */
  * LRU pages in batch. Therefore, the actual number is adjusted at least
  * max_pages_per_rpc.
  */
-long osc_lru_reclaim(struct client_obd *cli, unsigned long npages)
+static long osc_lru_reclaim(struct client_obd *cli, unsigned long npages)
 {
        struct cl_env_nest nest;
        struct lu_env *env;
 {
        struct cl_env_nest nest;
        struct lu_env *env;
@@ -878,18 +878,17 @@ out:
 }
 
 /**
 }
 
 /**
- * osc_lru_reserve() is called to reserve an LRU slot for a cl_page.
+ * osc_lru_alloc() is called to allocate an LRU slot for a cl_page.
  *
  * Usually the LRU slots are reserved in osc_io_iter_rw_init().
  * Only in the case that the LRU slots are in extreme shortage, it should
  * have reserved enough slots for an IO.
  */
  *
  * Usually the LRU slots are reserved in osc_io_iter_rw_init().
  * Only in the case that the LRU slots are in extreme shortage, it should
  * have reserved enough slots for an IO.
  */
-static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj,
-                          struct osc_page *opg)
+static int osc_lru_alloc(const struct lu_env *env, struct client_obd *cli,
+                        struct osc_page *opg)
 {
        struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
        struct osc_io *oio = osc_env_io(env);
 {
        struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
        struct osc_io *oio = osc_env_io(env);
-       struct client_obd *cli = osc_cli(obj);
        int rc = 0;
        ENTRY;
 
        int rc = 0;
        ENTRY;
 
@@ -929,6 +928,60 @@ out:
 }
 
 /**
 }
 
 /**
+ * osc_lru_reserve() is called to reserve enough LRU slots for I/O.
+ *
+ * The benefit of doing this is to reduce contention against atomic counter
+ * cl_lru_left by changing it from per-page access to per-IO access.
+ */
+unsigned long osc_lru_reserve(struct client_obd *cli, unsigned long npages)
+{
+       unsigned long reserved = 0;
+       unsigned long max_pages;
+       unsigned long c;
+
+       /* reserve a full RPC window at most to avoid that a thread accidentally
+        * consumes too many LRU slots */
+       max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
+       if (npages > max_pages)
+               npages = max_pages;
+
+       c = atomic_long_read(cli->cl_lru_left);
+       if (c < npages && osc_lru_reclaim(cli, npages) > 0)
+               c = atomic_long_read(cli->cl_lru_left);
+       while (c >= npages) {
+               if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) {
+                       reserved = npages;
+                       break;
+               }
+               c = atomic_long_read(cli->cl_lru_left);
+       }
+       if (atomic_long_read(cli->cl_lru_left) < max_pages) {
+               /* If there aren't enough pages in the per-OSC LRU then
+                * wake up the LRU thread to try and clear out space, so
+                * we don't block if pages are being dirtied quickly. */
+               CDEBUG(D_CACHE, "%s: queue LRU, left: %lu/%ld.\n",
+                      cli_name(cli), atomic_long_read(cli->cl_lru_left),
+                      max_pages);
+               (void)ptlrpcd_queue_work(cli->cl_lru_work);
+       }
+
+       return reserved;
+}
+
+/**
+ * osc_lru_unreserve() is called to unreserve LRU slots.
+ *
+ * LRU slots reserved by osc_lru_reserve() may have entries left due to several
+ * reasons such as page already existing or I/O error. Those reserved slots
+ * should be freed by calling this function.
+ */
+void osc_lru_unreserve(struct client_obd *cli, unsigned long npages)
+{
+       atomic_long_add(npages, cli->cl_lru_left);
+       wake_up_all(&osc_lru_waitq);
+}
+
+/**
  * Atomic operations are expensive. We accumulate the accounting for the
  * same page zone to get better performance.
  * In practice this can work pretty good because the pages in the same RPC
  * Atomic operations are expensive. We accumulate the accounting for the
  * same page zone to get better performance.
  * In practice this can work pretty good because the pages in the same RPC
index 28b23da..060ef0d 100644 (file)
@@ -2011,7 +2011,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
        }
 
 no_match:
        }
 
 no_match:
-       if (*flags & LDLM_FL_TEST_LOCK)
+       if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
                RETURN(-ENOLCK);
 
        if (intent) {
                RETURN(-ENOLCK);
 
        if (intent) {
@@ -2491,7 +2491,11 @@ static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
                        osc = lock->l_ast_data;
                        cl_object_get(osc2cl(osc));
                }
                        osc = lock->l_ast_data;
                        cl_object_get(osc2cl(osc));
                }
-               lock->l_ast_data = NULL;
+
+               /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
+                * by the 2nd round of ldlm_namespace_clean() call in
+                * osc_import_event(). */
+               ldlm_clear_cleaned(lock);
        }
        unlock_res(res);
 
        }
        unlock_res(res);