--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2002 Cluster File Systems, Inc.
+ * Author: Eric Barton <eeb@clusterfs.com>
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+
+#include "obdiolib.h"
+
+int
+parse_kmg (uint64_t *valp, char *str)
+{
+ uint64_t val;
+ char mod[32];
+
+ switch (sscanf (str, LPU64"%1[gGmMkK]", &val, mod))
+ {
+ default:
+ return (-1);
+
+ case 1:
+ *valp = val;
+ return (0);
+
+ case 2:
+ switch (*mod)
+ {
+ case 'g':
+ case 'G':
+ *valp = val << 30;
+ return (0);
+
+ case 'm':
+ case 'M':
+ *valp = val << 20;
+ return (0);
+
+ case 'k':
+ case 'K':
+ *valp = val << 10;
+ return (0);
+
+ default:
+ *valp = val;
+ return (0);
+ }
+ }
+}
+
+void
+usage (char *cmdname, int help)
+{
+ char *name = strrchr (cmdname, '/');
+
+ if (name == NULL)
+ name = cmdname;
+
+ fprintf (help ? stdout : stderr,
+ "usage: %s -d device -s size -o offset [-i id][-n reps][-l] oid\n",
+ name);
+}
+
+int
+exponential_modulus (int i, int base)
+{
+ int top = base;
+ int mod = 1;
+
+ for (;;) {
+ if (i < top)
+ return (i%mod == 0);
+
+ mod = top;
+ top *= base;
+ }
+}
+
+int
+main (int argc, char **argv)
+{
+ uint64_t bid = (((uint64_t)gethostid()) << 32) | getpid ();
+ int set_bid = 0;
+ uint64_t oid;
+ int setup = 0;
+ int device = -1;
+ int npeers = 0;
+ int reps = 1;
+ char hostname[128];
+ struct obdio_conn *conn;
+ struct obdio_barrier *b;
+ char *end;
+ uint64_t val;
+ int rc;
+ int c;
+
+ setvbuf (stdout, NULL, _IOLBF, 0);
+ memset (hostname, 0, sizeof (hostname));
+ gethostname (hostname, sizeof (hostname));
+ hostname[sizeof(hostname) - 1] = 0;
+
+ while ((c = getopt (argc, argv, "hsi:d:n:p:")) != -1)
+ switch (c) {
+ case 'h':
+ usage (argv[0], 1);
+ return (0);
+
+ case 'i':
+ bid = strtoll (optarg, &end, 0);
+ if (end == optarg || *end != 0) {
+ fprintf (stderr, "Can't parse id %s\n",
+ optarg);
+ return (1);
+ }
+ set_bid = 1;
+ break;
+
+ case 's':
+ setup = 1;
+ break;
+
+ case 'd':
+ device = strtol (optarg, &end, 0);
+ if (end == optarg || *end != 0 || device < 0) {
+ fprintf (stderr, "Can't parse device %s\n",
+ optarg);
+ return (1);
+ }
+ break;
+
+ case 'n':
+ if (parse_kmg (&val, optarg) != 0) {
+ fprintf (stderr, "Can't parse reps %s\n",
+ optarg);
+ return (1);
+ }
+ reps = (int)val;
+ break;
+
+ case 'p':
+ npeers = strtol (optarg, &end, 0);
+ if (end == optarg || *end != 0 || npeers <= 0) {
+ fprintf (stderr, "Can't parse npeers %s\n",
+ optarg);
+ return (1);
+ }
+ break;
+
+ default:
+ usage (argv[0], 0);
+ return (1);
+ }
+
+ if ((!setup && !set_bid) ||
+ npeers <= 0 ||
+ device < 0 ||
+ optind == argc) {
+ fprintf (stderr, "%s not specified\n",
+ (!setup && !set_bid) ? "id" :
+ npeers <= 0 ? "npeers" :
+ device < 0 ? "device" : "object id");
+ return (1);
+ }
+
+ oid = strtoull (argv[optind], &end, 0);
+ if (end == argv[optind] || *end != 0) {
+ fprintf (stderr, "Can't parse object id %s\n",
+ argv[optind]);
+ return (1);
+ }
+
+ conn = obdio_connect (device);
+ if (conn == NULL)
+ return (1);
+
+ b = obdio_new_barrier (oid, bid, npeers);
+ if (b == NULL)
+ return (1);
+
+ rc = 0;
+ if (setup) {
+ rc = obdio_setup_barrier (conn, b);
+ if (rc == 0)
+ printf ("Setup barrier: -d %d -i "LPX64" -p %d -n1 "LPX64"\n",
+ device, bid, npeers, oid);
+ } else {
+ for (c = 0; c < reps; c++) {
+ rc = obdio_barrier (conn, b);
+ if (rc != 0)
+ break;
+ if (exponential_modulus (c, 10))
+ printf ("%s: Barrier %d\n", hostname, c);
+ }
+ }
+
+ free (b);
+
+ obdio_disconnect (conn);
+
+ return (rc == 0 ? 0 : 1);
+}
+
+
*
*/
-#include <stdlib.h>
-#include <sys/ioctl.h>
-#include <fcntl.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <sys/wait.h>
-#include <sys/stat.h>
#include <stdio.h>
-#include <stdarg.h>
-#include <signal.h>
-
-#include <linux/lustre_lib.h>
-#include <linux/lustre_idl.h>
-#include <linux/lustre_dlm.h>
-#include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
-#include <linux/obd.h> /* for struct lov_stripe_md */
-#include <linux/obd_class.h>
-#include <linux/lustre_build_version.h>
-
-#include <unistd.h>
-#include <sys/un.h>
-#include <time.h>
-#include <sys/time.h>
-#include <netinet/in.h>
+#include <stdlib.h>
#include <errno.h>
#include <string.h>
-#include <asm/page.h> /* needed for PAGE_SIZE - rread */
-
-#define __KERNEL__
-#include <linux/list.h>
-#undef __KERNEL__
-
-#include "obdctl.h"
-
-struct obdio_conn {
- int oc_fd;
- uint64_t oc_conn_addr;
- uint64_t oc_conn_cookie;
- struct obd_ioctl_data oc_data;
- char oc_buffer[8192];
-};
-
-char *
-obdio_alloc_aligned_buffer (char **spacep, int size)
-{
- int pagesize = getpagesize();
- char *space = malloc (size + pagesize - 1);
-
- *spacep = space;
- if (space == NULL)
- return (NULL);
-
- return ((char *)(((unsigned long)space + pagesize - 1) & ~(pagesize - 1)));
-}
-
-void
-obdio_iocinit (struct obdio_conn *conn)
-{
- memset (&conn->oc_data, 0, sizeof (conn->oc_data));
- conn->oc_data.ioc_version = OBD_IOCTL_VERSION;
- conn->oc_data.ioc_addr = conn->oc_conn_addr;
- conn->oc_data.ioc_cookie = conn->oc_conn_cookie;
- conn->oc_data.ioc_len = sizeof (conn->oc_data);
-}
-
-int
-obdio_ioctl (struct obdio_conn *conn, int cmd)
-{
- char *buf = conn->oc_buffer;
- int rc;
- int rc2;
-
- rc = obd_ioctl_pack (&conn->oc_data, &buf, sizeof (conn->oc_buffer));
- if (rc != 0) {
- fprintf (stderr, "obd_ioctl_pack: %d (%s)\n",
- rc, strerror (errno));
- abort ();
- }
-
- rc = ioctl (conn->oc_fd, cmd, buf);
- if (rc != 0)
- return (rc);
-
- rc2 = obd_ioctl_unpack (&conn->oc_data, buf, sizeof (conn->oc_buffer));
- if (rc2 != 0) {
- fprintf (stderr, "obd_ioctl_unpack: %d (%s)\n",
- rc2, strerror (errno));
- abort ();
- }
-
- return (rc);
-}
-
-struct obdio_conn *
-obdio_connect (int device)
-{
- struct obdio_conn *conn;
- int rc;
-
- conn = malloc (sizeof (*conn));
- if (conn == NULL) {
- fprintf (stderr, "obdio_connect: no memory\n");
- return (NULL);
- }
- memset (conn, 0, sizeof (*conn));
-
- conn->oc_fd = open ("/dev/obd", O_RDWR);
- if (conn->oc_fd < 0) {
- fprintf (stderr, "Can't open /dev/obd: %s\n",
- strerror (errno));
- goto failed;
- }
-
- obdio_iocinit (conn);
- conn->oc_data.ioc_dev = device;
- rc = obdio_ioctl (conn, OBD_IOC_DEVICE);
- if (rc != 0) {
- fprintf (stderr, "Can't set device %d: %s\n",
- device, strerror (errno));
- goto failed;
- }
-
- obdio_iocinit (conn);
- rc = obdio_ioctl (conn, OBD_IOC_CONNECT);
- if (rc != 0) {
- fprintf (stderr, "Can't connect to device %d: %s\n",
- device, strerror (errno));
- goto failed;
- }
-
- conn->oc_conn_addr = conn->oc_data.ioc_addr;
- conn->oc_conn_cookie = conn->oc_data.ioc_cookie;
- return (conn);
-
- failed:
- free (conn);
- return (NULL);
-}
-
-void
-obdio_disconnect (struct obdio_conn *conn)
-{
- close (conn->oc_fd);
- /* obdclass will automatically close on last ref */
- free (conn);
-}
-
-int
-obdio_open (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh)
-{
- int rc;
-
- obdio_iocinit (conn);
-
- conn->oc_data.ioc_obdo1.o_id = oid;
- conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
- conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
-
- rc = obdio_ioctl (conn, OBD_IOC_OPEN);
-
- if (rc == 0)
- memcpy (fh, obdo_handle(&conn->oc_data.ioc_obdo1), sizeof (*fh));
-
- return (rc);
-}
-
-int
-obdio_close (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh)
-{
- obdio_iocinit (conn);
-
-
- conn->oc_data.ioc_obdo1.o_id = oid;
- conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
- memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), fh, sizeof (*fh));
- conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
- OBD_MD_FLMODE | OBD_MD_FLHANDLE;
-
- return (obdio_ioctl (conn, OBD_IOC_CLOSE));
-}
-
-int
-obdio_pread (struct obdio_conn *conn, uint64_t oid,
- char *buffer, uint32_t count, uint64_t offset)
-{
- obdio_iocinit (conn);
-
- conn->oc_data.ioc_obdo1.o_id = oid;
- conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
- conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
-
- conn->oc_data.ioc_pbuf2 = buffer;
- conn->oc_data.ioc_plen2 = count;
- conn->oc_data.ioc_count = count;
- conn->oc_data.ioc_offset = offset;
-
- return (obdio_ioctl (conn, OBD_IOC_BRW_READ));
-}
-
-int
-obdio_pwrite (struct obdio_conn *conn, uint64_t oid,
- char *buffer, uint32_t count, uint64_t offset)
-{
- obdio_iocinit (conn);
-
- conn->oc_data.ioc_obdo1.o_id = oid;
- conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
- conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
-
- conn->oc_data.ioc_pbuf2 = buffer;
- conn->oc_data.ioc_plen2 = count;
- conn->oc_data.ioc_count = count;
- conn->oc_data.ioc_offset = offset;
-
- return (obdio_ioctl (conn, OBD_IOC_BRW_WRITE));
-}
-
-int
-obdio_enqueue (struct obdio_conn *conn, uint64_t oid,
- int mode, uint64_t offset, uint32_t count,
- struct lustre_handle *lh)
-{
- int rc;
-
- obdio_iocinit (conn);
-
- conn->oc_data.ioc_obdo1.o_id = oid;
- conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
- conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
-
- conn->oc_data.ioc_conn1 = mode;
- conn->oc_data.ioc_count = count;
- conn->oc_data.ioc_offset = offset;
-
- rc = obdio_ioctl (conn, ECHO_IOC_ENQUEUE);
-
- if (rc == 0)
- memcpy (lh, obdo_handle (&conn->oc_data.ioc_obdo1), sizeof (*lh));
-
- return (rc);
-}
-
-int
-obdio_cancel (struct obdio_conn *conn, struct lustre_handle *lh)
-{
- obdio_iocinit (conn);
-
- memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), lh, sizeof (*lh));
-
- return (obdio_ioctl (conn, ECHO_IOC_CANCEL));
-}
+#include "obdiolib.h"
int
obdio_test_fixed_extent (struct obdio_conn *conn,
{
struct lustre_handle fh;
struct lustre_handle lh;
- char *space;
- char *buffer;
+ void *space;
+ void *buffer;
uint32_t *ibuf;
int i;
int j;
uint64_t base_offset = 0;
uint32_t size = 0;
int set_size = 0;
- int device = 0;
- int set_device = 0;
+ int device = -1;
int reps = 1;
int locked = 0;
char *end;
case 'd':
device = strtol (optarg, &end, 0);
- if (end == optarg || *end != 0) {
+ if (end == optarg || *end != 0 || device < 0) {
fprintf (stderr, "Can't parse device %s\n",
optarg);
return (1);
}
- set_device++;
break;
case 'n':
if (parse_kmg (&val, optarg) != 0) {
}
if (!set_size ||
- !set_device ||
+ device < 0 ||
optind == argc) {
fprintf (stderr, "No %s specified\n",
!set_size ? "size" :
- !set_device ? "device" : "object id");
+ device < 0 ? "device" : "object id");
return (1);
}
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2003 Cluster File Systems, Inc.
+ * Author: Eric Barton <eeb@clusterfs.com>
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+#include <fcntl.h>
+#include <sys/ioctl.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include "obdiolib.h"
+
+void
+obdio_iocinit (struct obdio_conn *conn)
+{
+ memset (&conn->oc_data, 0, sizeof (conn->oc_data));
+ conn->oc_data.ioc_version = OBD_IOCTL_VERSION;
+ conn->oc_data.ioc_addr = conn->oc_conn_addr;
+ conn->oc_data.ioc_cookie = conn->oc_conn_cookie;
+ conn->oc_data.ioc_len = sizeof (conn->oc_data);
+}
+
+int
+obdio_ioctl (struct obdio_conn *conn, int cmd)
+{
+ char *buf = conn->oc_buffer;
+ int rc;
+ int rc2;
+
+ rc = obd_ioctl_pack (&conn->oc_data, &buf, sizeof (conn->oc_buffer));
+ if (rc != 0) {
+ fprintf (stderr, "obdio_ioctl: obd_ioctl_pack: %d (%s)\n",
+ rc, strerror (errno));
+ abort ();
+ }
+
+ rc = ioctl (conn->oc_fd, cmd, buf);
+ if (rc != 0)
+ return (rc);
+
+ rc2 = obd_ioctl_unpack (&conn->oc_data, buf, sizeof (conn->oc_buffer));
+ if (rc2 != 0) {
+ fprintf (stderr, "obdio_ioctl: obd_ioctl_unpack: %d (%s)\n",
+ rc2, strerror (errno));
+ abort ();
+ }
+
+ return (rc);
+}
+
+struct obdio_conn *
+obdio_connect (int device)
+{
+ struct obdio_conn *conn;
+ int rc;
+
+ conn = malloc (sizeof (*conn));
+ if (conn == NULL) {
+ fprintf (stderr, "obdio_connect: no memory\n");
+ return (NULL);
+ }
+ memset (conn, 0, sizeof (*conn));
+
+ conn->oc_fd = open ("/dev/obd", O_RDWR);
+ if (conn->oc_fd < 0) {
+ fprintf (stderr, "obdio_connect: Can't open /dev/obd: %s\n",
+ strerror (errno));
+ goto failed;
+ }
+
+ obdio_iocinit (conn);
+ conn->oc_data.ioc_dev = device;
+ rc = obdio_ioctl (conn, OBD_IOC_DEVICE);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_connect: Can't set device %d: %s\n",
+ device, strerror (errno));
+ goto failed;
+ }
+
+ obdio_iocinit (conn);
+ rc = obdio_ioctl (conn, OBD_IOC_CONNECT);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_connect: Can't connect to device %d: %s\n",
+ device, strerror (errno));
+ goto failed;
+ }
+
+ conn->oc_conn_addr = conn->oc_data.ioc_addr;
+ conn->oc_conn_cookie = conn->oc_data.ioc_cookie;
+ return (conn);
+
+ failed:
+ free (conn);
+ return (NULL);
+}
+
+void
+obdio_disconnect (struct obdio_conn *conn)
+{
+ close (conn->oc_fd);
+ /* obdclass will automatically close on last ref */
+ free (conn);
+}
+
+int
+obdio_open (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh)
+{
+ int rc;
+
+ obdio_iocinit (conn);
+
+ conn->oc_data.ioc_obdo1.o_id = oid;
+ conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
+ conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
+
+ rc = obdio_ioctl (conn, OBD_IOC_OPEN);
+
+ if (rc == 0)
+ memcpy (fh, obdo_handle(&conn->oc_data.ioc_obdo1), sizeof (*fh));
+
+ return (rc);
+}
+
+int
+obdio_close (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh)
+{
+ obdio_iocinit (conn);
+
+
+ conn->oc_data.ioc_obdo1.o_id = oid;
+ conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
+ memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), fh, sizeof (*fh));
+ conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
+ OBD_MD_FLMODE | OBD_MD_FLHANDLE;
+
+ return (obdio_ioctl (conn, OBD_IOC_CLOSE));
+}
+
+int
+obdio_pread (struct obdio_conn *conn, uint64_t oid,
+ char *buffer, uint32_t count, uint64_t offset)
+{
+ obdio_iocinit (conn);
+
+ conn->oc_data.ioc_obdo1.o_id = oid;
+ conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
+ conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
+
+ conn->oc_data.ioc_pbuf2 = buffer;
+ conn->oc_data.ioc_plen2 = count;
+ conn->oc_data.ioc_count = count;
+ conn->oc_data.ioc_offset = offset;
+
+ return (obdio_ioctl (conn, OBD_IOC_BRW_READ));
+}
+
+int
+obdio_pwrite (struct obdio_conn *conn, uint64_t oid,
+ char *buffer, uint32_t count, uint64_t offset)
+{
+ obdio_iocinit (conn);
+
+ conn->oc_data.ioc_obdo1.o_id = oid;
+ conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
+ conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
+
+ conn->oc_data.ioc_pbuf2 = buffer;
+ conn->oc_data.ioc_plen2 = count;
+ conn->oc_data.ioc_count = count;
+ conn->oc_data.ioc_offset = offset;
+
+ return (obdio_ioctl (conn, OBD_IOC_BRW_WRITE));
+}
+
+int
+obdio_enqueue (struct obdio_conn *conn, uint64_t oid,
+ int mode, uint64_t offset, uint32_t count,
+ struct lustre_handle *lh)
+{
+ int rc;
+
+ obdio_iocinit (conn);
+
+ conn->oc_data.ioc_obdo1.o_id = oid;
+ conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
+ conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
+
+ conn->oc_data.ioc_conn1 = mode;
+ conn->oc_data.ioc_count = count;
+ conn->oc_data.ioc_offset = offset;
+
+ rc = obdio_ioctl (conn, ECHO_IOC_ENQUEUE);
+
+ if (rc == 0)
+ memcpy (lh, obdo_handle (&conn->oc_data.ioc_obdo1), sizeof (*lh));
+
+ return (rc);
+}
+
+int
+obdio_cancel (struct obdio_conn *conn, struct lustre_handle *lh)
+{
+ obdio_iocinit (conn);
+
+ memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), lh, sizeof (*lh));
+ conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLHANDLE;
+
+ return (obdio_ioctl (conn, ECHO_IOC_CANCEL));
+}
+
+void *
+obdio_alloc_aligned_buffer (void **spacep, int size)
+{
+ int pagesize = getpagesize();
+ void *space = malloc (size + pagesize - 1);
+
+ *spacep = space;
+ if (space == NULL)
+ return (NULL);
+
+ return ((void *)(((unsigned long)space + pagesize - 1) & ~(pagesize - 1)));
+}
+
+struct obdio_barrier *
+obdio_new_barrier (uint64_t oid, uint64_t id, int npeers)
+{
+ struct obdio_barrier *b;
+
+ b = (struct obdio_barrier *)malloc (sizeof (*b));
+ if (b == NULL) {
+ fprintf (stderr, "obdio_new_barrier "LPX64": Can't allocate\n", oid);
+ return (NULL);
+ }
+
+ b->ob_id = id;
+ b->ob_oid = oid;
+ b->ob_npeers = npeers;
+ b->ob_ordinal = 0;
+ b->ob_count = 0;
+ return (b);
+}
+
+int
+obdio_setup_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
+{
+ struct lustre_handle fh;
+ struct lustre_handle lh;
+ int rc;
+ int rc2;
+ void *space;
+ struct obdio_barrier *fileb;
+
+ if (b->ob_ordinal != 0 ||
+ b->ob_count != 0) {
+ fprintf (stderr, "obdio_setup_barrier: invalid parameter\n");
+ abort ();
+ }
+
+ rc = obdio_open (conn, b->ob_oid, &fh);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_setup_barrier "LPX64": Failed to open object: %s\n",
+ b->ob_oid, strerror (errno));
+ return (rc);
+ }
+
+ fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
+ if (fileb == NULL) {
+ fprintf (stderr, "obdio_setup_barrier "LPX64": Can't allocate page buffer\n",
+ b->ob_oid);
+ rc = -1;
+ goto out_0;
+ }
+
+ memset (fileb, 0, getpagesize ());
+ *fileb = *b;
+
+ rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_setup_barrier "LPX64": Error on enqueue: %s\n",
+ b->ob_oid, strerror (errno));
+ goto out_1;
+ }
+
+ rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
+ if (rc != 0)
+ fprintf (stderr, "obdio_setup_barrier "LPX64": Error on write: %s\n",
+ b->ob_oid, strerror (errno));
+
+ rc2 = obdio_cancel (conn, &lh);
+ if (rc == 0 && rc2 != 0) {
+ fprintf (stderr, "obdio_setup_barrier "LPX64": Error on cancel: %s\n",
+ b->ob_oid, strerror (errno));
+ rc = rc2;
+ }
+ out_1:
+ free (space);
+ out_0:
+ rc2 = obdio_close (conn, b->ob_oid, &fh);
+ if (rc == 0 && rc2 != 0) {
+ fprintf (stderr, "obdio_setup_barrier "LPX64": Error on close: %s\n",
+ b->ob_oid, strerror (errno));
+ rc = rc2;
+ }
+
+ return (rc);
+}
+
+int
+obdio_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
+{
+ struct lustre_handle fh;
+ struct lustre_handle lh;
+ int rc;
+ int rc2;
+ void *space;
+ struct obdio_barrier *fileb;
+ char *mode;
+
+ rc = obdio_open (conn, b->ob_oid, &fh);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on open: %s\n",
+ b->ob_oid, strerror (errno));
+ return (rc);
+ }
+
+ fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
+ if (fileb == NULL) {
+ fprintf (stderr, "obdio_barrier "LPX64": Can't allocate page buffer\n",
+ b->ob_oid);
+ rc = -1;
+ goto out_0;
+ }
+
+ rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on PW enqueue: %s\n",
+ b->ob_oid, strerror (errno));
+ goto out_1;
+ }
+
+ memset (fileb, 0xeb, getpagesize ());
+ rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on initial read: %s\n",
+ b->ob_oid, strerror (errno));
+ goto out_2;
+ }
+
+ if (fileb->ob_id != b->ob_id ||
+ fileb->ob_oid != b->ob_oid ||
+ fileb->ob_npeers != b->ob_npeers ||
+ fileb->ob_count >= b->ob_npeers ||
+ fileb->ob_ordinal != b->ob_ordinal) {
+ fprintf (stderr, "obdio_barrier "LPX64": corrupt on initial read\n", b->ob_id);
+ fprintf (stderr, " got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
+ fileb->ob_id, fileb->ob_oid, fileb->ob_npeers,
+ fileb->ob_ordinal, fileb->ob_count);
+ fprintf (stderr, " expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
+ b->ob_id, b->ob_oid, b->ob_npeers,
+ b->ob_ordinal, b->ob_count);
+ rc = -1;
+ goto out_2;
+ }
+
+ fileb->ob_count++;
+ if (fileb->ob_count == fileb->ob_npeers) { /* I'm the last joiner */
+ fileb->ob_count = 0; /* join count for next barrier */
+ fileb->ob_ordinal++; /* signal all joined */
+ }
+
+ rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on initial write: %s\n",
+ b->ob_oid, strerror (errno));
+ goto out_2;
+ }
+
+ mode = "PW";
+ b->ob_ordinal++; /* now I wait... */
+ while (fileb->ob_ordinal != b->ob_ordinal) {
+
+ rc = obdio_cancel (conn, &lh);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on %s cancel: %s\n",
+ b->ob_oid, mode, strerror (errno));
+ goto out_1;
+ }
+
+ mode = "PR";
+ rc = obdio_enqueue (conn, b->ob_oid, LCK_PR, 0, getpagesize (), &lh);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on PR enqueue: %s\n",
+ b->ob_oid, strerror (errno));
+ goto out_1;
+ }
+
+ memset (fileb, 0xeb, getpagesize ());
+ rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
+ if (rc != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on read: %s\n",
+ b->ob_oid, strerror (errno));
+ goto out_2;
+ }
+
+ if (fileb->ob_id != b->ob_id ||
+ fileb->ob_oid != b->ob_oid ||
+ fileb->ob_npeers != b->ob_npeers ||
+ fileb->ob_count >= b->ob_npeers ||
+ (fileb->ob_ordinal != b->ob_ordinal - 1 &&
+ fileb->ob_ordinal != b->ob_ordinal)) {
+ fprintf (stderr, "obdio_barrier "LPX64": corrupt\n", b->ob_id);
+ fprintf (stderr, " got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
+ fileb->ob_id, fileb->ob_oid, fileb->ob_npeers,
+ fileb->ob_ordinal, fileb->ob_count);
+ fprintf (stderr, " expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
+ b->ob_id, b->ob_oid, b->ob_npeers,
+ b->ob_ordinal, b->ob_count);
+ rc = -1;
+ goto out_2;
+ }
+ }
+
+ out_2:
+ rc2 = obdio_cancel (conn, &lh);
+ if (rc == 0 && rc2 != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on cancel: %s\n",
+ b->ob_oid, strerror (errno));
+ rc = rc2;
+ }
+ out_1:
+ free (space);
+ out_0:
+ rc2 = obdio_close (conn, b->ob_oid, &fh);
+ if (rc == 0 && rc2 != 0) {
+ fprintf (stderr, "obdio_barrier "LPX64": Error on close: %s\n",
+ b->ob_oid, strerror (errno));
+ rc = rc2;
+ }
+
+ return (rc);
+}
+
+