Whamcloud - gitweb
land 0.5.20.3 b_devel onto HEAD (b_devel will remain)
[fs/lustre-release.git] / lustre / utils / obdiolib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2003 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eeb@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <errno.h>
27 #include <string.h>
28 #include <fcntl.h>
29 #include <sys/ioctl.h>
30 #include <sys/types.h>
31 #include <sys/stat.h>
32
33 #include <liblustre.h>
34 #include "obdiolib.h"
35
36 void
37 obdio_iocinit (struct obdio_conn *conn)
38 {
39         memset (&conn->oc_data, 0, sizeof (conn->oc_data));
40         conn->oc_data.ioc_version = OBD_IOCTL_VERSION;
41         conn->oc_data.ioc_addr = conn->oc_conn_addr;
42         conn->oc_data.ioc_cookie = conn->oc_conn_cookie;
43         conn->oc_data.ioc_len = sizeof (conn->oc_data);
44 }
45
46 int
47 obdio_ioctl (struct obdio_conn *conn, int cmd) 
48 {
49         char *buf = conn->oc_buffer;
50         int   rc;
51         int   rc2;
52         
53         rc = obd_ioctl_pack (&conn->oc_data, &buf, sizeof (conn->oc_buffer));
54         if (rc != 0) {
55                 fprintf (stderr, "obdio_ioctl: obd_ioctl_pack: %d (%s)\n", 
56                          rc, strerror (errno));
57                 abort ();
58         }
59         
60         rc = ioctl (conn->oc_fd, cmd, buf);
61         if (rc != 0)
62                 return (rc);
63         
64         rc2 = obd_ioctl_unpack (&conn->oc_data, buf, sizeof (conn->oc_buffer));
65         if (rc2 != 0) {
66                 fprintf (stderr, "obdio_ioctl: obd_ioctl_unpack: %d (%s)\n",
67                          rc2, strerror (errno));
68                 abort ();
69         }
70         
71         return (rc);
72 }
73
74 struct obdio_conn *
75 obdio_connect (int device)
76 {
77         struct obdio_conn  *conn;
78         int                 rc;
79
80         conn = malloc (sizeof (*conn));
81         if (conn == NULL) {
82                 fprintf (stderr, "obdio_connect: no memory\n");
83                 return (NULL);
84         }
85         memset (conn, 0, sizeof (*conn));
86         
87         conn->oc_fd = open ("/dev/obd", O_RDWR);
88         if (conn->oc_fd < 0) {
89                 fprintf (stderr, "obdio_connect: Can't open /dev/obd: %s\n",
90                          strerror (errno));
91                 goto failed;
92         }
93
94         obdio_iocinit (conn);
95         conn->oc_data.ioc_dev = device;
96         rc = obdio_ioctl (conn, OBD_IOC_DEVICE);
97         if (rc != 0) {
98                 fprintf (stderr, "obdio_connect: Can't set device %d: %s\n",
99                          device, strerror (errno));
100                 goto failed;
101         }
102         
103         obdio_iocinit (conn);
104         rc = obdio_ioctl (conn, OBD_IOC_CONNECT);
105         if (rc != 0) {
106                 fprintf (stderr, "obdio_connect: Can't connect to device %d: %s\n",
107                          device, strerror (errno));
108                 goto failed;
109         }
110         
111         conn->oc_conn_addr = conn->oc_data.ioc_addr;
112         conn->oc_conn_cookie = conn->oc_data.ioc_cookie;
113         return (conn);
114         
115  failed:
116         free (conn);
117         return (NULL);
118 }
119
120 void
121 obdio_disconnect (struct obdio_conn *conn) 
122 {
123         close (conn->oc_fd);
124         /* obdclass will automatically close on last ref */
125         free (conn);
126 }
127
128 int
129 obdio_open (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh) 
130 {
131         int    rc;
132         
133         obdio_iocinit (conn);
134         
135         conn->oc_data.ioc_obdo1.o_id = oid;
136         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
137         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
138         
139         rc = obdio_ioctl (conn, OBD_IOC_OPEN);
140         
141         if (rc == 0)
142                 memcpy (fh, obdo_handle(&conn->oc_data.ioc_obdo1), sizeof (*fh));
143
144         return (rc);
145 }
146
147 int
148 obdio_close (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh) 
149 {
150         obdio_iocinit (conn);
151         
152
153         conn->oc_data.ioc_obdo1.o_id = oid;
154         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
155         memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), fh, sizeof (*fh));
156         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | 
157                                           OBD_MD_FLMODE | OBD_MD_FLHANDLE;
158         
159         return (obdio_ioctl (conn, OBD_IOC_CLOSE));
160 }
161
162 int
163 obdio_pread (struct obdio_conn *conn, uint64_t oid, 
164              char *buffer, uint32_t count, uint64_t offset) 
165 {
166         obdio_iocinit (conn);
167         
168         conn->oc_data.ioc_obdo1.o_id = oid;
169         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
170         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
171
172         conn->oc_data.ioc_pbuf2 = buffer;
173         conn->oc_data.ioc_plen2 = count;
174         conn->oc_data.ioc_count = count;
175         conn->oc_data.ioc_offset = offset;
176
177         return (obdio_ioctl (conn, OBD_IOC_BRW_READ));
178 }
179
180 int
181 obdio_pwrite (struct obdio_conn *conn, uint64_t oid, 
182               char *buffer, uint32_t count, uint64_t offset) 
183 {
184         obdio_iocinit (conn);
185         
186         conn->oc_data.ioc_obdo1.o_id = oid;
187         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
188         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
189
190         conn->oc_data.ioc_pbuf2 = buffer;
191         conn->oc_data.ioc_plen2 = count;
192         conn->oc_data.ioc_count = count;
193         conn->oc_data.ioc_offset = offset;
194
195         return (obdio_ioctl (conn, OBD_IOC_BRW_WRITE));
196 }
197
198 int
199 obdio_enqueue (struct obdio_conn *conn, uint64_t oid,
200                int mode, uint64_t offset, uint32_t count,
201                struct lustre_handle *lh)
202 {
203         int   rc;
204         
205         obdio_iocinit (conn);
206         
207         conn->oc_data.ioc_obdo1.o_id = oid;
208         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
209         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
210
211         conn->oc_data.ioc_conn1 = mode;
212         conn->oc_data.ioc_count = count;
213         conn->oc_data.ioc_offset = offset;
214         
215         rc = obdio_ioctl (conn, ECHO_IOC_ENQUEUE);
216         
217         if (rc == 0)
218                 memcpy (lh, obdo_handle (&conn->oc_data.ioc_obdo1), sizeof (*lh));
219         
220         return (rc);
221 }
222
223 int
224 obdio_cancel (struct obdio_conn *conn, struct lustre_handle *lh)
225 {
226         obdio_iocinit (conn);
227
228         memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), lh, sizeof (*lh));
229         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLHANDLE;
230         
231         return (obdio_ioctl (conn, ECHO_IOC_CANCEL));
232 }
233
234 void *
235 obdio_alloc_aligned_buffer (void **spacep, int size) 
236 {
237         int   pagesize = getpagesize();
238         void *space = malloc (size + pagesize - 1);
239         
240         *spacep = space;
241         if (space == NULL)
242                 return (NULL);
243         
244         return ((void *)(((unsigned long)space + pagesize - 1) & ~(pagesize - 1)));
245 }
246
247 struct obdio_barrier *
248 obdio_new_barrier (uint64_t oid, uint64_t id, int npeers) 
249 {
250         struct obdio_barrier *b;
251
252         b = (struct obdio_barrier *)malloc (sizeof (*b));
253         if (b == NULL) {
254                 fprintf (stderr, "obdio_new_barrier "LPX64": Can't allocate\n", oid);
255                 return (NULL);
256         }
257         
258         b->ob_id = id;
259         b->ob_oid = oid;
260         b->ob_npeers = npeers;
261         b->ob_ordinal = 0;
262         b->ob_count = 0;
263         return (b);
264 }
265
266 int
267 obdio_setup_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
268 {
269         struct lustre_handle    fh;
270         struct lustre_handle    lh;
271         int                     rc;
272         int                     rc2;
273         void                   *space;
274         struct obdio_barrier   *fileb;
275
276         if (b->ob_ordinal != 0 ||
277             b->ob_count != 0) {
278                 fprintf (stderr, "obdio_setup_barrier: invalid parameter\n");
279                 abort ();
280         }
281         
282         rc = obdio_open (conn, b->ob_oid, &fh);
283         if (rc != 0) {
284                 fprintf (stderr, "obdio_setup_barrier "LPX64": Failed to open object: %s\n",
285                          b->ob_oid, strerror (errno));
286                 return (rc);
287         }
288         
289         fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
290         if (fileb == NULL) {
291                 fprintf (stderr, "obdio_setup_barrier "LPX64": Can't allocate page buffer\n",
292                          b->ob_oid);
293                 rc = -1;
294                 goto out_0;
295         }
296         
297         memset (fileb, 0, getpagesize ());
298         *fileb = *b;
299         
300         rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
301         if (rc != 0) {
302                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on enqueue: %s\n",
303                          b->ob_oid, strerror (errno));
304                 goto out_1;
305         }
306         
307         rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
308         if (rc != 0)
309                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on write: %s\n",
310                          b->ob_oid, strerror (errno));
311         
312         rc2 = obdio_cancel (conn, &lh);
313         if (rc == 0 && rc2 != 0) {
314                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on cancel: %s\n",
315                          b->ob_oid, strerror (errno));
316                 rc = rc2;
317         }
318  out_1:
319         free (space);
320  out_0:
321         rc2 = obdio_close (conn, b->ob_oid, &fh);
322         if (rc == 0 && rc2 != 0) {
323                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on close: %s\n",
324                          b->ob_oid, strerror (errno));
325                 rc = rc2;
326         }
327         
328         return (rc);
329 }
330
331 int
332 obdio_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
333 {
334         struct lustre_handle   fh;
335         struct lustre_handle   lh;
336         int                    rc;
337         int                    rc2;
338         void                  *space;
339         struct obdio_barrier  *fileb;
340         char                  *mode;
341
342         rc = obdio_open (conn, b->ob_oid, &fh);
343         if (rc != 0) {
344                 fprintf (stderr, "obdio_barrier "LPX64": Error on open: %s\n",
345                          b->ob_oid, strerror (errno));
346                 return (rc);
347         }
348         
349         fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
350         if (fileb == NULL) {
351                 fprintf (stderr, "obdio_barrier "LPX64": Can't allocate page buffer\n",
352                          b->ob_oid);
353                 rc = -1;
354                 goto out_0;
355         }
356
357         rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
358         if (rc != 0) {
359                 fprintf (stderr, "obdio_barrier "LPX64": Error on PW enqueue: %s\n",
360                          b->ob_oid, strerror (errno));
361                 goto out_1;
362         }
363         
364         memset (fileb, 0xeb, getpagesize ());
365         rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
366         if (rc != 0) {
367                 fprintf (stderr, "obdio_barrier "LPX64": Error on initial read: %s\n",
368                          b->ob_oid, strerror (errno));
369                 goto out_2;
370         }
371         
372         if (fileb->ob_id != b->ob_id ||
373             fileb->ob_oid != b->ob_oid ||
374             fileb->ob_npeers != b->ob_npeers ||
375             fileb->ob_count >= b->ob_npeers ||
376             fileb->ob_ordinal != b->ob_ordinal) {
377                 fprintf (stderr, "obdio_barrier "LPX64": corrupt on initial read\n", b->ob_id);
378                 fprintf (stderr, "  got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
379                          fileb->ob_id, fileb->ob_oid, fileb->ob_npeers, 
380                          fileb->ob_ordinal, fileb->ob_count);
381                 fprintf (stderr, "  expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
382                          b->ob_id, b->ob_oid, b->ob_npeers, 
383                          b->ob_ordinal, b->ob_count);
384                 rc = -1;
385                 goto out_2;
386         }
387         
388         fileb->ob_count++;
389         if (fileb->ob_count == fileb->ob_npeers) { /* I'm the last joiner */
390                 fileb->ob_count = 0;            /* join count for next barrier */
391                 fileb->ob_ordinal++;            /* signal all joined */
392         }
393
394         rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
395         if (rc != 0) {
396                 fprintf (stderr, "obdio_barrier "LPX64": Error on initial write: %s\n",
397                          b->ob_oid, strerror (errno));
398                 goto out_2;
399         }
400
401         mode = "PW";
402         b->ob_ordinal++;                        /* now I wait... */
403         while (fileb->ob_ordinal != b->ob_ordinal) {
404
405                 rc = obdio_cancel (conn, &lh);
406                 if (rc != 0) {
407                         fprintf (stderr, "obdio_barrier "LPX64": Error on %s cancel: %s\n",
408                                  b->ob_oid, mode, strerror (errno));
409                         goto out_1;
410                 }
411
412                 mode = "PR";
413                 rc = obdio_enqueue (conn, b->ob_oid, LCK_PR, 0, getpagesize (), &lh);
414                 if (rc != 0) {
415                         fprintf (stderr, "obdio_barrier "LPX64": Error on PR enqueue: %s\n",
416                                  b->ob_oid, strerror (errno));
417                         goto out_1;
418                 }
419                 
420                 memset (fileb, 0xeb, getpagesize ());
421                 rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
422                 if (rc != 0) {
423                         fprintf (stderr, "obdio_barrier "LPX64": Error on read: %s\n",
424                                  b->ob_oid, strerror (errno));
425                         goto out_2;
426                 }
427                 
428                 if (fileb->ob_id != b->ob_id ||
429                     fileb->ob_oid != b->ob_oid ||
430                     fileb->ob_npeers != b->ob_npeers ||
431                     fileb->ob_count >= b->ob_npeers ||
432                     (fileb->ob_ordinal != b->ob_ordinal - 1 &&
433                      fileb->ob_ordinal != b->ob_ordinal)) {
434                         fprintf (stderr, "obdio_barrier "LPX64": corrupt\n", b->ob_id);
435                         fprintf (stderr, "  got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
436                                  fileb->ob_id, fileb->ob_oid, fileb->ob_npeers, 
437                                  fileb->ob_ordinal, fileb->ob_count);
438                         fprintf (stderr, "  expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
439                                  b->ob_id, b->ob_oid, b->ob_npeers, 
440                                  b->ob_ordinal, b->ob_count);
441                         rc = -1;
442                         goto out_2;
443                 }
444         }
445                         
446  out_2:
447         rc2 = obdio_cancel (conn, &lh);
448         if (rc == 0 && rc2 != 0) {
449                 fprintf (stderr, "obdio_barrier "LPX64": Error on cancel: %s\n",
450                          b->ob_oid, strerror (errno));
451                 rc = rc2;
452         }
453  out_1:
454         free (space);
455  out_0:
456         rc2 = obdio_close (conn, b->ob_oid, &fh);
457         if (rc == 0 && rc2 != 0) {
458                 fprintf (stderr, "obdio_barrier "LPX64": Error on close: %s\n",
459                          b->ob_oid, strerror (errno));
460                 rc = rc2;
461         }
462         
463         return (rc);
464 }
465
466