Whamcloud - gitweb
file rhel-2.4.21 was initially added on branch b1_4.
[fs/lustre-release.git] / lustre / utils / obdiolib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2003 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eeb@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <errno.h>
27 #include <string.h>
28 #include <fcntl.h>
29 #include <sys/ioctl.h>
30 #include <sys/types.h>
31 #include <sys/stat.h>
32
33 #include <liblustre.h>
34 #include "obdiolib.h"
35
36 void
37 obdio_iocinit (struct obdio_conn *conn)
38 {
39         memset (&conn->oc_data, 0, sizeof (conn->oc_data));
40         conn->oc_data.ioc_version = OBD_IOCTL_VERSION;
41         conn->oc_data.ioc_dev = conn->oc_device;
42         conn->oc_data.ioc_len = sizeof (conn->oc_data);
43 }
44
45 int
46 obdio_ioctl (struct obdio_conn *conn, int cmd)
47 {
48         char *buf = conn->oc_buffer;
49         int   rc;
50         int   rc2;
51
52         rc = obd_ioctl_pack (&conn->oc_data, &buf, sizeof (conn->oc_buffer));
53         if (rc != 0) {
54                 fprintf (stderr, "obdio_ioctl: obd_ioctl_pack: %d (%s)\n",
55                          rc, strerror (errno));
56                 abort ();
57         }
58
59         rc = ioctl (conn->oc_fd, cmd, buf);
60         if (rc != 0)
61                 return (rc);
62
63         rc2 = obd_ioctl_unpack (&conn->oc_data, buf, sizeof (conn->oc_buffer));
64         if (rc2 != 0) {
65                 fprintf (stderr, "obdio_ioctl: obd_ioctl_unpack: %d (%s)\n",
66                          rc2, strerror (errno));
67                 abort ();
68         }
69
70         return (rc);
71 }
72
73 struct obdio_conn *
74 obdio_connect (int device)
75 {
76         struct obdio_conn  *conn;
77
78         conn = malloc (sizeof (*conn));
79         if (conn == NULL) {
80                 fprintf (stderr, "obdio_connect: no memory\n");
81                 return (NULL);
82         }
83         memset (conn, 0, sizeof (*conn));
84
85         conn->oc_fd = open ("/dev/obd", O_RDWR);
86         if (conn->oc_fd < 0) {
87                 fprintf (stderr, "obdio_connect: Can't open /dev/obd: %s\n",
88                          strerror (errno));
89                 goto failed;
90         }
91
92         conn->oc_device = device;
93         return (conn);
94
95  failed:
96         free (conn);
97         return (NULL);
98 }
99
100 void
101 obdio_disconnect (struct obdio_conn *conn, int flags)
102 {
103         close (conn->oc_fd);
104         /* obdclass will automatically close on last ref */
105         free (conn);
106 }
107
108 int
109 obdio_pread (struct obdio_conn *conn, uint64_t oid,
110              char *buffer, uint32_t count, uint64_t offset)
111 {
112         obdio_iocinit (conn);
113
114         conn->oc_data.ioc_obdo1.o_id = oid;
115         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
116         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
117
118         conn->oc_data.ioc_pbuf2 = buffer;
119         conn->oc_data.ioc_plen2 = count;
120         conn->oc_data.ioc_count = count;
121         conn->oc_data.ioc_offset = offset;
122
123         return (obdio_ioctl (conn, OBD_IOC_BRW_READ));
124 }
125
126 int
127 obdio_pwrite (struct obdio_conn *conn, uint64_t oid,
128               char *buffer, uint32_t count, uint64_t offset)
129 {
130         obdio_iocinit (conn);
131
132         conn->oc_data.ioc_obdo1.o_id = oid;
133         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
134         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
135
136         conn->oc_data.ioc_pbuf2 = buffer;
137         conn->oc_data.ioc_plen2 = count;
138         conn->oc_data.ioc_count = count;
139         conn->oc_data.ioc_offset = offset;
140
141         return (obdio_ioctl (conn, OBD_IOC_BRW_WRITE));
142 }
143
144 int
145 obdio_enqueue (struct obdio_conn *conn, uint64_t oid,
146                int mode, uint64_t offset, uint32_t count,
147                struct lustre_handle *lh)
148 {
149         int   rc;
150
151         obdio_iocinit (conn);
152
153         conn->oc_data.ioc_obdo1.o_id = oid;
154         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
155         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
156
157         conn->oc_data.ioc_conn1 = mode;
158         conn->oc_data.ioc_count = count;
159         conn->oc_data.ioc_offset = offset;
160
161         rc = obdio_ioctl (conn, ECHO_IOC_ENQUEUE);
162
163         if (rc == 0)
164                 memcpy (lh, obdo_handle (&conn->oc_data.ioc_obdo1), sizeof (*lh));
165
166         return (rc);
167 }
168
169 int
170 obdio_cancel (struct obdio_conn *conn, struct lustre_handle *lh)
171 {
172         obdio_iocinit (conn);
173
174         memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), lh, sizeof (*lh));
175         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLHANDLE;
176
177         return (obdio_ioctl (conn, ECHO_IOC_CANCEL));
178 }
179
180 void *
181 obdio_alloc_aligned_buffer (void **spacep, int size)
182 {
183         int   pagesize = getpagesize();
184         void *space = malloc (size + pagesize - 1);
185
186         *spacep = space;
187         if (space == NULL)
188                 return (NULL);
189
190         return ((void *)(((unsigned long)space + pagesize - 1) & ~(pagesize - 1)));
191 }
192
193 struct obdio_barrier *
194 obdio_new_barrier (uint64_t oid, uint64_t id, int npeers)
195 {
196         struct obdio_barrier *b;
197
198         b = (struct obdio_barrier *)malloc (sizeof (*b));
199         if (b == NULL) {
200                 fprintf (stderr, "obdio_new_barrier "LPX64": Can't allocate\n", oid);
201                 return (NULL);
202         }
203
204         b->ob_id = id;
205         b->ob_oid = oid;
206         b->ob_npeers = npeers;
207         b->ob_ordinal = 0;
208         b->ob_count = 0;
209         return (b);
210 }
211
212 int
213 obdio_setup_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
214 {
215         struct lustre_handle    lh;
216         int                     rc;
217         int                     rc2;
218         void                   *space;
219         struct obdio_barrier   *fileb;
220
221         if (b->ob_ordinal != 0 ||
222             b->ob_count != 0) {
223                 fprintf (stderr, "obdio_setup_barrier: invalid parameter\n");
224                 abort ();
225         }
226
227         fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
228         if (fileb == NULL) {
229                 fprintf (stderr, "obdio_setup_barrier "LPX64": Can't allocate page buffer\n",
230                          b->ob_oid);
231                 return (-1);
232         }
233
234         memset (fileb, 0, getpagesize ());
235         *fileb = *b;
236
237         rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
238         if (rc != 0) {
239                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on enqueue: %s\n",
240                          b->ob_oid, strerror (errno));
241                 goto out;
242         }
243
244         rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
245         if (rc != 0)
246                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on write: %s\n",
247                          b->ob_oid, strerror (errno));
248
249         rc2 = obdio_cancel (conn, &lh);
250         if (rc == 0 && rc2 != 0) {
251                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on cancel: %s\n",
252                          b->ob_oid, strerror (errno));
253                 rc = rc2;
254         }
255  out:
256         free (space);
257         return (rc);
258 }
259
260 int
261 obdio_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
262 {
263         struct lustre_handle   lh;
264         int                    rc;
265         int                    rc2;
266         void                  *space;
267         struct obdio_barrier  *fileb;
268         char                  *mode;
269
270         fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
271         if (fileb == NULL) {
272                 fprintf (stderr, "obdio_barrier "LPX64": Can't allocate page buffer\n",
273                          b->ob_oid);
274                 return (-1);
275         }
276
277         rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
278         if (rc != 0) {
279                 fprintf (stderr, "obdio_barrier "LPX64": Error on PW enqueue: %s\n",
280                          b->ob_oid, strerror (errno));
281                 goto out_1;
282         }
283
284         memset (fileb, 0xeb, getpagesize ());
285         rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
286         if (rc != 0) {
287                 fprintf (stderr, "obdio_barrier "LPX64": Error on initial read: %s\n",
288                          b->ob_oid, strerror (errno));
289                 goto out_2;
290         }
291
292         if (fileb->ob_id != b->ob_id ||
293             fileb->ob_oid != b->ob_oid ||
294             fileb->ob_npeers != b->ob_npeers ||
295             fileb->ob_count >= b->ob_npeers ||
296             fileb->ob_ordinal != b->ob_ordinal) {
297                 fprintf (stderr, "obdio_barrier "LPX64": corrupt on initial read\n", b->ob_id);
298                 fprintf (stderr, "  got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
299                          fileb->ob_id, fileb->ob_oid, fileb->ob_npeers,
300                          fileb->ob_ordinal, fileb->ob_count);
301                 fprintf (stderr, "  expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
302                          b->ob_id, b->ob_oid, b->ob_npeers,
303                          b->ob_ordinal, b->ob_count);
304                 rc = -1;
305                 goto out_2;
306         }
307
308         fileb->ob_count++;
309         if (fileb->ob_count == fileb->ob_npeers) { /* I'm the last joiner */
310                 fileb->ob_count = 0;       /* join count for next barrier */
311                 fileb->ob_ordinal++;                 /* signal all joined */
312         }
313
314         rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
315         if (rc != 0) {
316                 fprintf (stderr, "obdio_barrier "LPX64": Error on initial write: %s\n",
317                          b->ob_oid, strerror (errno));
318                 goto out_2;
319         }
320
321         mode = "PW";
322         b->ob_ordinal++;           /* now I wait... */
323         while (fileb->ob_ordinal != b->ob_ordinal) {
324
325                 rc = obdio_cancel (conn, &lh);
326                 if (rc != 0) {
327                         fprintf (stderr, "obdio_barrier "LPX64": Error on %s cancel: %s\n",
328                                  b->ob_oid, mode, strerror (errno));
329                         goto out_1;
330                 }
331
332                 mode = "PR";
333                 rc = obdio_enqueue (conn, b->ob_oid, LCK_PR, 0, getpagesize (), &lh);
334                 if (rc != 0) {
335                         fprintf (stderr, "obdio_barrier "LPX64": Error on PR enqueue: %s\n",
336                                  b->ob_oid, strerror (errno));
337                         goto out_1;
338                 }
339
340                 memset (fileb, 0xeb, getpagesize ());
341                 rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
342                 if (rc != 0) {
343                         fprintf (stderr, "obdio_barrier "LPX64": Error on read: %s\n",
344                                  b->ob_oid, strerror (errno));
345                         goto out_2;
346                 }
347
348                 if (fileb->ob_id != b->ob_id ||
349                     fileb->ob_oid != b->ob_oid ||
350                     fileb->ob_npeers != b->ob_npeers ||
351                     fileb->ob_count >= b->ob_npeers ||
352                     (fileb->ob_ordinal != b->ob_ordinal - 1 &&
353                      fileb->ob_ordinal != b->ob_ordinal)) {
354                         fprintf (stderr, "obdio_barrier "LPX64": corrupt\n", b->ob_id);
355                         fprintf (stderr, "  got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
356                                  fileb->ob_id, fileb->ob_oid, fileb->ob_npeers,
357                                  fileb->ob_ordinal, fileb->ob_count);
358                         fprintf (stderr, "  expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
359                                  b->ob_id, b->ob_oid, b->ob_npeers,
360                                  b->ob_ordinal, b->ob_count);
361                         rc = -1;
362                         goto out_2;
363                 }
364         }
365
366  out_2:
367         rc2 = obdio_cancel (conn, &lh);
368         if (rc == 0 && rc2 != 0) {
369                 fprintf (stderr, "obdio_barrier "LPX64": Error on cancel: %s\n",
370                          b->ob_oid, strerror (errno));
371                 rc = rc2;
372         }
373  out_1:
374         free (space);
375         return (rc);
376 }
377
378