#include <linux/lustre_debug.h>
#include <linux/smp_lock.h>
#include <linux/lprocfs_status.h>
+#include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
struct semaphore obd_conf_sem; /* serialize configuration commands */
struct obd_device obd_dev[MAX_OBD_DEVICES];
void (*class_signal_connection_failure)(struct ptlrpc_connection *);
#ifdef CONFIG_HIGHMEM
-#warning "using highmem accounting for deadlock avoidance"
-/* Allow at most 3/4 of the highmem mappings to be consumed by vector I/O
+#warning "using kmap accounting for deadlock avoidance"
+/* Allow at most 3/4 of the kmap mappings to be consumed by vector I/O
* requests. This avoids deadlocks on servers which have a lot of clients
* doing vector I/O. We don't need to do this for non-vector I/O requests
* because singleton requests will just block on the kmap itself and never
* deadlock waiting for additional kmaps to complete.
+ *
+ * If we are a "server" task, we can have at most a single reservation
+ * in excess of the maximum. This avoids a deadlock when multiple client
+ * threads are on the same machine as the server threads, and the clients
+ * have consumed all of the available mappings. As long as a single server
+ * thread is can make progress, we are guaranteed to avoid deadlock.
*/
-#define OBD_HIGHMEM_MAX (LAST_PKMAP * 3 / 4)
-static atomic_t obd_highmem_count = ATOMIC_INIT(OBD_HIGHMEM_MAX);
-static DECLARE_WAIT_QUEUE_HEAD(obd_highmem_waitq);
+#define OBD_KMAP_MAX (LAST_PKMAP * 3 / 4)
+static atomic_t obd_kmap_count = ATOMIC_INIT(OBD_KMAP_MAX);
+static DECLARE_WAIT_QUEUE_HEAD(obd_kmap_waitq);
-void obd_highmem_get(int count)
+void obd_kmap_get(int count, int server)
{
//CERROR("getting %d kmap counts (%d/%d)\n", count,
- // atomic_read(&obd_highmem_count), OBD_HIGHMEM_MAX);
+ // atomic_read(&obd_kmap_count), OBD_KMAP_MAX);
if (count == 1)
- atomic_dec(&obd_highmem_count);
- else while (atomic_add_negative(-count, &obd_highmem_count)) {
+ atomic_dec(&obd_kmap_count);
+ else while (atomic_add_negative(-count, &obd_kmap_count)) {
static long next_show = 0;
static int skipped = 0;
+ if (server && atomic_read(&obd_kmap_count) >= -PTL_MD_MAX_IOV)
+ break;
+
CDEBUG(D_OTHER, "negative kmap reserved count: %d\n",
- atomic_read(&obd_highmem_count));
- atomic_add(count, &obd_highmem_count);
+ atomic_read(&obd_kmap_count));
+ atomic_add(count, &obd_kmap_count);
if (time_after(jiffies, next_show)) {
CERROR("blocking %s (and %d others) for kmaps\n",
skipped = 0;
} else
skipped++;
- wait_event(obd_highmem_waitq,
- atomic_read(&obd_highmem_count) >= count);
+ wait_event(obd_kmap_waitq,
+ atomic_read(&obd_kmap_count) >= count);
}
}
-void obd_highmem_put(int count)
+void obd_kmap_put(int count)
{
- atomic_add(count, &obd_highmem_count);
+ atomic_add(count, &obd_kmap_count);
/* Wake up sleepers. Sadly, this wakes up all of the tasks at once.
- * We should have something smarter here like:
- while (atomic_read(&obd_highmem_count) > 0)
- wake_up_nr(obd_highmem_waitq, 1);
+ * We could have something smarter here like:
+ while (atomic_read(&obd_kmap_count) > 0)
+ wake_up_nr(obd_kmap_waitq, 1);
although we would need to set somewhere (probably obd_class_init):
- obd_highmem_waitq.flags |= WQ_EXCLUSIVE;
- for now the wait_event() condition will handle this OK I believe.
+ obd_kmap_waitq.flags |= WQ_FLAG_EXCLUSIVE;
+ For now the wait_event() condition will handle this OK I believe.
*/
- wake_up(&obd_highmem_waitq);
+ if (atomic_read(&obd_kmap_count) > 0)
+ wake_up(&obd_kmap_waitq);
}
-EXPORT_SYMBOL(obd_highmem_get);
-EXPORT_SYMBOL(obd_highmem_put);
+EXPORT_SYMBOL(obd_kmap_get);
+EXPORT_SYMBOL(obd_kmap_put);
#endif
EXPORT_SYMBOL(obd_dev);
* and Andreas Dilger <adilger@clusterfs.com>
*/
-static char rcsid[] __attribute ((unused)) = "$Id: echo.c,v 1.45 2002/11/08 18:25:53 adilger Exp $";
-#define OBDECHO_VERSION "$Revision: 1.45 $"
+static char rcsid[] __attribute ((unused)) = "$Id: echo.c,v 1.46 2002/11/08 19:02:41 adilger Exp $";
+#define OBDECHO_VERSION "$Revision: 1.46 $"
#define EXPORT_SYMTAB
*desc_private = (void *)DESC_PRIV;
- obd_highmem_get(niocount);
+ obd_kmap_get(niocount, 1);
for (i = 0; i < objcount; i++, obj++) {
int gfp_mask = (obj->ioo_id & 1) ? GFP_HIGHUSER : GFP_KERNEL;
__free_pages(r->page, 0);
atomic_dec(&obd->u.echo.eo_prep);
}
- obd_highmem_put(niocount);
+ obd_kmap_put(niocount);
memset(res, 0, sizeof(*res) * niocount);
return rc;
r->offset, obj->ioo_id);
kunmap(page);
- obd_highmem_put(1);
+ obd_kmap_put(1);
__free_pages(page, 0);
atomic_dec(&obd->u.echo.eo_prep);
}
struct page *page = r->page;
kunmap(page);
- obd_highmem_put(1);
+ obd_kmap_put(1);
__free_pages(page, 0);
atomic_dec(&obd->u.echo.eo_prep);
}
bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
kunmap(bulk->bp_page);
- obd_highmem_put(1);
+ obd_kmap_put(1);
}
ptlrpc_bulk_decref(desc);
xid = ++connection->c_xid_out; /* single xid for all pages */
spin_unlock(&connection->c_lock);
- obd_highmem_get(page_count);
+ obd_kmap_get(page_count, 0);
for (mapped = 0; mapped < page_count; mapped++) {
struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
out_unmap:
while (mapped-- > 0)
kunmap(pga[mapped].pg);
- obd_highmem_put(page_count);
+ obd_kmap_put(page_count);
OBD_FREE(cb_data, sizeof(*cb_data));
out_desc:
ptlrpc_bulk_decref(desc);
cb_data->obd_data = local;
cb_data->obd_size = page_count * sizeof(*local);
- obd_highmem_get(page_count);
+ obd_kmap_get(page_count, 0);
for (mapped = 0; mapped < page_count; mapped++) {
local[mapped].addr = kmap(pga[mapped].pg);
while (mapped-- > 0)
kunmap(pga[mapped].pg);
- obd_highmem_put(page_count);
+ obd_kmap_put(page_count);
OBD_FREE(local, page_count * sizeof(*local));
out_cb: