Whamcloud - gitweb
b=3031
[fs/lustre-release.git] / lustre / utils / obdiolib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2003 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eeb@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <errno.h>
27 #include <string.h>
28 #include <fcntl.h>
29 #include <sys/ioctl.h>
30 #include <sys/types.h>
31 #include <sys/stat.h>
32
33 #include <liblustre.h>
34 #include "obdiolib.h"
35
36 void
37 obdio_iocinit (struct obdio_conn *conn)
38 {
39         memset (&conn->oc_data, 0, sizeof (conn->oc_data));
40         conn->oc_data.ioc_version = OBD_IOCTL_VERSION;
41         conn->oc_data.ioc_dev = conn->oc_device;
42         conn->oc_data.ioc_len = sizeof (conn->oc_data);
43 }
44
45 int
46 obdio_ioctl (struct obdio_conn *conn, int cmd)
47 {
48         char *buf = conn->oc_buffer;
49         int   rc;
50         int   rc2;
51
52         rc = obd_ioctl_pack (&conn->oc_data, &buf, sizeof (conn->oc_buffer));
53         if (rc != 0) {
54                 fprintf (stderr, "obdio_ioctl: obd_ioctl_pack: %d (%s)\n",
55                          rc, strerror (errno));
56                 abort ();
57         }
58
59         rc = ioctl (conn->oc_fd, cmd, buf);
60         if (rc != 0)
61                 return (rc);
62
63         rc2 = obd_ioctl_unpack (&conn->oc_data, buf, sizeof (conn->oc_buffer));
64         if (rc2 != 0) {
65                 fprintf (stderr, "obdio_ioctl: obd_ioctl_unpack: %d (%s)\n",
66                          rc2, strerror (errno));
67                 abort ();
68         }
69
70         return (rc);
71 }
72
73 struct obdio_conn *
74 obdio_connect (int device)
75 {
76         struct obdio_conn  *conn;
77
78         conn = malloc (sizeof (*conn));
79         if (conn == NULL) {
80                 fprintf (stderr, "obdio_connect: no memory\n");
81                 return (NULL);
82         }
83         memset (conn, 0, sizeof (*conn));
84
85         conn->oc_fd = open ("/dev/obd", O_RDWR);
86         if (conn->oc_fd < 0) {
87                 fprintf (stderr, "obdio_connect: Can't open /dev/obd: %s\n",
88                          strerror (errno));
89                 goto failed;
90         }
91
92         conn->oc_device = device;
93         return (conn);
94
95  failed:
96         free (conn);
97         return (NULL);
98 }
99
100 void
101 obdio_disconnect (struct obdio_conn *conn, int flags)
102 {
103         close (conn->oc_fd);
104         /* obdclass will automatically close on last ref */
105         free (conn);
106 }
107
108 int
109 obdio_pread (struct obdio_conn *conn, uint64_t oid,
110              char *buffer, uint32_t count, uint64_t offset)
111 {
112         obdio_iocinit (conn);
113
114         conn->oc_data.ioc_obdo1.o_id = oid;
115         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
116         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
117
118         conn->oc_data.ioc_pbuf1 = (void *)1;
119         conn->oc_data.ioc_plen1 = 1;
120
121         conn->oc_data.ioc_pbuf2 = buffer;
122         conn->oc_data.ioc_plen2 = count;
123         conn->oc_data.ioc_count = count;
124         conn->oc_data.ioc_offset = offset;
125
126         return (obdio_ioctl (conn, OBD_IOC_BRW_READ));
127 }
128
129 int
130 obdio_pwrite (struct obdio_conn *conn, uint64_t oid,
131               char *buffer, uint32_t count, uint64_t offset)
132 {
133         obdio_iocinit (conn);
134
135         conn->oc_data.ioc_obdo1.o_id = oid;
136         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
137         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
138
139         conn->oc_data.ioc_pbuf1 = (void *)1;
140         conn->oc_data.ioc_plen1 = 1;
141
142         conn->oc_data.ioc_pbuf2 = buffer;
143         conn->oc_data.ioc_plen2 = count;
144         conn->oc_data.ioc_count = count;
145         conn->oc_data.ioc_offset = offset;
146
147         return (obdio_ioctl (conn, OBD_IOC_BRW_WRITE));
148 }
149
150 int
151 obdio_enqueue (struct obdio_conn *conn, uint64_t oid,
152                int mode, uint64_t offset, uint32_t count,
153                struct lustre_handle *lh)
154 {
155         int   rc;
156
157         obdio_iocinit (conn);
158
159         conn->oc_data.ioc_obdo1.o_id = oid;
160         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
161         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
162
163         conn->oc_data.ioc_conn1 = mode;
164         conn->oc_data.ioc_count = count;
165         conn->oc_data.ioc_offset = offset;
166
167         rc = obdio_ioctl (conn, ECHO_IOC_ENQUEUE);
168
169         if (rc == 0)
170                 memcpy (lh, obdo_handle (&conn->oc_data.ioc_obdo1), sizeof (*lh));
171
172         return (rc);
173 }
174
175 int
176 obdio_cancel (struct obdio_conn *conn, struct lustre_handle *lh)
177 {
178         obdio_iocinit (conn);
179
180         memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), lh, sizeof (*lh));
181         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLHANDLE;
182
183         return (obdio_ioctl (conn, ECHO_IOC_CANCEL));
184 }
185
186 void *
187 obdio_alloc_aligned_buffer (void **spacep, int size)
188 {
189         int   pagesize = getpagesize();
190         void *space = malloc (size + pagesize - 1);
191
192         *spacep = space;
193         if (space == NULL)
194                 return (NULL);
195
196         return ((void *)(((unsigned long)space + pagesize - 1) & ~(pagesize - 1)));
197 }
198
199 struct obdio_barrier *
200 obdio_new_barrier (uint64_t oid, uint64_t id, int npeers)
201 {
202         struct obdio_barrier *b;
203
204         b = (struct obdio_barrier *)malloc (sizeof (*b));
205         if (b == NULL) {
206                 fprintf (stderr, "obdio_new_barrier "LPX64": Can't allocate\n", oid);
207                 return (NULL);
208         }
209
210         b->ob_id = id;
211         b->ob_oid = oid;
212         b->ob_npeers = npeers;
213         b->ob_ordinal = 0;
214         b->ob_count = 0;
215         return (b);
216 }
217
218 int
219 obdio_setup_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
220 {
221         struct lustre_handle    lh;
222         int                     rc;
223         int                     rc2;
224         void                   *space;
225         struct obdio_barrier   *fileb;
226
227         if (b->ob_ordinal != 0 ||
228             b->ob_count != 0) {
229                 fprintf (stderr, "obdio_setup_barrier: invalid parameter\n");
230                 abort ();
231         }
232
233         fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
234         if (fileb == NULL) {
235                 fprintf (stderr, "obdio_setup_barrier "LPX64": Can't allocate page buffer\n",
236                          b->ob_oid);
237                 return (-1);
238         }
239
240         memset (fileb, 0, getpagesize ());
241         *fileb = *b;
242
243         rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
244         if (rc != 0) {
245                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on enqueue: %s\n",
246                          b->ob_oid, strerror (errno));
247                 goto out;
248         }
249
250         rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
251         if (rc != 0)
252                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on write: %s\n",
253                          b->ob_oid, strerror (errno));
254
255         rc2 = obdio_cancel (conn, &lh);
256         if (rc == 0 && rc2 != 0) {
257                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on cancel: %s\n",
258                          b->ob_oid, strerror (errno));
259                 rc = rc2;
260         }
261  out:
262         free (space);
263         return (rc);
264 }
265
266 int
267 obdio_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
268 {
269         struct lustre_handle   lh;
270         int                    rc;
271         int                    rc2;
272         void                  *space;
273         struct obdio_barrier  *fileb;
274         char                  *mode;
275
276         fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
277         if (fileb == NULL) {
278                 fprintf (stderr, "obdio_barrier "LPX64": Can't allocate page buffer\n",
279                          b->ob_oid);
280                 return (-1);
281         }
282
283         rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
284         if (rc != 0) {
285                 fprintf (stderr, "obdio_barrier "LPX64": Error on PW enqueue: %s\n",
286                          b->ob_oid, strerror (errno));
287                 goto out_1;
288         }
289
290         memset (fileb, 0xeb, getpagesize ());
291         rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
292         if (rc != 0) {
293                 fprintf (stderr, "obdio_barrier "LPX64": Error on initial read: %s\n",
294                          b->ob_oid, strerror (errno));
295                 goto out_2;
296         }
297
298         if (fileb->ob_id != b->ob_id ||
299             fileb->ob_oid != b->ob_oid ||
300             fileb->ob_npeers != b->ob_npeers ||
301             fileb->ob_count >= b->ob_npeers ||
302             fileb->ob_ordinal != b->ob_ordinal) {
303                 fprintf (stderr, "obdio_barrier "LPX64": corrupt on initial read\n", b->ob_id);
304                 fprintf (stderr, "  got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
305                          fileb->ob_id, fileb->ob_oid, fileb->ob_npeers,
306                          fileb->ob_ordinal, fileb->ob_count);
307                 fprintf (stderr, "  expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
308                          b->ob_id, b->ob_oid, b->ob_npeers,
309                          b->ob_ordinal, b->ob_count);
310                 rc = -1;
311                 goto out_2;
312         }
313
314         fileb->ob_count++;
315         if (fileb->ob_count == fileb->ob_npeers) { /* I'm the last joiner */
316                 fileb->ob_count = 0;       /* join count for next barrier */
317                 fileb->ob_ordinal++;                 /* signal all joined */
318         }
319
320         rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
321         if (rc != 0) {
322                 fprintf (stderr, "obdio_barrier "LPX64": Error on initial write: %s\n",
323                          b->ob_oid, strerror (errno));
324                 goto out_2;
325         }
326
327         mode = "PW";
328         b->ob_ordinal++;           /* now I wait... */
329         while (fileb->ob_ordinal != b->ob_ordinal) {
330
331                 rc = obdio_cancel (conn, &lh);
332                 if (rc != 0) {
333                         fprintf (stderr, "obdio_barrier "LPX64": Error on %s cancel: %s\n",
334                                  b->ob_oid, mode, strerror (errno));
335                         goto out_1;
336                 }
337
338                 mode = "PR";
339                 rc = obdio_enqueue (conn, b->ob_oid, LCK_PR, 0, getpagesize (), &lh);
340                 if (rc != 0) {
341                         fprintf (stderr, "obdio_barrier "LPX64": Error on PR enqueue: %s\n",
342                                  b->ob_oid, strerror (errno));
343                         goto out_1;
344                 }
345
346                 memset (fileb, 0xeb, getpagesize ());
347                 rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
348                 if (rc != 0) {
349                         fprintf (stderr, "obdio_barrier "LPX64": Error on read: %s\n",
350                                  b->ob_oid, strerror (errno));
351                         goto out_2;
352                 }
353
354                 if (fileb->ob_id != b->ob_id ||
355                     fileb->ob_oid != b->ob_oid ||
356                     fileb->ob_npeers != b->ob_npeers ||
357                     fileb->ob_count >= b->ob_npeers ||
358                     (fileb->ob_ordinal != b->ob_ordinal - 1 &&
359                      fileb->ob_ordinal != b->ob_ordinal)) {
360                         fprintf (stderr, "obdio_barrier "LPX64": corrupt\n", b->ob_id);
361                         fprintf (stderr, "  got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
362                                  fileb->ob_id, fileb->ob_oid, fileb->ob_npeers,
363                                  fileb->ob_ordinal, fileb->ob_count);
364                         fprintf (stderr, "  expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
365                                  b->ob_id, b->ob_oid, b->ob_npeers,
366                                  b->ob_ordinal, b->ob_count);
367                         rc = -1;
368                         goto out_2;
369                 }
370         }
371
372  out_2:
373         rc2 = obdio_cancel (conn, &lh);
374         if (rc == 0 && rc2 != 0) {
375                 fprintf (stderr, "obdio_barrier "LPX64": Error on cancel: %s\n",
376                          b->ob_oid, strerror (errno));
377                 rc = rc2;
378         }
379  out_1:
380         free (space);
381         return (rc);
382 }
383
384