Whamcloud - gitweb
- except for fixing the segfault in the documentation build, this is
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lov/lov.c
5  *
6  * Copyright (C) 2002 Cluster File Systems, Inc.
7  * Author: Phil Schwan <phil@off.net>
8  *
9  * This code is issued under the GNU General Public License.
10  * See the file COPYING in this distribution
11  */
12
13 #define EXPORT_SYMTAB
14 #define DEBUG_SUBSYSTEM S_LOV
15
16 #include <linux/module.h>
17 #include <linux/obd_class.h>
18 #include <linux/obd_lov.h>
19
20 extern struct obd_device obd_dev[MAX_OBD_DEVICES];
21
22 /* obd methods */
23 static int lov_connect(struct lustre_handle *conn)
24 {
25         int rc;
26
27         MOD_INC_USE_COUNT;
28         rc = class_connect(conn);
29
30         if (rc)
31                 MOD_DEC_USE_COUNT;
32
33         return rc;
34 }
35
36 static int lov_disconnect(struct lustre_handle *conn)
37 {
38         int rc;
39
40         rc = class_disconnect(conn);
41         if (!rc)
42                 MOD_DEC_USE_COUNT;
43
44         /* XXX cleanup preallocated inodes */
45         return rc;
46 }
47
48 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa)
49 {
50         int rc;
51         ENTRY;
52
53         if (!class_conn2export(conn))
54                 RETURN(-EINVAL);
55
56         rc = obd_getattr(&conn->oc_dev->obd_multi_conn[0], oa);
57         RETURN(rc);
58 }
59
60 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa)
61 {
62         int rc, retval, i;
63         ENTRY;
64
65         if (!class_conn2export(conn))
66                 RETURN(-EINVAL);
67
68         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
69                 rc = obd_setattr(&conn->oc_dev->obd_multi_conn[i], oa);
70                 if (i == 0)
71                         retval = rc;
72                 else if (retval != rc)
73                         CERROR("different results on multiple OBDs!\n");
74         }
75
76         RETURN(rc);
77 }
78
79 static int lov_open(struct lustre_handle *conn, struct obdo *oa)
80 {
81         int rc, retval, i;
82         ENTRY;
83
84         if (!class_conn2export(conn))
85                 RETURN(-EINVAL);
86
87         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
88                 rc = obd_open(&conn->oc_dev->obd_multi_conn[i], oa);
89                 if (i == 0)
90                         retval = rc;
91                 else if (retval != rc)
92                         CERROR("different results on multiple OBDs!\n");
93         }
94
95         RETURN(rc);
96 }
97
98 static int lov_close(struct lustre_handle *conn, struct obdo *oa)
99 {
100         int rc, retval, i;
101         ENTRY;
102
103         if (!class_conn2export(conn))
104                 RETURN(-EINVAL);
105
106         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
107                 rc = obd_close(&conn->oc_dev->obd_multi_conn[i], oa);
108                 if (i == 0)
109                         retval = rc;
110                 else if (retval != rc)
111                         CERROR("different results on multiple OBDs!\n");
112         }
113
114         RETURN(rc);
115 }
116
117 static int lov_create(struct lustre_handle *conn, struct obdo *oa)
118 {
119         int rc, retval, i, offset;
120         struct obdo tmp;
121         struct lov_md md;
122         ENTRY;
123
124         if (!class_conn2export(conn))
125                 RETURN(-EINVAL);
126
127         md.lmd_object_id = oa->o_id;
128         md.lmd_stripe_count = conn->oc_dev->obd_multi_count;
129
130         memset(oa->o_inline, 0, sizeof(oa->o_inline));
131         offset = sizeof(md);
132         for (i = 0; i < md.lmd_stripe_count; i++) {
133                 struct lov_object_id lov_id;
134                 rc = obd_create(&conn->oc_dev->obd_multi_conn[i], &tmp);
135                 if (i == 0) {
136                         memcpy(oa, &tmp, sizeof(tmp));
137                         retval = rc;
138                 } else if (retval != rc)
139                         CERROR("return codes didn't match (%d, %d)\n",
140                                retval, rc);
141                 lov_id = (struct lov_object_id *)(oa->o_inline + offset);
142                 lov_id->l_device_id = i;
143                 lov_id->l_object_id = tmp.o_id;
144                 offset += sizeof(*lov_id);
145         }
146         memcpy(oa->o_inline, &md, sizeof(md));
147
148         return rc;
149 }
150
151 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa)
152 {
153         int rc, retval, i, offset;
154         struct obdo tmp;
155         struct lov_md *md;
156         struct lov_object_id lov_id;
157         ENTRY;
158
159         if (!class_conn2export(conn))
160                 RETURN(-EINVAL);
161
162         md = (struct lov_md *)oa->o_inline;
163
164         memcpy(&tmp, oa, sizeof(tmp));
165
166         offset = sizeof(md);
167         for (i = 0; i < md->lmd_stripe_count; i++) {
168                 struct lov_object_id *lov_id;
169                 lov_id = (struct lov_object_id *)(oa->o_inline + offset);
170
171                 tmp.o_id = lov_id->l_object_id;
172
173                 rc = obd_destroy(&conn->oc_dev->obd_multi_conn[i], &tmp);
174                 if (i == 0)
175                         retval = rc;
176                 else if (retval != rc)
177                         CERROR("return codes didn't match (%d, %d)\n",
178                                retval, rc);
179                 offset += sizeof(*lov_id);
180         }
181
182         return rc;
183 }
184
185 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
186  * we can send this 'punch' to just the authoritative node and the nodes
187  * that the punch will affect. */
188 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
189                      obd_size count, obd_off offset)
190 {
191         int rc, retval, i;
192         ENTRY;
193
194         if (!class_conn2export(conn))
195                 RETURN(-EINVAL);
196
197         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
198                 rc = obd_punch(&conn->oc_dev->obd_multi_conn[i], oa, count,
199                                offset);
200                 if (i == 0)
201                         retval = rc;
202                 else if (retval != rc)
203                         CERROR("different results on multiple OBDs!\n");
204         }
205
206         RETURN(rc);
207 }
208
209 struct lov_callback_data {
210         atomic_t count;
211         wait_queue_head waitq;
212 };
213
214 static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
215 {
216         struct lov_callback_data *cb_data = data;
217
218         if (atomic_dec_and_test(&cb_data->count))
219                 wake_up(&cb_data->waitq);
220 }
221
222 static int lov_read_check_status(struct lov_callback_data *cb_data)
223 {
224         ENTRY;
225         if (sigismember(&(current->pending.signal), SIGKILL) ||
226             sigismember(&(current->pending.signal), SIGTERM) ||
227             sigismember(&(current->pending.signal), SIGINT)) {
228                 cb_data->flags |= PTL_RPC_FL_INTR;
229                 RETURN(1);
230         }
231         if (atomic_read(&cb_data->count) == 0)
232                 RETURN(1);
233         RETURN(0);
234 }
235
236 /* buffer must lie in user memory here */
237 static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
238                    struct obdo **oa,
239                    obd_count *oa_bufs, struct page **buf,
240                    obd_size *count, obd_off *offset, obd_flag *flags,
241                    bulk_callback_t callback, void *data)
242 {
243         int rc, i, page_array_offset = 0;
244         obd_off off = offset;
245         obd_size retval = 0;
246         struct lov_callback_data *cb_data;
247         ENTRY;
248
249         if (num_oa != 1)
250                 LBUG();
251
252         if (!class_conn2export(conn))
253                 RETURN(-EINVAL);
254
255         OBD_ALLOC(cb_data, sizeof(*cb_data));
256         if (cb_data == NULL) {
257                 LBUG();
258                 RETURN(-ENOMEM);
259         }
260         INIT_WAITQUEUE_HEAD(&cb_data->waitq);
261         atomic_set(&cb_data->count, 0);
262
263         for (i = 0; i < oa_bufs[0]; i++) {
264                 struct page *current_page = buf[i];
265
266                 struct lov_md *md = (struct lov_md *)oa[i]->inline;
267                 int bufcount = oa_bufs[i];
268                 // md->lmd_stripe_count
269
270                 for (k = page_array_offset; k < bufcount + page_array_offset;
271                      k++) {
272                         
273                 }
274                 page_array_offset += bufcount;
275
276
277         while (off < offset + count) {
278                 int stripe, conn;
279                 obd_size size, tmp;
280
281                 stripe = off / conn->oc_dev->u.lov.lov_stripe_size;
282                 size = (stripe + 1) * conn->oc_dev->u.lov.lov_strip_size - off;
283                 if (size > *count)
284                         size = *count;
285
286                 conn = stripe % conn->oc_dev->obd_multi_count;
287
288                 tmp = size;
289                 atomic_inc(&cb_data->count);
290                 rc = obd_brw(cmd, &conn->oc_dev->obd_multi_conn[conn],
291                              num_oa, oa, buf,
292                               &size, off, lov_read_callback, cb_data);
293                 if (rc == 0)
294                         retval += size;
295                 else {
296                         CERROR("read(off=%Lu, count=%Lu): %d\n",
297                                (unsigned long long)off,
298                                (unsigned long long)size, rc);
299                         break;
300                 }
301
302                 buf += size;
303         }
304
305         wait_event_interruptible(&cb_data->waitq,
306                                  lov_read_check_status(cb_data));
307         if (cb_data->flags & PTL_RPC_FL_INTR)
308                 rc = -EINTR;
309
310         /* FIXME: The error handling here sucks */
311         *count = retval;
312         OBD_FREE(cb_data, sizeof(*cb_data));
313         RETURN(rc);
314 }
315
316 static void lov_write_finished(struct ptlrpc_bulk_desc *desc, void *data)
317 {
318         
319 }
320
321 /* buffer must lie in user memory here */
322 static int filter_write(struct lustre_handle *conn, struct obdo *oa, char *buf,
323                          obd_size *count, obd_off offset)
324 {
325         int err;
326         struct file *file;
327         unsigned long retval;
328
329         ENTRY;
330         if (!class_conn2export(conn)) {
331                 CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
332                 EXIT;
333                 return -EINVAL;
334         }
335
336         file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
337         if (!file || IS_ERR(file)) {
338                 EXIT;
339                 return -PTR_ERR(file);
340         }
341
342         /* count doubles as retval */
343         retval = file->f_op->write(file, buf, *count, (loff_t *)&offset);
344         filp_close(file, 0);
345
346         if ( retval >= 0 ) {
347                 err = 0;
348                 *count = retval;
349                 EXIT;
350         } else {
351                 err = retval;
352                 *count = 0;
353                 EXIT;
354         }
355
356         return err;
357 }
358
359 static int lov_enqueue(struct lustre_handle *conn, struct ldlm_namespace *ns,
360                        struct ldlm_handle *parent_lock, __u64 *res_id,
361                        __u32 type, struct ldlm_extent *extent, __u32 mode,
362                        int *flags, void *data, int datalen,
363                        struct ldlm_handle *lockh)
364 {
365         int rc;
366         ENTRY;
367
368         if (!class_conn2export(conn))
369                 RETURN(-EINVAL);
370
371         rc = obd_enqueue(&conn->oc_dev->obd_multi_conn[0], ns, parent_lock,
372                          res_id, type, extent, mode, flags, data, datalen,
373                          lockh);
374         RETURN(rc);
375 }
376
377 static int lov_cancel(struct lustre_handle *conn, __u32 mode,
378                       struct ldlm_handle *lockh)
379 {
380         int rc;
381         ENTRY;
382
383         if (!class_conn2export(conn))
384                 RETURN(-EINVAL);
385
386         rc = obd_cancel(&conn->oc_dev->obd_multi_conn[0], oa);
387         RETURN(rc);
388 }
389 #endif
390
391 struct obd_ops lov_obd_ops = {
392         o_setup:       class_multi_setup,
393         o_cleanup:     class_multi_cleanup,
394         o_create:      lov_create,
395         o_connect:     lov_connect,
396         o_disconnect:  lov_disconnect,
397         o_getattr:     lov_getattr,
398         o_setattr:     lov_setattr,
399         o_open:        lov_open,
400         o_close:       lov_close,
401 #if 0
402         o_destroy:     lov_destroy,
403         o_brw:         lov_pgcache_brw,
404         o_punch:       lov_punch,
405         o_enqueue:     lov_enqueue,
406         o_cancel:      lov_cancel
407 #endif
408 };
409
410
411 #define LOV_VERSION "v0.1"
412
413 static int __init lov_init(void)
414 {
415         printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
416                ", phil@clusterfs.com\n");
417         return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
418 }
419
420 static void __exit lov_exit(void)
421 {
422         class_unregister_type(OBD_LOV_DEVICENAME);
423 }
424
425 MODULE_AUTHOR("Phil Schwan <phil@clusterfs.com>");
426 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
427 MODULE_LICENSE("GPL");
428
429 module_init(lov_init);
430 module_exit(lov_exit);