Whamcloud - gitweb
Revert "LU-462 Don't alloc/free client data for self export"
[fs/lustre-release.git] / lustre / osc / osc_page.c
index 62706e1..0a3e480 100644 (file)
  * GPL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
  */
 
-/** \addtogroup osc osc @{ */
-
 #define DEBUG_SUBSYSTEM S_OSC
 
 #include "osc_cl_internal.h"
 
+/** \addtogroup osc 
+ *  @{ 
+ */
+
 /* 
  * Comment out osc_page_protected because it may sleep inside the
  * the client_obd_list_lock.
@@ -65,7 +70,7 @@ static int osc_page_is_dlocked(const struct lu_env *env,
         ldlm_mode_t             dlmmode;
         int                     flags;
 
-        might_sleep();
+        cfs_might_sleep();
 
         info = osc_env_info(env);
         resname = &info->oti_resname;
@@ -119,8 +124,8 @@ static int osc_page_protected(const struct lu_env *env,
                 descr->cld_mode = mode;
                 descr->cld_start = page->cp_index;
                 descr->cld_end   = page->cp_index;
-                spin_lock(&hdr->coh_lock_guard);
-                list_for_each_entry(scan, &hdr->coh_locks, cll_linkage) {
+                cfs_spin_lock(&hdr->coh_lock_guard);
+                cfs_list_for_each_entry(scan, &hdr->coh_locks, cll_linkage) {
                         /*
                          * Lock-less sub-lock has to be either in HELD state
                          * (when io is actively going on), or in CACHED state,
@@ -137,7 +142,7 @@ static int osc_page_protected(const struct lu_env *env,
                                 break;
                         }
                 }
-                spin_unlock(&hdr->coh_lock_guard);
+                cfs_spin_unlock(&hdr->coh_lock_guard);
         }
         return result;
 }
@@ -160,6 +165,7 @@ static void osc_page_fini(const struct lu_env *env,
 {
         struct osc_page *opg = cl2osc_page(slice);
         CDEBUG(D_TRACE, "%p\n", opg);
+        LASSERT(opg->ops_lock == NULL);
         OBD_SLAB_FREE_PTR(opg, osc_page_kmem);
 }
 
@@ -198,10 +204,10 @@ static void osc_page_transfer_add(const struct lu_env *env,
         LINVRNT(cl_page_is_vmlocked(env, opg->ops_cl.cpl_page));
 
         obj = cl2osc(opg->ops_cl.cpl_obj);
-        spin_lock(&obj->oo_seatbelt);
-        list_add(&opg->ops_inflight, &obj->oo_inflight[crt]);
+        cfs_spin_lock(&obj->oo_seatbelt);
+        cfs_list_add(&opg->ops_inflight, &obj->oo_inflight[crt]);
         opg->ops_submitter = cfs_current();
-        spin_unlock(&obj->oo_seatbelt);
+        cfs_spin_unlock(&obj->oo_seatbelt);
 }
 
 static int osc_page_cache_add(const struct lu_env *env,
@@ -210,16 +216,16 @@ static int osc_page_cache_add(const struct lu_env *env,
 {
         struct osc_page   *opg = cl2osc_page(slice);
         struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
-        struct osc_io     *oio = osc_env_io(env);
         int result;
-        int brw_flags;
+        /* All cacheable IO is async-capable */
+        int brw_flags = OBD_BRW_ASYNC;
         int noquota = 0;
 
         LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 0));
         ENTRY;
 
         /* Set the OBD_BRW_SRVLOCK before the page is queued. */
