Whamcloud - gitweb
d22151b6f25277577cf255c0a3da7cc349fe537d
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lov/lov.c
5  *
6  * Copyright (C) 2002 Cluster File Systems, Inc.
7  * Author: Phil Schwan <phil@off.net>
8  *
9  * This code is issued under the GNU General Public License.
10  * See the file COPYING in this distribution
11  */
12
13 #define EXPORT_SYMTAB
14 #define DEBUG_SUBSYSTEM S_LOV
15
16 #include <linux/slab.h>
17 #include <linux/module.h>
18 #include <linux/obd_support.h>
19 #include <linux/lustre_lib.h>
20 #include <linux/lustre_net.h>
21 #include <linux/lustre_idl.h>
22 #include <linux/obd_class.h>
23 #include <linux/obd_lov.h>
24
25 extern struct obd_device obd_dev[MAX_OBD_DEVICES];
26
27 /* obd methods */
28
29 static int lov_getinfo(struct obd_device *obd, struct lustre_handle *mdc_connh,
30                        struct lov_desc *desc, uuid_t **uuids,
31                        struct ptlrpc_request **request)
32 {
33         struct ptlrpc_request *req;
34         struct mds_status_req *streq;
35         struct lov_obd *lov = &obd->u.lov;
36         struct mdc_obd *mdc = &lov->mdcobd->u.mdc;
37         int rc, size[2] = {sizeof(*streq)};
38         ENTRY;
39
40         req = ptlrpc_prep_req2(mdc->mdc_client, mdc->mdc_conn, mdc_connh,
41                                MDS_LOVINFO, 1, size, NULL);
42         if (!req)
43                 GOTO(out, rc = -ENOMEM);
44
45         *request = req;
46         streq = lustre_msg_buf(req->rq_reqmsg, 0);
47         streq->flags = HTON__u32(MDS_STATUS_LOV);
48         streq->repbuf = HTON__u32(8000);
49
50         /* prepare for reply */
51         req->rq_level = LUSTRE_CONN_CON;
52         size[0] = sizeof(*desc);
53         size[1] = 8000;
54         req->rq_replen = lustre_msg_size(2, size);
55
56         rc = ptlrpc_queue_wait(req);
57         rc = ptlrpc_check_status(req, rc);
58
59         if (!rc) {
60                 memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
61                 *uuids = lustre_msg_buf(req->rq_repmsg, 1);
62                 lov_unpackdesc(desc);
63         }
64         mdc->mdc_max_mdsize = sizeof(*desc) +
65                 desc->ld_tgt_count * sizeof(uuid_t);
66
67         EXIT;
68  out:
69         return rc;
70 }
71
72 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
73 {
74         struct ptlrpc_request *req;
75         struct lustre_handle mdc_conn;
76         struct lov_obd *lov = &obd->u.lov;
77         uuid_t *uuidarray;
78         int rc;
79         int i;
80
81         MOD_INC_USE_COUNT;
82         memcpy(&mdc_conn, conn, sizeof(mdc_conn));
83         conn->addr = -1;
84         conn->cookie = 0;
85         rc = class_connect(conn, obd);
86         if (rc) {
87                 MOD_DEC_USE_COUNT;
88                 RETURN(rc);
89         }
90
91         rc = lov_getinfo(obd, &mdc_conn, &lov->desc, &uuidarray, &req);
92         if (rc) {
93                 CERROR("cannot get lov info %d\n", rc);
94                 GOTO(out, rc);
95         }
96
97         if (lov->desc.ld_tgt_count > 1000) {
98                 CERROR("configuration error: target count > 1000 (%d)\n",
99                        lov->desc.ld_tgt_count);
100                 GOTO(out, rc = -EINVAL);
101         }
102
103         if (strcmp(obd->obd_uuid, lov->desc.ld_uuid)) {
104                 CERROR("lov uuid %s not on mds device (%s)\n",
105                        obd->obd_uuid, lov->desc.ld_uuid);
106                 GOTO(out, rc = -EINVAL);
107         }
108
109         if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[1] <
110             sizeof(uuid_t) * lov->desc.ld_tgt_count) {
111                 CERROR("invalid uuid array returned\n");
112                 GOTO(out, rc = -EINVAL);
113         }
114
115         lov->bufsize = sizeof(struct lov_tgt_desc) *  lov->desc.ld_tgt_count;
116         OBD_ALLOC(lov->tgts, lov->bufsize);
117         if (!lov->tgts) {
118                 CERROR("Out of memory\n");
119                 GOTO(out, rc = -ENOMEM);
120         }
121
122         uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
123         for (i = 0 ; i < lov->desc.ld_tgt_count; i++)
124                 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(uuid_t));
125
126         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
127                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
128                 if (!tgt) {
129                         CERROR("Target %s not configured\n", uuidarray[i]);
130                         GOTO(out_mem, rc = -EINVAL);
131                 }
132                 rc = obd_connect(&lov->tgts[i].conn, tgt);
133                 if (rc) {
134                         CERROR("Target %s connect error %d\n",
135                                uuidarray[i], rc);
136                         GOTO(out_mem, rc);
137                 }
138         }
139
140  out_mem:
141         if (rc) {
142                 for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
143                         int rc2;
144                         rc2 = obd_disconnect(&lov->tgts[i].conn);
145                         if (rc2)
146                                 CERROR("BAD: Target %s disconnect error %d\n",
147                                        uuidarray[i], rc2);
148                 }
149                 OBD_FREE(lov->tgts, lov->bufsize);
150         }
151  out:
152         if (rc)
153                 class_disconnect(conn);
154
155         ptlrpc_free_req(req);
156         return rc;
157 }
158
159 static int lov_disconnect(struct lustre_handle *conn)
160 {
161         struct obd_device *obd = class_conn2obd(conn);
162         struct lov_obd *lov = &obd->u.lov;
163         int rc;
164         int i; 
165
166         if (!lov->tgts)
167                 goto out_local;
168
169         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { 
170                 rc = obd_disconnect(&lov->tgts[i].conn);
171                 if (rc) { 
172                         CERROR("Target %s disconnect error %d\n", 
173                                lov->tgts[i].uuid, rc); 
174                         RETURN(rc);
175                 }
176         }
177         OBD_FREE(lov->tgts, lov->bufsize);
178         lov->bufsize = 0;
179         lov->tgts = NULL; 
180
181  out_local:
182
183         rc = class_disconnect(conn);
184         if (!rc)
185                 MOD_DEC_USE_COUNT;
186
187         return rc;
188 }
189
190 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
191 {
192         struct obd_ioctl_data* data = buf;
193         struct lov_obd *lov = &obd->u.lov;
194         int rc = 0;
195         ENTRY;
196
197         if (data->ioc_inllen1 < 1) {
198                 CERROR("osc setup requires an MDC UUID\n");
199                 RETURN(-EINVAL);
200         }
201
202         if (data->ioc_inllen1 > 37) {
203                 CERROR("mdc UUID must be less than 38 characters\n");
204                 RETURN(-EINVAL);
205         }
206
207         /* FIXME: we should make a connection instead perhaps to avoid
208            the mdc from walking away? The fs guarantees this. */ 
209         lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1); 
210         if (!lov->mdcobd) { 
211                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid, 
212                        data->ioc_inlbuf1); 
213                 rc = -EINVAL;
214         }
215         RETURN(rc); 
216
217
218
219 static inline int lov_stripe_md_size(struct obd_device *obd)
220 {
221         struct lov_obd *lov = &obd->u.lov;
222         int size;
223
224         size = sizeof(struct lov_stripe_md) + 
225                 lov->desc.ld_tgt_count * sizeof(struct lov_object_id); 
226         return size;
227 }
228
229 static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md **ea)
230 {
231         int rc, i;
232         struct obdo tmp;
233         struct obd_export *export = class_conn2export(conn);
234         struct lov_obd *lov;
235         struct lov_stripe_md *md;
236         ENTRY;
237
238         if (!ea) { 
239                 CERROR("lov_create needs EA for striping information\n"); 
240                 RETURN(-EINVAL); 
241         }
242
243         if (!export)
244                 RETURN(-EINVAL);
245         lov = &export->exp_obd->u.lov;
246
247         oa->o_easize =  lov_stripe_md_size(export->exp_obd);
248         if (!*ea) {
249                 OBD_ALLOC(*ea, oa->o_easize);
250                 if (! *ea)
251                         RETURN(-ENOMEM);
252         }
253
254         md = *ea;
255         md->lmd_size = oa->o_easize;
256         md->lmd_object_id = oa->o_id;
257         if (!md->lmd_stripe_count) { 
258                 md->lmd_stripe_count = lov->desc.ld_default_stripecount;
259         }
260
261         for (i = 0; i < md->lmd_stripe_count; i++) {
262                 struct lov_stripe_md obj_md; 
263                 struct lov_stripe_md *obj_mdp = &obj_md; 
264                 /* create data objects with "parent" OA */ 
265                 memcpy(&tmp, oa, sizeof(tmp));
266                 tmp.o_easize = sizeof(struct lov_stripe_md);
267                 rc = obd_create(&lov->tgts[i].conn, &tmp, &obj_mdp);
268                 if (rc) 
269                         GOTO(out_cleanup, rc); 
270                 md->lmd_objects[i].l_object_id = tmp.o_id;
271         }
272
273  out_cleanup: 
274         if (rc) { 
275                 int i2, rc2;
276                 for (i2 = 0 ; i2 < i ; i2++) { 
277                         /* destroy already created objects here */ 
278                         tmp.o_id = md->lmd_objects[i].l_object_id;
279                         rc2 = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
280                         if (rc2) { 
281                                 CERROR("Failed to remove object from target %d\n", 
282                                        i2); 
283                         }
284                 }
285         }
286         return rc;
287 }
288
289 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, 
290 struct lov_stripe_md *ea)
291 {
292         int rc, i;
293         struct obdo tmp;
294         struct obd_export *export = class_conn2export(conn);
295         struct lov_obd *lov;
296         struct lov_stripe_md *md;
297         ENTRY;
298
299         if (!ea) { 
300                 CERROR("LOV requires striping ea for desctruction\n"); 
301                 RETURN(-EINVAL); 
302         }
303
304         if (!export || !export->exp_obd) 
305                 RETURN(-ENODEV); 
306
307         lov = &export->exp_obd->u.lov;
308         md = ea;
309
310         for (i = 0; i < md->lmd_stripe_count; i++) {
311                 /* create data objects with "parent" OA */ 
312                 memcpy(&tmp, oa, sizeof(tmp));
313                 oa->o_id = md->lmd_objects[i].l_object_id; 
314                 rc = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
315                 if (!rc) { 
316                         CERROR("Error destroying object %Ld on %d\n",
317                                oa->o_id, i); 
318                 }
319         }
320         RETURN(rc);
321 }
322
323 #if 0
324 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa)
325 {
326         int rc;
327         ENTRY;
328
329         if (!class_conn2export(conn))
330                 RETURN(-EINVAL);
331
332         rc = obd_getattr(&conn->oc_dev->obd_multi_conn[0], oa);
333         RETURN(rc);
334 }
335
336 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa)
337 {
338         int rc, retval, i;
339         ENTRY;
340
341         if (!class_conn2export(conn))
342                 RETURN(-EINVAL);
343
344         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
345                 rc = obd_setattr(&conn->oc_dev->obd_multi_conn[i], oa);
346                 if (i == 0)
347                         retval = rc;
348                 else if (retval != rc)
349                         CERROR("different results on multiple OBDs!\n");
350         }
351
352         RETURN(rc);
353 }
354
355 static int lov_open(struct lustre_handle *conn, struct obdo *oa)
356 {
357         int rc, retval, i;
358         ENTRY;
359
360         if (!class_conn2export(conn))
361                 RETURN(-EINVAL);
362
363         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
364                 rc = obd_open(&conn->oc_dev->obd_multi_conn[i], oa);
365                 if (i == 0)
366                         retval = rc;
367                 else if (retval != rc)
368                         CERROR("different results on multiple OBDs!\n");
369         }
370
371         RETURN(rc);
372 }
373
374 static int lov_close(struct lustre_handle *conn, struct obdo *oa)
375 {
376         int rc, retval, i;
377         ENTRY;
378
379         if (!class_conn2export(conn))
380                 RETURN(-EINVAL);
381
382         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
383                 rc = obd_close(&conn->oc_dev->obd_multi_conn[i], oa);
384                 if (i == 0)
385                         retval = rc;
386                 else if (retval != rc)
387                         CERROR("different results on multiple OBDs!\n");
388         }
389
390         RETURN(rc);
391 }
392
393
394
395 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
396  * we can send this 'punch' to just the authoritative node and the nodes
397  * that the punch will affect. */
398 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
399                      obd_size count, obd_off offset)
400 {
401         int rc, retval, i;
402         ENTRY;
403
404         if (!class_conn2export(conn))
405                 RETURN(-EINVAL);
406
407         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
408                 rc = obd_punch(&conn->oc_dev->obd_multi_conn[i], oa, count,
409                                offset);
410                 if (i == 0)
411                         retval = rc;
412                 else if (retval != rc)
413                         CERROR("different results on multiple OBDs!\n");
414         }
415
416         RETURN(rc);
417 }
418
419 struct lov_callback_data {
420         atomic_t count;
421         wait_queue_head waitq;
422 };
423
424 static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
425 {
426         struct lov_callback_data *cb_data = data;
427
428         if (atomic_dec_and_test(&cb_data->count))
429                 wake_up(&cb_data->waitq);
430 }
431
432 static int lov_read_check_status(struct lov_callback_data *cb_data)
433 {
434         ENTRY;
435         if (sigismember(&(current->pending.signal), SIGKILL) ||
436             sigismember(&(current->pending.signal), SIGTERM) ||
437             sigismember(&(current->pending.signal), SIGINT)) {
438                 cb_data->flags |= PTL_RPC_FL_INTR;
439                 RETURN(1);
440         }
441         if (atomic_read(&cb_data->count) == 0)
442                 RETURN(1);
443         RETURN(0);
444 }
445
446 /* buffer must lie in user memory here */
447 static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
448                    struct obdo **oa,
449                    obd_count *oa_bufs, struct page **buf,
450                    obd_size *count, obd_off *offset, obd_flag *flags,
451                    bulk_callback_t callback, void *data)
452 {
453         int rc, i, page_array_offset = 0;
454         obd_off off = offset;
455         obd_size retval = 0;
456         struct lov_callback_data *cb_data;
457         ENTRY;
458
459         if (num_oa != 1)
460                 LBUG();
461
462         if (!class_conn2export(conn))
463                 RETURN(-EINVAL);
464
465         OBD_ALLOC(cb_data, sizeof(*cb_data));
466         if (cb_data == NULL) {
467                 LBUG();
468                 RETURN(-ENOMEM);
469         }
470         INIT_WAITQUEUE_HEAD(&cb_data->waitq);
471         atomic_set(&cb_data->count, 0);
472
473         for (i = 0; i < oa_bufs[0]; i++) {
474                 struct page *current_page = buf[i];
475
476                 struct lov_md *md = (struct lov_md *)oa[i]->inline;
477                 int bufcount = oa_bufs[i];
478                 // md->lmd_stripe_count
479
480                 for (k = page_array_offset; k < bufcount + page_array_offset;
481                      k++) {
482                         
483                 }
484                 page_array_offset += bufcount;
485
486
487         while (off < offset + count) {
488                 int stripe, conn;
489                 obd_size size, tmp;
490
491                 stripe = off / conn->oc_dev->u.lov.lov_stripe_size;
492                 size = (stripe + 1) * conn->oc_dev->u.lov.lov_strip_size - off;
493                 if (size > *count)
494                         size = *count;
495
496                 conn = stripe % conn->oc_dev->obd_multi_count;
497
498                 tmp = size;
499                 atomic_inc(&cb_data->count);
500                 rc = obd_brw(cmd, &conn->oc_dev->obd_multi_conn[conn],
501                              num_oa, oa, buf,
502                               &size, off, lov_read_callback, cb_data);
503                 if (rc == 0)
504                         retval += size;
505                 else {
506                         CERROR("read(off=%Lu, count=%Lu): %d\n",
507                                (unsigned long long)off,
508                                (unsigned long long)size, rc);
509                         break;
510                 }
511
512                 buf += size;
513         }
514
515         wait_event(&cb_data->waitq, lov_read_check_status(cb_data));
516         if (cb_data->flags & PTL_RPC_FL_INTR)
517                 rc = -EINTR;
518
519         /* FIXME: The error handling here sucks */
520         *count = retval;
521         OBD_FREE(cb_data, sizeof(*cb_data));
522         RETURN(rc);
523 }
524
525 static void lov_write_finished(struct ptlrpc_bulk_desc *desc, void *data)
526 {
527         
528 }
529
530 /* buffer must lie in user memory here */
531 static int filter_write(struct lustre_handle *conn, struct obdo *oa, char *buf,
532                          obd_size *count, obd_off offset)
533 {
534         int err;
535         struct file *file;
536         unsigned long retval;
537
538         ENTRY;
539         if (!class_conn2export(conn)) {
540                 CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
541                 EXIT;
542                 return -EINVAL;
543         }
544
545         file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
546         if (!file || IS_ERR(file)) {
547                 EXIT;
548                 return -PTR_ERR(file);
549         }
550
551         /* count doubles as retval */
552         retval = file->f_op->write(file, buf, *count, (loff_t *)&offset);
553         filp_close(file, 0);
554
555         if ( retval >= 0 ) {
556                 err = 0;
557                 *count = retval;
558                 EXIT;
559         } else {
560                 err = retval;
561                 *count = 0;
562                 EXIT;
563         }
564
565         return err;
566 }
567
568 static int lov_enqueue(struct lustre_handle *conn, struct ldlm_namespace *ns,
569                        struct ldlm_handle *parent_lock, __u64 *res_id,
570                        __u32 type, struct ldlm_extent *extent, __u32 mode,
571                        int *flags, void *data, int datalen,
572                        struct ldlm_handle *lockh)
573 {
574         int rc;
575         ENTRY;
576
577         if (!class_conn2export(conn))
578                 RETURN(-EINVAL);
579
580         rc = obd_enqueue(&conn->oc_dev->obd_multi_conn[0], ns, parent_lock,
581                          res_id, type, extent, mode, flags, data, datalen,
582                          lockh);
583         RETURN(rc);
584 }
585
586 static int lov_cancel(struct lustre_handle *conn, __u32 mode,
587                       struct ldlm_handle *lockh)
588 {
589         int rc;
590         ENTRY;
591
592         if (!class_conn2export(conn))
593                 RETURN(-EINVAL);
594
595         rc = obd_cancel(&conn->oc_dev->obd_multi_conn[0], oa);
596         RETURN(rc);
597 }
598 #endif
599
600 struct obd_ops lov_obd_ops = {
601         o_setup:       lov_setup,
602         o_connect:     lov_connect,
603         o_disconnect:  lov_disconnect,
604         o_create:      lov_create,
605         o_destroy:     lov_destroy,
606 #if 0
607         o_getattr:     lov_getattr,
608         o_setattr:     lov_setattr,
609         o_open:        lov_open,
610         o_close:       lov_close,
611         o_brw:         lov_pgcache_brw,
612         o_punch:       lov_punch,
613         o_enqueue:     lov_enqueue,
614         o_cancel:      lov_cancel
615 #endif
616 };
617
618
619 #define LOV_VERSION "v0.1"
620
621 static int __init lov_init(void)
622 {
623         printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
624                ", info@clusterfs.com\n");
625         return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
626 }
627
628 static void __exit lov_exit(void)
629 {
630         class_unregister_type(OBD_LOV_DEVICENAME);
631 }
632
633 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
634 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
635 MODULE_LICENSE("GPL");
636
637 module_init(lov_init);
638 module_exit(lov_exit);