Whamcloud - gitweb
Merge b_md into HEAD
[fs/lustre-release.git] / lustre / utils / obdiolib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2003 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eeb@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <errno.h>
27 #include <string.h>
28 #include <fcntl.h>
29 #include <sys/ioctl.h>
30 #include <sys/types.h>
31 #include <sys/stat.h>
32
33 #include "obdiolib.h"
34
35 void
36 obdio_iocinit (struct obdio_conn *conn)
37 {
38         memset (&conn->oc_data, 0, sizeof (conn->oc_data));
39         conn->oc_data.ioc_version = OBD_IOCTL_VERSION;
40         conn->oc_data.ioc_addr = conn->oc_conn_addr;
41         conn->oc_data.ioc_cookie = conn->oc_conn_cookie;
42         conn->oc_data.ioc_len = sizeof (conn->oc_data);
43 }
44
45 int
46 obdio_ioctl (struct obdio_conn *conn, int cmd) 
47 {
48         char *buf = conn->oc_buffer;
49         int   rc;
50         int   rc2;
51         
52         rc = obd_ioctl_pack (&conn->oc_data, &buf, sizeof (conn->oc_buffer));
53         if (rc != 0) {
54                 fprintf (stderr, "obdio_ioctl: obd_ioctl_pack: %d (%s)\n", 
55                          rc, strerror (errno));
56                 abort ();
57         }
58         
59         rc = ioctl (conn->oc_fd, cmd, buf);
60         if (rc != 0)
61                 return (rc);
62         
63         rc2 = obd_ioctl_unpack (&conn->oc_data, buf, sizeof (conn->oc_buffer));
64         if (rc2 != 0) {
65                 fprintf (stderr, "obdio_ioctl: obd_ioctl_unpack: %d (%s)\n",
66                          rc2, strerror (errno));
67                 abort ();
68         }
69         
70         return (rc);
71 }
72
73 struct obdio_conn *
74 obdio_connect (int device)
75 {
76         struct obdio_conn  *conn;
77         int                 rc;
78
79         conn = malloc (sizeof (*conn));
80         if (conn == NULL) {
81                 fprintf (stderr, "obdio_connect: no memory\n");
82                 return (NULL);
83         }
84         memset (conn, 0, sizeof (*conn));
85         
86         conn->oc_fd = open ("/dev/obd", O_RDWR);
87         if (conn->oc_fd < 0) {
88                 fprintf (stderr, "obdio_connect: Can't open /dev/obd: %s\n",
89                          strerror (errno));
90                 goto failed;
91         }
92
93         obdio_iocinit (conn);
94         conn->oc_data.ioc_dev = device;
95         rc = obdio_ioctl (conn, OBD_IOC_DEVICE);
96         if (rc != 0) {
97                 fprintf (stderr, "obdio_connect: Can't set device %d: %s\n",
98                          device, strerror (errno));
99                 goto failed;
100         }
101         
102         obdio_iocinit (conn);
103         rc = obdio_ioctl (conn, OBD_IOC_CONNECT);
104         if (rc != 0) {
105                 fprintf (stderr, "obdio_connect: Can't connect to device %d: %s\n",
106                          device, strerror (errno));
107                 goto failed;
108         }
109         
110         conn->oc_conn_addr = conn->oc_data.ioc_addr;
111         conn->oc_conn_cookie = conn->oc_data.ioc_cookie;
112         return (conn);
113         
114  failed:
115         free (conn);
116         return (NULL);
117 }
118
119 void
120 obdio_disconnect (struct obdio_conn *conn) 
121 {
122         close (conn->oc_fd);
123         /* obdclass will automatically close on last ref */
124         free (conn);
125 }
126
127 int
128 obdio_open (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh) 
129 {
130         int    rc;
131         
132         obdio_iocinit (conn);
133         
134         conn->oc_data.ioc_obdo1.o_id = oid;
135         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
136         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
137         
138         rc = obdio_ioctl (conn, OBD_IOC_OPEN);
139         
140         if (rc == 0)
141                 memcpy (fh, obdo_handle(&conn->oc_data.ioc_obdo1), sizeof (*fh));
142
143         return (rc);
144 }
145
146 int
147 obdio_close (struct obdio_conn *conn, uint64_t oid, struct lustre_handle *fh) 
148 {
149         obdio_iocinit (conn);
150         
151
152         conn->oc_data.ioc_obdo1.o_id = oid;
153         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
154         memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), fh, sizeof (*fh));
155         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | 
156                                           OBD_MD_FLMODE | OBD_MD_FLHANDLE;
157         
158         return (obdio_ioctl (conn, OBD_IOC_CLOSE));
159 }
160
161 int
162 obdio_pread (struct obdio_conn *conn, uint64_t oid, 
163              char *buffer, uint32_t count, uint64_t offset) 
164 {
165         obdio_iocinit (conn);
166         
167         conn->oc_data.ioc_obdo1.o_id = oid;
168         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
169         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
170
171         conn->oc_data.ioc_pbuf2 = buffer;
172         conn->oc_data.ioc_plen2 = count;
173         conn->oc_data.ioc_count = count;
174         conn->oc_data.ioc_offset = offset;
175
176         return (obdio_ioctl (conn, OBD_IOC_BRW_READ));
177 }
178
179 int
180 obdio_pwrite (struct obdio_conn *conn, uint64_t oid, 
181               char *buffer, uint32_t count, uint64_t offset) 
182 {
183         obdio_iocinit (conn);
184         
185         conn->oc_data.ioc_obdo1.o_id = oid;
186         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
187         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
188
189         conn->oc_data.ioc_pbuf2 = buffer;
190         conn->oc_data.ioc_plen2 = count;
191         conn->oc_data.ioc_count = count;
192         conn->oc_data.ioc_offset = offset;
193
194         return (obdio_ioctl (conn, OBD_IOC_BRW_WRITE));
195 }
196
197 int
198 obdio_enqueue (struct obdio_conn *conn, uint64_t oid,
199                int mode, uint64_t offset, uint32_t count,
200                struct lustre_handle *lh)
201 {
202         int   rc;
203         
204         obdio_iocinit (conn);
205         
206         conn->oc_data.ioc_obdo1.o_id = oid;
207         conn->oc_data.ioc_obdo1.o_mode = S_IFREG;
208         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
209
210         conn->oc_data.ioc_conn1 = mode;
211         conn->oc_data.ioc_count = count;
212         conn->oc_data.ioc_offset = offset;
213         
214         rc = obdio_ioctl (conn, ECHO_IOC_ENQUEUE);
215         
216         if (rc == 0)
217                 memcpy (lh, obdo_handle (&conn->oc_data.ioc_obdo1), sizeof (*lh));
218         
219         return (rc);
220 }
221
222 int
223 obdio_cancel (struct obdio_conn *conn, struct lustre_handle *lh)
224 {
225         obdio_iocinit (conn);
226
227         memcpy (obdo_handle (&conn->oc_data.ioc_obdo1), lh, sizeof (*lh));
228         conn->oc_data.ioc_obdo1.o_valid = OBD_MD_FLHANDLE;
229         
230         return (obdio_ioctl (conn, ECHO_IOC_CANCEL));
231 }
232
233 void *
234 obdio_alloc_aligned_buffer (void **spacep, int size) 
235 {
236         int   pagesize = getpagesize();
237         void *space = malloc (size + pagesize - 1);
238         
239         *spacep = space;
240         if (space == NULL)
241                 return (NULL);
242         
243         return ((void *)(((unsigned long)space + pagesize - 1) & ~(pagesize - 1)));
244 }
245
246 struct obdio_barrier *
247 obdio_new_barrier (uint64_t oid, uint64_t id, int npeers) 
248 {
249         struct obdio_barrier *b;
250
251         b = (struct obdio_barrier *)malloc (sizeof (*b));
252         if (b == NULL) {
253                 fprintf (stderr, "obdio_new_barrier "LPX64": Can't allocate\n", oid);
254                 return (NULL);
255         }
256         
257         b->ob_id = id;
258         b->ob_oid = oid;
259         b->ob_npeers = npeers;
260         b->ob_ordinal = 0;
261         b->ob_count = 0;
262         return (b);
263 }
264
265 int
266 obdio_setup_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
267 {
268         struct lustre_handle    fh;
269         struct lustre_handle    lh;
270         int                     rc;
271         int                     rc2;
272         void                   *space;
273         struct obdio_barrier   *fileb;
274
275         if (b->ob_ordinal != 0 ||
276             b->ob_count != 0) {
277                 fprintf (stderr, "obdio_setup_barrier: invalid parameter\n");
278                 abort ();
279         }
280         
281         rc = obdio_open (conn, b->ob_oid, &fh);
282         if (rc != 0) {
283                 fprintf (stderr, "obdio_setup_barrier "LPX64": Failed to open object: %s\n",
284                          b->ob_oid, strerror (errno));
285                 return (rc);
286         }
287         
288         fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
289         if (fileb == NULL) {
290                 fprintf (stderr, "obdio_setup_barrier "LPX64": Can't allocate page buffer\n",
291                          b->ob_oid);
292                 rc = -1;
293                 goto out_0;
294         }
295         
296         memset (fileb, 0, getpagesize ());
297         *fileb = *b;
298         
299         rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
300         if (rc != 0) {
301                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on enqueue: %s\n",
302                          b->ob_oid, strerror (errno));
303                 goto out_1;
304         }
305         
306         rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
307         if (rc != 0)
308                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on write: %s\n",
309                          b->ob_oid, strerror (errno));
310         
311         rc2 = obdio_cancel (conn, &lh);
312         if (rc == 0 && rc2 != 0) {
313                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on cancel: %s\n",
314                          b->ob_oid, strerror (errno));
315                 rc = rc2;
316         }
317  out_1:
318         free (space);
319  out_0:
320         rc2 = obdio_close (conn, b->ob_oid, &fh);
321         if (rc == 0 && rc2 != 0) {
322                 fprintf (stderr, "obdio_setup_barrier "LPX64": Error on close: %s\n",
323                          b->ob_oid, strerror (errno));
324                 rc = rc2;
325         }
326         
327         return (rc);
328 }
329
330 int
331 obdio_barrier (struct obdio_conn *conn, struct obdio_barrier *b)
332 {
333         struct lustre_handle   fh;
334         struct lustre_handle   lh;
335         int                    rc;
336         int                    rc2;
337         void                  *space;
338         struct obdio_barrier  *fileb;
339         char                  *mode;
340
341         rc = obdio_open (conn, b->ob_oid, &fh);
342         if (rc != 0) {
343                 fprintf (stderr, "obdio_barrier "LPX64": Error on open: %s\n",
344                          b->ob_oid, strerror (errno));
345                 return (rc);
346         }
347         
348         fileb = (struct obdio_barrier *) obdio_alloc_aligned_buffer (&space, getpagesize ());
349         if (fileb == NULL) {
350                 fprintf (stderr, "obdio_barrier "LPX64": Can't allocate page buffer\n",
351                          b->ob_oid);
352                 rc = -1;
353                 goto out_0;
354         }
355
356         rc = obdio_enqueue (conn, b->ob_oid, LCK_PW, 0, getpagesize (), &lh);
357         if (rc != 0) {
358                 fprintf (stderr, "obdio_barrier "LPX64": Error on PW enqueue: %s\n",
359                          b->ob_oid, strerror (errno));
360                 goto out_1;
361         }
362         
363         memset (fileb, 0xeb, getpagesize ());
364         rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
365         if (rc != 0) {
366                 fprintf (stderr, "obdio_barrier "LPX64": Error on initial read: %s\n",
367                          b->ob_oid, strerror (errno));
368                 goto out_2;
369         }
370         
371         if (fileb->ob_id != b->ob_id ||
372             fileb->ob_oid != b->ob_oid ||
373             fileb->ob_npeers != b->ob_npeers ||
374             fileb->ob_count >= b->ob_npeers ||
375             fileb->ob_ordinal != b->ob_ordinal) {
376                 fprintf (stderr, "obdio_barrier "LPX64": corrupt on initial read\n", b->ob_id);
377                 fprintf (stderr, "  got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
378                          fileb->ob_id, fileb->ob_oid, fileb->ob_npeers, 
379                          fileb->ob_ordinal, fileb->ob_count);
380                 fprintf (stderr, "  expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
381                          b->ob_id, b->ob_oid, b->ob_npeers, 
382                          b->ob_ordinal, b->ob_count);
383                 rc = -1;
384                 goto out_2;
385         }
386         
387         fileb->ob_count++;
388         if (fileb->ob_count == fileb->ob_npeers) { /* I'm the last joiner */
389                 fileb->ob_count = 0;            /* join count for next barrier */
390                 fileb->ob_ordinal++;            /* signal all joined */
391         }
392
393         rc = obdio_pwrite (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
394         if (rc != 0) {
395                 fprintf (stderr, "obdio_barrier "LPX64": Error on initial write: %s\n",
396                          b->ob_oid, strerror (errno));
397                 goto out_2;
398         }
399
400         mode = "PW";
401         b->ob_ordinal++;                        /* now I wait... */
402         while (fileb->ob_ordinal != b->ob_ordinal) {
403
404                 rc = obdio_cancel (conn, &lh);
405                 if (rc != 0) {
406                         fprintf (stderr, "obdio_barrier "LPX64": Error on %s cancel: %s\n",
407                                  b->ob_oid, mode, strerror (errno));
408                         goto out_1;
409                 }
410
411                 mode = "PR";
412                 rc = obdio_enqueue (conn, b->ob_oid, LCK_PR, 0, getpagesize (), &lh);
413                 if (rc != 0) {
414                         fprintf (stderr, "obdio_barrier "LPX64": Error on PR enqueue: %s\n",
415                                  b->ob_oid, strerror (errno));
416                         goto out_1;
417                 }
418                 
419                 memset (fileb, 0xeb, getpagesize ());
420                 rc = obdio_pread (conn, b->ob_oid, (void *)fileb, getpagesize (), 0);
421                 if (rc != 0) {
422                         fprintf (stderr, "obdio_barrier "LPX64": Error on read: %s\n",
423                                  b->ob_oid, strerror (errno));
424                         goto out_2;
425                 }
426                 
427                 if (fileb->ob_id != b->ob_id ||
428                     fileb->ob_oid != b->ob_oid ||
429                     fileb->ob_npeers != b->ob_npeers ||
430                     fileb->ob_count >= b->ob_npeers ||
431                     (fileb->ob_ordinal != b->ob_ordinal - 1 &&
432                      fileb->ob_ordinal != b->ob_ordinal)) {
433                         fprintf (stderr, "obdio_barrier "LPX64": corrupt\n", b->ob_id);
434                         fprintf (stderr, "  got ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
435                                  fileb->ob_id, fileb->ob_oid, fileb->ob_npeers, 
436                                  fileb->ob_ordinal, fileb->ob_count);
437                         fprintf (stderr, "  expected ["LPX64","LPX64","LPX64","LPX64","LPX64"]\n",
438                                  b->ob_id, b->ob_oid, b->ob_npeers, 
439                                  b->ob_ordinal, b->ob_count);
440                         rc = -1;
441                         goto out_2;
442                 }
443         }
444                         
445  out_2:
446         rc2 = obdio_cancel (conn, &lh);
447         if (rc == 0 && rc2 != 0) {
448                 fprintf (stderr, "obdio_barrier "LPX64": Error on cancel: %s\n",
449                          b->ob_oid, strerror (errno));
450                 rc = rc2;
451         }
452  out_1:
453         free (space);
454  out_0:
455         rc2 = obdio_close (conn, b->ob_oid, &fh);
456         if (rc == 0 && rc2 != 0) {
457                 fprintf (stderr, "obdio_barrier "LPX64": Error on close: %s\n",
458                          b->ob_oid, strerror (errno));
459                 rc = rc2;
460         }
461         
462         return (rc);
463 }
464
465