Whamcloud - gitweb
LU-884 osc: async osc_check_rpcs()
authorJinshan Xiong <jinshan.xiong@whamcloud.com>
Wed, 4 Jan 2012 18:28:29 +0000 (10:28 -0800)
committerOleg Drokin <green@whamcloud.com>
Tue, 10 Jan 2012 18:17:46 +0000 (13:17 -0500)
Add a new "async" parameter to osc_check_rpcs(); if it is called with
async, it will compose a fake ptlrpc_request so that RPCs will be
composed and issued in ptlrpcd context.

Signed-off-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Change-Id: I1c2f8ae43da8146428c474f17ddf3dc23a2df9ef
Reviewed-on: http://review.whamcloud.com/1825
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/obd.h
lustre/osc/osc_internal.h
lustre/osc/osc_request.c
lustre/ptlrpc/pack_generic.c
lustre/tests/sanity.sh

index 58fdbd7..03e5921 100644 (file)
@@ -550,6 +550,9 @@ struct client_obd {
         struct lu_client_seq    *cl_seq;
 
         cfs_atomic_t             cl_resends; /* resend count */
         struct lu_client_seq    *cl_seq;
 
         cfs_atomic_t             cl_resends; /* resend count */
+
+        /* ptlrpc work for writeback in ptlrpcd context */
+        void                    *cl_writeback_work;
 };
 #define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid)
 
 };
 #define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid)
 
index 1236240..2648c5c 100644 (file)
@@ -157,7 +157,6 @@ void osc_oap_to_pending(struct osc_async_page *oap);
 int  osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap);
 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi);
 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli);
 int  osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap);
 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi);
 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli);
-
 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
                        struct lov_stripe_md *lsm, struct lov_oinfo *loi,
                        struct osc_async_page *oap, int cmd, int off,
 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
                        struct lov_stripe_md *lsm, struct lov_oinfo *loi,
                        struct osc_async_page *oap, int cmd, int off,
index 1f25a97..eae47c2 100644 (file)
@@ -69,6 +69,8 @@
 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
 static int brw_interpret(const struct lu_env *env,
                          struct ptlrpc_request *req, void *data, int rc);
 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
 static int brw_interpret(const struct lu_env *env,
                          struct ptlrpc_request *req, void *data, int rc);
+static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
+                            int ptlrpc);
 int osc_cleanup(struct obd_device *obd);
 
 /* Pack OSC object metadata for disk storage (LE byte order). */
 int osc_cleanup(struct obd_device *obd);
 
 /* Pack OSC object metadata for disk storage (LE byte order). */
@@ -1965,7 +1967,6 @@ static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
                          int cmd)
 {
 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
                          int cmd)
 {
-        int optimal;
         ENTRY;
 
         if (lop->lop_num_pending == 0)
         ENTRY;
 
         if (lop->lop_num_pending == 0)
@@ -1986,8 +1987,7 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
                 RETURN(1);
         }
                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
                 RETURN(1);
         }
-        /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
-        optimal = cli->cl_max_pages_per_rpc;
+
         if (cmd & OBD_BRW_WRITE) {
                 /* trigger a write rpc stream as long as there are dirtiers
                  * waiting for space.  as they're waiting, they're not going to
         if (cmd & OBD_BRW_WRITE) {
                 /* trigger a write rpc stream as long as there are dirtiers
                  * waiting for space.  as they're waiting, they're not going to
@@ -1996,13 +1996,8 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
                         RETURN(1);
                 }
                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
                         RETURN(1);
                 }
-                /* +16 to avoid triggering rpcs that would want to include pages
-                 * that are being queued but which can't be made ready until
-                 * the queuer finishes with the page. this is a wart for
-                 * llite::commit_write() */
-                optimal += 16;
         }
         }
-        if (lop->lop_num_pending >= optimal)
+        if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
                 RETURN(1);
 
         RETURN(0);
                 RETURN(1);
 
         RETURN(0);
@@ -2204,6 +2199,18 @@ static void osc_ap_completion(const struct lu_env *env,
         EXIT;
 }
 
         EXIT;
 }
 
+static int brw_queue_work(const struct lu_env *env, void *data)
+{
+        struct client_obd *cli = data;
+
+        CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
+
+        client_obd_list_lock(&cli->cl_loi_list_lock);
+        osc_check_rpcs0(env, cli, 1);
+        client_obd_list_unlock(&cli->cl_loi_list_lock);
+        RETURN(0);
+}
+
 static int brw_interpret(const struct lu_env *env,
                          struct ptlrpc_request *req, void *data, int rc)
 {
 static int brw_interpret(const struct lu_env *env,
                          struct ptlrpc_request *req, void *data, int rc)
 {
@@ -2226,7 +2233,6 @@ static int brw_interpret(const struct lu_env *env,
         }
 
         cli = aa->aa_cli;
         }
 
         cli = aa->aa_cli;
-
         client_obd_list_lock(&cli->cl_loi_list_lock);
 
         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
         client_obd_list_lock(&cli->cl_loi_list_lock);
 
         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
@@ -2254,8 +2260,9 @@ static int brw_interpret(const struct lu_env *env,
                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
         }
         osc_wake_cache_waiters(cli);
                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
         }
         osc_wake_cache_waiters(cli);
