Whamcloud - gitweb
LU-4856 misc: Reduce exposure to overflow on page counters.
[fs/lustre-release.git] / lustre / osc / osc_cache.c
index 2b49d0c..a2779d1 100644 (file)
@@ -426,7 +426,7 @@ static void osc_extent_insert(struct osc_object *obj, struct osc_extent *ext)
                else if (ext->oe_start > tmp->oe_end)
                        n = &(*n)->rb_right;
                else
-                       EASSERTF(0, tmp, EXTSTR, EXTPARA(ext));
+                       EASSERTF(0, tmp, EXTSTR"\n", EXTPARA(ext));
        }
        rb_link_node(&ext->oe_node, parent, n);
        rb_insert_color(&ext->oe_node, &obj->oo_root);
@@ -639,7 +639,8 @@ struct osc_extent *osc_extent_find(const struct lu_env *env,
        /* grants has been allocated by caller */
        LASSERTF(*grants >= chunksize + cli->cl_extent_tax,
                 "%u/%u/%u.\n", *grants, chunksize, cli->cl_extent_tax);
-       LASSERTF((max_end - cur->oe_start) < max_pages, EXTSTR, EXTPARA(cur));
+       LASSERTF((max_end - cur->oe_start) < max_pages, EXTSTR"\n",
+                EXTPARA(cur));
 
 restart:
        osc_object_lock(obj);
@@ -657,7 +658,7 @@ restart:
                /* if covering by different locks, no chance to match */
                if (lock != ext->oe_osclock) {
                        EASSERTF(!overlapped(ext, cur), ext,
-                                EXTSTR, EXTPARA(cur));
+                                EXTSTR"\n", EXTPARA(cur));
 
                        ext = next_extent(ext);
                        continue;
@@ -678,7 +679,7 @@ restart:
                         * full contain. */
                        EASSERTF((ext->oe_start <= cur->oe_start &&
                                  ext->oe_end >= cur->oe_end),
-                                ext, EXTSTR, EXTPARA(cur));
+                                ext, EXTSTR"\n", EXTPARA(cur));
 
                        if (ext->oe_state > OES_CACHE || ext->oe_fsync_wait) {
                                /* for simplicity, we wait for this extent to
@@ -1318,16 +1319,16 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
 
 #define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do {                    \
        struct client_obd *__tmp = (cli);                               \
-       CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %d/%d "    \
+       CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %ld/%lu "  \
               "dropped: %ld avail: %ld, reserved: %ld, flight: %d }"   \
-              "lru {in list: %d, left: %d, waiters: %d }" fmt,         \
+              "lru {in list: %ld, left: %ld, waiters: %d }" fmt,       \
               __tmp->cl_import->imp_obd->obd_name,                     \
               __tmp->cl_dirty_pages, __tmp->cl_dirty_max_pages,        \
-              atomic_read(&obd_dirty_pages), obd_max_dirty_pages,      \
+              atomic_long_read(&obd_dirty_pages), obd_max_dirty_pages, \
               __tmp->cl_lost_grant, __tmp->cl_avail_grant,             \
               __tmp->cl_reserved_grant, __tmp->cl_w_in_flight,         \
-              atomic_read(&__tmp->cl_lru_in_list),                     \
-              atomic_read(&__tmp->cl_lru_busy),                        \
+              atomic_long_read(&__tmp->cl_lru_in_list),                \
+              atomic_long_read(&__tmp->cl_lru_busy),                   \
               atomic_read(&__tmp->cl_lru_shrinkers), ##args);          \
 } while (0)
 
@@ -1337,7 +1338,7 @@ static void osc_consume_write_grant(struct client_obd *cli,
 {
        assert_spin_locked(&cli->cl_loi_list_lock.lock);
        LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
-       atomic_inc(&obd_dirty_pages);
+       atomic_long_inc(&obd_dirty_pages);
        cli->cl_dirty_pages++;
        pga->flag |= OBD_BRW_FROM_GRANT;
        CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
@@ -1359,11 +1360,11 @@ static void osc_release_write_grant(struct client_obd *cli,
        }
 
        pga->flag &= ~OBD_BRW_FROM_GRANT;
-       atomic_dec(&obd_dirty_pages);
+       atomic_long_dec(&obd_dirty_pages);
        cli->cl_dirty_pages--;
        if (pga->flag & OBD_BRW_NOCACHE) {
                pga->flag &= ~OBD_BRW_NOCACHE;
-               atomic_dec(&obd_dirty_transit_pages);
+               atomic_long_dec(&obd_dirty_transit_pages);
                cli->cl_dirty_transit--;
        }
        EXIT;
@@ -1432,7 +1433,7 @@ static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
        int grant = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
 
        client_obd_list_lock(&cli->cl_loi_list_lock);
-       atomic_sub(nr_pages, &obd_dirty_pages);
+       atomic_long_sub(nr_pages, &obd_dirty_pages);
        cli->cl_dirty_pages -= nr_pages;
        cli->cl_lost_grant += lost_grant;
        if (cli->cl_avail_grant < grant && cli->cl_lost_grant >= grant) {
@@ -1469,18 +1470,18 @@ static int osc_enter_cache_try(struct client_obd *cli,
 {
        int rc;
 
-       OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes);
+       OSC_DUMP_GRANT(D_CACHE, cli, "need:%d", bytes);
 
        rc = osc_reserve_grant(cli, bytes);
        if (rc < 0)
                return 0;
 
        if (cli->cl_dirty_pages < cli->cl_dirty_max_pages &&
-           1 + atomic_read(&obd_dirty_pages) <= obd_max_dirty_pages) {
+           1 + atomic_long_read(&obd_dirty_pages) <= obd_max_dirty_pages) {
                osc_consume_write_grant(cli, &oap->oap_brw_page);
                if (transient) {
                        cli->cl_dirty_transit++;
-                       atomic_inc(&obd_dirty_transit_pages);
+                       atomic_long_inc(&obd_dirty_transit_pages);
                        oap->oap_brw_flags |= OBD_BRW_NOCACHE;
                }
                rc = 1;
@@ -1510,15 +1511,17 @@ static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
 static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
                           struct osc_async_page *oap, int bytes)
 {
-       struct osc_object *osc = oap->oap_obj;
-       struct lov_oinfo  *loi = osc->oo_oinfo;
-       struct osc_cache_waiter ocw;
-       struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL,
-                                                 LWI_ON_SIGNAL_NOOP, NULL);
-       int rc = -EDQUOT;
+       struct osc_object       *osc = oap->oap_obj;
+       struct lov_oinfo        *loi = osc->oo_oinfo;
+       struct osc_cache_waiter  ocw;
+       struct l_wait_info       lwi;
+       int                      rc = -EDQUOT;
        ENTRY;
 
-       OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes);
+       lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(AT_OFF ? obd_timeout : at_max),
+                              NULL, LWI_ON_SIGNAL_NOOP, NULL);
+
+       OSC_DUMP_GRANT(D_CACHE, cli, "need:%d", bytes);
 
        client_obd_list_lock(&cli->cl_loi_list_lock);
 
@@ -1526,12 +1529,16 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
         * of queued writes and create a discontiguous rpc stream */
        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
            cli->cl_dirty_max_pages == 0 ||
-           cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
+           cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) {
+               OSC_DUMP_GRANT(D_CACHE, cli, "forced sync i/o");
                GOTO(out, rc = -EDQUOT);
+       }
 
        /* Hopefully normal case - cache space and write credits available */
