Whamcloud - gitweb
LU-56 libcfs: more common APIs in libcfs
authorLiang Zhen <liang@whamcloud.com>
Mon, 16 Apr 2012 17:38:17 +0000 (01:38 +0800)
committerOleg Drokin <green@whamcloud.com>
Wed, 30 May 2012 20:03:21 +0000 (16:03 -0400)
Implementation of some common APIs:
- per-cpu-partition (percpt) data allocators
- implementation of per-cpu-partition lock
- a few other functions

Signed-off-by: Liang Zhen <liang@whamcloud.com>
Change-Id: Ib303b79ee9be87cc306da7e2feb20c8c296b8ac6
Reviewed-on: http://review.whamcloud.com/2558
Tested-by: Hudson
Reviewed-by: Doug Oucharek <doug@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
libcfs/include/libcfs/libcfs_private.h
libcfs/include/libcfs/user-prim.h
libcfs/libcfs/Makefile.in
libcfs/libcfs/autoMakefile.am
libcfs/libcfs/libcfs_lock.c [new file with mode: 0644]
libcfs/libcfs/libcfs_mem.c [new file with mode: 0644]

index 7946ebc..233de97 100644 (file)
@@ -328,6 +328,32 @@ int libcfs_debug_cleanup(void);
 /* !__KERNEL__ */
 #endif
 
+/*
+ * allocate per-cpu-partition data, returned value is an array of pointers,
+ * variable can be indexed by CPU ID.
+ *     cptable != NULL: size of array is number of CPU partitions
+ *     cptable == NULL: size of array is number of HW cores
+ */
+void *cfs_percpt_alloc(struct cfs_cpt_table *cptab, unsigned int size);
+/*
+ * destory per-cpu-partition variable
+ */
+void  cfs_percpt_free(void *vars);
+int   cfs_percpt_number(void *vars);
+void *cfs_percpt_current(void *vars);
+void *cfs_percpt_index(void *vars, int idx);
+
+#define cfs_percpt_for_each(var, i, vars)              \
+       for (i = 0; i < cfs_percpt_number(vars) &&      \
+                   ((var) = (vars)[i]) != NULL; i++)
+
+/*
+ * allocate a variable array, returned value is an array of pointers.
+ * Caller can specify length of array by count.
+ */
+void *cfs_array_alloc(int count, unsigned int size);
+void  cfs_array_free(void *vars);
+
 #define LASSERT_ATOMIC_ENABLED          (1)
 
 #if LASSERT_ATOMIC_ENABLED
