Whamcloud - gitweb
LU-904 ptlrpc: redo io on -EINPROGRESS
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 5429b14..995391d 100644 (file)
@@ -28,9 +28,8 @@
 /*
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
- */
-/*
- * Copyright (c) 2011 Whamcloud, Inc.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -861,7 +860,16 @@ static void osc_release_write_grant(struct client_obd *cli,
                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
         }
         if (!sent) {
-                cli->cl_lost_grant += CFS_PAGE_SIZE;
+                /* Reclaim grant from truncated pages. This is used to solve
+                 * write-truncate and grant all gone(to lost_grant) problem.
+                 * For a vfs write this problem can be easily solved by a sync
+                 * write, however, this is not an option for page_mkwrite()
+                 * because grant has to be allocated before a page becomes
+                 * dirty. */
+                if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE)
+                        cli->cl_avail_grant += CFS_PAGE_SIZE;
+                else
+                        cli->cl_lost_grant += CFS_PAGE_SIZE;
                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
@@ -925,6 +933,9 @@ void osc_wake_cache_waiters(struct client_obd *cli)
                                                 &ocw->ocw_oap->oap_brw_page);
                 }
 
+                CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n",
+                       ocw, ocw->ocw_oap, cli->cl_avail_grant);
+
                 cfs_waitq_signal(&ocw->ocw_waitq);
         }
 
@@ -1267,7 +1278,7 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
                 cksum++;
 
-        return cksum;
+        return fini_checksum(cksum, cksum_type);
 }
 
 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