-       if (osc_enter_cache_try(cli, oap, bytes, 0))
+       if (osc_enter_cache_try(cli, oap, bytes, 0)) {
+               OSC_DUMP_GRANT(D_CACHE, cli, "granted from cache");
                GOTO(out, rc = 0);
+       }
 
        /* We can get here for two reasons: too many dirty pages in cache, or
         * run out of grants. In both cases we should write dirty pages out.
@@ -1556,42 +1563,52 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
 
                client_obd_list_lock(&cli->cl_loi_list_lock);
 
-               /* l_wait_event is interrupted by signal, or timed out */
                if (rc < 0) {
-                       switch (rc) {
-                       case -ETIMEDOUT:
-                               OSC_DUMP_GRANT(D_ERROR, cli,
-                                               "try to reserve %d.\n", bytes);
-                               osc_extent_tree_dump(D_ERROR, osc);
-                               rc = -EDQUOT;
-                               break;
-                       case -EINTR:
-                               /* Ensures restartability - LU-3581 */
-                               rc = -ERESTARTSYS;
-                               break;
-                       default:
-                               CDEBUG(D_CACHE, "%s: event for cache space @"
-                                      " %p never arrived due to %d\n",
-                                      cli->cl_import->imp_obd->obd_name,
-                                      &ocw, rc);
-                               break;
-                       }
+                       /* l_wait_event is interrupted by signal or timed out */
                        list_del_init(&ocw.ocw_entry);
-                       GOTO(out, rc);
+                       break;
                }
-
                LASSERT(list_empty(&ocw.ocw_entry));
                rc = ocw.ocw_rc;
 
                if (rc != -EDQUOT)
