Whamcloud - gitweb
Fix races in client write RPC generation when cache full
authorzab <zab>
Thu, 8 Jan 2004 07:24:03 +0000 (07:24 +0000)
committerzab <zab>
Thu, 8 Jan 2004 07:24:03 +0000 (07:24 +0000)
b=2482
(tested in buffalo against b1_0)

lustre/include/linux/obd.h
lustre/ldlm/ldlm_lib.c
lustre/osc/osc_request.c

index 42d823a..e3dedb4 100644 (file)
@@ -49,10 +49,12 @@ struct lov_oinfo {                 /* per-stripe data structure */
         int loi_ost_idx;           /* OST stripe index in lov_tgt_desc->tgts */
         int loi_ost_gen;           /* generation of this loi_ost_idx */
 
-        /* tracking offsets per file, per stripe.. */
+        /* used by the osc to keep track of what objects to build into rpcs */
         struct loi_oap_pages loi_read_lop;
         struct loi_oap_pages loi_write_lop;
+        /* _cli_ is poorly named, it should be _ready_ */
         struct list_head loi_cli_item;
+        struct list_head        loi_write_item;
 };
 
 static inline void loi_init(struct lov_oinfo *loi)
@@ -64,6 +66,7 @@ static inline void loi_init(struct lov_oinfo *loi)
         INIT_LIST_HEAD(&loi->loi_write_lop.lop_urgent);
         INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending_sync);
         INIT_LIST_HEAD(&loi->loi_cli_item);
+        INIT_LIST_HEAD(&loi->loi_write_item);
 }
 
 struct lov_stripe_md {
@@ -209,6 +212,7 @@ struct client_obd {
          * lists of osc_client_pages that hang off of the loi */
         spinlock_t               cl_loi_list_lock;
         struct list_head         cl_loi_ready_list;
+        struct list_head         cl_loi_write_list;
         int                      cl_brw_in_flight;
         /* just a sum of the loi/lop pending numbers to be exported by /proc */
         int                      cl_pending_w_pages;
index 30d3ac5..1ca54d3 100644 (file)
@@ -103,6 +103,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         cli->cl_ost_can_grant = 1;
         INIT_LIST_HEAD(&cli->cl_cache_waiters);
         INIT_LIST_HEAD(&cli->cl_loi_ready_list);
+        INIT_LIST_HEAD(&cli->cl_loi_write_list);
         spin_lock_init(&cli->cl_loi_list_lock);
         cli->cl_brw_in_flight = 0;
         spin_lock_init(&cli->cl_read_rpc_hist.oh_lock);
index 0fd35c6..962c20d 100644 (file)
@@ -1089,6 +1089,7 @@ static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
 
 static void osc_check_rpcs(struct client_obd *cli);
 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap);
+static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi);
 
 static void osc_complete_oap(struct client_obd *cli,
                              struct osc_async_page *oap, int rc)
@@ -1238,8 +1239,8 @@ static void lop_update_pending(struct client_obd *cli,
 
 /* the loi lock is held across this function but it's allowed to release
  * and reacquire it during its work */
-static int osc_send_oap_rpc(struct client_obd *cli, int cmd,
-                            struct loi_oap_pages *lop)
+static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
+                            int cmd, struct loi_oap_pages *lop)
 {
         struct ptlrpc_request *request;
         obd_count page_count = 0;
@@ -1326,6 +1327,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, int cmd,
         if (page_count == 0)
                 RETURN(0);
 
+        loi_list_maint(cli, loi);
         spin_unlock(&cli->cl_loi_list_lock);
 
         request = osc_build_req(cli, &rpc_list, page_count, cmd);
@@ -1344,6 +1346,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, int cmd,
                                 list_add(&oap->oap_urgent_item,
                                          &lop->lop_urgent);
                 }
+                loi_list_maint(cli, loi);
                 RETURN(PTR_ERR(request));
         }
 
@@ -1380,110 +1383,143 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
         int optimal;
         ENTRY;
 
-        /* stream rpcs until we complete the urgent pages in the object */
+        if (lop->lop_num_pending == 0)
+                RETURN(0);
+
+        /* stream rpcs in queue order as long as as there is an urgent page
+         * queued.  this is our cheap solution for good batching in the case
+         * where writepage marks some random page in the middle of the file as
+         * urgent because of, say, memory pressure */
         if (!list_empty(&lop->lop_urgent))
                 RETURN(1);
 
         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
         optimal = cli->cl_max_pages_per_rpc;