-        brw_flags = osc_io_srvlock(oio) ? OBD_BRW_SRVLOCK : 0;
+        brw_flags |= opg->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
         if (!client_is_remote(osc_export(obj)) &&
             cfs_capable(CFS_CAP_SYS_RESOURCE)) {
                 brw_flags |= OBD_BRW_NOQUOTA;
@@ -229,7 +235,8 @@ static int osc_page_cache_add(const struct lu_env *env,
         osc_page_transfer_get(opg, "transfer\0cache");
         result = osc_queue_async_io(env, osc_export(obj), NULL, obj->oo_oinfo,
                                     &opg->ops_oap, OBD_BRW_WRITE | noquota,
-                                    0, 0, brw_flags, 0);
+                                    opg->ops_from, opg->ops_to - opg->ops_from,
+                                    brw_flags, 0);
         if (result != 0)
                 osc_page_transfer_put(env, opg);
         else
@@ -245,6 +252,48 @@ void osc_index2policy(ldlm_policy_data_t *policy, const struct cl_object *obj,
         policy->l_extent.end   = cl_offset(obj, end + 1) - 1;
 }
 
+static int osc_page_addref_lock(const struct lu_env *env,
+                                struct osc_page *opg,
+                                struct cl_lock *lock)
+{
+        struct osc_lock *olock;
+        int              rc;
+
+        LASSERT(opg->ops_lock == NULL);
+
+        olock = osc_lock_at(lock);
+        if (cfs_atomic_inc_return(&olock->ols_pageref) <= 0) {
+                cfs_atomic_dec(&olock->ols_pageref);
+                cl_lock_put(env, lock);
+                rc = 1;
+        } else {
+                opg->ops_lock = lock;
+                rc = 0;
+        }
+        return rc;
+}
+
+static void osc_page_putref_lock(const struct lu_env *env,
+                                 struct osc_page *opg)
+{
+        struct cl_lock  *lock = opg->ops_lock;
+        struct osc_lock *olock;
+
+        LASSERT(lock != NULL);
+        olock = osc_lock_at(lock);
+
+        cfs_atomic_dec(&olock->ols_pageref);
+        opg->ops_lock = NULL;
+
+        /*
+         * Note: usually this won't be the last reference of the lock, but if
+         * it is, then all the lock_put do is at most just freeing some memory,
+         * so it would be OK that caller is holding spinlocks.
+         */
+        LASSERT(cfs_atomic_read(&lock->cll_ref) > 1 || olock->ols_hold == 0);
+        cl_lock_put(env, lock);
+}
+
 static int osc_page_is_under_lock(const struct lu_env *env,
                                   const struct cl_page_slice *slice,
                                   struct cl_io *unused)
@@ -255,14 +304,34 @@ static int osc_page_is_under_lock(const struct lu_env *env,
         ENTRY;
         lock = cl_lock_at_page(env, slice->cpl_obj, slice->cpl_page,
                                NULL, 1, 0);
-        if (lock != NULL) {
-                cl_lock_put(env, lock);
+        if (lock != NULL &&
+            osc_page_addref_lock(env, cl2osc_page(slice), lock) == 0)
                 result = -EBUSY;
-        else
+        else
                 result = -ENODATA;
         RETURN(result);
 }
 
+static void osc_page_disown(const struct lu_env *env,
+                            const struct cl_page_slice *slice,
+                            struct cl_io *io)
+{
+        struct osc_page *opg = cl2osc_page(slice);
+
+        if (unlikely(opg->ops_lock))
+                osc_page_putref_lock(env, opg);
+}
+
+static void osc_page_completion_read(const struct lu_env *env,
+                                     const struct cl_page_slice *slice,
+                                     int ioret)
+{
+        struct osc_page *opg = cl2osc_page(slice);
+
+        if (likely(opg->ops_lock))
+                osc_page_putref_lock(env, opg);
+}
+
 static int osc_page_fail(const struct lu_env *env,
                          const struct cl_page_slice *slice,
                          struct cl_io *unused)
@@ -275,9 +344,9 @@ static int osc_page_fail(const struct lu_env *env,
 }
 
 
-static const char *osc_list(struct list_head *head)
+static const char *osc_list(cfs_list_t *head)
 {
-        return list_empty(head) ? "-" : "+";
+        return cfs_list_empty(head) ? "-" : "+";
 }
 
 static inline cfs_time_t osc_submit_duration(struct osc_page *opg)
@@ -294,11 +363,16 @@ static int osc_page_print(const struct lu_env *env,
 {
         struct osc_page       *opg = cl2osc_page(slice);
         struct osc_async_page *oap = &opg->ops_oap;
+        struct osc_object     *obj = cl2osc(slice->cpl_obj);
+        struct client_obd     *cli = &osc_export(obj)->exp_obd->u.cli;
+        struct lov_oinfo      *loi = obj->oo_oinfo;
 
         return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p: "
-                          "< %#x %d %u %s %s %s >"
-                          "< %llu %u %#x %#x %p %p %p %p %p >"
-                          "< %s %p %d %lu >\n",
+                          "1< %#x %d %u %s %s %s > "
+                          "2< "LPU64" %u %u %#x %#x | %p %p %p %p %p > "
+                          "3< %s %p %d %lu %d > "
+                          "4< %d %d %d %lu %s | %s %s %s %s > "
+                          "5< %s %s %s %s | %d %s %s | %d %s %s>\n",
                           opg,
                           /* 1 */
                           oap->oap_magic, oap->oap_cmd,
@@ -307,7 +381,7 @@ static int osc_page_print(const struct lu_env *env,
                           osc_list(&oap->oap_urgent_item),
                           osc_list(&oap->oap_rpc_item),
                           /* 2 */
-                          oap->oap_obj_off, oap->oap_page_off,
+                          oap->oap_obj_off, oap->oap_page_off, oap->oap_count,
                           oap->oap_async_flags, oap->oap_brw_flags,
                           oap->oap_request,
                           oap->oap_cli, oap->oap_loi, oap->oap_caller_ops,
@@ -315,7 +389,27 @@ static int osc_page_print(const struct lu_env *env,
                           /* 3 */
                           osc_list(&opg->ops_inflight),
                           opg->ops_submitter, opg->ops_transfer_pinned,
-                          osc_submit_duration(opg));
+                          osc_submit_duration(opg), opg->ops_srvlock,
+                          /* 4 */
+                          cli->cl_r_in_flight, cli->cl_w_in_flight,
+                          cli->cl_max_rpcs_in_flight,
+                          cli->cl_avail_grant,
+                          osc_list(&cli->cl_cache_waiters),
+                          osc_list(&cli->cl_loi_ready_list),
+                          osc_list(&cli->cl_loi_hp_ready_list),
+                          osc_list(&cli->cl_loi_write_list),
+                          osc_list(&cli->cl_loi_read_list),
+                          /* 5 */
+                          osc_list(&loi->loi_ready_item),
+                          osc_list(&loi->loi_hp_ready_item),
+                          osc_list(&loi->loi_write_item),
+                          osc_list(&loi->loi_read_item),
+                          loi->loi_read_lop.lop_num_pending,
+                          osc_list(&loi->loi_read_lop.lop_pending),
+                          osc_list(&loi->loi_read_lop.lop_urgent),
+                          loi->loi_write_lop.lop_num_pending,
+                          osc_list(&loi->loi_write_lop.lop_pending),
+                          osc_list(&loi->loi_write_lop.lop_urgent));
 }
 
 static void osc_page_delete(const struct lu_env *env,
@@ -337,9 +431,9 @@ static void osc_page_delete(const struct lu_env *env,
                               "Trying to teardown failed: %d\n", rc);
                 LASSERT(0);
         }
-        spin_lock(&obj->oo_seatbelt);
-        list_del_init(&opg->ops_inflight);
-        spin_unlock(&obj->oo_seatbelt);
+        cfs_spin_lock(&obj->oo_seatbelt);
+        cfs_list_del_init(&opg->ops_inflight);
+        cfs_spin_unlock(&obj->oo_seatbelt);
         EXIT;
 }
 
@@ -353,9 +447,9 @@ void osc_page_clip(const struct lu_env *env, const struct cl_page_slice *slice,
 
         opg->ops_from = from;
         opg->ops_to   = to;
-        spin_lock(&oap->oap_lock);
+        cfs_spin_lock(&oap->oap_lock);
         oap->oap_async_flags |= ASYNC_COUNT_STABLE;
-        spin_unlock(&oap->oap_lock);
+        cfs_spin_unlock(&oap->oap_lock);
 }
 
 static int osc_page_cancel(const struct lu_env *env,
@@ -383,12 +477,14 @@ static const struct cl_page_operations osc_page_ops = {
         .cpo_print         = osc_page_print,
         .cpo_delete        = osc_page_delete,
         .cpo_is_under_lock = osc_page_is_under_lock,
+        .cpo_disown        = osc_page_disown,
         .io = {
                 [CRT_READ] = {
-                        .cpo_cache_add = osc_page_fail
+                        .cpo_cache_add  = osc_page_fail,
+                        .cpo_completion = osc_page_completion_read
                 },
                 [CRT_WRITE] = {
-                        .cpo_cache_add = osc_page_cache_add
+                        .cpo_cache_add  = osc_page_cache_add
                 }
         },
         .cpo_clip           = osc_page_clip,
@@ -473,19 +569,19 @@ static int osc_completion(const struct lu_env *env,
         LASSERT(page->cp_req == NULL);
 
         /* As the transfer for this page is being done, clear the flags */
-        spin_lock(&oap->oap_lock);
+        cfs_spin_lock(&oap->oap_lock);
         oap->oap_async_flags = 0;
-        spin_unlock(&oap->oap_lock);
+        cfs_spin_unlock(&oap->oap_lock);
 
         crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE;
         /* Clear opg->ops_transfer_pinned before VM lock is released. */
         opg->ops_transfer_pinned = 0;
 
-        spin_lock(&obj->oo_seatbelt);
+        cfs_spin_lock(&obj->oo_seatbelt);
         LASSERT(opg->ops_submitter != NULL);
-        LASSERT(!list_empty(&opg->ops_inflight));
-        list_del_init(&opg->ops_inflight);
-        spin_unlock(&obj->oo_seatbelt);
+        LASSERT(!cfs_list_empty(&opg->ops_inflight));
+        cfs_list_del_init(&opg->ops_inflight);
+        cfs_spin_unlock(&obj->oo_seatbelt);
 
         opg->ops_submit_time = 0;
 
@@ -495,7 +591,7 @@ static int osc_completion(const struct lu_env *env,
         if (rc == 0 && oap->oap_brw_flags & OBD_BRW_SRVLOCK) {
                 struct lu_device *ld    = opg->ops_cl.cpl_obj->co_lu.lo_dev;
                 struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
-                int bytes = opg->ops_to - opg->ops_from;
+                int bytes = oap->oap_count;
 
                 if (crt == CRT_READ)
                         stats->os_lockless_reads += bytes;
@@ -544,9 +640,12 @@ struct cl_page *osc_page_init(const struct lu_env *env,
                                              cl_offset(obj, page->cp_index),
                                              &osc_async_page_ops,
                                              opg, (void **)&oap, 1, NULL);
-                if (result == 0)
+                if (result == 0) {
+                        struct osc_io *oio = osc_env_io(env);
+                        opg->ops_srvlock = osc_io_srvlock(oio);
                         cl_page_slice_add(page, &opg->ops_cl, obj,
                                           &osc_page_ops);
+                }
                 /*
                  * Cannot assert osc_page_protected() here as read-ahead
                  * creates temporary pages outside of a lock.
@@ -578,7 +677,7 @@ void osc_io_submit_page(const struct lu_env *env,
         oap->oap_page_off   = opg->ops_from;
         oap->oap_count      = opg->ops_to - opg->ops_from;
         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
-        if (libcfs_memory_pressure_get())
+        if (cfs_memory_pressure_get())
                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
         oap->oap_brw_flags |= OBD_BRW_SYNC;
         if (osc_io_srvlock(oio))
@@ -596,9 +695,9 @@ void osc_io_submit_page(const struct lu_env *env,
         else if (!(oap->oap_brw_page.flag & OBD_BRW_FROM_GRANT))
                 osc_enter_cache_try(env, cli, oap->oap_loi, oap, 1);
 
-        spin_lock(&oap->oap_lock);
+        cfs_spin_lock(&oap->oap_lock);
         oap->oap_async_flags |= OSC_FLAGS | flags;
-        spin_unlock(&oap->oap_lock);
+        cfs_spin_unlock(&oap->oap_lock);
 
         osc_oap_to_pending(oap);
         osc_page_transfer_get(opg, "transfer\0imm");