-                       GOTO(out, rc);
-               if (osc_enter_cache_try(cli, oap, bytes, 0))
-                       GOTO(out, rc = 0);
+                       break;
+               if (osc_enter_cache_try(cli, oap, bytes, 0)) {
+                       rc = 0;
+                       break;
+               }
+       }
+
+       switch (rc) {
+       case 0:
+               OSC_DUMP_GRANT(D_CACHE, cli, "finally got grant space");
+               break;
+       case -ETIMEDOUT:
+               OSC_DUMP_GRANT(D_CACHE, cli, "timeout, fall back to sync i/o");
+               osc_extent_tree_dump(D_CACHE, osc);
+               /* fall back to synchronous I/O */
+               rc = -EDQUOT;
+               break;
+       case -EINTR:
+               /* Ensures restartability - LU-3581 */
+               OSC_DUMP_GRANT(D_CACHE, cli, "interrupted");
+               rc = -ERESTARTSYS;
+               break;
+       case -EDQUOT:
+               OSC_DUMP_GRANT(D_CACHE, cli, "no grant space, fall back to sync"
+                              " i/o");
+               break;
+       default:
+               CDEBUG(D_CACHE, "%s: event for cache space @ %p never arrived "
+                      "due to %d, fall back to sync i/o\n",
+                      cli->cl_import->imp_obd->obd_name, &ocw, rc);
+               /* fall back to synchronous I/O */
+               rc = -EDQUOT;
+               break;
        }
        EXIT;
 out:
        client_obd_list_unlock(&cli->cl_loi_list_lock);
-       OSC_DUMP_GRANT(D_CACHE, cli, "returned %d.\n", rc);
        RETURN(rc);
 }
 
@@ -1609,17 +1626,17 @@ void osc_wake_cache_waiters(struct client_obd *cli)
                ocw->ocw_rc = -EDQUOT;
                /* we can't dirty more */
                if ((cli->cl_dirty_pages  >= cli->cl_dirty_max_pages) ||
-                   (1 + atomic_read(&obd_dirty_pages) > obd_max_dirty_pages)) {
+                   (1 + atomic_long_read(&obd_dirty_pages) >
+                    obd_max_dirty_pages)) {
                        CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
-                              "osc max %ld, sys max %d\n", cli->cl_dirty_pages,
-                              cli->cl_dirty_max_pages, obd_max_dirty_pages);
+                              "osc max %ld, sys max %ld\n",
+                              cli->cl_dirty_pages, cli->cl_dirty_max_pages,
+                              obd_max_dirty_pages);
                        goto wakeup;
                }
 
-               ocw->ocw_rc = 0;
-               if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0))
-                       ocw->ocw_rc = -EDQUOT;
-
+               if (osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0))
+                       ocw->ocw_rc = 0;
 wakeup:
                CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n",
                       ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc);
@@ -1826,6 +1843,9 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
                                    int *pc, unsigned int *max_pages)
 {
        struct osc_extent *tmp;
+       struct osc_async_page *oap = list_first_entry(&ext->oe_pages,
+                                                     struct osc_async_page,
+                                                     oap_pending_item);
        ENTRY;
 
        EASSERT((ext->oe_state == OES_CACHE || ext->oe_state == OES_LOCK_DONE),
@@ -1836,6 +1856,9 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
                RETURN(0);
 
        list_for_each_entry(tmp, rpclist, oe_link) {
+               struct osc_async_page *oap2;
+               oap2 = list_first_entry(&tmp->oe_pages, struct osc_async_page,
+                                       oap_pending_item);
                EASSERT(tmp->oe_owner == current, tmp);
 #if 0
                if (overlapped(tmp, ext)) {
@@ -1843,6 +1866,11 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
                        EASSERT(0, ext);
                }
 #endif
+               if (oap2cl_page(oap)->cp_type != oap2cl_page(oap2)->cp_type) {
+                       CDEBUG(D_CACHE, "Do not permit different type of IO"
+                                       " for a same RPC\n");
+                       RETURN(0);
+               }
 
                if (tmp->oe_srvlock != ext->oe_srvlock ||
                    !tmp->oe_grants != !ext->oe_grants)
@@ -1936,6 +1964,7 @@ static int get_write_extents(struct osc_object *obj, struct list_head *rpclist)
 static int
 osc_send_write_rpc(const struct lu_env *env, struct client_obd *cli,
                   struct osc_object *osc, pdl_policy_t pol)
+__must_hold(osc)
 {
        struct list_head   rpclist = LIST_HEAD_INIT(rpclist);
        struct osc_extent *ext;
@@ -2009,6 +2038,7 @@ osc_send_write_rpc(const struct lu_env *env, struct client_obd *cli,
 static int
 osc_send_read_rpc(const struct lu_env *env, struct client_obd *cli,
                  struct osc_object *osc, pdl_policy_t pol)
+__must_hold(osc)
 {
        struct osc_extent *ext;
        struct osc_extent *next;
@@ -2088,6 +2118,7 @@ static struct osc_object *osc_next_obj(struct client_obd *cli)
 /* called with the loi list lock held */
 static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
                           pdl_policy_t pol)
+__must_hold(&cli->cl_loi_list_lock)
 {
        struct osc_object *osc;
        int rc = 0;