-        /* *2 to avoid triggering rpcs that would want to include pages that
-         * are being queued but which can't be made ready until the queuer
-         * finishes with the page. this is a wart for llite::commit_write() */
-        if (cmd == OBD_BRW_WRITE)
+        if (cmd == OBD_BRW_WRITE) {
+                /* trigger a write rpc stream as long as there are dirtiers
+                 * waiting for space.  as they're waiting, they're not going to
+                 * create more pages to coallesce with what's waiting.. */
+                if (!list_empty(&cli->cl_cache_waiters))
+                        RETURN(1);
+
+                /* *2 to avoid triggering rpcs that would want to include pages
+                 * that are being queued but which can't be made ready until
+                 * the queuer finishes with the page. this is a wart for
+                 * llite::commit_write() */
                 optimal *= 2;
+        }
         if (lop->lop_num_pending >= optimal)
                 RETURN(1);
 
-        /* trigger a write rpc stream as long as there are dirtiers waiting
-         * for space.  as they're waiting, they're not going to create more
-         * pages to coallesce with what's waiting.. */
-        if (!list_empty(&cli->cl_cache_waiters))
-                RETURN(1);
-
         RETURN(0);
 }
 
-static int loi_makes_rpc(struct client_obd *cli, struct lov_oinfo *loi)
+static void on_list(struct list_head *item, struct list_head *list, 
+                    int should_be_on)
 {
-        return lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
-               lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ);
+        if (list_empty(item) && should_be_on)
+                list_add_tail(item, list);
+        else if (!list_empty(item) && !should_be_on)
+                list_del_init(item);
 }
 
-static void loi_onto_ready_list(struct client_obd *cli, struct lov_oinfo *loi)
+/* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
+ * can find pages to build into rpcs quickly */
+static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
 {
-        if (list_empty(&loi->loi_cli_item) && loi_makes_rpc(cli, loi))
-                list_add_tail(&loi->loi_cli_item, &cli->cl_loi_ready_list);
+        on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list, 
+                lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
+                lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+
+        on_list(&loi->loi_write_item, &cli->cl_loi_write_list, 
+                loi->loi_write_lop.lop_num_pending);
 }
 
 #define LOI_DEBUG(LOI, STR, args...) \
-        CDEBUG(D_INODE, "loi rdy %d [%p,%p] wr %d:%d rd %d:%d " STR, \
+        CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
                !list_empty(&(LOI)->loi_cli_item),                  \
-               (LOI)->loi_cli_item.next,                  \
-               (LOI)->loi_cli_item.prev,                  \
                (LOI)->loi_write_lop.lop_num_pending,                     \
                !list_empty(&(LOI)->loi_write_lop.lop_urgent),         \
                (LOI)->loi_read_lop.lop_num_pending,                      \
                !list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
                args)                       \
 
