From b918d20c1073fbb29de95dccec018b1b992cf81d Mon Sep 17 00:00:00 2001 From: vitaly Date: Mon, 9 Mar 2009 19:14:38 +0000 Subject: [PATCH] Branch HEAD b=17644 i=nikita i=green send 1 extra rpc in flight if this is a high priority request --- lustre/include/cl_object.h | 11 ++++- lustre/include/obd.h | 8 ++-- lustre/ldlm/ldlm_lib.c | 1 + lustre/liblustre/llite_cl.c | 8 ++-- lustre/llite/rw.c | 3 +- lustre/llite/rw26.c | 2 +- lustre/llite/vvp_io.c | 2 +- lustre/lov/lov_io.c | 8 ++-- lustre/obdclass/cl_io.c | 7 +-- lustre/obdclass/cl_lock.c | 4 +- lustre/obdecho/echo_client.c | 6 ++- lustre/osc/osc_internal.h | 1 + lustre/osc/osc_io.c | 5 ++- lustre/osc/osc_request.c | 101 ++++++++++++++++++++++++++++++++++++------- 14 files changed, 128 insertions(+), 39 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index 5b922bd..c6c764e 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -1916,6 +1916,11 @@ enum cl_io_state { CIS_FINI }; +enum cl_req_priority { + CRP_NORMAL, + CRP_CANCEL +}; + /** * IO state private for a layer. * @@ -2033,7 +2038,8 @@ struct cl_io_operations { int (*cio_submit)(const struct lu_env *env, const struct cl_io_slice *slice, enum cl_req_type crt, - struct cl_2queue *queue); + struct cl_2queue *queue, + enum cl_req_priority priority); } req_op[CRT_NR]; /** * Read missing page. @@ -2868,7 +2874,8 @@ int cl_io_prepare_write(const struct lu_env *env, struct cl_io *io, int cl_io_commit_write (const struct lu_env *env, struct cl_io *io, struct cl_page *page, unsigned from, unsigned to); int cl_io_submit_rw (const struct lu_env *env, struct cl_io *io, - enum cl_req_type iot, struct cl_2queue *queue); + enum cl_req_type iot, struct cl_2queue *queue, + enum cl_req_priority priority); void cl_io_rw_advance (const struct lu_env *env, struct cl_io *io, size_t nob); int cl_io_cancel (const struct lu_env *env, struct cl_io *io, diff --git a/lustre/include/obd.h b/lustre/include/obd.h index cb5babd..dc7745e 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -97,8 +97,8 @@ struct lov_oinfo { /* per-stripe data structure */ /* used by the osc to keep track of what objects to build into rpcs */ struct loi_oap_pages loi_read_lop; struct loi_oap_pages loi_write_lop; - /* _cli_ is poorly named, it should be _ready_ */ - struct list_head loi_cli_item; + struct list_head loi_ready_item; + struct list_head loi_hp_ready_item; struct list_head loi_write_item; struct list_head loi_read_item; @@ -122,7 +122,8 @@ static inline void loi_init(struct lov_oinfo *loi) CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending); CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_urgent); CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending_group); - CFS_INIT_LIST_HEAD(&loi->loi_cli_item); + CFS_INIT_LIST_HEAD(&loi->loi_ready_item); + CFS_INIT_LIST_HEAD(&loi->loi_hp_ready_item); CFS_INIT_LIST_HEAD(&loi->loi_write_item); CFS_INIT_LIST_HEAD(&loi->loi_read_item); } @@ -439,6 +440,7 @@ struct client_obd { */ client_obd_lock_t cl_loi_list_lock; struct list_head cl_loi_ready_list; + struct list_head cl_loi_hp_ready_list; struct list_head cl_loi_write_list; struct list_head cl_loi_read_list; int cl_r_in_flight; diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index dc1c7e2..2c498dc 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -277,6 +277,7 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg) cli->cl_dirty_max = num_physpages << (CFS_PAGE_SHIFT - 3); CFS_INIT_LIST_HEAD(&cli->cl_cache_waiters); CFS_INIT_LIST_HEAD(&cli->cl_loi_ready_list); + CFS_INIT_LIST_HEAD(&cli->cl_loi_hp_ready_list); CFS_INIT_LIST_HEAD(&cli->cl_loi_write_list); CFS_INIT_LIST_HEAD(&cli->cl_loi_read_list); client_obd_list_lock_init(&cli->cl_loi_list_lock); diff --git a/lustre/liblustre/llite_cl.c b/lustre/liblustre/llite_cl.c index e3c340e..59c7d68 100644 --- a/lustre/liblustre/llite_cl.c +++ b/lustre/liblustre/llite_cl.c @@ -580,10 +580,10 @@ static int llu_queue_pio(const struct lu_env *env, struct cl_io *io, /* printk("Inited anchor with %d pages\n", npages); */ if (rc == 0) { - rc = cl_io_submit_rw(env, io, - io->ci_type == CIT_READ ? CRT_READ : - CRT_WRITE, - queue); + enum cl_req_type crt; + + crt = io->ci_type == CIT_READ ? CRT_READ : CRT_WRITE; + rc = cl_io_submit_rw(env, io, crt, queue, CRP_NORMAL); if (rc == 0) { /* If some pages weren't sent for any reason, count * then as completed, to avoid infinite wait. */ diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index f7876ed..d3f59a6 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -1116,7 +1116,8 @@ int ll_writepage(struct page *vmpage, struct writeback_control *_) */ set_page_dirty(vmpage); cl_2queue_init_page(queue, page); - result = cl_io_submit_rw(env, io, CRT_WRITE, queue); + result = cl_io_submit_rw(env, io, CRT_WRITE, + queue, CRP_NORMAL); cl_page_list_disown(env, io, &queue->c2_qin); if (result != 0) { /* diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index fac56d7..32bfa8a 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -311,7 +311,7 @@ ssize_t ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, if (rc == 0) { rc = cl_io_submit_rw(env, io, rw == READ ? CRT_READ : CRT_WRITE, - queue); + queue, CRP_NORMAL); if (rc == 0) { /* * If some pages weren't sent for any reason (e.g., diff --git a/lustre/llite/vvp_io.c b/lustre/llite/vvp_io.c index 5efbbf1..7adac17 100644 --- a/lustre/llite/vvp_io.c +++ b/lustre/llite/vvp_io.c @@ -733,7 +733,7 @@ static int vvp_page_sync_io(const struct lu_env *env, struct cl_io *io, cl_sync_io_init(anchor, 1); cp->cpg_sync_io = anchor; cl_page_clip(env, page, 0, to); - result = cl_io_submit_rw(env, io, crt, queue); + result = cl_io_submit_rw(env, io, crt, queue, CRP_NORMAL); if (result == 0) result = cl_sync_io_wait(env, io, &queue->c2_qout, anchor); else diff --git a/lustre/lov/lov_io.c b/lustre/lov/lov_io.c index 4ab7d25..6fada6d 100644 --- a/lustre/lov/lov_io.c +++ b/lustre/lov/lov_io.c @@ -551,7 +551,8 @@ static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld, */ static int lov_io_submit(const struct lu_env *env, const struct cl_io_slice *ios, - enum cl_req_type crt, struct cl_2queue *queue) + enum cl_req_type crt, struct cl_2queue *queue, + enum cl_req_priority priority) { struct lov_io *lio = cl2lov_io(env, ios); struct lov_object *obj = lio->lis_object; @@ -581,7 +582,8 @@ static int lov_io_submit(const struct lu_env *env, sub = lov_sub_get(env, lio, idx); LASSERT(!IS_ERR(sub)); LASSERT(sub->sub_io == &lio->lis_single_subio); - rc = cl_io_submit_rw(sub->sub_env, sub->sub_io, crt, queue); + rc = cl_io_submit_rw(sub->sub_env, sub->sub_io, + crt, queue, priority); lov_sub_put(sub); RETURN(rc); } @@ -622,7 +624,7 @@ static int lov_io_submit(const struct lu_env *env, sub = lov_sub_get(env, lio, stripe); if (!IS_ERR(sub)) { rc = cl_io_submit_rw(sub->sub_env, sub->sub_io, - crt, cl2q); + crt, cl2q, priority); lov_sub_put(sub); } else rc = PTR_ERR(sub); diff --git a/lustre/obdclass/cl_io.c b/lustre/obdclass/cl_io.c index 0e2f342..1b10e2a 100644 --- a/lustre/obdclass/cl_io.c +++ b/lustre/obdclass/cl_io.c @@ -739,7 +739,7 @@ int cl_io_read_page(const struct lu_env *env, struct cl_io *io, } } if (result == 0) - result = cl_io_submit_rw(env, io, CRT_READ, queue); + result = cl_io_submit_rw(env, io, CRT_READ, queue, CRP_NORMAL); /* * Unlock unsent pages in case of error. */ @@ -835,7 +835,8 @@ EXPORT_SYMBOL(cl_io_commit_write); * \see cl_io_operations::cio_submit() */ int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io, - enum cl_req_type crt, struct cl_2queue *queue) + enum cl_req_type crt, struct cl_2queue *queue, + enum cl_req_priority priority) { const struct cl_io_slice *scan; int result = 0; @@ -847,7 +848,7 @@ int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io, if (scan->cis_iop->req_op[crt].cio_submit == NULL) continue; result = scan->cis_iop->req_op[crt].cio_submit(env, scan, crt, - queue); + queue, priority); if (result != 0) break; } diff --git a/lustre/obdclass/cl_lock.c b/lustre/obdclass/cl_lock.c index c3dd3c4..e087eca 100644 --- a/lustre/obdclass/cl_lock.c +++ b/lustre/obdclass/cl_lock.c @@ -1741,8 +1741,8 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock, if (queue->c2_qin.pl_nr > 0) { result = cl_page_list_unmap(env, io, &queue->c2_qin); if (!discard) { - rc0 = cl_io_submit_rw(env, io, - CRT_WRITE, queue); + rc0 = cl_io_submit_rw(env, io, CRT_WRITE, + queue, CRP_CANCEL); rc1 = cl_page_list_own(env, io, &queue->c2_qout); result = result ?: rc0 ?: rc1; diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index fa00083..f5b47d1 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -1135,8 +1135,10 @@ static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset, enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE; async = async && (typ == CRT_WRITE); - rc = (async ? cl_echo_async_brw : cl_io_submit_rw)(env, io, - typ, queue); + if (async) + rc = cl_echo_async_brw(env, io, typ, queue); + else + rc = cl_io_submit_rw(env, io,typ, queue, CRP_NORMAL); CDEBUG(D_INFO, "echo_client %s write returns %d\n", async ? "async" : "sync", rc); if (rc == 0) { diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h index b7a5143..545b5c1 100644 --- a/lustre/osc/osc_internal.h +++ b/lustre/osc/osc_internal.h @@ -48,6 +48,7 @@ enum async_flags { ASYNC_COUNT_STABLE = 0x4, /* ap_refresh_count will not be called to give the caller a chance to update or cancel the size of the io */ + ASYNC_HP = 0x10, }; struct obd_async_page_ops { diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 594e3a0..065c808 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -113,7 +113,8 @@ static void osc_io_unplug(const struct lu_env *env, struct osc_object *osc, */ static int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios, - enum cl_req_type crt, struct cl_2queue *queue) + enum cl_req_type crt, struct cl_2queue *queue, + enum cl_req_priority priority) { struct cl_page *page; struct cl_page *tmp; @@ -148,6 +149,8 @@ static int osc_io_submit(const struct lu_env *env, osc = cl2osc(opg->ops_cl.cpl_obj); exp = osc_export(osc); + if (priority > CRP_NORMAL) + oap->oap_async_flags |= ASYNC_HP; /* * This can be checked without cli->cl_loi_list_lock, because * ->oap_*_item are always manipulated when the page is owned. diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index e041b19..6d2b1ce 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1904,6 +1904,25 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop, RETURN(0); } +static int lop_makes_hprpc(struct loi_oap_pages *lop) +{ + struct osc_async_page *oap; + ENTRY; + + if (list_empty(&lop->lop_urgent)) + RETURN(0); + + oap = list_entry(lop->lop_urgent.next, + struct osc_async_page, oap_urgent_item); + + if (oap->oap_async_flags & ASYNC_HP) { + CDEBUG(D_CACHE, "hp request forcing RPC\n"); + RETURN(1); + } + + RETURN(0); +} + static void on_list(struct list_head *item, struct list_head *list, int should_be_on) { @@ -1917,9 +1936,17 @@ static void on_list(struct list_head *item, struct list_head *list, * can find pages to build into rpcs quickly */ void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi) { - on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list, - lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) || - lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)); + if (lop_makes_hprpc(&loi->loi_write_lop) || + lop_makes_hprpc(&loi->loi_read_lop)) { + /* HP rpc */ + on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0); + on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1); + } else { + on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0); + on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, + lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)|| + lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)); + } on_list(&loi->loi_write_item, &cli->cl_loi_write_list, loi->loi_write_lop.lop_num_pending); @@ -2015,8 +2042,10 @@ void osc_oap_to_pending(struct osc_async_page *oap) else lop = &oap->oap_loi->loi_read_lop; - if (oap->oap_async_flags & ASYNC_URGENT) + if (oap->oap_async_flags & ASYNC_HP) list_add(&oap->oap_urgent_item, &lop->lop_urgent); + else if (oap->oap_async_flags & ASYNC_URGENT) + list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent); list_add_tail(&oap->oap_pending_item, &lop->lop_pending); lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1); } @@ -2276,6 +2305,15 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, struct cl_object *clob = NULL; ENTRY; + /* If there are HP OAPs we need to handle at least 1 of them, + * move it the beginning of the pending list for that. */ + if (!list_empty(&lop->lop_urgent)) { + oap = list_entry(lop->lop_urgent.next, + struct osc_async_page, oap_urgent_item); + if (oap->oap_async_flags & ASYNC_HP) + list_move(&oap->oap_pending_item, &lop->lop_pending); + } + /* first we find the pages we're allowed to work with */ list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item) { @@ -2486,7 +2524,8 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, #define LOI_DEBUG(LOI, STR, args...) \ CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \ - !list_empty(&(LOI)->loi_cli_item), \ + !list_empty(&(LOI)->loi_ready_item) || \ + !list_empty(&(LOI)->loi_hp_ready_item), \ (LOI)->loi_write_lop.lop_num_pending, \ !list_empty(&(LOI)->loi_write_lop.lop_urgent), \ (LOI)->loi_read_lop.lop_num_pending, \ @@ -2498,11 +2537,16 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, struct lov_oinfo *osc_next_loi(struct client_obd *cli) { ENTRY; - /* first return all objects which we already know to have - * pages ready to be stuffed into rpcs */ + + /* First return objects that have blocked locks so that they + * will be flushed quickly and other clients can get the lock, + * then objects which have pages ready to be stuffed into RPCs */ + if (!list_empty(&cli->cl_loi_hp_ready_list)) + RETURN(list_entry(cli->cl_loi_hp_ready_list.next, + struct lov_oinfo, loi_hp_ready_item)); if (!list_empty(&cli->cl_loi_ready_list)) RETURN(list_entry(cli->cl_loi_ready_list.next, - struct lov_oinfo, loi_cli_item)); + struct lov_oinfo, loi_ready_item)); /* then if we have cache waiters, return all objects with queued * writes. This is especially important when many small files @@ -2526,6 +2570,26 @@ struct lov_oinfo *osc_next_loi(struct client_obd *cli) RETURN(NULL); } +static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi) +{ + struct osc_async_page *oap; + int hprpc = 0; + + if (!list_empty(&loi->loi_write_lop.lop_urgent)) { + oap = list_entry(loi->loi_write_lop.lop_urgent.next, + struct osc_async_page, oap_urgent_item); + hprpc = !!(oap->oap_async_flags & ASYNC_HP); + } + + if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) { + oap = list_entry(loi->loi_write_lop.lop_urgent.next, + struct osc_async_page, oap_urgent_item); + hprpc = !!(oap->oap_async_flags & ASYNC_HP); + } + + return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc; +} + /* called with the loi list lock held */ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) { @@ -2536,7 +2600,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) while ((loi = osc_next_loi(cli)) != NULL) { LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli)); - if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight) + if (osc_max_rpc_in_flight(cli, loi)) break; /* attempt some read/write balancing by alternating between @@ -2568,8 +2632,10 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) /* attempt some inter-object balancing by issueing rpcs * for each object in turn */ - if (!list_empty(&loi->loi_cli_item)) - list_del_init(&loi->loi_cli_item); + if (!list_empty(&loi->loi_hp_ready_item)) + list_del_init(&loi->loi_hp_ready_item); + if (!list_empty(&loi->loi_ready_item)) + list_del_init(&loi->loi_ready_item); if (!list_empty(&loi->loi_write_item)) list_del_init(&loi->loi_write_item); if (!list_empty(&loi->loi_read_item)) @@ -2832,11 +2898,14 @@ int osc_set_async_flags_base(struct client_obd *cli, if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY)) oap->oap_async_flags |= ASYNC_READY; - if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) { - if (list_empty(&oap->oap_rpc_item)) { + if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) && + list_empty(&oap->oap_rpc_item)) { + if (oap->oap_async_flags & ASYNC_HP) list_add(&oap->oap_urgent_item, &lop->lop_urgent); - loi_list_maint(cli, loi); - } + else + list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent); + oap->oap_async_flags |= ASYNC_URGENT; + loi_list_maint(cli, loi); } LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page, @@ -2877,7 +2946,7 @@ int osc_teardown_async_page(struct obd_export *exp, if (!list_empty(&oap->oap_urgent_item)) { list_del_init(&oap->oap_urgent_item); - oap->oap_async_flags &= ~ASYNC_URGENT; + oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP); } if (!list_empty(&oap->oap_pending_item)) { list_del_init(&oap->oap_pending_item); -- 1.8.3.1