Whamcloud - gitweb
Branch HEAD
authorvitaly <vitaly>
Mon, 9 Mar 2009 19:14:38 +0000 (19:14 +0000)
committervitaly <vitaly>
Mon, 9 Mar 2009 19:14:38 +0000 (19:14 +0000)
b=17644
i=nikita
i=green

send 1 extra rpc in flight if this is a high priority request

14 files changed:
lustre/include/cl_object.h
lustre/include/obd.h
lustre/ldlm/ldlm_lib.c
lustre/liblustre/llite_cl.c
lustre/llite/rw.c
lustre/llite/rw26.c
lustre/llite/vvp_io.c
lustre/lov/lov_io.c
lustre/obdclass/cl_io.c
lustre/obdclass/cl_lock.c
lustre/obdecho/echo_client.c
lustre/osc/osc_internal.h
lustre/osc/osc_io.c
lustre/osc/osc_request.c

index 5b922bd..c6c764e 100644 (file)
@@ -1916,6 +1916,11 @@ enum cl_io_state {
         CIS_FINI
 };
 
+enum cl_req_priority {
+        CRP_NORMAL,
+        CRP_CANCEL
+};
+
 /**
  * IO state private for a layer.
  *
@@ -2033,7 +2038,8 @@ struct cl_io_operations {
                 int  (*cio_submit)(const struct lu_env *env,
                                    const struct cl_io_slice *slice,
                                    enum cl_req_type crt,
-                                   struct cl_2queue *queue);
+                                   struct cl_2queue *queue,
+                                   enum cl_req_priority priority);
         } req_op[CRT_NR];
         /**
          * Read missing page.
@@ -2868,7 +2874,8 @@ int   cl_io_prepare_write(const struct lu_env *env, struct cl_io *io,
 int   cl_io_commit_write (const struct lu_env *env, struct cl_io *io,
                           struct cl_page *page, unsigned from, unsigned to);
 int   cl_io_submit_rw    (const struct lu_env *env, struct cl_io *io,
-                          enum cl_req_type iot, struct cl_2queue *queue);
+                          enum cl_req_type iot, struct cl_2queue *queue,
+                          enum cl_req_priority priority);
 void  cl_io_rw_advance   (const struct lu_env *env, struct cl_io *io,
                           size_t nob);
 int   cl_io_cancel       (const struct lu_env *env, struct cl_io *io,
index cb5babd..dc7745e 100644 (file)
@@ -97,8 +97,8 @@ struct lov_oinfo {                 /* per-stripe data structure */
         /* used by the osc to keep track of what objects to build into rpcs */
         struct loi_oap_pages loi_read_lop;
         struct loi_oap_pages loi_write_lop;
-        /* _cli_ is poorly named, it should be _ready_ */
-        struct list_head loi_cli_item;
+        struct list_head loi_ready_item;
+        struct list_head loi_hp_ready_item;
         struct list_head loi_write_item;
         struct list_head loi_read_item;
 
@@ -122,7 +122,8 @@ static inline void loi_init(struct lov_oinfo *loi)
         CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending);
         CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_urgent);
         CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending_group);
-        CFS_INIT_LIST_HEAD(&loi->loi_cli_item);
+        CFS_INIT_LIST_HEAD(&loi->loi_ready_item);
+        CFS_INIT_LIST_HEAD(&loi->loi_hp_ready_item);
         CFS_INIT_LIST_HEAD(&loi->loi_write_item);
         CFS_INIT_LIST_HEAD(&loi->loi_read_item);
 }