+struct lov_oinfo *osc_next_loi(struct client_obd *cli)
+{
+        ENTRY;
+        /* first return all objects which we already know to have
+         * pages ready to be stuffed into rpcs */ 
+        if (!list_empty(&cli->cl_loi_ready_list))
+                RETURN(list_entry(cli->cl_loi_ready_list.next, 
+                                  struct lov_oinfo, loi_cli_item));
+        
+        /* then if we have cache waiters, return all objects with queued 
+         * writes.  This is especially important when many small files
+         * have filled up the cache and not been fired into rpcs because
+         * they don't pass the nr_pending/object threshhold */
+        if (!list_empty(&cli->cl_cache_waiters) &&
+            !list_empty(&cli->cl_loi_write_list))
+                RETURN(list_entry(cli->cl_loi_write_list.next, 
+                                  struct lov_oinfo, loi_write_item));
+        RETURN(NULL);
+}
+
 /* called with the loi list lock held */
 static void osc_check_rpcs(struct client_obd *cli)
 {
         struct lov_oinfo *loi;
-        int rc = 0, making_progress;
+        int rc = 0, race_counter = 0;
         ENTRY;
 
-        if (list_empty(&cli->cl_loi_ready_list)) {
-                CDEBUG(D_INODE, "no lois ready\n");
-                EXIT;
-                return;
-        }
+        while ((loi = osc_next_loi(cli)) != NULL) {
 
-        while (!list_empty(&cli->cl_loi_ready_list)) {
-                loi = list_entry(cli->cl_loi_ready_list.next, struct lov_oinfo,
-                                 loi_cli_item);
+                LOI_DEBUG(loi, "%d in flight", cli->cl_brw_in_flight);
 
                 if (cli->cl_brw_in_flight >= cli->cl_max_rpcs_in_flight)
                         break;
 
-                making_progress = 0;
-
-                /* hmm, it occurs to me that having rpc preparation fail
-                 * with num_pending == num_urgent means that there won't
-                 * be any more calls into here unless other traffic comes
-                 * in.  hmm. */
-
                 /* attempt some read/write balancing by alternating between
-                 * reads and writes in an object */
+                 * reads and writes in an object.  The makes_rpc checks here
+                 * would be redundant if we were getting read/write work items
+                 * instead of objects.  we don't want send_oap_rpc to drain a
+                 * partial read pending queue when we're given this object to
+                 * do io on writes while there are cache waiters */
                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
-                        rc = osc_send_oap_rpc(cli, OBD_BRW_WRITE,
+                        rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
                                               &loi->loi_write_lop);
                         if (rc < 0)
                                 break;
                         if (rc > 0)
-                                making_progress++;
+                                race_counter = 0;
+                        else
+                                race_counter++;
                 }
                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
-                        rc = osc_send_oap_rpc(cli, OBD_BRW_READ,
+                        rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
                                               &loi->loi_read_lop);
                         if (rc < 0)
                                 break;
                         if (rc > 0)
-                                making_progress++;
+                                race_counter = 0;
+                        else
+                                race_counter++;
                 }
 
                 /* attempt some inter-object balancing by issueing rpcs
                  * for each object in turn */
                 if (!list_empty(&loi->loi_cli_item))
                         list_del_init(&loi->loi_cli_item);
+                if (!list_empty(&loi->loi_write_item))
+                        list_del_init(&loi->loi_write_item);
 
-                loi_onto_ready_list(cli, loi);
-
-                LOI_DEBUG(loi, "mp %d\n", making_progress);
+                loi_list_maint(cli, loi);
 
-                /* could be smarter, !making_progress can happen in theory
-                 * if all the pages can not be locked in set_io_ready */
-                if (!making_progress)
+                /* send_oap_rpc fails with 0 when make_ready tells it to
+                 * back off.  llite's make_ready does this when it tries
+                 * to lock a page queued for write that is already locked.
+                 * we want to try sending rpcs from many objects, but we
+                 * don't want to spin failing with 0.  */
+                if (race_counter == 10)
                         break;
         }
         EXIT;
@@ -1534,7 +1570,7 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
         /* make sure that there are write rpcs in flight to wait for. this
          * is a little silly as this object may not have any pending
          * but other objects sure might. this should probably be cleaned. */
-        loi_onto_ready_list(cli, loi);
+        loi_list_maint(cli, loi);
         osc_check_rpcs(cli);
         spin_unlock(&cli->cl_loi_list_lock);
 
@@ -1665,7 +1701,7 @@ static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
         lop_update_pending(cli, lop, cmd, 1);
 
-        loi_onto_ready_list(cli, loi);
+        loi_list_maint(cli, loi);
 
         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
                   cmd);
@@ -1720,7 +1756,7 @@ static int osc_set_async_flags(struct obd_export *exp,
         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
                 if (list_empty(&oap->oap_rpc_item)) {
                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
-                        loi_onto_ready_list(cli, loi);
+                        loi_list_maint(cli, loi);
                 }
         }
 
@@ -1796,7 +1832,7 @@ static void osc_sync_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
                 lop_update_pending(cli, lop, cmd, 1);
         }
-        loi_onto_ready_list(cli, loi);
+        loi_list_maint(cli, loi);
 }
 
 static int osc_trigger_sync_io(struct obd_export *exp,
@@ -1862,8 +1898,7 @@ static int osc_teardown_async_page(struct obd_export *exp,
                 list_del_init(&oap->oap_pending_item);
                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
         }
-        if (!list_empty(&loi->loi_cli_item) && !loi_makes_rpc(cli, loi))
-                list_del_init(&loi->loi_cli_item);
+        loi_list_maint(cli, loi);
 
         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
 out: