Whamcloud - gitweb
Fix some simple errors and warnings
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lov/lov.c
5  *
6  * Copyright (C) 2002 Cluster File Systems, Inc.
7  * Author: Phil Schwan <phil@off.net>
8  *
9  * This code is issued under the GNU General Public License.
10  * See the file COPYING in this distribution
11  */
12
13 #define EXPORT_SYMTAB
14 #define DEBUG_SUBSYSTEM S_LOV
15
16 #include <linux/slab.h>
17 #include <linux/module.h>
18 #include <linux/obd_support.h>
19 #include <linux/lustre_lib.h>
20 #include <linux/lustre_net.h>
21 #include <linux/lustre_idl.h>
22 #include <linux/lustre_mds.h>
23 #include <linux/obd_class.h>
24 #include <linux/obd_lov.h>
25
26 extern struct obd_device obd_dev[MAX_OBD_DEVICES];
27
28 /* obd methods */
29
30 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
31 {
32         struct ptlrpc_request *req;
33         struct lov_obd *lov = &obd->u.lov;
34         struct lustre_handle mdc_conn;
35         uuid_t *uuidarray;
36         int rc;
37         int i;
38
39         MOD_INC_USE_COUNT;
40         rc = class_connect(conn, obd);
41         if (rc) {
42                 MOD_DEC_USE_COUNT;
43                 RETURN(rc);
44         }
45
46         rc = obd_connect(&mdc_conn, lov->mdcobd);
47         if (rc) {
48                 CERROR("cannot connect to mdc: rc = %d\n", rc);
49                 GOTO(out, rc = -EINVAL);
50         }
51
52         rc = mdc_getlovinfo(obd, &mdc_conn, &uuidarray, &req);
53         obd_disconnect(&mdc_conn);
54
55         if (rc) {
56                 CERROR("cannot get lov info %d\n", rc);
57                 GOTO(out, rc);
58         }
59
60
61         if (lov->desc.ld_tgt_count > 1000) {
62                 CERROR("configuration error: target count > 1000 (%d)\n",
63                        lov->desc.ld_tgt_count);
64                 GOTO(out, rc = -EINVAL);
65         }
66
67         if (strcmp(obd->obd_uuid, lov->desc.ld_uuid)) {
68                 CERROR("lov uuid %s not on mds device (%s)\n",
69                        obd->obd_uuid, lov->desc.ld_uuid);
70                 GOTO(out, rc = -EINVAL);
71         }
72
73         if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[1] <
74             sizeof(uuid_t) * lov->desc.ld_tgt_count) {
75                 CERROR("invalid uuid array returned\n");
76                 GOTO(out, rc = -EINVAL);
77         }
78
79         lov->bufsize = sizeof(struct lov_tgt_desc) *  lov->desc.ld_tgt_count;
80         OBD_ALLOC(lov->tgts, lov->bufsize);
81         if (!lov->tgts) {
82                 CERROR("Out of memory\n");
83                 GOTO(out, rc = -ENOMEM);
84         }
85
86         uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
87         for (i = 0 ; i < lov->desc.ld_tgt_count; i++)
88                 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(uuid_t));
89
90         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
91                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
92                 if (!tgt) {
93                         CERROR("Target %s not configured\n", uuidarray[i]);
94                         GOTO(out_mem, rc = -EINVAL);
95                 }
96                 rc = obd_connect(&lov->tgts[i].conn, tgt);
97                 if (rc) {
98                         CERROR("Target %s connect error %d\n",
99                                uuidarray[i], rc);
100                         GOTO(out_mem, rc);
101                 }
102         }
103
104  out_mem:
105         if (rc) {
106                 for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
107                         int rc2;
108                         rc2 = obd_disconnect(&lov->tgts[i].conn);
109                         if (rc2)
110                                 CERROR("BAD: Target %s disconnect error %d\n",
111                                        uuidarray[i], rc2);
112                 }
113                 OBD_FREE(lov->tgts, lov->bufsize);
114         }
115  out:
116         if (rc)
117                 class_disconnect(conn);
118
119         ptlrpc_free_req(req);
120         return rc;
121 }
122
123 static int lov_disconnect(struct lustre_handle *conn)
124 {
125         struct obd_device *obd = class_conn2obd(conn);
126         struct lov_obd *lov = &obd->u.lov;
127         int rc;
128         int i; 
129
130         if (!lov->tgts)
131                 goto out_local;
132
133         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { 
134                 rc = obd_disconnect(&lov->tgts[i].conn);
135                 if (rc) { 
136                         CERROR("Target %s disconnect error %d\n", 
137                                lov->tgts[i].uuid, rc); 
138                         RETURN(rc);
139                 }
140         }
141         OBD_FREE(lov->tgts, lov->bufsize);
142         lov->bufsize = 0;
143         lov->tgts = NULL; 
144
145  out_local:
146
147         rc = class_disconnect(conn);
148         if (!rc)
149                 MOD_DEC_USE_COUNT;
150
151         return rc;
152 }
153
154 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
155 {
156         struct obd_ioctl_data* data = buf;
157         struct lov_obd *lov = &obd->u.lov;
158         int rc = 0;
159         ENTRY;
160
161         if (data->ioc_inllen1 < 1) {
162                 CERROR("osc setup requires an MDC UUID\n");
163                 RETURN(-EINVAL);
164         }
165
166         if (data->ioc_inllen1 > 37) {
167                 CERROR("mdc UUID must be less than 38 characters\n");
168                 RETURN(-EINVAL);
169         }
170
171         /* FIXME: we should make a connection instead perhaps to avoid
172            the mdc from walking away? The fs guarantees this. */ 
173         lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1); 
174         if (!lov->mdcobd) { 
175                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid, 
176                        data->ioc_inlbuf1); 
177                 rc = -EINVAL;
178         }
179         RETURN(rc); 
180
181
182
183 static inline int lov_stripe_md_size(struct obd_device *obd)
184 {
185         struct lov_obd *lov = &obd->u.lov;
186         int size;
187
188         size = sizeof(struct lov_stripe_md) + 
189                 lov->desc.ld_tgt_count * sizeof(struct lov_object_id); 
190         return size;
191 }
192
193 static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md **ea)
194 {
195         int rc = 0, i;
196         struct obdo tmp;
197         struct obd_export *export = class_conn2export(conn);
198         struct lov_obd *lov;
199         struct lov_stripe_md *md;
200         ENTRY;
201
202         if (!ea) { 
203                 CERROR("lov_create needs EA for striping information\n"); 
204                 RETURN(-EINVAL); 
205         }
206
207         if (!export)
208                 RETURN(-EINVAL);
209         lov = &export->exp_obd->u.lov;
210
211         oa->o_easize =  lov_stripe_md_size(export->exp_obd);
212         if (!*ea) {
213                 OBD_ALLOC(*ea, oa->o_easize);
214                 if (! *ea)
215                         RETURN(-ENOMEM);
216         }
217
218         md = *ea;
219         md->lmd_size = oa->o_easize;
220         md->lmd_object_id = oa->o_id;
221         if (!md->lmd_stripe_count) { 
222                 md->lmd_stripe_count = lov->desc.ld_default_stripecount;
223         }
224
225         for (i = 0; i < md->lmd_stripe_count; i++) {
226                 struct lov_stripe_md obj_md; 
227                 struct lov_stripe_md *obj_mdp = &obj_md; 
228                 /* create data objects with "parent" OA */ 
229                 memcpy(&tmp, oa, sizeof(tmp));
230                 tmp.o_easize = sizeof(struct lov_stripe_md);
231                 rc = obd_create(&lov->tgts[i].conn, &tmp, &obj_mdp);
232                 if (rc) 
233                         GOTO(out_cleanup, rc); 
234                 md->lmd_objects[i].l_object_id = tmp.o_id;
235         }
236
237  out_cleanup: 
238         if (rc) { 
239                 int i2, rc2;
240                 for (i2 = 0 ; i2 < i ; i2++) { 
241                         /* destroy already created objects here */ 
242                         tmp.o_id = md->lmd_objects[i].l_object_id;
243                         rc2 = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
244                         if (rc2) { 
245                                 CERROR("Failed to remove object from target %d\n", 
246                                        i2); 
247                         }
248                 }
249         }
250         return rc;
251 }
252
253 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, 
254 struct lov_stripe_md *ea)
255 {
256         int rc = 0, i;
257         struct obdo tmp;
258         struct obd_export *export = class_conn2export(conn);
259         struct lov_obd *lov;
260         struct lov_stripe_md *md;
261         ENTRY;
262
263         if (!ea) { 
264                 CERROR("LOV requires striping ea for desctruction\n"); 
265                 RETURN(-EINVAL); 
266         }
267
268         if (!export || !export->exp_obd) 
269                 RETURN(-ENODEV); 
270
271         lov = &export->exp_obd->u.lov;
272         md = ea;
273
274         for (i = 0; i < md->lmd_stripe_count; i++) {
275                 /* create data objects with "parent" OA */ 
276                 memcpy(&tmp, oa, sizeof(tmp));
277                 oa->o_id = md->lmd_objects[i].l_object_id; 
278                 rc = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
279                 if (!rc) { 
280                         CERROR("Error destroying object %Ld on %d\n",
281                                oa->o_id, i); 
282                 }
283         }
284         RETURN(rc);
285 }
286
287 #if 0
288 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa)
289 {
290         int rc;
291         ENTRY;
292
293         if (!class_conn2export(conn))
294                 RETURN(-EINVAL);
295
296         rc = obd_getattr(&conn->oc_dev->obd_multi_conn[0], oa);
297         RETURN(rc);
298 }
299
300 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa)
301 {
302         int rc, retval, i;
303         ENTRY;
304
305         if (!class_conn2export(conn))
306                 RETURN(-EINVAL);
307
308         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
309                 rc = obd_setattr(&conn->oc_dev->obd_multi_conn[i], oa);
310                 if (i == 0)
311                         retval = rc;
312                 else if (retval != rc)
313                         CERROR("different results on multiple OBDs!\n");
314         }
315
316         RETURN(rc);
317 }
318
319 static int lov_open(struct lustre_handle *conn, struct obdo *oa)
320 {
321         int rc, retval, i;
322         ENTRY;
323
324         if (!class_conn2export(conn))
325                 RETURN(-EINVAL);
326
327         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
328                 rc = obd_open(&conn->oc_dev->obd_multi_conn[i], oa);
329                 if (i == 0)
330                         retval = rc;
331                 else if (retval != rc)
332                         CERROR("different results on multiple OBDs!\n");
333         }
334
335         RETURN(rc);
336 }
337
338 static int lov_close(struct lustre_handle *conn, struct obdo *oa)
339 {
340         int rc, retval, i;
341         ENTRY;
342
343         if (!class_conn2export(conn))
344                 RETURN(-EINVAL);
345
346         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
347                 rc = obd_close(&conn->oc_dev->obd_multi_conn[i], oa);
348                 if (i == 0)
349                         retval = rc;
350                 else if (retval != rc)
351                         CERROR("different results on multiple OBDs!\n");
352         }
353
354         RETURN(rc);
355 }
356
357
358
359 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
360  * we can send this 'punch' to just the authoritative node and the nodes
361  * that the punch will affect. */
362 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
363                      obd_size count, obd_off offset)
364 {
365         int rc, retval, i;
366         ENTRY;
367
368         if (!class_conn2export(conn))
369                 RETURN(-EINVAL);
370
371         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
372                 rc = obd_punch(&conn->oc_dev->obd_multi_conn[i], oa, count,
373                                offset);
374                 if (i == 0)
375                         retval = rc;
376                 else if (retval != rc)
377                         CERROR("different results on multiple OBDs!\n");
378         }
379
380         RETURN(rc);
381 }
382
383 struct lov_callback_data {
384         atomic_t count;
385         wait_queue_head waitq;
386 };
387
388 static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
389 {
390         struct lov_callback_data *cb_data = data;
391
392         if (atomic_dec_and_test(&cb_data->count))
393                 wake_up(&cb_data->waitq);
394 }
395
396 static int lov_read_check_status(struct lov_callback_data *cb_data)
397 {
398         ENTRY;
399         if (sigismember(&(current->pending.signal), SIGKILL) ||
400             sigismember(&(current->pending.signal), SIGTERM) ||
401             sigismember(&(current->pending.signal), SIGINT)) {
402                 cb_data->flags |= PTL_RPC_FL_INTR;
403                 RETURN(1);
404         }
405         if (atomic_read(&cb_data->count) == 0)
406                 RETURN(1);
407         RETURN(0);
408 }
409
410 /* buffer must lie in user memory here */
411 static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
412                    struct obdo **oa,
413                    obd_count *oa_bufs, struct page **buf,
414                    obd_size *count, obd_off *offset, obd_flag *flags,
415                    bulk_callback_t callback, void *data)
416 {
417         int rc, i, page_array_offset = 0;
418         obd_off off = offset;
419         obd_size retval = 0;
420         struct lov_callback_data *cb_data;
421         ENTRY;
422
423         if (num_oa != 1)
424                 LBUG();
425
426         if (!class_conn2export(conn))
427                 RETURN(-EINVAL);
428
429         OBD_ALLOC(cb_data, sizeof(*cb_data));
430         if (cb_data == NULL) {
431                 LBUG();
432                 RETURN(-ENOMEM);
433         }
434         INIT_WAITQUEUE_HEAD(&cb_data->waitq);
435         atomic_set(&cb_data->count, 0);
436
437         for (i = 0; i < oa_bufs[0]; i++) {
438                 struct page *current_page = buf[i];
439
440                 struct lov_md *md = (struct lov_md *)oa[i]->inline;
441                 int bufcount = oa_bufs[i];
442                 // md->lmd_stripe_count
443
444                 for (k = page_array_offset; k < bufcount + page_array_offset;
445                      k++) {
446                         
447                 }
448                 page_array_offset += bufcount;
449
450
451         while (off < offset + count) {
452                 int stripe, conn;
453                 obd_size size, tmp;
454
455                 stripe = off / conn->oc_dev->u.lov.lov_stripe_size;
456                 size = (stripe + 1) * conn->oc_dev->u.lov.lov_strip_size - off;
457                 if (size > *count)
458                         size = *count;
459
460                 conn = stripe % conn->oc_dev->obd_multi_count;
461
462                 tmp = size;
463                 atomic_inc(&cb_data->count);
464                 rc = obd_brw(cmd, &conn->oc_dev->obd_multi_conn[conn],
465                              num_oa, oa, buf,
466                               &size, off, lov_read_callback, cb_data);
467                 if (rc == 0)
468                         retval += size;
469                 else {
470                         CERROR("read(off=%Lu, count=%Lu): %d\n",
471                                (unsigned long long)off,
472                                (unsigned long long)size, rc);
473                         break;
474                 }
475
476                 buf += size;
477         }
478
479         wait_event(&cb_data->waitq, lov_read_check_status(cb_data));
480         if (cb_data->flags & PTL_RPC_FL_INTR)
481                 rc = -EINTR;
482
483         /* FIXME: The error handling here sucks */
484         *count = retval;
485         OBD_FREE(cb_data, sizeof(*cb_data));
486         RETURN(rc);
487 }
488
489 static void lov_write_finished(struct ptlrpc_bulk_desc *desc, void *data)
490 {
491         
492 }
493
494 /* buffer must lie in user memory here */
495 static int filter_write(struct lustre_handle *conn, struct obdo *oa, char *buf,
496                          obd_size *count, obd_off offset)
497 {
498         int err;
499         struct file *file;
500         unsigned long retval;
501
502         ENTRY;
503         if (!class_conn2export(conn)) {
504                 CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
505                 EXIT;
506                 return -EINVAL;
507         }
508
509         file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
510         if (!file || IS_ERR(file)) {
511                 EXIT;
512                 return -PTR_ERR(file);
513         }
514
515         /* count doubles as retval */
516         retval = file->f_op->write(file, buf, *count, (loff_t *)&offset);
517         filp_close(file, 0);
518
519         if ( retval >= 0 ) {
520                 err = 0;
521                 *count = retval;
522                 EXIT;
523         } else {
524                 err = retval;
525                 *count = 0;
526                 EXIT;
527         }
528
529         return err;
530 }
531
532 static int lov_enqueue(struct lustre_handle *conn, struct ldlm_namespace *ns,
533                        struct ldlm_handle *parent_lock, __u64 *res_id,
534                        __u32 type, struct ldlm_extent *extent, __u32 mode,
535                        int *flags, void *data, int datalen,
536                        struct ldlm_handle *lockh)
537 {
538         int rc;
539         ENTRY;
540
541         if (!class_conn2export(conn))
542                 RETURN(-EINVAL);
543
544         rc = obd_enqueue(&conn->oc_dev->obd_multi_conn[0], ns, parent_lock,
545                          res_id, type, extent, mode, flags, data, datalen,
546                          lockh);
547         RETURN(rc);
548 }
549
550 static int lov_cancel(struct lustre_handle *conn, __u32 mode,
551                       struct ldlm_handle *lockh)
552 {
553         int rc;
554         ENTRY;
555
556         if (!class_conn2export(conn))
557                 RETURN(-EINVAL);
558
559         rc = obd_cancel(&conn->oc_dev->obd_multi_conn[0], oa);
560         RETURN(rc);
561 }
562 #endif
563
564 struct obd_ops lov_obd_ops = {
565         o_setup:       lov_setup,
566         o_connect:     lov_connect,
567         o_disconnect:  lov_disconnect,
568         o_create:      lov_create,
569         o_destroy:     lov_destroy,
570 #if 0
571         o_getattr:     lov_getattr,
572         o_setattr:     lov_setattr,
573         o_open:        lov_open,
574         o_close:       lov_close,
575         o_brw:         lov_pgcache_brw,
576         o_punch:       lov_punch,
577         o_enqueue:     lov_enqueue,
578         o_cancel:      lov_cancel
579 #endif
580 };
581
582
583 #define LOV_VERSION "v0.1"
584
585 static int __init lov_init(void)
586 {
587         printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
588                ", info@clusterfs.com\n");
589         return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
590 }
591
592 static void __exit lov_exit(void)
593 {
594         class_unregister_type(OBD_LOV_DEVICENAME);
595 }
596
597 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
598 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
599 MODULE_LICENSE("GPL");
600
601 module_init(lov_init);
602 module_exit(lov_exit);