@@ -439,6 +440,7 @@ struct client_obd {
          */
         client_obd_lock_t        cl_loi_list_lock;
         struct list_head         cl_loi_ready_list;
+        struct list_head         cl_loi_hp_ready_list;
         struct list_head         cl_loi_write_list;
         struct list_head         cl_loi_read_list;
         int                      cl_r_in_flight;
index dc1c7e2..2c498dc 100644 (file)
@@ -277,6 +277,7 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
                 cli->cl_dirty_max = num_physpages << (CFS_PAGE_SHIFT - 3);
         CFS_INIT_LIST_HEAD(&cli->cl_cache_waiters);
         CFS_INIT_LIST_HEAD(&cli->cl_loi_ready_list);
+        CFS_INIT_LIST_HEAD(&cli->cl_loi_hp_ready_list);
         CFS_INIT_LIST_HEAD(&cli->cl_loi_write_list);
         CFS_INIT_LIST_HEAD(&cli->cl_loi_read_list);
         client_obd_list_lock_init(&cli->cl_loi_list_lock);
index e3c340e..59c7d68 100644 (file)
@@ -580,10 +580,10 @@ static int llu_queue_pio(const struct lu_env *env, struct cl_io *io,
         /* printk("Inited anchor with %d pages\n", npages); */
 
         if (rc == 0) {
-                rc = cl_io_submit_rw(env, io,
-                                     io->ci_type == CIT_READ ? CRT_READ :
-                                                               CRT_WRITE,
-                                     queue);
+                enum cl_req_type crt;
+
+                crt = io->ci_type == CIT_READ ? CRT_READ : CRT_WRITE;
+                rc = cl_io_submit_rw(env, io, crt, queue, CRP_NORMAL);
                 if (rc == 0) {
                         /* If some pages weren't sent for any reason, count
                          * then as completed, to avoid infinite wait. */
index f7876ed..d3f59a6 100644 (file)
@@ -1116,7 +1116,8 @@ int ll_writepage(struct page *vmpage, struct writeback_control *_)
                          */
                         set_page_dirty(vmpage);
                         cl_2queue_init_page(queue, page);
-                        result = cl_io_submit_rw(env, io, CRT_WRITE, queue);
+                        result = cl_io_submit_rw(env, io, CRT_WRITE,
+                                                 queue, CRP_NORMAL);
                         cl_page_list_disown(env, io, &queue->c2_qin);
                         if (result != 0) {
                                 /*
index fac56d7..32bfa8a 100644 (file)
@@ -311,7 +311,7 @@ ssize_t ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io,
 
         if (rc == 0) {
                 rc = cl_io_submit_rw(env, io, rw == READ ? CRT_READ : CRT_WRITE,
-                                     queue);
+                                     queue, CRP_NORMAL);
                 if (rc == 0) {
                         /*
                          * If some pages weren't sent for any reason (e.g.,
index 5efbbf1..7adac17 100644 (file)
@@ -733,7 +733,7 @@ static int vvp_page_sync_io(const struct lu_env *env, struct cl_io *io,
         cl_sync_io_init(anchor, 1);
         cp->cpg_sync_io = anchor;
         cl_page_clip(env, page, 0, to);
-        result = cl_io_submit_rw(env, io, crt, queue);
+        result = cl_io_submit_rw(env, io, crt, queue, CRP_NORMAL);
         if (result == 0)
                 result = cl_sync_io_wait(env, io, &queue->c2_qout, anchor);
         else
index 4ab7d25..6fada6d 100644 (file)
@@ -551,7 +551,8 @@ static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld,
  */
 static int lov_io_submit(const struct lu_env *env,
                          const struct cl_io_slice *ios,
-                         enum cl_req_type crt, struct cl_2queue *queue)
+                         enum cl_req_type crt, struct cl_2queue *queue,
+                         enum cl_req_priority priority)
 {
         struct lov_io          *lio = cl2lov_io(env, ios);
         struct lov_object      *obj = lio->lis_object;
@@ -581,7 +582,8 @@ static int lov_io_submit(const struct lu_env *env,
                 sub = lov_sub_get(env, lio, idx);
                 LASSERT(!IS_ERR(sub));
                 LASSERT(sub->sub_io == &lio->lis_single_subio);
-                rc = cl_io_submit_rw(sub->sub_env, sub->sub_io, crt, queue);
+                rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
+                                     crt, queue, priority);
                 lov_sub_put(sub);
                 RETURN(rc);
         }
@@ -622,7 +624,7 @@ static int lov_io_submit(const struct lu_env *env,
                 sub = lov_sub_get(env, lio, stripe);
                 if (!IS_ERR(sub)) {
                         rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
-                                             crt, cl2q);
+                                             crt, cl2q, priority);
                         lov_sub_put(sub);
                 } else
                         rc = PTR_ERR(sub);
index 0e2f342..1b10e2a 100644 (file)
@@ -739,7 +739,7 @@ int cl_io_read_page(const struct lu_env *env, struct cl_io *io,
                 }
         }
         if (result == 0)
-                result = cl_io_submit_rw(env, io, CRT_READ, queue);
+                result = cl_io_submit_rw(env, io, CRT_READ, queue, CRP_NORMAL);
         /*
          * Unlock unsent pages in case of error.
          */
@@ -835,7 +835,8 @@ EXPORT_SYMBOL(cl_io_commit_write);
  * \see cl_io_operations::cio_submit()
  */
 int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
-                    enum cl_req_type crt, struct cl_2queue *queue)
+                    enum cl_req_type crt, struct cl_2queue *queue,
+                    enum cl_req_priority priority)
 {
         const struct cl_io_slice *scan;
         int result = 0;
@@ -847,7 +848,7 @@ int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
                 if (scan->cis_iop->req_op[crt].cio_submit == NULL)
                         continue;
                 result = scan->cis_iop->req_op[crt].cio_submit(env, scan, crt,
-                                                               queue);
+                                                               queue, priority);
                 if (result != 0)
                         break;
         }
