Whamcloud - gitweb
A mostly-fix for "mknod /mnt/lustre/foofo p". It doesn't fail outright
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lov/lov.c
5  *
6  * Copyright (C) 2002 Cluster File Systems, Inc.
7  * Author: Phil Schwan <phil@off.net>
8  *
9  * This code is issued under the GNU General Public License.
10  * See the file COPYING in this distribution
11  */
12
13 #define EXPORT_SYMTAB
14 #define DEBUG_SUBSYSTEM S_LOV
15
16 #include <linux/slab.h>
17 #include <linux/module.h>
18 #include <linux/obd_support.h>
19 #include <linux/lustre_lib.h>
20 #include <linux/lustre_net.h>
21 #include <linux/lustre_idl.h>
22 #include <linux/obd_class.h>
23 #include <linux/obd_lov.h>
24
25 extern struct obd_device obd_dev[MAX_OBD_DEVICES];
26
27 /* obd methods */
28
29 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
30 {
31         struct ptlrpc_request *req;
32         struct lov_obd *lov = &obd->u.lov;
33         struct lustre_handle mdc_conn;
34         uuid_t *uuidarray;
35         int rc;
36         int i;
37
38         MOD_INC_USE_COUNT;
39         rc = class_connect(conn, obd);
40         if (rc) {
41                 MOD_DEC_USE_COUNT;
42                 RETURN(rc);
43         }
44
45         rc = obd_connect(&mdc_conn, lov->mdcobd);
46         if (rc) {
47                 CERROR("cannot connect to mdc: rc = %d\n", rc);
48                 GOTO(out, rc = -EINVAL);
49         }
50
51         rc = mdc_getlovinfo(obd, &mdc_conn, &uuidarray, &req);
52         obd_disconnect(&mdc_conn);
53
54         if (rc) {
55                 CERROR("cannot get lov info %d\n", rc);
56                 GOTO(out, rc);
57         }
58
59
60         if (lov->desc.ld_tgt_count > 1000) {
61                 CERROR("configuration error: target count > 1000 (%d)\n",
62                        lov->desc.ld_tgt_count);
63                 GOTO(out, rc = -EINVAL);
64         }
65
66         if (strcmp(obd->obd_uuid, lov->desc.ld_uuid)) {
67                 CERROR("lov uuid %s not on mds device (%s)\n",
68                        obd->obd_uuid, lov->desc.ld_uuid);
69                 GOTO(out, rc = -EINVAL);
70         }
71
72         if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[1] <
73             sizeof(uuid_t) * lov->desc.ld_tgt_count) {
74                 CERROR("invalid uuid array returned\n");
75                 GOTO(out, rc = -EINVAL);
76         }
77
78         lov->bufsize = sizeof(struct lov_tgt_desc) *  lov->desc.ld_tgt_count;
79         OBD_ALLOC(lov->tgts, lov->bufsize);
80         if (!lov->tgts) {
81                 CERROR("Out of memory\n");
82                 GOTO(out, rc = -ENOMEM);
83         }
84
85         uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
86         for (i = 0 ; i < lov->desc.ld_tgt_count; i++)
87                 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(uuid_t));
88
89         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
90                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
91                 if (!tgt) {
92                         CERROR("Target %s not configured\n", uuidarray[i]);
93                         GOTO(out_mem, rc = -EINVAL);
94                 }
95                 rc = obd_connect(&lov->tgts[i].conn, tgt);
96                 if (rc) {
97                         CERROR("Target %s connect error %d\n",
98                                uuidarray[i], rc);
99                         GOTO(out_mem, rc);
100                 }
101         }
102
103  out_mem:
104         if (rc) {
105                 for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
106                         int rc2;
107                         rc2 = obd_disconnect(&lov->tgts[i].conn);
108                         if (rc2)
109                                 CERROR("BAD: Target %s disconnect error %d\n",
110                                        uuidarray[i], rc2);
111                 }
112                 OBD_FREE(lov->tgts, lov->bufsize);
113         }
114  out:
115         if (rc)
116                 class_disconnect(conn);
117
118         ptlrpc_free_req(req);
119         return rc;
120 }
121
122 static int lov_disconnect(struct lustre_handle *conn)
123 {
124         struct obd_device *obd = class_conn2obd(conn);
125         struct lov_obd *lov = &obd->u.lov;
126         int rc;
127         int i; 
128
129         if (!lov->tgts)
130                 goto out_local;
131
132         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { 
133                 rc = obd_disconnect(&lov->tgts[i].conn);
134                 if (rc) { 
135                         CERROR("Target %s disconnect error %d\n", 
136                                lov->tgts[i].uuid, rc); 
137                         RETURN(rc);
138                 }
139         }
140         OBD_FREE(lov->tgts, lov->bufsize);
141         lov->bufsize = 0;
142         lov->tgts = NULL; 
143
144  out_local:
145
146         rc = class_disconnect(conn);
147         if (!rc)
148                 MOD_DEC_USE_COUNT;
149
150         return rc;
151 }
152
153 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
154 {
155         struct obd_ioctl_data* data = buf;
156         struct lov_obd *lov = &obd->u.lov;
157         int rc = 0;
158         ENTRY;
159
160         if (data->ioc_inllen1 < 1) {
161                 CERROR("osc setup requires an MDC UUID\n");
162                 RETURN(-EINVAL);
163         }
164
165         if (data->ioc_inllen1 > 37) {
166                 CERROR("mdc UUID must be less than 38 characters\n");
167                 RETURN(-EINVAL);
168         }
169
170         /* FIXME: we should make a connection instead perhaps to avoid
171            the mdc from walking away? The fs guarantees this. */ 
172         lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1); 
173         if (!lov->mdcobd) { 
174                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid, 
175                        data->ioc_inlbuf1); 
176                 rc = -EINVAL;
177         }
178         RETURN(rc); 
179
180
181
182 static inline int lov_stripe_md_size(struct obd_device *obd)
183 {
184         struct lov_obd *lov = &obd->u.lov;
185         int size;
186
187         size = sizeof(struct lov_stripe_md) + 
188                 lov->desc.ld_tgt_count * sizeof(struct lov_object_id); 
189         return size;
190 }
191
192 static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md **ea)
193 {
194         int rc, i;
195         struct obdo tmp;
196         struct obd_export *export = class_conn2export(conn);
197         struct lov_obd *lov;
198         struct lov_stripe_md *md;
199         ENTRY;
200
201         if (!ea) { 
202                 CERROR("lov_create needs EA for striping information\n"); 
203                 RETURN(-EINVAL); 
204         }
205
206         if (!export)
207                 RETURN(-EINVAL);
208         lov = &export->exp_obd->u.lov;
209
210         oa->o_easize =  lov_stripe_md_size(export->exp_obd);
211         if (!*ea) {
212                 OBD_ALLOC(*ea, oa->o_easize);
213                 if (! *ea)
214                         RETURN(-ENOMEM);
215         }
216
217         md = *ea;
218         md->lmd_size = oa->o_easize;
219         md->lmd_object_id = oa->o_id;
220         if (!md->lmd_stripe_count) { 
221                 md->lmd_stripe_count = lov->desc.ld_default_stripecount;
222         }
223
224         for (i = 0; i < md->lmd_stripe_count; i++) {
225                 struct lov_stripe_md obj_md; 
226                 struct lov_stripe_md *obj_mdp = &obj_md; 
227                 /* create data objects with "parent" OA */ 
228                 memcpy(&tmp, oa, sizeof(tmp));
229                 tmp.o_easize = sizeof(struct lov_stripe_md);
230                 rc = obd_create(&lov->tgts[i].conn, &tmp, &obj_mdp);
231                 if (rc) 
232                         GOTO(out_cleanup, rc); 
233                 md->lmd_objects[i].l_object_id = tmp.o_id;
234         }
235
236  out_cleanup: 
237         if (rc) { 
238                 int i2, rc2;
239                 for (i2 = 0 ; i2 < i ; i2++) { 
240                         /* destroy already created objects here */ 
241                         tmp.o_id = md->lmd_objects[i].l_object_id;
242                         rc2 = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
243                         if (rc2) { 
244                                 CERROR("Failed to remove object from target %d\n", 
245                                        i2); 
246                         }
247                 }
248         }
249         return rc;
250 }
251
252 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, 
253 struct lov_stripe_md *ea)
254 {
255         int rc, i;
256         struct obdo tmp;
257         struct obd_export *export = class_conn2export(conn);
258         struct lov_obd *lov;
259         struct lov_stripe_md *md;
260         ENTRY;
261
262         if (!ea) { 
263                 CERROR("LOV requires striping ea for desctruction\n"); 
264                 RETURN(-EINVAL); 
265         }
266
267         if (!export || !export->exp_obd) 
268                 RETURN(-ENODEV); 
269
270         lov = &export->exp_obd->u.lov;
271         md = ea;
272
273         for (i = 0; i < md->lmd_stripe_count; i++) {
274                 /* create data objects with "parent" OA */ 
275                 memcpy(&tmp, oa, sizeof(tmp));
276                 oa->o_id = md->lmd_objects[i].l_object_id; 
277                 rc = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
278                 if (!rc) { 
279                         CERROR("Error destroying object %Ld on %d\n",
280                                oa->o_id, i); 
281                 }
282         }
283         RETURN(rc);
284 }
285
286 #if 0
287 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa)
288 {
289         int rc;
290         ENTRY;
291
292         if (!class_conn2export(conn))
293                 RETURN(-EINVAL);
294
295         rc = obd_getattr(&conn->oc_dev->obd_multi_conn[0], oa);
296         RETURN(rc);
297 }
298
299 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa)
300 {
301         int rc, retval, i;
302         ENTRY;
303
304         if (!class_conn2export(conn))
305                 RETURN(-EINVAL);
306
307         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
308                 rc = obd_setattr(&conn->oc_dev->obd_multi_conn[i], oa);
309                 if (i == 0)
310                         retval = rc;
311                 else if (retval != rc)
312                         CERROR("different results on multiple OBDs!\n");
313         }
314
315         RETURN(rc);
316 }
317
318 static int lov_open(struct lustre_handle *conn, struct obdo *oa)
319 {
320         int rc, retval, i;
321         ENTRY;
322
323         if (!class_conn2export(conn))
324                 RETURN(-EINVAL);
325
326         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
327                 rc = obd_open(&conn->oc_dev->obd_multi_conn[i], oa);
328                 if (i == 0)
329                         retval = rc;
330                 else if (retval != rc)
331                         CERROR("different results on multiple OBDs!\n");
332         }
333
334         RETURN(rc);
335 }
336
337 static int lov_close(struct lustre_handle *conn, struct obdo *oa)
338 {
339         int rc, retval, i;
340         ENTRY;
341
342         if (!class_conn2export(conn))
343                 RETURN(-EINVAL);
344
345         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
346                 rc = obd_close(&conn->oc_dev->obd_multi_conn[i], oa);
347                 if (i == 0)
348                         retval = rc;
349                 else if (retval != rc)
350                         CERROR("different results on multiple OBDs!\n");
351         }
352
353         RETURN(rc);
354 }
355
356
357
358 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
359  * we can send this 'punch' to just the authoritative node and the nodes
360  * that the punch will affect. */
361 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
362                      obd_size count, obd_off offset)
363 {
364         int rc, retval, i;
365         ENTRY;
366
367         if (!class_conn2export(conn))
368                 RETURN(-EINVAL);
369
370         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
371                 rc = obd_punch(&conn->oc_dev->obd_multi_conn[i], oa, count,
372                                offset);
373                 if (i == 0)
374                         retval = rc;
375                 else if (retval != rc)
376                         CERROR("different results on multiple OBDs!\n");
377         }
378
379         RETURN(rc);
380 }
381
382 struct lov_callback_data {
383         atomic_t count;
384         wait_queue_head waitq;
385 };
386
387 static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
388 {
389         struct lov_callback_data *cb_data = data;
390
391         if (atomic_dec_and_test(&cb_data->count))
392                 wake_up(&cb_data->waitq);
393 }
394
395 static int lov_read_check_status(struct lov_callback_data *cb_data)
396 {
397         ENTRY;
398         if (sigismember(&(current->pending.signal), SIGKILL) ||
399             sigismember(&(current->pending.signal), SIGTERM) ||
400             sigismember(&(current->pending.signal), SIGINT)) {
401                 cb_data->flags |= PTL_RPC_FL_INTR;
402                 RETURN(1);
403         }
404         if (atomic_read(&cb_data->count) == 0)
405                 RETURN(1);
406         RETURN(0);
407 }
408
409 /* buffer must lie in user memory here */
410 static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
411                    struct obdo **oa,
412                    obd_count *oa_bufs, struct page **buf,
413                    obd_size *count, obd_off *offset, obd_flag *flags,
414                    bulk_callback_t callback, void *data)
415 {
416         int rc, i, page_array_offset = 0;
417         obd_off off = offset;
418         obd_size retval = 0;
419         struct lov_callback_data *cb_data;
420         ENTRY;
421
422         if (num_oa != 1)
423                 LBUG();
424
425         if (!class_conn2export(conn))
426                 RETURN(-EINVAL);
427
428         OBD_ALLOC(cb_data, sizeof(*cb_data));
429         if (cb_data == NULL) {
430                 LBUG();
431                 RETURN(-ENOMEM);
432         }
433         INIT_WAITQUEUE_HEAD(&cb_data->waitq);
434         atomic_set(&cb_data->count, 0);
435
436         for (i = 0; i < oa_bufs[0]; i++) {
437                 struct page *current_page = buf[i];
438
439                 struct lov_md *md = (struct lov_md *)oa[i]->inline;
440                 int bufcount = oa_bufs[i];
441                 // md->lmd_stripe_count
442
443                 for (k = page_array_offset; k < bufcount + page_array_offset;
444                      k++) {
445                         
446                 }
447                 page_array_offset += bufcount;
448
449
450         while (off < offset + count) {
451                 int stripe, conn;
452                 obd_size size, tmp;
453
454                 stripe = off / conn->oc_dev->u.lov.lov_stripe_size;
455                 size = (stripe + 1) * conn->oc_dev->u.lov.lov_strip_size - off;
456                 if (size > *count)
457                         size = *count;
458
459                 conn = stripe % conn->oc_dev->obd_multi_count;
460
461                 tmp = size;
462                 atomic_inc(&cb_data->count);
463                 rc = obd_brw(cmd, &conn->oc_dev->obd_multi_conn[conn],
464                              num_oa, oa, buf,
465                               &size, off, lov_read_callback, cb_data);
466                 if (rc == 0)
467                         retval += size;
468                 else {
469                         CERROR("read(off=%Lu, count=%Lu): %d\n",
470                                (unsigned long long)off,
471                                (unsigned long long)size, rc);
472                         break;
473                 }
474
475                 buf += size;
476         }
477
478         wait_event(&cb_data->waitq, lov_read_check_status(cb_data));
479         if (cb_data->flags & PTL_RPC_FL_INTR)
480                 rc = -EINTR;
481
482         /* FIXME: The error handling here sucks */
483         *count = retval;
484         OBD_FREE(cb_data, sizeof(*cb_data));
485         RETURN(rc);
486 }
487
488 static void lov_write_finished(struct ptlrpc_bulk_desc *desc, void *data)
489 {
490         
491 }
492
493 /* buffer must lie in user memory here */
494 static int filter_write(struct lustre_handle *conn, struct obdo *oa, char *buf,
495                          obd_size *count, obd_off offset)
496 {
497         int err;
498         struct file *file;
499         unsigned long retval;
500
501         ENTRY;
502         if (!class_conn2export(conn)) {
503                 CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
504                 EXIT;
505                 return -EINVAL;
506         }
507
508         file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
509         if (!file || IS_ERR(file)) {
510                 EXIT;
511                 return -PTR_ERR(file);
512         }
513
514         /* count doubles as retval */
515         retval = file->f_op->write(file, buf, *count, (loff_t *)&offset);
516         filp_close(file, 0);
517
518         if ( retval >= 0 ) {
519                 err = 0;
520                 *count = retval;
521                 EXIT;
522         } else {
523                 err = retval;
524                 *count = 0;
525                 EXIT;
526         }
527
528         return err;
529 }
530
531 static int lov_enqueue(struct lustre_handle *conn, struct ldlm_namespace *ns,
532                        struct ldlm_handle *parent_lock, __u64 *res_id,
533                        __u32 type, struct ldlm_extent *extent, __u32 mode,
534                        int *flags, void *data, int datalen,
535                        struct ldlm_handle *lockh)
536 {
537         int rc;
538         ENTRY;
539
540         if (!class_conn2export(conn))
541                 RETURN(-EINVAL);
542
543         rc = obd_enqueue(&conn->oc_dev->obd_multi_conn[0], ns, parent_lock,
544                          res_id, type, extent, mode, flags, data, datalen,
545                          lockh);
546         RETURN(rc);
547 }
548
549 static int lov_cancel(struct lustre_handle *conn, __u32 mode,
550                       struct ldlm_handle *lockh)
551 {
552         int rc;
553         ENTRY;
554
555         if (!class_conn2export(conn))
556                 RETURN(-EINVAL);
557
558         rc = obd_cancel(&conn->oc_dev->obd_multi_conn[0], oa);
559         RETURN(rc);
560 }
561 #endif
562
563 struct obd_ops lov_obd_ops = {
564         o_setup:       lov_setup,
565         o_connect:     lov_connect,
566         o_disconnect:  lov_disconnect,
567         o_create:      lov_create,
568         o_destroy:     lov_destroy,
569 #if 0
570         o_getattr:     lov_getattr,
571         o_setattr:     lov_setattr,
572         o_open:        lov_open,
573         o_close:       lov_close,
574         o_brw:         lov_pgcache_brw,
575         o_punch:       lov_punch,
576         o_enqueue:     lov_enqueue,
577         o_cancel:      lov_cancel
578 #endif
579 };
580
581
582 #define LOV_VERSION "v0.1"
583
584 static int __init lov_init(void)
585 {
586         printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
587                ", info@clusterfs.com\n");
588         return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
589 }
590
591 static void __exit lov_exit(void)
592 {
593         class_unregister_type(OBD_LOV_DEVICENAME);
594 }
595
596 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
597 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
598 MODULE_LICENSE("GPL");
599
600 module_init(lov_init);
601 module_exit(lov_exit);