@@ -423,6 +449,83 @@ do {                                                            \
 #define CFS_ALLOC_PTR(ptr)      LIBCFS_ALLOC(ptr, sizeof (*(ptr)));
 #define CFS_FREE_PTR(ptr)       LIBCFS_FREE(ptr, sizeof (*(ptr)));
 
+/*
+ * percpu partition lock
+ *
+ * There are some use-cases like this in Lustre:
+ * . each CPU partition has it's own private data which is frequently changed,
+ *   and mostly by the local CPU partition.
+ * . all CPU partitions share some global data, these data are rarely changed.
+ *
+ * LNet is typical example.
+ * CPU partition lock is designed for this kind of use-cases:
+ * . each CPU partition has it's own private lock
+ * . change on private data just needs to take the private lock
+ * . read on shared data just needs to take _any_ of private locks
+ * . change on shared data needs to take _all_ private locks,
+ *   which is slow and should be really rare.
+ */
+
+enum {
+       CFS_PERCPT_LOCK_EX      = -1, /* negative */
+};
+
+#ifdef __KERNEL__
+
+struct cfs_percpt_lock {
+       /* cpu-partition-table for this lock */
+       struct cfs_cpt_table    *pcl_cptab;
+       /* exclusively locked */
+       unsigned int            pcl_locked;
+       /* private lock table */
+       cfs_spinlock_t          **pcl_locks;
+};
+
+/* return number of private locks */
+static inline int
+cfs_percpt_lock_num(struct cfs_percpt_lock *pcl)
+{
+       return cfs_cpt_number(pcl->pcl_cptab);
+}
+
+#else /* !__KERNEL__ */
+
+# ifdef HAVE_LIBPTHREAD
+
+struct cfs_percpt_lock {
+       pthread_mutex_t         pcl_mutex;
+};
+
+# else /* !HAVE_LIBPTHREAD */
+#define CFS_PERCPT_LOCK_MAGIC          0xbabecafe;
+
+struct cfs_percpt_lock {
+       int                     pcl_lock;
+};
+# endif /* HAVE_LIBPTHREAD */
+# define cfs_percpt_lock_num(pcl)        1
+#endif /* __KERNEL__ */
+
+/*
+ * create a cpu-partition lock based on CPU partition table \a cptab,
+ * each private lock has extra \a psize bytes padding data
+ */
+struct cfs_percpt_lock *cfs_percpt_lock_alloc(struct cfs_cpt_table *cptab);
+/* destroy a cpu-partition lock */
+void cfs_percpt_lock_free(struct cfs_percpt_lock *pcl);
+
+/* lock private lock \a index of \a pcl */
+void cfs_percpt_lock(struct cfs_percpt_lock *pcl, int index);
+/* unlock private lock \a index of \a pcl */
+void cfs_percpt_unlock(struct cfs_percpt_lock *pcl, int index);
+/* create percpt (atomic) refcount based on @cptab */
+cfs_atomic_t **cfs_percpt_atomic_alloc(struct cfs_cpt_table *cptab, int val);
+/* destroy percpt refcount */
+void cfs_percpt_atomic_free(cfs_atomic_t **refs);
+/* return sum of all percpu refs */
+int cfs_percpt_atomic_summary(cfs_atomic_t **refs);
+
+
 /** Compile-time assertion.
 
  * Check an invariant described by a constant expression at compile time by
@@ -538,6 +641,19 @@ static inline size_t cfs_round_strlen(char *fset)
         return (size_t)cfs_size_round((int)strlen(fset) + 1);
 }
 
+/* roundup \a val to power2 */
+static inline unsigned int cfs_power2_roundup(unsigned int val)
+{
+       if (val != LOWEST_BIT_SET(val)) { /* not a power of 2 already */
+               do {
+                       val &= ~LOWEST_BIT_SET(val);
+               } while (val != LOWEST_BIT_SET(val));
+               /* ...and round up */
+               val <<= 1;
+       }
+       return val;
+}
+
 #define LOGL(var,len,ptr)                                       \
 do {                                                            \
         if (var)                                                \
index 655e209..b65148a 100644 (file)
@@ -143,6 +143,8 @@ static inline int cfs_psdev_deregister(cfs_psdev_t *foo)
 
 #define CFS_DAEMON_FLAGS                0
 
+#define CFS_L1_CACHE_ALIGN(x)          (x)
+
 #ifdef HAVE_LIBPTHREAD
 typedef int (*cfs_thread_t)(void *);
 int cfs_create_thread(cfs_thread_t func, void *arg, unsigned long flags);
index b8273b8..026dcab 100644 (file)
@@ -24,7 +24,8 @@ endif
 
 libcfs-all-objs := debug.o fail.o nidstrings.o lwt.o module.o tracefile.o \
                   watchdog.o libcfs_string.o hash.o kernel_user_comm.o \
-                  prng.o workitem.o upcall_cache.o libcfs_cpu.o
+                  prng.o workitem.o upcall_cache.o libcfs_cpu.o \
+                  libcfs_mem.o libcfs_lock.o
 
 libcfs-objs := $(libcfs-linux-objs) $(libcfs-all-objs)
 
index 2b41949..7ce8133 100644 (file)
@@ -43,8 +43,8 @@ DIST_SUBDIRS := linux util posix darwin
 if LIBLUSTRE
 noinst_LIBRARIES= libcfs.a
 libcfs_a_SOURCES= posix/posix-debug.c user-prim.c user-lock.c user-tcpip.c \
-                  prng.c user-bitops.c user-mem.c hash.c kernel_user_comm.c \
-                 workitem.c fail.c libcfs_cpu.c
+                 prng.c user-bitops.c user-mem.c hash.c kernel_user_comm.c \
+                 workitem.c fail.c libcfs_cpu.c libcfs_mem.c libcfs_lock.c
 libcfs_a_CPPFLAGS = $(LLCPPFLAGS)
 libcfs_a_CFLAGS = $(LLCFLAGS)
 endif
@@ -90,4 +90,4 @@ MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ linux-*.c linux/*.o darwin/*.o libcfs
 EXTRA_DIST := $(libcfs-all-objs:%.o=%.c) Info.plist tracefile.h prng.c \
              user-lock.c user-tcpip.c user-bitops.c user-prim.c workitem.c \
              user-mem.c kernel_user_comm.c fail.c libcfs_cpu.c \
-             linux/linux-tracefile.h
+             libcfs_mem.c libcfs_lock.c linux/linux-tracefile.h
diff --git a/libcfs/libcfs/libcfs_lock.c b/libcfs/libcfs/libcfs_lock.c
new file mode 100644 (file)
index 0000000..b9e2f25
--- /dev/null
@@ -0,0 +1,255 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/* Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * Author: liang@whamcloud.com
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_LNET
+
+#include <libcfs/libcfs.h>
+
+#ifdef __KERNEL__
+
+/** destroy cpu-partition lock, see libcfs_private.h for more detail */
+void
+cfs_percpt_lock_free(struct cfs_percpt_lock *pcl)
+{
+       LASSERT(pcl->pcl_locks != NULL);
+       LASSERT(!pcl->pcl_locked);
+
+       cfs_percpt_free(pcl->pcl_locks);
+       LIBCFS_FREE(pcl, sizeof(*pcl));
+}
+CFS_EXPORT_SYMBOL(cfs_percpt_lock_free);
+
+/**
+ * create cpu-partition lock, see libcfs_private.h for more detail.
+ *
+ * cpu-partition lock is designed for large-scale SMP system, so we need to
+ * reduce cacheline conflict as possible as we can, that's the
+ * reason we always allocate cacheline-aligned memory block.
+ */
+struct cfs_percpt_lock *
+cfs_percpt_lock_alloc(struct cfs_cpt_table *cptab)
+{
+       struct cfs_percpt_lock  *pcl;
+       cfs_spinlock_t          *lock;
+       int                     i;
+
+       /* NB: cptab can be NULL, pcl will be for HW CPUs on that case */
+       LIBCFS_ALLOC(pcl, sizeof(*pcl));
+       if (pcl == NULL)
+               return NULL;
+
+       pcl->pcl_cptab = cptab;
+       pcl->pcl_locks = cfs_percpt_alloc(cptab, sizeof(*lock));
+       if (pcl->pcl_locks == NULL) {
+               LIBCFS_FREE(pcl, sizeof(*pcl));
+               return NULL;
+       }
+
+       cfs_percpt_for_each(lock, i, pcl->pcl_locks)
+               cfs_spin_lock_init(lock);
+
+       return pcl;
+}
+CFS_EXPORT_SYMBOL(cfs_percpt_lock_alloc);
+
+/**
+ * lock a CPU partition
+ *
+ * \a index != CFS_PERCPT_LOCK_EX
+ *     hold private lock indexed by \a index
+ *
+ * \a index == CFS_PERCPT_LOCK_EX
+ *     exclusively lock @pcl and nobody can take private lock
+ */
+void
+cfs_percpt_lock(struct cfs_percpt_lock *pcl, int index)
+{
+       int     ncpt = cfs_cpt_number(pcl->pcl_cptab);
+       int     i;
+
+       LASSERT(index >= CFS_PERCPT_LOCK_EX && index < ncpt);
+
+       if (ncpt == 1) {
+               index = 0;
+       } else { /* serialize with exclusive lock */
+               while (pcl->pcl_locked)
+                       cpu_relax();
+       }
+
+       if (likely(index != CFS_PERCPT_LOCK_EX)) {
+               cfs_spin_lock(pcl->pcl_locks[index]);
+               return;
+       }
+
+       /* exclusive lock request */
+       for (i = 0; i < ncpt; i++) {
+               cfs_spin_lock(pcl->pcl_locks[i]);
+               if (i == 0) {
+                       LASSERT(!pcl->pcl_locked);
+                       /* nobody should take private lock after this
+                        * so I wouldn't starve for too long time */
+                       pcl->pcl_locked = 1;
+               }
+       }
+}
+CFS_EXPORT_SYMBOL(cfs_percpt_lock);
+
+/** unlock a CPU partition */
+void
+cfs_percpt_unlock(struct cfs_percpt_lock *pcl, int index)
+{
+       int     ncpt = cfs_cpt_number(pcl->pcl_cptab);
+       int     i;
+
+       index = ncpt == 1 ? 0 : index;
+
+       if (likely(index != CFS_PERCPT_LOCK_EX)) {
+               cfs_spin_unlock(pcl->pcl_locks[index]);
+               return;
+       }
+
+       for (i = ncpt - 1; i >= 0; i--) {
+               if (i == 0) {
+                       LASSERT(pcl->pcl_locked);
+                       pcl->pcl_locked = 0;
+               }
+               cfs_spin_unlock(pcl->pcl_locks[i]);
+       }
+}
+CFS_EXPORT_SYMBOL(cfs_percpt_unlock);
+
+#else /* !__KERNEL__ */
+# ifdef HAVE_LIBPTHREAD
+
+struct cfs_percpt_lock *
+cfs_percpt_lock_alloc(struct cfs_cpt_table *cptab)
+{
+       struct cfs_percpt_lock *pcl;
+
+       CFS_ALLOC_PTR(pcl);
+       if (pcl != NULL)
+               pthread_mutex_init(&pcl->pcl_mutex, NULL);
+
+       return pcl;
+}
+
+void
+cfs_percpt_lock_free(struct cfs_percpt_lock *pcl)
+{
+       pthread_mutex_destroy(&pcl->pcl_mutex);
+       CFS_FREE_PTR(pcl);
+}
+
+void
+cfs_percpt_lock(struct cfs_percpt_lock *pcl, int lock)
+{
+       pthread_mutex_lock(&(pcl)->pcl_mutex);
+}
+
+void
+cfs_percpt_unlock(struct cfs_percpt_lock *pcl, int lock)
+{
+       pthread_mutex_unlock(&(pcl)->pcl_mutex);
+}
+
+# else /* !HAVE_LIBPTHREAD */
+
+struct cfs_percpt_lock *
+cfs_percpt_lock_alloc(struct cfs_cpt_table *cptab)
+{
+       return (struct cfs_percpt_lock *)CFS_PERCPT_LOCK_MAGIC;
+}
+
+void
+cfs_percpt_lock_free(struct cfs_percpt_lock *pcl)
+{
+       LASSERT(pcl == (struct cfs_percpt_lock *)CFS_PERCPT_LOCK_MAGIC);
+}
+
+void
+cfs_percpt_lock(struct cfs_percpt_lock *pcl, int index)
+{
+       LASSERT(pcl == (struct cfs_percpt_lock *)CFS_PERCPT_LOCK_MAGIC);
+}
+
+void
+cfs_percpt_unlock(struct cfs_percpt_lock *pcl, int index)
+{
+       LASSERT(pcl == (struct cfs_percpt_lock *)CFS_PERCPT_LOCK_MAGIC);
+}
+
+# endif /* HAVE_LIBPTHREAD */
+#endif /* __KERNEL__ */
+
+/** free cpu-partition refcount */
+void
+cfs_percpt_atomic_free(cfs_atomic_t **refs)
+{
+       cfs_percpt_free(refs);
+}
+CFS_EXPORT_SYMBOL(cfs_percpt_atomic_free);
+
+/** allocate cpu-partition refcount with initial value @init_val */
+cfs_atomic_t **
+cfs_percpt_atomic_alloc(struct cfs_cpt_table *cptab, int init_val)
+{
+       cfs_atomic_t    **refs;
+       cfs_atomic_t    *ref;
+       int             i;
+
+       refs = cfs_percpt_alloc(cptab, sizeof(*ref));
+       if (refs == NULL)
+               return NULL;
+
+       cfs_percpt_for_each(ref, i, refs)
+               cfs_atomic_set(ref, init_val);
+       return refs;
+}
+CFS_EXPORT_SYMBOL(cfs_percpt_atomic_alloc);
+
+/** return sum of cpu-partition refs */
+int
+cfs_percpt_atomic_summary(cfs_atomic_t **refs)
+{
+       cfs_atomic_t    *ref;
+       int             i;
+       int             val = 0;
+
+       cfs_percpt_for_each(ref, i, refs)
+               val += cfs_atomic_read(ref);
+
+       return val;
+}
+CFS_EXPORT_SYMBOL(cfs_percpt_atomic_summary);
diff --git a/libcfs/libcfs/libcfs_mem.c b/libcfs/libcfs/libcfs_mem.c
new file mode 100644 (file)
index 0000000..95df0d7
--- /dev/null
@@ -0,0 +1,205 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * Author: liang@whamcloud.com
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_LNET
+
+#include <libcfs/libcfs.h>
+
+struct cfs_var_array {
+       unsigned int            va_count;       /* # of buffers */
+       unsigned int            va_size;        /* size of each var */
+       struct cfs_cpt_table    *va_cptab;      /* cpu partition table */
+       void                    *va_ptrs[0];    /* buffer addresses */
+};
+
+/*
+ * free per-cpu data, see more detail in cfs_percpt_free
+ */
+void
+cfs_percpt_free(void *vars)
+{
+       struct  cfs_var_array *arr;
+       int     i;
+
+       arr = container_of(vars, struct cfs_var_array, va_ptrs[0]);
+
+       for (i = 0; i < arr->va_count; i++) {
+               if (arr->va_ptrs[i] != NULL)
+                       LIBCFS_FREE(arr->va_ptrs[i], arr->va_size);
+       }
+
+       LIBCFS_FREE(arr, offsetof(struct cfs_var_array,
+                                 va_ptrs[arr->va_count]));
+}
+EXPORT_SYMBOL(cfs_percpt_free);
+
+/*
+ * allocate per cpu-partition variables, returned value is an array of pointers,
+ * variable can be indexed by CPU partition ID, i.e:
+ *
+ *     arr = cfs_percpt_alloc(cfs_cpu_pt, size);
+ *     then caller can access memory block for CPU 0 by arr[0],
+ *     memory block for CPU 1 by arr[1]...
+ *     memory block for CPU N by arr[N]...
+ *
+ * cacheline aligned.
+ */
+void *
+cfs_percpt_alloc(struct cfs_cpt_table *cptab, unsigned int size)
+{
+       struct cfs_var_array    *arr;
+       int                     count;
+       int                     i;
+
+       count = cfs_cpt_number(cptab);
+
+       LIBCFS_ALLOC(arr, offsetof(struct cfs_var_array, va_ptrs[count]));
+       if (arr == NULL)
+               return NULL;
+
+       arr->va_size    = size = CFS_L1_CACHE_ALIGN(size);
+       arr->va_count   = count;
+       arr->va_cptab   = cptab;
+
+       for (i = 0; i < count; i++) {
+               LIBCFS_CPT_ALLOC(arr->va_ptrs[i], cptab, i, size);
+               if (arr->va_ptrs[i] == NULL) {
+                       cfs_percpt_free((void *)&arr->va_ptrs[0]);
+                       return NULL;
+               }
+       }
+
+       return (void *)&arr->va_ptrs[0];
+}
+EXPORT_SYMBOL(cfs_percpt_alloc);
+
+/*
+ * return number of CPUs (or number of elements in per-cpu data)
+ * according to cptab of @vars
+ */
+int
+cfs_percpt_number(void *vars)
+{
+       struct cfs_var_array *arr;
+
+       arr = container_of(vars, struct cfs_var_array, va_ptrs[0]);
+
+       return arr->va_count;
+}
+EXPORT_SYMBOL(cfs_percpt_number);
+
+/*
+ * return memory block shadowed from current CPU
+ */
+void *
+cfs_percpt_current(void *vars)
+{
+       struct cfs_var_array *arr;
+       int    cpt;
+
+       arr = container_of(vars, struct cfs_var_array, va_ptrs[0]);
+       cpt = cfs_cpt_current(arr->va_cptab, 0);
+       if (cpt < 0)
+               return NULL;
+
+       return arr->va_ptrs[cpt];
+}
+EXPORT_SYMBOL(cfs_percpt_current);
+
+void *
+cfs_percpt_index(void *vars, int idx)
+{
+       struct cfs_var_array *arr;
+
+       arr = container_of(vars, struct cfs_var_array, va_ptrs[0]);
+
+       LASSERT(idx >= 0 && idx < arr->va_count);
+       return arr->va_ptrs[idx];
+}
+EXPORT_SYMBOL(cfs_percpt_index);
+
+/*
+ * free variable array, see more detail in cfs_array_alloc
+ */
+void
+cfs_array_free(void *vars)
+{
+       struct cfs_var_array    *arr;
+       int                     i;
+
+       arr = container_of(vars, struct cfs_var_array, va_ptrs[0]);
+
+       for (i = 0; i < arr->va_count; i++) {
+               if (arr->va_ptrs[i] == NULL)
+                       continue;
+
+               LIBCFS_FREE(arr->va_ptrs[i], arr->va_size);
+       }
+       LIBCFS_FREE(arr, offsetof(struct cfs_var_array,
+                                 va_ptrs[arr->va_count]));
+}
+EXPORT_SYMBOL(cfs_array_free);
+
+/*
+ * allocate a variable array, returned value is an array of pointers.
+ * Caller can specify length of array by @count, @size is size of each
+ * memory block in array.
+ */
+void *
+cfs_array_alloc(int count, unsigned int size)
+{
+       struct cfs_var_array    *arr;
+       int                     i;
+
+       LIBCFS_ALLOC(arr, offsetof(struct cfs_var_array, va_ptrs[count]));
+       if (arr == NULL)
+               return NULL;
+
+       arr->va_count   = count;
+       arr->va_size    = size;
+
+       for (i = 0; i < count; i++) {
+               LIBCFS_ALLOC(arr->va_ptrs[i], size);
+
+               if (arr->va_ptrs[i] == NULL) {
+                       cfs_array_free((void *)&arr->va_ptrs[0]);
+                       return NULL;
+               }
+       }
+
+       return (void *)&arr->va_ptrs[0];
+}
+EXPORT_SYMBOL(cfs_array_alloc);