index c3dd3c4..e087eca 100644 (file)
@@ -1741,8 +1741,8 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock,
                 if (queue->c2_qin.pl_nr > 0) {
                         result = cl_page_list_unmap(env, io, &queue->c2_qin);
                         if (!discard) {
-                                rc0 = cl_io_submit_rw(env, io,
-                                                      CRT_WRITE, queue);
+                                rc0 = cl_io_submit_rw(env, io, CRT_WRITE,
+                                                      queue, CRP_CANCEL);
                                 rc1 = cl_page_list_own(env, io,
                                                        &queue->c2_qout);
                                 result = result ?: rc0 ?: rc1;
index fa00083..f5b47d1 100644 (file)
@@ -1135,8 +1135,10 @@ static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
                 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
 
                 async = async && (typ == CRT_WRITE);
-                rc = (async ? cl_echo_async_brw : cl_io_submit_rw)(env, io,
-                                                                   typ, queue);
+                if (async)
+                        rc = cl_echo_async_brw(env, io, typ, queue);
+                else
+                        rc = cl_io_submit_rw(env, io,typ, queue, CRP_NORMAL);
                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
                        async ? "async" : "sync", rc);
                 if (rc == 0) {
index b7a5143..545b5c1 100644 (file)
@@ -48,6 +48,7 @@ enum async_flags {
         ASYNC_COUNT_STABLE = 0x4, /* ap_refresh_count will not be called
                                      to give the caller a chance to update
                                      or cancel the size of the io */
+        ASYNC_HP = 0x10,
 };
 
 struct obd_async_page_ops {
index 594e3a0..065c808 100644 (file)
@@ -113,7 +113,8 @@ static void osc_io_unplug(const struct lu_env *env, struct osc_object *osc,
  */
 static int osc_io_submit(const struct lu_env *env,
                          const struct cl_io_slice *ios,
-                         enum cl_req_type crt, struct cl_2queue *queue)
+                         enum cl_req_type crt, struct cl_2queue *queue,
+                         enum cl_req_priority priority)
 {
         struct cl_page    *page;
         struct cl_page    *tmp;
@@ -148,6 +149,8 @@ static int osc_io_submit(const struct lu_env *env,
                 osc = cl2osc(opg->ops_cl.cpl_obj);
                 exp = osc_export(osc);
 
+                if (priority > CRP_NORMAL)
+                        oap->oap_async_flags |= ASYNC_HP;
                 /*
                  * This can be checked without cli->cl_loi_list_lock, because
                  * ->oap_*_item are always manipulated when the page is owned.
index e041b19..6d2b1ce 100644 (file)
@@ -1904,6 +1904,25 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
         RETURN(0);
 }
 
+static int lop_makes_hprpc(struct loi_oap_pages *lop)
+{
+        struct osc_async_page *oap;
+        ENTRY;
+
+        if (list_empty(&lop->lop_urgent))
+                RETURN(0);
+
+        oap = list_entry(lop->lop_urgent.next,
+                         struct osc_async_page, oap_urgent_item);
+
+        if (oap->oap_async_flags & ASYNC_HP) {
+                CDEBUG(D_CACHE, "hp request forcing RPC\n");
+                RETURN(1);
+        }
+
+        RETURN(0);
+}
+
 static void on_list(struct list_head *item, struct list_head *list,
                     int should_be_on)
 {
@@ -1917,9 +1936,17 @@ static void on_list(struct list_head *item, struct list_head *list,
  * can find pages to build into rpcs quickly */
 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
 {
-        on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
-                lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
-                lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+        if (lop_makes_hprpc(&loi->loi_write_lop) ||
+            lop_makes_hprpc(&loi->loi_read_lop)) {
+                /* HP rpc */
+                on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
+                on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
+        } else {
+                on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
+                on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
+                        lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
+                        lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+        }
 
         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
                 loi->loi_write_lop.lop_num_pending);
@@ -2015,8 +2042,10 @@ void osc_oap_to_pending(struct osc_async_page *oap)
         else
                 lop = &oap->oap_loi->loi_read_lop;
 
-        if (oap->oap_async_flags & ASYNC_URGENT)
+        if (oap->oap_async_flags & ASYNC_HP)
                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
+        else if (oap->oap_async_flags & ASYNC_URGENT)
+                list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
 }
@@ -2276,6 +2305,15 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
         struct cl_object *clob = NULL;
         ENTRY;
 
+        /* If there are HP OAPs we need to handle at least 1 of them,
+         * move it the beginning of the pending list for that. */
+        if (!list_empty(&lop->lop_urgent)) {
+                oap = list_entry(lop->lop_urgent.next,
+                                 struct osc_async_page, oap_urgent_item);
+                if (oap->oap_async_flags & ASYNC_HP)
+                        list_move(&oap->oap_pending_item, &lop->lop_pending);
+        }
+
         /* first we find the pages we're allowed to work with */
         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
                                  oap_pending_item) {
@@ -2486,7 +2524,8 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
 
 #define LOI_DEBUG(LOI, STR, args...)                                     \
         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
-               !list_empty(&(LOI)->loi_cli_item),                        \
+               !list_empty(&(LOI)->loi_ready_item) ||                    \
+               !list_empty(&(LOI)->loi_hp_ready_item),                   \
                (LOI)->loi_write_lop.lop_num_pending,                     \
                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
                (LOI)->loi_read_lop.lop_num_pending,                      \
@@ -2498,11 +2537,16 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
 {
         ENTRY;
-        /* first return all objects which we already know to have
-         * pages ready to be stuffed into rpcs */
+
+        /* First return objects that have blocked locks so that they
+         * will be flushed quickly and other clients can get the lock,
+         * then objects which have pages ready to be stuffed into RPCs */
+        if (!list_empty(&cli->cl_loi_hp_ready_list))
+                RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
+                                  struct lov_oinfo, loi_hp_ready_item));
         if (!list_empty(&cli->cl_loi_ready_list))
                 RETURN(list_entry(cli->cl_loi_ready_list.next,
-                                  struct lov_oinfo, loi_cli_item));
+                                  struct lov_oinfo, loi_ready_item));
 
         /* then if we have cache waiters, return all objects with queued
          * writes.  This is especially important when many small files
@@ -2526,6 +2570,26 @@ struct lov_oinfo *osc_next_loi(struct client_obd *cli)
         RETURN(NULL);
 }
 
+static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
+{
+        struct osc_async_page *oap;
+        int hprpc = 0;
+
+        if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
+                oap = list_entry(loi->loi_write_lop.lop_urgent.next,
+                                 struct osc_async_page, oap_urgent_item);
+                hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+        }
+
+        if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
+                oap = list_entry(loi->loi_write_lop.lop_urgent.next,
+                                 struct osc_async_page, oap_urgent_item);
+                hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+        }
+
+        return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
+}
+
 /* called with the loi list lock held */
 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
 {
@@ -2536,7 +2600,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
         while ((loi = osc_next_loi(cli)) != NULL) {
                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
 
-                if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
+                if (osc_max_rpc_in_flight(cli, loi))
                         break;
 
                 /* attempt some read/write balancing by alternating between
@@ -2568,8 +2632,10 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
 
                 /* attempt some inter-object balancing by issueing rpcs
                  * for each object in turn */
-                if (!list_empty(&loi->loi_cli_item))
-                        list_del_init(&loi->loi_cli_item);
+                if (!list_empty(&loi->loi_hp_ready_item))
+                        list_del_init(&loi->loi_hp_ready_item);
+                if (!list_empty(&loi->loi_ready_item))
+                        list_del_init(&loi->loi_ready_item);
                 if (!list_empty(&loi->loi_write_item))
                         list_del_init(&loi->loi_write_item);
                 if (!list_empty(&loi->loi_read_item))
@@ -2832,11 +2898,14 @@ int osc_set_async_flags_base(struct client_obd *cli,
         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
                 oap->oap_async_flags |= ASYNC_READY;
 
-        if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
-                if (list_empty(&oap->oap_rpc_item)) {
+        if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
+            list_empty(&oap->oap_rpc_item)) {
+                if (oap->oap_async_flags & ASYNC_HP)
                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
-                        loi_list_maint(cli, loi);
-                }
+                else
+                        list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
+                oap->oap_async_flags |= ASYNC_URGENT;
+                loi_list_maint(cli, loi);
         }
 
         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
@@ -2877,7 +2946,7 @@ int osc_teardown_async_page(struct obd_export *exp,
 
         if (!list_empty(&oap->oap_urgent_item)) {
                 list_del_init(&oap->oap_urgent_item);
-                oap->oap_async_flags &= ~ASYNC_URGENT;
+                oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
         }
         if (!list_empty(&oap->oap_pending_item)) {
                 list_del_init(&oap->oap_pending_item);