Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / utils / obdiolib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2003 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eeb@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <errno.h>
27 #include <string.h>
28 #include <fcntl.h>
29 #include <sys/ioctl.h>
30 #include <sys/types.h>
31 #include <sys/stat.h>
32
33 #include <liblustre.h>
34 #include "obdiolib.h"
35
36 void
37 obdio_iocinit (struct obdio_conn *conn)
38 {
39         memset (&conn->oc_data, 0, sizeof (conn->oc_data));
40         conn->oc_data.ioc_version = OBD_IOCTL_VERSION;
41         conn->oc_data.ioc_cookie = conn->oc_conn_cookie;
42         conn->oc_data.ioc_len = sizeof (conn->oc_data);
43 }
44
45 int
46 obdio_ioctl (struct obdio_conn *conn, int cmd)
47 {
48         char *buf = conn->oc_buffer;
49         int   rc;
50         int   rc2;
51
52         rc = obd_ioctl_pack (&conn->oc_data, &buf, sizeof (conn->oc_buffer));
53         if (rc != 0) {
54                 fprintf (stderr, "obdio_ioctl: obd_ioctl_pack: %d (%s)\n",
55                          rc, strerror (errno));
56                 abort ();
57         }
58
59         rc = ioctl (conn->oc_fd, cmd, buf);
60         if (rc != 0)
61                 return (rc);
62
63         rc2 = obd_ioctl_unpack (&conn->oc_data, buf, sizeof (conn->oc_buffer));
64         if (rc2 != 0) {
65                 fprintf (stderr, "obdio_ioctl: obd_ioctl_unpack: %d (%s)\n",
66                          rc2, strerror (errno));
67                 abort ();
68         }
69
70         return (rc);
71 }
72
73 struct obdio_conn *
74 obdio_connect (int device)
75 {
76         struct obdio_conn  *conn;
77         int                 rc;
78
79         conn = malloc (sizeof (*conn));
80         if (conn == NULL) {
81                 fprintf (stderr, "obdio_connect: no memory\n");
82                 return (NULL);
83         }
84         memset (conn, 0, sizeof (*conn));
85
86         conn->oc_fd = open ("/dev/obd", O_RDWR);
87         if (conn->oc_fd < 0) {
88                 fprintf (stderr, "obdio_connect: Can't open /dev/obd: %s\n",
89                          strerror (errno));
90                 goto failed;
91         }
92
93         obdio_iocinit (conn);
94         conn->oc_data.ioc_dev = device;
95         rc = obdio_ioctl (conn, OBD_IOC_DEVICE);
96         if (rc != 0) {
97                 fprintf (stderr, "obdio_connect: Can't set device %d: %s\n",
98                          device, strerror (errno));
99                 goto failed;
100         }
101
102         obdio_iocinit (conn);
103         rc = obdio_ioctl (conn, OBD_IOC_CONNECT);
104         if (rc != 0) {
105                 fprintf(stderr, "obdio_connect: Can't connect to device "
106                         "%d: %s\n", device, strerror (errno));
107                 goto failed;
108         }
109
110         conn->oc_conn_cookie = conn->oc_data.ioc_cookie;
111         return (conn);
112
113  failed:
114         free (conn);
115         return (NULL);
116 }
117
118 void
119 obdio_disconnect (struct obdio_conn *conn, int flags)
120 {
121         close (conn->oc_fd);
122         /* obdclass will automatically close on last ref */
123         free (conn);
124 }
125
126 int
127 obdio_pread (struct obdio_conn *conn, uint64_t oid,
128              char *buffer, uint32_t count, uint64_t offset)
129 {
130         obdio_iocinit (conn);
131
132         conn->oc_data.ioc_obdo1.o_id = oid;
133         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
134         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
135
136         conn->oc_data.ioc_pbuf2 = buffer;
137         conn->oc_data.ioc_plen2 = count;
138         conn->oc_data.ioc_count = count;
139         conn->oc_data.ioc_offset = offset;
140
141         return (obdio_ioctl (conn, OBD_IOC_BRW_READ));
142 }
143
144 int
145 obdio_pwrite (struct obdio_conn *conn, uint64_t oid,
146               char *buffer, uint32_t count, uint64_t offset)
147 {
148         obdio_iocinit (conn);
149
150         conn->oc_data.ioc_obdo1.o_id = oid;
151         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
152         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
153
154         conn->oc_data.ioc_pbuf2 = buffer;
155         conn->oc_data.ioc_plen2 = count;
156         conn->oc_data.ioc_count = count;
157         conn->oc_data.ioc_offset = offset;
158
159         return (obdio_ioctl (conn, OBD_IOC_BRW_WRITE));
160 }
161
162 int
163 obdio_enqueue (struct obdio_conn *conn, uint64_t oid,
164                int mode, uint64_t offset, uint32_t count,
165                struct lustre_handle *lh)
166 {
167         int   rc;
168
169         obdio_iocinit (conn);
170
171         conn->oc_data.ioc_obdo1.o_id = oid;
172         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
173         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
174
175         conn->oc_data.ioc_conn1 = mode;
176         conn->oc_data.ioc_count = count;
177         conn->oc_data.ioc_offset = offset;
178
179         rc = obdio_ioctl (conn, ECHO_IOC_ENQUEUE);
180
181         if (rc == 0)
182                 memcpy (lh, obdo_handle (&conn->oc_data.ioc_obdo1), sizeof (*lh));
183
184         return (rc);
185 }
186
187 int
188 obdio_cancel (struct obdio_conn *conn, struct lustre_handle *lh)
189 {
190         obdio_iocinit (conn);
191
192         memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), lh, sizeof (*lh));
193         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLHANDLE;
194
195         return (obdio_ioctl (conn, ECHO_IOC_CANCEL));
196 }
197
198 void *
199 obdio_alloc_aligned_buffer (void **spacep, int size)
200 {
201         int   pagesize = getpagesize();
202         void *space = malloc (size + pagesize - 1);
203
204         *spacep = space;
205         if (space == NULL)
206                 return (NULL);
207
208         return ((void *)(((unsigned long)space + pagesize - 1) & ~(pagesize - 1)));
209 }
210
211 struct obdio_barrier *
212 obdio_new_barrier (uint64_t oid, uint64_t id, int npeers)
213 {
214         struct obdio_barrier *b;
215
216         b = (struct obdio_barrier *)malloc (sizeof (*b));
217         if (b == NULL) {
218                 fprintf (stderr, "obdio_new_barrier "LPX64": Can't allocate\n", oid);
219                 return (NULL);
220         }
221
222         b->ob_id = id;
223         b->ob_oid = oid;
224         b->ob_npeers = npeers;
225         b->ob_ordinal = 0;
226         b->ob_count = 0;
227         return (b);
228 }
229
230 int
231 obdio_setup_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
232 {
233         struct lustre_handle    lh;
234         int                     rc;
235         int                     rc2;
236         void                   *space;
237         struct obdio_barrier   *fileb;
238
239         if (b->ob_ordinal != 0 ||
240             b->ob_count != 0) {
241                 fprintf (stderr, "obdio_setup_barrier: invalid parameter\n");
242                 abort ();
243         }
244
245         fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
246         if (fileb == NULL) {
247                 fprintf (stderr, "obdio_setup_barrier "LPX64": Can't allocate page buffer\n",
248                          b->ob_oid);
249                 return (-1);
250         }
251
252         memset (fileb, 0, getpagesize ());
253         *fileb = *b;
254
255         rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
256         if (rc != 0) {
257                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on enqueue: %s\n",
258                          b->ob_oid, strerror (errno));
259                 goto out;
260         }
261
262         rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
263         if (rc != 0)
264                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on write: %s\n",
265                          b->ob_oid, strerror (errno));
266
267         rc2 = obdio_cancel (conn, &lh);
268         if (rc == 0 && rc2 != 0) {
269                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on cancel: %s\n",
270                          b->ob_oid, strerror (errno));
271                 rc = rc2;
272         }
273  out:
274         free (space);
275         return (rc);
276 }
277
278 int
279 obdio_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
280 {
281         struct lustre_handle   lh;
282         int                    rc;
283         int                    rc2;
284         void                  *space;
285         struct obdio_barrier  *fileb;
286         char                  *mode;
287
288         fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
289         if (fileb == NULL) {
290                 fprintf (stderr, "obdio_barrier "LPX64": Can't allocate page buffer\n",
291                          b->ob_oid);
292                 return (-1);
293         }
294
295         rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
296         if (rc != 0) {
297                 fprintf (stderr, "obdio_barrier "LPX64": Error on PW enqueue: %s\n",
298                          b->ob_oid, strerror (errno));
299                 goto out_1;
300         }
301
302         memset (fileb, 0xeb, getpagesize ());
303         rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
304         if (rc != 0) {
305                 fprintf (stderr, "obdio_barrier "LPX64": Error on initial read: %s\n",
306                          b->ob_oid, strerror (errno));
307                 goto out_2;
308         }
309
310         if (fileb->ob_id != b->ob_id ||
311             fileb->ob_oid != b->ob_oid ||
312             fileb->ob_npeers != b->ob_npeers ||
313             fileb->ob_count >= b->ob_npeers ||
314             fileb->ob_ordinal != b->ob_ordinal) {
315                 fprintf (stderr, "obdio_barrier "LPX64": corrupt on initial read\n", b->ob_id);
316                 fprintf (stderr, "  got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
317                          fileb->ob_id, fileb->ob_oid, fileb->ob_npeers,
318                          fileb->ob_ordinal, fileb->ob_count);
319                 fprintf (stderr, "  expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
320                          b->ob_id, b->ob_oid, b->ob_npeers,
321                          b->ob_ordinal, b->ob_count);
322                 rc = -1;
323                 goto out_2;
324         }
325
326         fileb->ob_count++;
327         if (fileb->ob_count == fileb->ob_npeers) { /* I'm the last joiner */
328                 fileb->ob_count = 0;       /* join count for next barrier */
329                 fileb->ob_ordinal++;                 /* signal all joined */
330         }
331
332         rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
333         if (rc != 0) {
334                 fprintf (stderr, "obdio_barrier "LPX64": Error on initial write: %s\n",
335                          b->ob_oid, strerror (errno));
336                 goto out_2;
337         }
338
339         mode = "PW";
340         b->ob_ordinal++;           /* now I wait... */
341         while (fileb->ob_ordinal != b->ob_ordinal) {
342
343                 rc = obdio_cancel (conn, &lh);
344                 if (rc != 0) {
345                         fprintf (stderr, "obdio_barrier "LPX64": Error on %s cancel: %s\n",
346                                  b->ob_oid, mode, strerror (errno));
347                         goto out_1;
348                 }
349
350                 mode = "PR";
351                 rc = obdio_enqueue (conn, b->ob_oid, LCK_PR, 0, getpagesize (), &lh);
352                 if (rc != 0) {
353                         fprintf (stderr, "obdio_barrier "LPX64": Error on PR enqueue: %s\n",
354                                  b->ob_oid, strerror (errno));
355                         goto out_1;
356                 }
357
358                 memset (fileb, 0xeb, getpagesize ());
359                 rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
360                 if (rc != 0) {
361                         fprintf (stderr, "obdio_barrier "LPX64": Error on read: %s\n",
362                                  b->ob_oid, strerror (errno));
363                         goto out_2;
364                 }
365
366                 if (fileb->ob_id != b->ob_id ||
367                     fileb->ob_oid != b->ob_oid ||
368                     fileb->ob_npeers != b->ob_npeers ||
369                     fileb->ob_count >= b->ob_npeers ||
370                     (fileb->ob_ordinal != b->ob_ordinal - 1 &&
371                      fileb->ob_ordinal != b->ob_ordinal)) {
372                         fprintf (stderr, "obdio_barrier "LPX64": corrupt\n", b->ob_id);
373                         fprintf (stderr, "  got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
374                                  fileb->ob_id, fileb->ob_oid, fileb->ob_npeers,
375                                  fileb->ob_ordinal, fileb->ob_count);
376                         fprintf (stderr, "  expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
377                                  b->ob_id, b->ob_oid, b->ob_npeers,
378                                  b->ob_ordinal, b->ob_count);
379                         rc = -1;
380                         goto out_2;
381                 }
382         }
383
384  out_2:
385         rc2 = obdio_cancel (conn, &lh);
386         if (rc == 0 && rc2 != 0) {
387                 fprintf (stderr, "obdio_barrier "LPX64": Error on cancel: %s\n",
388                          b->ob_oid, strerror (errno));
389                 rc = rc2;
390         }
391  out_1:
392         free (space);
393         return (rc);
394 }
395
396