-        osc_check_rpcs(env, cli);
+        osc_check_rpcs0(env, cli, 1);
         client_obd_list_unlock(&cli->cl_loi_list_lock);
         client_obd_list_unlock(&cli->cl_loi_list_lock);
+
         if (!async)
                 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
                                   req->rq_bulk->bd_nob_transferred);
         if (!async)
                 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
                                   req->rq_bulk->bd_nob_transferred);
@@ -2404,8 +2411,8 @@ out:
  */
 static int
 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
  */
 static int
 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
-                 struct lov_oinfo *loi,
-                 int cmd, struct loi_oap_pages *lop)
+                 struct lov_oinfo *loi, int cmd,
+                 struct loi_oap_pages *lop, pdl_policy_t pol)
 {
         struct ptlrpc_request *req;
         obd_count page_count = 0;
 {
         struct ptlrpc_request *req;
         obd_count page_count = 0;
@@ -2639,7 +2646,7 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
          *      single ptlrpcd thread cannot process in time. So more ptlrpcd
          *      threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
          */
          *      single ptlrpcd thread cannot process in time. So more ptlrpcd
          *      threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
          */
-        ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+        ptlrpcd_add_req(req, pol, -1);
         RETURN(1);
 }
 
         RETURN(1);
 }
 
@@ -2713,12 +2720,15 @@ static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
 }
 
 /* called with the loi list lock held */
 }
 
 /* called with the loi list lock held */
-void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
+static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
 {
         struct lov_oinfo *loi;
         int rc = 0, race_counter = 0;
 {
         struct lov_oinfo *loi;
         int rc = 0, race_counter = 0;
+        pdl_policy_t pol;
         ENTRY;
 
         ENTRY;
 
+        pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
+
         while ((loi = osc_next_loi(cli)) != NULL) {
                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
 
         while ((loi = osc_next_loi(cli)) != NULL) {
                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
 
@@ -2733,7 +2743,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
                  * do io on writes while there are cache waiters */
                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
                  * do io on writes while there are cache waiters */
                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
-                                              &loi->loi_write_lop);
+                                              &loi->loi_write_lop, pol);
                         if (rc < 0) {
                                 CERROR("Write request failed with %d\n", rc);
 
                         if (rc < 0) {
                                 CERROR("Write request failed with %d\n", rc);
 
@@ -2763,7 +2773,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
                 }
                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
                 }
                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
-                                              &loi->loi_read_lop);
+                                              &loi->loi_read_lop, pol);
                         if (rc < 0)
                                 CERROR("Read request failed with %d\n", rc);
 
                         if (rc < 0)
                                 CERROR("Read request failed with %d\n", rc);
 
@@ -2794,7 +2804,11 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
                 if (race_counter == 10)
                         break;
         }
                 if (race_counter == 10)
                         break;
         }
-        EXIT;
+}
+
+void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
+{
+        osc_check_rpcs0(env, cli, 0);
 }
 
 /* we're trying to queue a page in the osc so we're subject to the
 }
 
 /* we're trying to queue a page in the osc so we're subject to the
@@ -3003,13 +3017,19 @@ int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
                 }
         }
 
                 }
         }
 
-        osc_oap_to_pending(oap);
-        loi_list_maint(cli, loi);
-
         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
                   cmd);
 
         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
                   cmd);
 
-        osc_check_rpcs(env, cli);
+        osc_oap_to_pending(oap);
+        loi_list_maint(cli, loi);
+        if (!osc_max_rpc_in_flight(cli, loi) &&
+            lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
+                LASSERT(cli->cl_writeback_work != NULL);
+                rc = ptlrpcd_queue_work(cli->cl_writeback_work);
+
+                CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
+                       cli, rc);
+        }
         client_obd_list_unlock(&cli->cl_loi_list_lock);
 
         RETURN(0);
         client_obd_list_unlock(&cli->cl_loi_list_lock);
 
         RETURN(0);
@@ -4444,6 +4464,7 @@ static int osc_cancel_for_recovery(struct ldlm_lock *lock)
 
 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
 
 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
+        struct client_obd *cli = &obd->u.cli;
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
@@ -4453,11 +4474,18 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 RETURN(rc);
 
         rc = client_obd_setup(obd, lcfg);
                 RETURN(rc);
 
         rc = client_obd_setup(obd, lcfg);
-        if (rc) {
-                ptlrpcd_decref();
-        } else {
+        if (rc == 0) {
+                void *handler;
+                handler = ptlrpcd_alloc_work(cli->cl_import,
+                                             brw_queue_work, cli);
+                if (!IS_ERR(handler))
+                        cli->cl_writeback_work = handler;
+                else
+                        rc = PTR_ERR(handler);
+        }
+
+        if (rc == 0) {
                 struct lprocfs_static_vars lvars = { 0 };
                 struct lprocfs_static_vars lvars = { 0 };
-                struct client_obd *cli = &obd->u.cli;
 
                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
                 lprocfs_osc_init_vars(&lvars);
 
                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
                 lprocfs_osc_init_vars(&lvars);
@@ -4484,6 +4512,8 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
         }
 
                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
         }
 
+        if (rc)
+                ptlrpcd_decref();
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -4505,6 +4535,7 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
                 break;
         }
         case OBD_CLEANUP_EXPORTS: {
                 break;
         }
         case OBD_CLEANUP_EXPORTS: {
+                struct client_obd *cli = &obd->u.cli;
                 /* LU-464
                  * for echo client, export may be on zombie list, wait for
                  * zombie thread to cull it, because cli.cl_import will be
                 /* LU-464
                  * for echo client, export may be on zombie list, wait for
                  * zombie thread to cull it, because cli.cl_import will be
@@ -4515,6 +4546,10 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
                  *   client_disconnect_export()
                  */
                 obd_zombie_barrier();
                  *   client_disconnect_export()
                  */
                 obd_zombie_barrier();
