Whamcloud - gitweb
Fix for many-clients-and-server-on-same-node deadlock. One server task
authoradilger <adilger>
Fri, 8 Nov 2002 19:02:41 +0000 (19:02 +0000)
committeradilger <adilger>
Fri, 8 Nov 2002 19:02:41 +0000 (19:02 +0000)
can go "in the hole" for at most one maximum-sized request, to avoid
deadlocking when the clients have consumed all of the kmapping reservations.

The server can then complete the request and both the server and client will
release their reservations, allowing further progress to be made.

lustre/include/linux/obd_support.h
lustre/obdclass/class_obd.c
lustre/obdecho/echo.c
lustre/obdfilter/filter.c
lustre/osc/osc_request.c

index 5d927d6..a6eb4c2 100644 (file)
@@ -177,10 +177,10 @@ do {                                                                    \
 } while (0)
 
 #ifdef CONFIG_HIGHMEM
-extern void obd_highmem_get(int count);
-extern void obd_highmem_put(int count);
+extern void obd_kmap_get(int count, int server);
+extern void obd_kmap_put(int count);
 #else
-#define obd_highmem_get(count) do {} while (0)
-#define obd_highmem_put(count) do {} while (0)
+#define obd_kmap_get(count, server) do {} while (0)
+#define obd_kmap_put(count) do {} while (0)
 #endif
 #endif
index ae29ee0..d8b0849 100644 (file)
@@ -45,6 +45,7 @@
 #include <linux/lustre_debug.h>
 #include <linux/smp_lock.h>
 #include <linux/lprocfs_status.h>
+#include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
 
 struct semaphore obd_conf_sem;   /* serialize configuration commands */
 struct obd_device obd_dev[MAX_OBD_DEVICES];
@@ -665,30 +666,39 @@ static struct miscdevice obd_psdev = {
 void (*class_signal_connection_failure)(struct ptlrpc_connection *);
 
 #ifdef CONFIG_HIGHMEM
-#warning "using highmem accounting for deadlock avoidance"
-/* Allow at most 3/4 of the highmem mappings to be consumed by vector I/O
+#warning "using kmap accounting for deadlock avoidance"
+/* Allow at most 3/4 of the kmap mappings to be consumed by vector I/O
  * requests.  This avoids deadlocks on servers which have a lot of clients
  * doing vector I/O.  We don't need to do this for non-vector I/O requests
  * because singleton requests will just block on the kmap itself and never
  * deadlock waiting for additional kmaps to complete.
+ *
+ * If we are a "server" task, we can have at most a single reservation
+ * in excess of the maximum.  This avoids a deadlock when multiple client
+ * threads are on the same machine as the server threads, and the clients
+ * have consumed all of the available mappings.  As long as a single server
+ * thread is can make progress, we are guaranteed to avoid deadlock.
  */
-#define OBD_HIGHMEM_MAX (LAST_PKMAP * 3 / 4)
-static atomic_t obd_highmem_count = ATOMIC_INIT(OBD_HIGHMEM_MAX);
-static DECLARE_WAIT_QUEUE_HEAD(obd_highmem_waitq);
+#define OBD_KMAP_MAX (LAST_PKMAP * 3 / 4)
+static atomic_t obd_kmap_count = ATOMIC_INIT(OBD_KMAP_MAX);
+static DECLARE_WAIT_QUEUE_HEAD(obd_kmap_waitq);
 
-void obd_highmem_get(int count)
+void obd_kmap_get(int count, int server)
 {
         //CERROR("getting %d kmap counts (%d/%d)\n", count,
-        //       atomic_read(&obd_highmem_count), OBD_HIGHMEM_MAX);
+        //       atomic_read(&obd_kmap_count), OBD_KMAP_MAX);
         if (count == 1)
-                atomic_dec(&obd_highmem_count);
-        else while (atomic_add_negative(-count, &obd_highmem_count)) {
+                atomic_dec(&obd_kmap_count);
+        else while (atomic_add_negative(-count, &obd_kmap_count)) {
                 static long next_show = 0;
                 static int skipped = 0;
 
+                if (server && atomic_read(&obd_kmap_count) >= -PTL_MD_MAX_IOV)
+                        break;
+
                 CDEBUG(D_OTHER, "negative kmap reserved count: %d\n",
-                       atomic_read(&obd_highmem_count));
-                atomic_add(count, &obd_highmem_count);
+                       atomic_read(&obd_kmap_count));
+                atomic_add(count, &obd_kmap_count);
 
                 if (time_after(jiffies, next_show)) {
                         CERROR("blocking %s (and %d others) for kmaps\n",
@@ -697,27 +707,28 @@ void obd_highmem_get(int count)
                         skipped = 0;
                 } else
                         skipped++;
-                wait_event(obd_highmem_waitq,
-                           atomic_read(&obd_highmem_count) >= count);
+                wait_event(obd_kmap_waitq,
+                           atomic_read(&obd_kmap_count) >= count);
         }
 }
 
-void obd_highmem_put(int count)
+void obd_kmap_put(int count)
 {
-        atomic_add(count, &obd_highmem_count);
+        atomic_add(count, &obd_kmap_count);
         /* Wake up sleepers.  Sadly, this wakes up all of the tasks at once.
-         * We should have something smarter here like:
-        while (atomic_read(&obd_highmem_count) > 0)
-                wake_up_nr(obd_highmem_waitq, 1);
+         * We could have something smarter here like:
+        while (atomic_read(&obd_kmap_count) > 0)
+                wake_up_nr(obd_kmap_waitq, 1);
         although we would need to set somewhere (probably obd_class_init):
-        obd_highmem_waitq.flags |= WQ_EXCLUSIVE;
-        for now the wait_event() condition will handle this OK I believe.
+        obd_kmap_waitq.flags |= WQ_FLAG_EXCLUSIVE;
+        For now the wait_event() condition will handle this OK I believe.
          */
-        wake_up(&obd_highmem_waitq);
+        if (atomic_read(&obd_kmap_count) > 0)
+                wake_up(&obd_kmap_waitq);
 }
 
-EXPORT_SYMBOL(obd_highmem_get);
-EXPORT_SYMBOL(obd_highmem_put);
+EXPORT_SYMBOL(obd_kmap_get);
+EXPORT_SYMBOL(obd_kmap_put);
 #endif
 
 EXPORT_SYMBOL(obd_dev);
index e0e74dd..8c16744 100644 (file)
@@ -12,8 +12,8 @@
  * and Andreas Dilger <adilger@clusterfs.com>
  */
 
-static char rcsid[] __attribute ((unused)) = "$Id: echo.c,v 1.45 2002/11/08 18:25:53 adilger Exp $";
-#define OBDECHO_VERSION "$Revision: 1.45 $"
+static char rcsid[] __attribute ((unused)) = "$Id: echo.c,v 1.46 2002/11/08 19:02:41 adilger Exp $";
+#define OBDECHO_VERSION "$Revision: 1.46 $"
 
 #define EXPORT_SYMTAB
 
@@ -272,7 +272,7 @@ int echo_preprw(int cmd, struct lustre_handle *conn, int objcount,
 
         *desc_private = (void *)DESC_PRIV;
 
-        obd_highmem_get(niocount);
+        obd_kmap_get(niocount, 1);
 
         for (i = 0; i < objcount; i++, obj++) {
                 int gfp_mask = (obj->ioo_id & 1) ? GFP_HIGHUSER : GFP_KERNEL;
@@ -321,7 +321,7 @@ preprw_cleanup:
                 __free_pages(r->page, 0);
                 atomic_dec(&obd->u.echo.eo_prep);
         }
-        obd_highmem_put(niocount);
+        obd_kmap_put(niocount);
         memset(res, 0, sizeof(*res) * niocount);
 
         return rc;
@@ -384,7 +384,7 @@ int echo_commitrw(int cmd, struct lustre_handle *conn, int objcount,
                                                  r->offset, obj->ioo_id);
 
                         kunmap(page);
-                        obd_highmem_put(1);
+                        obd_kmap_put(1);
                         __free_pages(page, 0);
                         atomic_dec(&obd->u.echo.eo_prep);
                 }
@@ -400,7 +400,7 @@ commitrw_cleanup:
                 struct page *page = r->page;
 
                 kunmap(page);
-                obd_highmem_put(1);
+                obd_kmap_put(1);
                 __free_pages(page, 0);
                 atomic_dec(&obd->u.echo.eo_prep);
         }
index a628b89..53d9efb 100644 (file)
@@ -1440,7 +1440,7 @@ static int filter_preprw(int cmd, struct lustre_handle *conn,
                         GOTO(out_ctxt, rc = PTR_ERR(*desc_private));
         }
 
-        obd_highmem_get(niocount);
+        obd_kmap_get(niocount, 1);
 
         for (i = 0; i < objcount; i++, o++) {
                 struct dentry *dentry;
@@ -1504,7 +1504,7 @@ out_clean:
                 else
                         lustre_put_page(lnb->page);
         }
-        obd_highmem_put(niocount);
+        obd_kmap_put(niocount);
         goto out_stop;
 }
 
