Whamcloud - gitweb
file jbd-2.4.19-pre1-jcberr.patch was initially added on branch b_devel.
[fs/lustre-release.git] / lustre / utils / obdiolib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2003 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eeb@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <errno.h>
27 #include <string.h>
28 #include <fcntl.h>
29 #include <sys/ioctl.h>
30 #include <sys/types.h>
31 #include <sys/stat.h>
32
33 #include <liblustre.h>
34 #include "obdiolib.h"
35
36 void
37 obdio_iocinit (struct obdio_conn *conn)
38 {
39         memset (&conn->oc_data, 0, sizeof (conn->oc_data));
40         conn->oc_data.ioc_version = OBD_IOCTL_VERSION;
41         conn->oc_data.ioc_cookie = conn->oc_conn_cookie;
42         conn->oc_data.ioc_len = sizeof (conn->oc_data);
43 }
44
45 int
46 obdio_ioctl (struct obdio_conn *conn, int cmd)
47 {
48         char *buf = conn->oc_buffer;
49         int   rc;
50         int   rc2;
51
52         rc = obd_ioctl_pack (&conn->oc_data, &buf, sizeof (conn->oc_buffer));
53         if (rc != 0) {
54                 fprintf (stderr, "obdio_ioctl: obd_ioctl_pack: %d (%s)\n",
55                          rc, strerror (errno));
56                 abort ();
57         }
58
59         rc = ioctl (conn->oc_fd, cmd, buf);
60         if (rc != 0)
61                 return (rc);
62
63         rc2 = obd_ioctl_unpack (&conn->oc_data, buf, sizeof (conn->oc_buffer));
64         if (rc2 != 0) {
65                 fprintf (stderr, "obdio_ioctl: obd_ioctl_unpack: %d (%s)\n",
66                          rc2, strerror (errno));
67                 abort ();
68         }
69
70         return (rc);
71 }
72
73 struct obdio_conn *
74 obdio_connect (int device)
75 {
76         struct obdio_conn  *conn;
77         int                 rc;
78
79         conn = malloc (sizeof (*conn));
80         if (conn == NULL) {
81                 fprintf (stderr, "obdio_connect: no memory\n");
82                 return (NULL);
83         }
84         memset (conn, 0, sizeof (*conn));
85
86         conn->oc_fd = open ("/dev/obd", O_RDWR);
87         if (conn->oc_fd < 0) {
88                 fprintf (stderr, "obdio_connect: Can't open /dev/obd: %s\n",
89                          strerror (errno));
90                 goto failed;
91         }
92
93         obdio_iocinit (conn);
94         conn->oc_data.ioc_dev = device;
95         rc = obdio_ioctl (conn, OBD_IOC_DEVICE);
96         if (rc != 0) {
97                 fprintf (stderr, "obdio_connect: Can't set device %d: %s\n",
98                          device, strerror (errno));
99                 goto failed;
100         }
101
102         obdio_iocinit (conn);
103         rc = obdio_ioctl (conn, OBD_IOC_CONNECT);
104         if (rc != 0) {
105                 fprintf(stderr, "obdio_connect: Can't connect to device "
106                         "%d: %s\n", device, strerror (errno));
107                 goto failed;
108         }
109
110         conn->oc_conn_cookie = conn->oc_data.ioc_cookie;
111         return (conn);
112
113  failed:
114         free (conn);
115         return (NULL);
116 }
117
118 void
119 obdio_disconnect (struct obdio_conn *conn, int flags)
120 {
121         close (conn->oc_fd);
122         /* obdclass will automatically close on last ref */
123         free (conn);
124 }
125
126 int
127 obdio_open (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh)
128 {
129         int    rc;
130
131         obdio_iocinit (conn);
132
133         conn->oc_data.ioc_obdo1.o_id = oid;
134         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
135         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
136
137         rc = obdio_ioctl (conn, OBD_IOC_OPEN);
138
139         if (rc == 0)
140                 memcpy (fh, obdo_handle(&conn->oc_data.ioc_obdo1), sizeof (*fh));
141
142         return (rc);
143 }
144
145 int
146 obdio_close (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh)
147 {
148         obdio_iocinit (conn);
149
150
151         conn->oc_data.ioc_obdo1.o_id = oid;
152         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
153         memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), fh, sizeof (*fh));
154         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
155                                           OBD_MD_FLMODE | OBD_MD_FLHANDLE;
156
157         return (obdio_ioctl (conn, OBD_IOC_CLOSE));
158 }
159
160 int
161 obdio_pread (struct obdio_conn *conn, uint64_t oid,
162              char *buffer, uint32_t count, uint64_t offset)
163 {
164         obdio_iocinit (conn);
165
166         conn->oc_data.ioc_obdo1.o_id = oid;
167         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
168         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
169
170         conn->oc_data.ioc_pbuf2 = buffer;
171         conn->oc_data.ioc_plen2 = count;
172         conn->oc_data.ioc_count = count;
173         conn->oc_data.ioc_offset = offset;
174
175         return (obdio_ioctl (conn, OBD_IOC_BRW_READ));
176 }
177
178 int
179 obdio_pwrite (struct obdio_conn *conn, uint64_t oid,
180               char *buffer, uint32_t count, uint64_t offset)
181 {
182         obdio_iocinit (conn);
183
184         conn->oc_data.ioc_obdo1.o_id = oid;
185         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
186         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
187
188         conn->oc_data.ioc_pbuf2 = buffer;
189         conn->oc_data.ioc_plen2 = count;
190         conn->oc_data.ioc_count = count;
191         conn->oc_data.ioc_offset = offset;
192
193         return (obdio_ioctl (conn, OBD_IOC_BRW_WRITE));
194 }
195
196 int
197 obdio_enqueue (struct obdio_conn *conn, uint64_t oid,
198                int mode, uint64_t offset, uint32_t count,
199                struct lustre_handle *lh)
200 {
201         int   rc;
202
203         obdio_iocinit (conn);
204
205         conn->oc_data.ioc_obdo1.o_id = oid;
206         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
207         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
208
209         conn->oc_data.ioc_conn1 = mode;
210         conn->oc_data.ioc_count = count;
211         conn->oc_data.ioc_offset = offset;
212
213         rc = obdio_ioctl (conn, ECHO_IOC_ENQUEUE);
214
215         if (rc == 0)
216                 memcpy (lh, obdo_handle (&conn->oc_data.ioc_obdo1), sizeof (*lh));
217
218         return (rc);
219 }
220
221 int
222 obdio_cancel (struct obdio_conn *conn, struct lustre_handle *lh)
223 {
224         obdio_iocinit (conn);
225
226         memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), lh, sizeof (*lh));
227         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLHANDLE;
228
229         return (obdio_ioctl (conn, ECHO_IOC_CANCEL));
230 }
231
232 void *
233 obdio_alloc_aligned_buffer (void **spacep, int size)
234 {
235         int   pagesize = getpagesize();
236         void *space = malloc (size + pagesize - 1);
237
238         *spacep = space;
239         if (space == NULL)
240                 return (NULL);
241
242         return ((void *)(((unsigned long)space + pagesize - 1) & ~(pagesize - 1)));
243 }
244
245 struct obdio_barrier *
246 obdio_new_barrier (uint64_t oid, uint64_t id, int npeers)
247 {
248         struct obdio_barrier *b;
249
250         b = (struct obdio_barrier *)malloc (sizeof (*b));
251         if (b == NULL) {
252                 fprintf (stderr, "obdio_new_barrier "LPX64": Can't allocate\n", oid);
253                 return (NULL);
254         }
255
256         b->ob_id = id;
257         b->ob_oid = oid;
258         b->ob_npeers = npeers;
259         b->ob_ordinal = 0;
260         b->ob_count = 0;
261         return (b);
262 }
263
264 int
265 obdio_setup_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
266 {
267         struct lustre_handle    fh;
268         struct lustre_handle    lh;
269         int                     rc;
270         int                     rc2;
271         void                   *space;
272         struct obdio_barrier   *fileb;
273
274         if (b->ob_ordinal != 0 ||
275             b->ob_count != 0) {
276                 fprintf (stderr, "obdio_setup_barrier: invalid parameter\n");
277                 abort ();
278         }
279
280         rc = obdio_open (conn, b->ob_oid, &fh);
281         if (rc != 0) {
282                 fprintf (stderr, "obdio_setup_barrier "LPX64": Failed to open object: %s\n",
283                          b->ob_oid, strerror (errno));
284                 return (rc);
285         }
286
287         fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
288         if (fileb == NULL) {
289                 fprintf (stderr, "obdio_setup_barrier "LPX64": Can't allocate page buffer\n",
290                          b->ob_oid);
291                 rc = -1;
292                 goto out_0;
293         }
294
295         memset (fileb, 0, getpagesize ());
296         *fileb = *b;
297
298         rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
299         if (rc != 0) {
300                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on enqueue: %s\n",
301                          b->ob_oid, strerror (errno));
302                 goto out_1;
303         }
304
305         rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
306         if (rc != 0)
307                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on write: %s\n",
308                          b->ob_oid, strerror (errno));
309
310         rc2 = obdio_cancel (conn, &lh);
311         if (rc == 0 && rc2 != 0) {
312                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on cancel: %s\n",
313                          b->ob_oid, strerror (errno));
314                 rc = rc2;
315         }
316  out_1:
317         free (space);
318  out_0:
319         rc2 = obdio_close (conn, b->ob_oid, &fh);
320         if (rc == 0 && rc2 != 0) {
321                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on close: %s\n",
322                          b->ob_oid, strerror (errno));
323                 rc = rc2;
324         }
325
326         return (rc);
327 }
328
329 int
330 obdio_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
331 {
332         struct lustre_handle   fh;
333         struct lustre_handle   lh;
334         int                    rc;
335         int                    rc2;
336         void                  *space;
337         struct obdio_barrier  *fileb;
338         char                  *mode;
339
340         rc = obdio_open (conn, b->ob_oid, &fh);
341         if (rc != 0) {
342                 fprintf (stderr, "obdio_barrier "LPX64": Error on open: %s\n",
343                          b->ob_oid, strerror (errno));
344                 return (rc);
345         }
346
347         fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
348         if (fileb == NULL) {
349                 fprintf (stderr, "obdio_barrier "LPX64": Can't allocate page buffer\n",
350                          b->ob_oid);
351                 rc = -1;
352                 goto out_0;
353         }
354
355         rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
356         if (rc != 0) {
357                 fprintf (stderr, "obdio_barrier "LPX64": Error on PW enqueue: %s\n",
358                          b->ob_oid, strerror (errno));
359                 goto out_1;
360         }
361
362         memset (fileb, 0xeb, getpagesize ());
363         rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
364         if (rc != 0) {
365                 fprintf (stderr, "obdio_barrier "LPX64": Error on initial read: %s\n",
366                          b->ob_oid, strerror (errno));
367                 goto out_2;
368         }
369
370         if (fileb->ob_id != b->ob_id ||
371             fileb->ob_oid != b->ob_oid ||
372             fileb->ob_npeers != b->ob_npeers ||
373             fileb->ob_count >= b->ob_npeers ||
374             fileb->ob_ordinal != b->ob_ordinal) {
375                 fprintf (stderr, "obdio_barrier "LPX64": corrupt on initial read\n", b->ob_id);
376                 fprintf (stderr, "  got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
377                          fileb->ob_id, fileb->ob_oid, fileb->ob_npeers,
378                          fileb->ob_ordinal, fileb->ob_count);
379                 fprintf (stderr, "  expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
380                          b->ob_id, b->ob_oid, b->ob_npeers,
381                          b->ob_ordinal, b->ob_count);
382                 rc = -1;
383                 goto out_2;
384         }
385
386         fileb->ob_count++;
387         if (fileb->ob_count == fileb->ob_npeers) { /* I'm the last joiner */
388                 fileb->ob_count = 0;       /* join count for next barrier */
389                 fileb->ob_ordinal++;                 /* signal all joined */
390         }
391
392         rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
393         if (rc != 0) {
394                 fprintf (stderr, "obdio_barrier "LPX64": Error on initial write: %s\n",
395                          b->ob_oid, strerror (errno));
396                 goto out_2;
397         }
398
399         mode = "PW";
400         b->ob_ordinal++;           /* now I wait... */
401         while (fileb->ob_ordinal != b->ob_ordinal) {
402
403                 rc = obdio_cancel (conn, &lh);
404                 if (rc != 0) {
405                         fprintf (stderr, "obdio_barrier "LPX64": Error on %s cancel: %s\n",
406                                  b->ob_oid, mode, strerror (errno));
407                         goto out_1;
408                 }
409
410                 mode = "PR";
411                 rc = obdio_enqueue (conn, b->ob_oid, LCK_PR, 0, getpagesize (), &lh);
412                 if (rc != 0) {
413                         fprintf (stderr, "obdio_barrier "LPX64": Error on PR enqueue: %s\n",
414                                  b->ob_oid, strerror (errno));
415                         goto out_1;
416                 }
417
418                 memset (fileb, 0xeb, getpagesize ());
419                 rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
420                 if (rc != 0) {
421                         fprintf (stderr, "obdio_barrier "LPX64": Error on read: %s\n",
422                                  b->ob_oid, strerror (errno));
423                         goto out_2;
424                 }
425
426                 if (fileb->ob_id != b->ob_id ||
427                     fileb->ob_oid != b->ob_oid ||
428                     fileb->ob_npeers != b->ob_npeers ||
429                     fileb->ob_count >= b->ob_npeers ||
430                     (fileb->ob_ordinal != b->ob_ordinal - 1 &&
431                      fileb->ob_ordinal != b->ob_ordinal)) {
432                         fprintf (stderr, "obdio_barrier "LPX64": corrupt\n", b->ob_id);
433                         fprintf (stderr, "  got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
434                                  fileb->ob_id, fileb->ob_oid, fileb->ob_npeers,
435                                  fileb->ob_ordinal, fileb->ob_count);
436                         fprintf (stderr, "  expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
437                                  b->ob_id, b->ob_oid, b->ob_npeers,
438                                  b->ob_ordinal, b->ob_count);
439                         rc = -1;
440                         goto out_2;
441                 }
442         }
443
444  out_2:
445         rc2 = obdio_cancel (conn, &lh);
446         if (rc == 0 && rc2 != 0) {
447                 fprintf (stderr, "obdio_barrier "LPX64": Error on cancel: %s\n",
448                          b->ob_oid, strerror (errno));
449                 rc = rc2;
450         }
451  out_1:
452         free (space);
453  out_0:
454         rc2 = obdio_close (conn, b->ob_oid, &fh);
455         if (rc == 0 && rc2 != 0) {
456                 fprintf (stderr, "obdio_barrier "LPX64": Error on close: %s\n",
457                          b->ob_oid, strerror (errno));
458                 rc = rc2;
459         }
460
461         return (rc);
462 }
463
464