+                if (cli->cl_writeback_work) {
+                        ptlrpcd_destroy_work(cli->cl_writeback_work);
+                        cli->cl_writeback_work = NULL;
+                }
                 obd_cleanup_client_import(obd);
                 ptlrpc_lprocfs_unregister_obd(obd);
                 lprocfs_obd_cleanup(obd);
                 obd_cleanup_client_import(obd);
                 ptlrpc_lprocfs_unregister_obd(obd);
                 lprocfs_obd_cleanup(obd);
index be37b20..6c7ef8a 100644 (file)
@@ -2269,6 +2269,7 @@ void _debug_req(struct ptlrpc_request *req, __u32 mask,
 {
         int req_ok = req->rq_reqmsg != NULL;
         int rep_ok = req->rq_repmsg != NULL;
 {
         int req_ok = req->rq_reqmsg != NULL;
         int rep_ok = req->rq_repmsg != NULL;
+        lnet_nid_t nid = LNET_NID_ANY;
         va_list args;
 
         if (ptlrpc_req_need_swab(req)) {
         va_list args;
 
         if (ptlrpc_req_need_swab(req)) {
@@ -2276,6 +2277,11 @@ void _debug_req(struct ptlrpc_request *req, __u32 mask,
                 rep_ok = rep_ok && rep_ptlrpc_body_swabbed(req);
         }
 
                 rep_ok = rep_ok && rep_ptlrpc_body_swabbed(req);
         }
 
+        if (req->rq_import && req->rq_import->imp_connection)
+                nid = req->rq_import->imp_connection->c_peer.nid;
+        else if (req->rq_export && req->rq_export->exp_connection)
+                nid = req->rq_export->exp_connection->c_peer.nid;
+
         va_start(args, fmt);
         libcfs_debug_vmsg2(data->msg_cdls, data->msg_subsys,mask,data->msg_file,
                            data->msg_fn, data->msg_line, fmt, args,
         va_start(args, fmt);
         libcfs_debug_vmsg2(data->msg_cdls, data->msg_subsys,mask,data->msg_file,
                            data->msg_fn, data->msg_line, fmt, args,
@@ -2290,11 +2296,7 @@ void _debug_req(struct ptlrpc_request *req, __u32 mask,
                                 req->rq_export ?
                                      req->rq_export->exp_client_uuid.uuid :
                                      "<?>",
                                 req->rq_export ?
                                      req->rq_export->exp_client_uuid.uuid :
                                      "<?>",
-                           libcfs_nid2str(req->rq_import ?
-                                req->rq_import->imp_connection->c_peer.nid :
-                                req->rq_export ?
-                                     req->rq_export->exp_connection->c_peer.nid:
-                                     LNET_NID_ANY),
+                           libcfs_nid2str(nid),
                            req->rq_request_portal, req->rq_reply_portal,
                            req->rq_reqlen, req->rq_replen,
                            req->rq_early_count, req->rq_timedout,
                            req->rq_request_portal, req->rq_reply_portal,
                            req->rq_reqlen, req->rq_replen,
                            req->rq_early_count, req->rq_timedout,
index a1c47f5..1988caf 100644 (file)
@@ -6903,7 +6903,7 @@ test_133c() {
        do_facet $SINGLEMDS $LCTL set_param mdt.*.md_stats=clear
        do_facet ost1 $LCTL set_param obdfilter.*.stats=clear
 
        do_facet $SINGLEMDS $LCTL set_param mdt.*.md_stats=clear
        do_facet ost1 $LCTL set_param obdfilter.*.stats=clear
 
-       dd if=/dev/zero of=${testdir}/${tfile} conv=notrunc bs=1024k count=1 || error "dd failed"
+       dd if=/dev/zero of=${testdir}/${tfile} conv=notrunc bs=512k count=1 || error "dd failed"
        sync
        cancel_lru_locks osc
        check_stats ost "write" 1
        sync
        cancel_lru_locks osc
        check_stats ost "write" 1