@@ -1409,7 +1420,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
 
         /* size[REQ_REC_OFF] still sizeof (*body) */
         if (opc == OST_WRITE) {
-                if (unlikely(cli->cl_checksum) &&
+                if (cli->cl_checksum &&
                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
                         /* store cl_cksum_type in a local variable since
                          * it can be changed via lprocfs */
@@ -1440,7 +1451,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
                                      sizeof(__u32) * niocount);
         } else {
-                if (unlikely(cli->cl_checksum) &&
+                if (cli->cl_checksum &&
                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
                                 body->oa.o_flags = 0;
@@ -1685,12 +1696,13 @@ static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
         struct ptlrpc_request *req;
         int                    rc;
         cfs_waitq_t            waitq;
-        int                    resends = 0;
+        int                    generation, resends = 0;
         struct l_wait_info     lwi;
 
         ENTRY;
 
         cfs_waitq_init(&waitq);
+        generation = exp->exp_obd->u.cli.cl_import->imp_generation;
 
 restart_bulk:
         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
@@ -1698,6 +1710,11 @@ restart_bulk:
         if (rc != 0)
                 return (rc);
 
+        if (resends) {
+                req->rq_generation_set = 1;
+                req->rq_import_generation = generation;
+        }
+
         rc = ptlrpc_queue_wait(req);
 
         if (rc == -ETIMEDOUT && req->rq_resend) {
@@ -1709,19 +1726,34 @@ restart_bulk:
         rc = osc_brw_fini_request(req, rc);
 
         ptlrpc_req_finished(req);
+        /* When server return -EINPROGRESS, client should always retry
+         * regardless of the number of times the bulk was resent already.*/
         if (osc_recoverable_error(rc)) {
                 resends++;
-                if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
-                        CERROR("too many resend retries, returning error\n");
-                        RETURN(-EIO);
+                if (rc != -EINPROGRESS &&
+                    !client_should_resend(resends, &exp->exp_obd->u.cli)) {
+                        CERROR("%s: too many resend retries for object: "
+                               ""LPU64":"LPU64", rc = %d.\n",
+                               exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
+                        goto out;
+                }
+                if (generation !=
+                    exp->exp_obd->u.cli.cl_import->imp_generation) {
+                        CDEBUG(D_HA, "%s: resend cross eviction for object: "
+                               ""LPU64":"LPU64", rc = %d.\n",
+                               exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
+                        goto out;
                 }
 
-                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
+                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
+                                       NULL);
                 l_wait_event(waitq, 0, &lwi);
 
                 goto restart_bulk;
         }
-
+out:
+        if (rc == -EAGAIN || rc == -EINPROGRESS)
+                rc = -EIO;
         RETURN (rc);
 }
 
@@ -1735,11 +1767,6 @@ int osc_brw_redo_request(struct ptlrpc_request *request,
         int rc = 0;
         ENTRY;
 
-        if (!client_should_resend(aa->aa_resends, aa->aa_cli)) {
-                CERROR("too many resent retries, returning error\n");
-                RETURN(-EIO);
-        }
-
         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
 
         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
@@ -1771,6 +1798,8 @@ int osc_brw_redo_request(struct ptlrpc_request *request,
         new_req->rq_interpret_reply = request->rq_interpret_reply;
         new_req->rq_async_args = request->rq_async_args;
         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
+        new_req->rq_generation_set = 1;
+        new_req->rq_import_generation = request->rq_import_generation;
 
         new_aa = ptlrpc_req_async_args(new_req);
 
@@ -2221,10 +2250,29 @@ static int brw_interpret(const struct lu_env *env,
 
         rc = osc_brw_fini_request(req, rc);
         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
+        /* When server return -EINPROGRESS, client should always retry
+         * regardless of the number of times the bulk was resent already. */
         if (osc_recoverable_error(rc)) {
-                rc = osc_brw_redo_request(req, aa);
+                if (req->rq_import_generation !=
+                    req->rq_import->imp_generation) {
+                        CDEBUG(D_HA, "%s: resend cross eviction for object: "
+                               ""LPU64":"LPU64", rc = %d.\n",
+                               req->rq_import->imp_obd->obd_name,
+                               aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
+                } else if (rc == -EINPROGRESS ||
+                    client_should_resend(aa->aa_resends, aa->aa_cli)) {
+                        rc = osc_brw_redo_request(req, aa);
+                } else {
+                        CERROR("%s: too many resent retries for object: "
+                               ""LPU64":"LPU64", rc = %d.\n",
+                               req->rq_import->imp_obd->obd_name,
+                               aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
+                }
+
                 if (rc == 0)
                         RETURN(0);
+                else if (rc == -EAGAIN || rc == -EINPROGRESS)
+                        rc = -EIO;
         }
 
         if (aa->aa_ocapa) {
@@ -2432,10 +2480,14 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
          * with ASYNC_HP. We have to send out them as soon as possible. */
         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
                 if (oap->oap_async_flags & ASYNC_HP)
-                        cfs_list_move(&oap->oap_pending_item, &lop->lop_pending);
+                        cfs_list_move(&oap->oap_pending_item, &rpc_list);
+                else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
+                        /* only do this for writeback pages. */
+                        cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
                 if (++page_count >= cli->cl_max_pages_per_rpc)
                         break;
         }
+        cfs_list_splice_init(&rpc_list, &lop->lop_pending);
         page_count = 0;
 
         /* first we find the pages we're allowed to work with */
@@ -2569,8 +2621,6 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
                         break;
         }
 
-        osc_wake_cache_waiters(cli);
-
         loi_list_maint(cli, loi);
 
         client_obd_list_unlock(&cli->cl_loi_list_lock);
@@ -2811,24 +2861,6 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
         osc_check_rpcs0(env, cli, 0);
 }
 
-/* we're trying to queue a page in the osc so we're subject to the
- * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
- * If the osc's queued pages are already at that limit, then we want to sleep
- * until there is space in the osc's queue for us.  We also may be waiting for
- * write credits from the OST if there are RPCs in flight that may return some
- * before we fall back to sync writes.
- *
- * We need this know our allocation was granted in the presence of signals */
-static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
-{
-        int rc;
-        ENTRY;
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-        rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
-        RETURN(rc);
-};
-
 /**
  * Non-blocking version of osc_enter_cache() that consumes grant only when it
  * is available.
@@ -2859,7 +2891,7 @@ static int osc_enter_cache(const struct lu_env *env,
 {
         struct osc_cache_waiter ocw;
         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
-
+        int rc = -EDQUOT;
         ENTRY;
 
         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
@@ -2880,35 +2912,38 @@ static int osc_enter_cache(const struct lu_env *env,
             osc_enter_cache_try(env, cli, loi, oap, 0))
                 RETURN(0);
 
-        /* It is safe to block as a cache waiter as long as there is grant
-         * space available or the hope of additional grant being returned
-         * when an in flight write completes.  Using the write back cache
-         * if possible is preferable to sending the data synchronously
-         * because write pages can then be merged in to large requests.
-         * The addition of this cache waiter will causing pending write
-         * pages to be sent immediately. */
-        if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
+        /* We can get here for two reasons: too many dirty pages in cache, or
+         * run out of grants. In both cases we should write dirty pages out.
+         * Adding a cache waiter will trigger urgent write-out no matter what
+         * RPC size will be.
+         * The exiting condition is no avail grants and no dirty pages caching,
+         * that really means there is no space on the OST. */
+        cfs_waitq_init(&ocw.ocw_waitq);
+        ocw.ocw_oap = oap;
+        while (cli->cl_dirty > 0) {
                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
-                cfs_waitq_init(&ocw.ocw_waitq);
-                ocw.ocw_oap = oap;
                 ocw.ocw_rc = 0;
 
                 loi_list_maint(cli, loi);
                 osc_check_rpcs(env, cli);
                 client_obd_list_unlock(&cli->cl_loi_list_lock);
 
-                CDEBUG(D_CACHE, "sleeping for cache space\n");
-                l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
+                CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
+                       cli->cl_import->imp_obd->obd_name, &ocw, oap);
+
+                rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry), &lwi);
 
                 client_obd_list_lock(&cli->cl_loi_list_lock);
-                if (!cfs_list_empty(&ocw.ocw_entry)) {
-                        cfs_list_del(&ocw.ocw_entry);
-                        RETURN(-EINTR);
-                }
-                RETURN(ocw.ocw_rc);
+                cfs_list_del_init(&ocw.ocw_entry);
+                if (rc < 0)
+                        break;
+
+                rc = ocw.ocw_rc;
+                if (rc != -EDQUOT)
+                        break;
         }
 
-        RETURN(-EDQUOT);
+        RETURN(rc);
 }
 
 
@@ -3328,7 +3363,7 @@ void osc_update_enqueue(struct lustre_handle *lov_lockhp,
 
         if (lock != NULL) {
                 if (rc != ELDLM_OK)
-                        ldlm_lock_fail_match(lock, rc);
+                        ldlm_lock_fail_match(lock);
 
                 LDLM_LOCK_PUT(lock);
         }
@@ -4224,7 +4259,7 @@ static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
 
         LASSERT(olg == &obd->obd_olg);
 
-        cfs_mutex_down(&olg->olg_cat_processing);
+        cfs_mutex_lock(&olg->olg_cat_processing);
         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
         if (rc) {
                 CERROR("rc: %d\n", rc);
@@ -4248,7 +4283,7 @@ static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
         }
 
  out:
-        cfs_mutex_up(&olg->olg_cat_processing);
+        cfs_mutex_unlock(&olg->olg_cat_processing);
 
         return rc;
 }
@@ -4507,7 +4542,6 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                                             ptlrpc_add_rqs_to_pool);
 
                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
-                cfs_sema_init(&cli->cl_grant_sem, 1);
 
                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
         }