Whamcloud - gitweb
2cd84ab6997bee299c974d112f686f48099861cc
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lov/lov.c
5  *
6  * Copyright (C) 2002 Cluster File Systems, Inc.
7  * Author: Phil Schwan <phil@off.net>
8  *
9  * This code is issued under the GNU General Public License.
10  * See the file COPYING in this distribution
11  */
12
13 #define EXPORT_SYMTAB
14 #define DEBUG_SUBSYSTEM S_LOV
15
16 #include <linux/slab.h>
17 #include <linux/module.h>
18 #include <linux/obd_support.h>
19 #include <linux/lustre_lib.h>
20 #include <linux/lustre_net.h>
21 #include <linux/lustre_idl.h>
22 #include <linux/lustre_mds.h>
23 #include <linux/obd_class.h>
24 #include <linux/obd_lov.h>
25 #include <linux/init.h>
26
27 extern struct obd_device obd_dev[MAX_OBD_DEVICES];
28
29 /* obd methods */
30
31 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
32 {
33         struct ptlrpc_request *req;
34         struct lov_obd *lov = &obd->u.lov;
35         struct lustre_handle mdc_conn;
36         uuid_t *uuidarray;
37         int rc;
38         int i;
39
40         MOD_INC_USE_COUNT;
41         rc = class_connect(conn, obd);
42         if (rc) {
43                 MOD_DEC_USE_COUNT;
44                 RETURN(rc);
45         }
46
47         rc = obd_connect(&mdc_conn, lov->mdcobd);
48         if (rc) {
49                 CERROR("cannot connect to mdc: rc = %d\n", rc);
50                 GOTO(out, rc = -EINVAL);
51         }
52
53         rc = mdc_getlovinfo(obd, &mdc_conn, &uuidarray, &req);
54         obd_disconnect(&mdc_conn);
55
56         if (rc) {
57                 CERROR("cannot get lov info %d\n", rc);
58                 GOTO(out, rc);
59         }
60
61
62         if (lov->desc.ld_tgt_count > 1000) {
63                 CERROR("configuration error: target count > 1000 (%d)\n",
64                        lov->desc.ld_tgt_count);
65                 GOTO(out, rc = -EINVAL);
66         }
67
68         if (strcmp(obd->obd_uuid, lov->desc.ld_uuid)) {
69                 CERROR("lov uuid %s not on mds device (%s)\n",
70                        obd->obd_uuid, lov->desc.ld_uuid);
71                 GOTO(out, rc = -EINVAL);
72         }
73
74         if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[1] <
75             sizeof(uuid_t) * lov->desc.ld_tgt_count) {
76                 CERROR("invalid uuid array returned\n");
77                 GOTO(out, rc = -EINVAL);
78         }
79
80         lov->bufsize = sizeof(struct lov_tgt_desc) *  lov->desc.ld_tgt_count;
81         OBD_ALLOC(lov->tgts, lov->bufsize);
82         if (!lov->tgts) {
83                 CERROR("Out of memory\n");
84                 GOTO(out, rc = -ENOMEM);
85         }
86
87         uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
88         for (i = 0 ; i < lov->desc.ld_tgt_count; i++)
89                 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(uuid_t));
90
91         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
92                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
93                 if (!tgt) {
94                         CERROR("Target %s not configured\n", uuidarray[i]);
95                         GOTO(out_mem, rc = -EINVAL);
96                 }
97                 rc = obd_connect(&lov->tgts[i].conn, tgt);
98                 if (rc) {
99                         CERROR("Target %s connect error %d\n",
100                                uuidarray[i], rc);
101                         GOTO(out_mem, rc);
102                 }
103         }
104
105  out_mem:
106         if (rc) {
107                 for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
108                         int rc2;
109                         rc2 = obd_disconnect(&lov->tgts[i].conn);
110                         if (rc2)
111                                 CERROR("BAD: Target %s disconnect error %d\n",
112                                        uuidarray[i], rc2);
113                 }
114                 OBD_FREE(lov->tgts, lov->bufsize);
115         }
116  out:
117         if (rc)
118                 class_disconnect(conn);
119
120         ptlrpc_free_req(req);
121         return rc;
122 }
123
124 static int lov_disconnect(struct lustre_handle *conn)
125 {
126         struct obd_device *obd = class_conn2obd(conn);
127         struct lov_obd *lov = &obd->u.lov;
128         int rc;
129         int i; 
130
131         if (!lov->tgts)
132                 goto out_local;
133
134         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { 
135                 rc = obd_disconnect(&lov->tgts[i].conn);
136                 if (rc) { 
137                         CERROR("Target %s disconnect error %d\n", 
138                                lov->tgts[i].uuid, rc); 
139                         RETURN(rc);
140                 }
141         }
142         OBD_FREE(lov->tgts, lov->bufsize);
143         lov->bufsize = 0;
144         lov->tgts = NULL; 
145
146  out_local:
147
148         rc = class_disconnect(conn);
149         if (!rc)
150                 MOD_DEC_USE_COUNT;
151
152         return rc;
153 }
154
155 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
156 {
157         struct obd_ioctl_data* data = buf;
158         struct lov_obd *lov = &obd->u.lov;
159         int rc = 0;
160         ENTRY;
161
162         if (data->ioc_inllen1 < 1) {
163                 CERROR("osc setup requires an MDC UUID\n");
164                 RETURN(-EINVAL);
165         }
166
167         if (data->ioc_inllen1 > 37) {
168                 CERROR("mdc UUID must be less than 38 characters\n");
169                 RETURN(-EINVAL);
170         }
171
172         /* FIXME: we should make a connection instead perhaps to avoid
173            the mdc from walking away? The fs guarantees this. */ 
174         lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1); 
175         if (!lov->mdcobd) { 
176                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid, 
177                        data->ioc_inlbuf1); 
178                 rc = -EINVAL;
179         }
180         RETURN(rc); 
181
182
183
184 static inline int lov_stripe_md_size(struct obd_device *obd)
185 {
186         struct lov_obd *lov = &obd->u.lov;
187         int size;
188
189         size = sizeof(struct lov_stripe_md) + 
190                 lov->desc.ld_tgt_count * sizeof(struct lov_object_id); 
191         return size;
192 }
193
194 static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md **ea)
195 {
196         int rc = 0, i;
197         struct obdo tmp;
198         struct obd_export *export = class_conn2export(conn);
199         struct lov_obd *lov;
200         struct lov_stripe_md *md;
201         ENTRY;
202
203         if (!ea) { 
204                 CERROR("lov_create needs EA for striping information\n"); 
205                 RETURN(-EINVAL); 
206         }
207
208         if (!export)
209                 RETURN(-EINVAL);
210         lov = &export->exp_obd->u.lov;
211
212         oa->o_easize =  lov_stripe_md_size(export->exp_obd);
213         if (!*ea) {
214                 OBD_ALLOC(*ea, oa->o_easize);
215                 if (! *ea)
216                         RETURN(-ENOMEM);
217         }
218
219         md = *ea;
220         md->lmd_size = oa->o_easize;
221         md->lmd_object_id = oa->o_id;
222         if (!md->lmd_stripe_count) { 
223                 md->lmd_stripe_count = lov->desc.ld_default_stripecount;
224         }
225
226         for (i = 0; i < md->lmd_stripe_count; i++) {
227                 struct lov_stripe_md obj_md; 
228                 struct lov_stripe_md *obj_mdp = &obj_md; 
229                 /* create data objects with "parent" OA */ 
230                 memcpy(&tmp, oa, sizeof(tmp));
231                 tmp.o_easize = sizeof(struct lov_stripe_md);
232                 rc = obd_create(&lov->tgts[i].conn, &tmp, &obj_mdp);
233                 if (rc) 
234                         GOTO(out_cleanup, rc); 
235                 md->lmd_objects[i].l_object_id = tmp.o_id;
236         }
237
238  out_cleanup: 
239         if (rc) { 
240                 int i2, rc2;
241                 for (i2 = 0 ; i2 < i ; i2++) { 
242                         /* destroy already created objects here */ 
243                         tmp.o_id = md->lmd_objects[i].l_object_id;
244                         rc2 = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
245                         if (rc2) { 
246                                 CERROR("Failed to remove object from target %d\n", 
247                                        i2); 
248                         }
249                 }
250         }
251         return rc;
252 }
253
254 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, 
255 struct lov_stripe_md *ea)
256 {
257         int rc = 0, i;
258         struct obdo tmp;
259         struct obd_export *export = class_conn2export(conn);
260         struct lov_obd *lov;
261         struct lov_stripe_md *md;
262         ENTRY;
263
264         if (!ea) { 
265                 CERROR("LOV requires striping ea for desctruction\n"); 
266                 RETURN(-EINVAL); 
267         }
268
269         if (!export || !export->exp_obd) 
270                 RETURN(-ENODEV); 
271
272         lov = &export->exp_obd->u.lov;
273         md = ea;
274
275         for (i = 0; i < md->lmd_stripe_count; i++) {
276                 /* create data objects with "parent" OA */ 
277                 memcpy(&tmp, oa, sizeof(tmp));
278                 oa->o_id = md->lmd_objects[i].l_object_id; 
279                 rc = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
280                 if (!rc) { 
281                         CERROR("Error destroying object %Ld on %d\n",
282                                oa->o_id, i); 
283                 }
284         }
285         RETURN(rc);
286 }
287
288 #if 0
289 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa)
290 {
291         int rc;
292         ENTRY;
293
294         if (!class_conn2export(conn))
295                 RETURN(-EINVAL);
296
297         rc = obd_getattr(&conn->oc_dev->obd_multi_conn[0], oa);
298         RETURN(rc);
299 }
300
301 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa)
302 {
303         int rc, retval, i;
304         ENTRY;
305
306         if (!class_conn2export(conn))
307                 RETURN(-EINVAL);
308
309         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
310                 rc = obd_setattr(&conn->oc_dev->obd_multi_conn[i], oa);
311                 if (i == 0)
312                         retval = rc;
313                 else if (retval != rc)
314                         CERROR("different results on multiple OBDs!\n");
315         }
316
317         RETURN(rc);
318 }
319
320 static int lov_open(struct lustre_handle *conn, struct obdo *oa)
321 {
322         int rc, retval, i;
323         ENTRY;
324
325         if (!class_conn2export(conn))
326                 RETURN(-EINVAL);
327
328         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
329                 rc = obd_open(&conn->oc_dev->obd_multi_conn[i], oa);
330                 if (i == 0)
331                         retval = rc;
332                 else if (retval != rc)
333                         CERROR("different results on multiple OBDs!\n");
334         }
335
336         RETURN(rc);
337 }
338
339 static int lov_close(struct lustre_handle *conn, struct obdo *oa)
340 {
341         int rc, retval, i;
342         ENTRY;
343
344         if (!class_conn2export(conn))
345                 RETURN(-EINVAL);
346
347         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
348                 rc = obd_close(&conn->oc_dev->obd_multi_conn[i], oa);
349                 if (i == 0)
350                         retval = rc;
351                 else if (retval != rc)
352                         CERROR("different results on multiple OBDs!\n");
353         }
354
355         RETURN(rc);
356 }
357
358
359
360 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
361  * we can send this 'punch' to just the authoritative node and the nodes
362  * that the punch will affect. */
363 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
364                      obd_size count, obd_off offset)
365 {
366         int rc, retval, i;
367         ENTRY;
368
369         if (!class_conn2export(conn))
370                 RETURN(-EINVAL);
371
372         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
373                 rc = obd_punch(&conn->oc_dev->obd_multi_conn[i], oa, count,
374                                offset);
375                 if (i == 0)
376                         retval = rc;
377                 else if (retval != rc)
378                         CERROR("different results on multiple OBDs!\n");
379         }
380
381         RETURN(rc);
382 }
383
384 struct lov_callback_data {
385         atomic_t count;
386         wait_queue_head waitq;
387 };
388
389 static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
390 {
391         struct lov_callback_data *cb_data = data;
392
393         if (atomic_dec_and_test(&cb_data->count))
394                 wake_up(&cb_data->waitq);
395 }
396
397 static int lov_read_check_status(struct lov_callback_data *cb_data)
398 {
399         ENTRY;
400         if (sigismember(&(current->pending.signal), SIGKILL) ||
401             sigismember(&(current->pending.signal), SIGTERM) ||
402             sigismember(&(current->pending.signal), SIGINT)) {
403                 cb_data->flags |= PTL_RPC_FL_INTR;
404                 RETURN(1);
405         }
406         if (atomic_read(&cb_data->count) == 0)
407                 RETURN(1);
408         RETURN(0);
409 }
410
411 /* buffer must lie in user memory here */
412 static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
413                    struct obdo **oa,
414                    obd_count *oa_bufs, struct page **buf,
415                    obd_size *count, obd_off *offset, obd_flag *flags,
416                    bulk_callback_t callback, void *data)
417 {
418         int rc, i, page_array_offset = 0;
419         obd_off off = offset;
420         obd_size retval = 0;
421         struct lov_callback_data *cb_data;
422         ENTRY;
423
424         if (num_oa != 1)
425                 LBUG();
426
427         if (!class_conn2export(conn))
428                 RETURN(-EINVAL);
429
430         OBD_ALLOC(cb_data, sizeof(*cb_data));
431         if (cb_data == NULL) {
432                 LBUG();
433                 RETURN(-ENOMEM);
434         }
435         INIT_WAITQUEUE_HEAD(&cb_data->waitq);
436         atomic_set(&cb_data->count, 0);
437
438         for (i = 0; i < oa_bufs[0]; i++) {
439                 struct page *current_page = buf[i];
440
441                 struct lov_md *md = (struct lov_md *)oa[i]->inline;
442                 int bufcount = oa_bufs[i];
443                 // md->lmd_stripe_count
444
445                 for (k = page_array_offset; k < bufcount + page_array_offset;
446                      k++) {
447                         
448                 }
449                 page_array_offset += bufcount;
450
451
452         while (off < offset + count) {
453                 int stripe, conn;
454                 obd_size size, tmp;
455
456                 stripe = off / conn->oc_dev->u.lov.lov_stripe_size;
457                 size = (stripe + 1) * conn->oc_dev->u.lov.lov_strip_size - off;
458                 if (size > *count)
459                         size = *count;
460
461                 conn = stripe % conn->oc_dev->obd_multi_count;
462
463                 tmp = size;
464                 atomic_inc(&cb_data->count);
465                 rc = obd_brw(cmd, &conn->oc_dev->obd_multi_conn[conn],
466                              num_oa, oa, buf,
467                               &size, off, lov_read_callback, cb_data);
468                 if (rc == 0)
469                         retval += size;
470                 else {
471                         CERROR("read(off=%Lu, count=%Lu): %d\n",
472                                (unsigned long long)off,
473                                (unsigned long long)size, rc);
474                         break;
475                 }
476
477                 buf += size;
478         }
479
480         wait_event(&cb_data->waitq, lov_read_check_status(cb_data));
481         if (cb_data->flags & PTL_RPC_FL_INTR)
482                 rc = -EINTR;
483
484         /* FIXME: The error handling here sucks */
485         *count = retval;
486         OBD_FREE(cb_data, sizeof(*cb_data));
487         RETURN(rc);
488 }
489
490 static void lov_write_finished(struct ptlrpc_bulk_desc *desc, void *data)
491 {
492         
493 }
494
495 /* buffer must lie in user memory here */
496 static int filter_write(struct lustre_handle *conn, struct obdo *oa, char *buf,
497                          obd_size *count, obd_off offset)
498 {
499         int err;
500         struct file *file;
501         unsigned long retval;
502
503         ENTRY;
504         if (!class_conn2export(conn)) {
505                 CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
506                 EXIT;
507                 return -EINVAL;
508         }
509
510         file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
511         if (!file || IS_ERR(file)) {
512                 EXIT;
513                 return -PTR_ERR(file);
514         }
515
516         /* count doubles as retval */
517         retval = file->f_op->write(file, buf, *count, (loff_t *)&offset);
518         filp_close(file, 0);
519
520         if ( retval >= 0 ) {
521                 err = 0;
522                 *count = retval;
523                 EXIT;
524         } else {
525                 err = retval;
526                 *count = 0;
527                 EXIT;
528         }
529
530         return err;
531 }
532
533 static int lov_enqueue(struct lustre_handle *conn, struct ldlm_namespace *ns,
534                        struct ldlm_handle *parent_lock, __u64 *res_id,
535                        __u32 type, struct ldlm_extent *extent, __u32 mode,
536                        int *flags, void *data, int datalen,
537                        struct ldlm_handle *lockh)
538 {
539         int rc;
540         ENTRY;
541
542         if (!class_conn2export(conn))
543                 RETURN(-EINVAL);
544
545         rc = obd_enqueue(&conn->oc_dev->obd_multi_conn[0], ns, parent_lock,
546                          res_id, type, extent, mode, flags, data, datalen,
547                          lockh);
548         RETURN(rc);
549 }
550
551 static int lov_cancel(struct lustre_handle *conn, __u32 mode,
552                       struct ldlm_handle *lockh)
553 {
554         int rc;
555         ENTRY;
556
557         if (!class_conn2export(conn))
558                 RETURN(-EINVAL);
559
560         rc = obd_cancel(&conn->oc_dev->obd_multi_conn[0], oa);
561         RETURN(rc);
562 }
563 #endif
564
565 struct obd_ops lov_obd_ops = {
566         o_setup:       lov_setup,
567         o_connect:     lov_connect,
568         o_disconnect:  lov_disconnect,
569         o_create:      lov_create,
570         o_destroy:     lov_destroy,
571 #if 0
572         o_getattr:     lov_getattr,
573         o_setattr:     lov_setattr,
574         o_open:        lov_open,
575         o_close:       lov_close,
576         o_brw:         lov_pgcache_brw,
577         o_punch:       lov_punch,
578         o_enqueue:     lov_enqueue,
579         o_cancel:      lov_cancel
580 #endif
581 };
582
583
584 #define LOV_VERSION "v0.1"
585
586 static int __init lov_init(void)
587 {
588         printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
589                ", info@clusterfs.com\n");
590         return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
591 }
592
593 static void __exit lov_exit(void)
594 {
595         class_unregister_type(OBD_LOV_DEVICENAME);
596 }
597
598 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
599 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
600 MODULE_LICENSE("GPL");
601
602 module_init(lov_init);
603 module_exit(lov_exit);