@@ -1591,7 +1591,7 @@ static int filter_commitrw(int cmd, struct lustre_handle *conn,
                         } else
                                 lustre_put_page(page);
 
-                        obd_highmem_put(1);
+                        obd_kmap_put(1);
                         f_dput(r->dentry);
                 }
         }
@@ -1610,7 +1610,7 @@ static int filter_commitrw(int cmd, struct lustre_handle *conn,
                                 continue;
 
                         err = filter_write_locked_page(r);
-                        obd_highmem_put(1);
+                        obd_kmap_put(1);
                         if (!rc)
                                 rc = err;
                         f_dput(r->dentry);
index b79e612..508e328 100644 (file)
@@ -326,7 +326,7 @@ static void unmap_and_decref_bulk_desc(void *data)
                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
 
                 kunmap(bulk->bp_page);
-                obd_highmem_put(1);
+                obd_kmap_put(1);
         }
 
         ptlrpc_bulk_decref(desc);
@@ -413,7 +413,7 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         xid = ++connection->c_xid_out;       /* single xid for all pages */
         spin_unlock(&connection->c_lock);
 
-        obd_highmem_get(page_count);
+        obd_kmap_get(page_count, 0);
 
         for (mapped = 0; mapped < page_count; mapped++) {
                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
@@ -477,7 +477,7 @@ out_req:
 out_unmap:
         while (mapped-- > 0)
                 kunmap(pga[mapped].pg);
-        obd_highmem_put(page_count);
+        obd_kmap_put(page_count);
         OBD_FREE(cb_data, sizeof(*cb_data));
 out_desc:
         ptlrpc_bulk_decref(desc);
@@ -538,7 +538,7 @@ static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
         cb_data->obd_data = local;
         cb_data->obd_size = page_count * sizeof(*local);
 
-        obd_highmem_get(page_count);
+        obd_kmap_get(page_count, 0);
 
         for (mapped = 0; mapped < page_count; mapped++) {
                 local[mapped].addr = kmap(pga[mapped].pg);
@@ -612,7 +612,7 @@ out_unmap:
         while (mapped-- > 0)
                 kunmap(pga[mapped].pg);
 
-        obd_highmem_put(page_count);
+        obd_kmap_put(page_count);
 
         OBD_FREE(local, page_count * sizeof(*local));
 out_cb: