Whamcloud - gitweb
Fix problem with multiple connections to the same local device.
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lov/lov.c
5  *
6  * Copyright (C) 2002 Cluster File Systems, Inc.
7  * Author: Phil Schwan <phil@off.net>
8  *
9  * This code is issued under the GNU General Public License.
10  * See the file COPYING in this distribution
11  */
12
13 #define EXPORT_SYMTAB
14 #define DEBUG_SUBSYSTEM S_LOV
15
16 #include <linux/slab.h>
17 #include <linux/module.h>
18 #include <linux/obd_support.h>
19 #include <linux/lustre_lib.h>
20 #include <linux/lustre_net.h>
21 #include <linux/lustre_idl.h>
22 #include <linux/obd_class.h>
23 #include <linux/obd_lov.h>
24
25 extern struct obd_device obd_dev[MAX_OBD_DEVICES];
26
27 /* obd methods */
28
29 static int lov_getinfo(struct obd_device *obd, struct lov_desc *desc,
30                        uuid_t **uuids, struct ptlrpc_request **request)
31 {
32         struct ptlrpc_request *req;
33         struct mds_status_req *streq;
34         struct lov_obd *lov = &obd->u.lov;
35         struct mdc_obd *mdc = &lov->mdcobd->u.mdc;
36         int rc, size[2] = {sizeof(*streq)};
37         ENTRY;
38
39 #warning error: need to set lov->mdc_connh somewhere!!!!
40         req = ptlrpc_prep_req2(mdc->mdc_client, mdc->mdc_conn, &lov->mdc_connh,
41                                MDS_LOVINFO, 1, size, NULL);
42         if (!req)
43                 GOTO(out, rc = -ENOMEM);
44
45         *request = req;
46         streq = lustre_msg_buf(req->rq_reqmsg, 0);
47         streq->flags = HTON__u32(MDS_STATUS_LOV);
48         streq->repbuf = HTON__u32(8000);
49
50         /* prepare for reply */
51         req->rq_level = LUSTRE_CONN_CON;
52         size[0] = sizeof(*desc);
53         size[1] = 8000;
54         req->rq_replen = lustre_msg_size(2, size);
55
56         rc = ptlrpc_queue_wait(req);
57         rc = ptlrpc_check_status(req, rc);
58
59         if (!rc) {
60                 memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
61                 *uuids = lustre_msg_buf(req->rq_repmsg, 1);
62                 lov_unpackdesc(desc);
63         }
64         mdc->mdc_max_mdsize = sizeof(*desc) +
65                 desc->ld_tgt_count * sizeof(uuid_t);
66
67         EXIT;
68  out:
69         return rc;
70 }
71
72 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
73 {
74         int rc;
75         int i;
76         struct ptlrpc_request *req;
77         struct lov_obd *lov = &obd->u.lov;
78         uuid_t *uuidarray; 
79
80         MOD_INC_USE_COUNT;
81         rc = class_connect(conn, obd);
82         if (rc) { 
83                 MOD_DEC_USE_COUNT;
84                 RETURN(rc); 
85         }
86         
87         rc = lov_getinfo(obd, &lov->desc, &uuidarray, &req);
88         if (rc) { 
89                 CERROR("cannot get lov info %d\n", rc);
90                 GOTO(out, rc); 
91         }
92         
93         if (lov->desc.ld_tgt_count > 1000) { 
94                 CERROR("configuration error: target count > 1000 (%d)\n",
95                        lov->desc.ld_tgt_count);
96                 GOTO(out, rc = -EINVAL); 
97         }
98         
99         if (strcmp(obd->obd_uuid, lov->desc.ld_uuid)) { 
100                 CERROR("lov uuid %s not on mds device (%s)\n", 
101                        obd->obd_uuid, lov->desc.ld_uuid);
102                 GOTO(out, rc = -EINVAL); 
103         }                
104             
105         if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[1] < 
106             sizeof(uuid_t) * lov->desc.ld_tgt_count) { 
107                 CERROR("invalid uuid array returned\n");
108                 GOTO(out, rc = -EINVAL); 
109         }
110
111         lov->bufsize = sizeof(struct lov_tgt_desc) *  lov->desc.ld_tgt_count; 
112         OBD_ALLOC(lov->tgts, lov->bufsize); 
113         if (!lov->tgts) { 
114                 CERROR("Out of memory\n"); 
115                 GOTO(out, rc = -ENOMEM); 
116         }
117
118         uuidarray = lustre_msg_buf(req->rq_repmsg, 1); 
119         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { 
120                 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(uuid_t)); 
121         }
122
123         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { 
124                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
125                 if (!tgt) { 
126                         CERROR("Target %s not configured\n", uuidarray[i]); 
127                         GOTO(out_mem, rc = -EINVAL); 
128                 }
129                 rc = obd_connect(&lov->tgts[i].conn, tgt); 
130                 if (rc) { 
131                         CERROR("Target %s connect error %d\n", 
132                                uuidarray[i], rc); 
133                         GOTO(out_mem, rc);
134                 }
135         }
136
137  out_mem:
138         if (rc) { 
139                 for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { 
140                         int rc2;
141                         rc2 = obd_disconnect(&lov->tgts[i].conn);
142                         if (rc2)
143                                 CERROR("BAD: Target %s disconnect error %d\n", 
144                                        uuidarray[i], rc2); 
145                 }
146                 OBD_FREE(lov->tgts, lov->bufsize);
147         }
148  out:
149         if (rc) { 
150                 class_disconnect(conn);
151         }
152         if (req)
153                 ptlrpc_free_req(req); 
154         return rc;
155         
156 }
157
158 static int lov_disconnect(struct lustre_handle *conn)
159 {
160         struct obd_device *obd = class_conn2obd(conn);
161         struct lov_obd *lov = &obd->u.lov;
162         int rc;
163         int i; 
164
165         if (!lov->tgts)
166                 goto out_local;
167
168         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { 
169                 rc = obd_disconnect(&lov->tgts[i].conn);
170                 if (rc) { 
171                         CERROR("Target %s disconnect error %d\n", 
172                                lov->tgts[i].uuid, rc); 
173                         RETURN(rc);
174                 }
175         }
176         OBD_FREE(lov->tgts, lov->bufsize);
177         lov->bufsize = 0;
178         lov->tgts = NULL; 
179
180  out_local:
181
182         rc = class_disconnect(conn);
183         if (!rc)
184                 MOD_DEC_USE_COUNT;
185
186         return rc;
187 }
188
189 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
190 {
191         struct obd_ioctl_data* data = buf;
192         struct lov_obd *lov = &obd->u.lov;
193         int rc = 0;
194         ENTRY;
195
196         if (data->ioc_inllen1 < 1) {
197                 CERROR("osc setup requires an MDC UUID\n");
198                 RETURN(-EINVAL);
199         }
200
201         if (data->ioc_inllen1 > 37) {
202                 CERROR("mdc UUID must be less than 38 characters\n");
203                 RETURN(-EINVAL);
204         }
205
206         /* FIXME: we should make a connection instead perhaps to avoid
207            the mdc from walking away? The fs guarantees this. */ 
208         lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1); 
209         if (!lov->mdcobd) { 
210                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid, 
211                        data->ioc_inlbuf1); 
212                 rc = -EINVAL;
213         }
214         RETURN(rc); 
215
216
217
218 static inline int lov_stripe_md_size(struct obd_device *obd)
219 {
220         struct lov_obd *lov = &obd->u.lov;
221         int size;
222
223         size = sizeof(struct lov_stripe_md) + 
224                 lov->desc.ld_tgt_count * sizeof(struct lov_object_id); 
225         return size;
226 }
227
228 static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md **ea)
229 {
230         int rc, i;
231         struct obdo tmp;
232         struct obd_export *export = class_conn2export(conn);
233         struct lov_obd *lov;
234         struct lov_stripe_md *md;
235         ENTRY;
236
237         if (!ea) { 
238                 CERROR("lov_create needs EA for striping information\n"); 
239                 RETURN(-EINVAL); 
240         }
241
242         if (!export)
243                 RETURN(-EINVAL);
244         lov = &export->export_obd->u.lov;
245
246         oa->o_easize =  lov_stripe_md_size(export->export_obd); 
247         if (! *ea) { 
248                 OBD_ALLOC(*ea, oa->o_easize); 
249                 if (! *ea) 
250                         RETURN(-ENOMEM); 
251         }
252
253         md = *ea; 
254         md->lmd_size = oa->o_easize;
255         md->lmd_object_id = oa->o_id;
256         if (!md->lmd_stripe_count) { 
257                 md->lmd_stripe_count = lov->desc.ld_default_stripecount;
258         }
259
260         for (i = 0; i < md->lmd_stripe_count; i++) {
261                 struct lov_stripe_md obj_md; 
262                 struct lov_stripe_md *obj_mdp = &obj_md; 
263                 /* create data objects with "parent" OA */ 
264                 memcpy(&tmp, oa, sizeof(tmp));
265                 tmp.o_easize = sizeof(struct lov_stripe_md);
266                 rc = obd_create(&lov->tgts[i].conn, &tmp, &obj_mdp);
267                 if (rc) 
268                         GOTO(out_cleanup, rc); 
269                 md->lmd_objects[i].l_object_id = tmp.o_id;
270         }
271
272  out_cleanup: 
273         if (rc) { 
274                 int i2, rc2;
275                 for (i2 = 0 ; i2 < i ; i2++) { 
276                         /* destroy already created objects here */ 
277                         tmp.o_id = md->lmd_objects[i].l_object_id;
278                         rc2 = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
279                         if (rc2) { 
280                                 CERROR("Failed to remove object from target %d\n", 
281                                        i2); 
282                         }
283                 }
284         }
285         return rc;
286 }
287
288 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, 
289 struct lov_stripe_md *ea)
290 {
291         int rc, i;
292         struct obdo tmp;
293         struct obd_export *export = class_conn2export(conn);
294         struct lov_obd *lov;
295         struct lov_stripe_md *md;
296         ENTRY;
297
298         if (!ea) { 
299                 CERROR("LOV requires striping ea for desctruction\n"); 
300                 RETURN(-EINVAL); 
301         }
302
303         if (!export || !export->export_obd) 
304                 RETURN(-ENODEV); 
305
306         lov = &export->export_obd->u.lov;
307         md = ea;
308
309         for (i = 0; i < md->lmd_stripe_count; i++) {
310                 /* create data objects with "parent" OA */ 
311                 memcpy(&tmp, oa, sizeof(tmp));
312                 oa->o_id = md->lmd_objects[i].l_object_id; 
313                 rc = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
314                 if (!rc) { 
315                         CERROR("Error destroying object %Ld on %d\n",
316                                oa->o_id, i); 
317                 }
318         }
319         RETURN(rc);
320 }
321
322 #if 0
323 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa)
324 {
325         int rc;
326         ENTRY;
327
328         if (!class_conn2export(conn))
329                 RETURN(-EINVAL);
330
331         rc = obd_getattr(&conn->oc_dev->obd_multi_conn[0], oa);
332         RETURN(rc);
333 }
334
335 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa)
336 {
337         int rc, retval, i;
338         ENTRY;
339
340         if (!class_conn2export(conn))
341                 RETURN(-EINVAL);
342
343         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
344                 rc = obd_setattr(&conn->oc_dev->obd_multi_conn[i], oa);
345                 if (i == 0)
346                         retval = rc;
347                 else if (retval != rc)
348                         CERROR("different results on multiple OBDs!\n");
349         }
350
351         RETURN(rc);
352 }
353
354 static int lov_open(struct lustre_handle *conn, struct obdo *oa)
355 {
356         int rc, retval, i;
357         ENTRY;
358
359         if (!class_conn2export(conn))
360                 RETURN(-EINVAL);
361
362         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
363                 rc = obd_open(&conn->oc_dev->obd_multi_conn[i], oa);
364                 if (i == 0)
365                         retval = rc;
366                 else if (retval != rc)
367                         CERROR("different results on multiple OBDs!\n");
368         }
369
370         RETURN(rc);
371 }
372
373 static int lov_close(struct lustre_handle *conn, struct obdo *oa)
374 {
375         int rc, retval, i;
376         ENTRY;
377
378         if (!class_conn2export(conn))
379                 RETURN(-EINVAL);
380
381         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
382                 rc = obd_close(&conn->oc_dev->obd_multi_conn[i], oa);
383                 if (i == 0)
384                         retval = rc;
385                 else if (retval != rc)
386                         CERROR("different results on multiple OBDs!\n");
387         }
388
389         RETURN(rc);
390 }
391
392
393
394 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
395  * we can send this 'punch' to just the authoritative node and the nodes
396  * that the punch will affect. */
397 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
398                      obd_size count, obd_off offset)
399 {
400         int rc, retval, i;
401         ENTRY;
402
403         if (!class_conn2export(conn))
404                 RETURN(-EINVAL);
405
406         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
407                 rc = obd_punch(&conn->oc_dev->obd_multi_conn[i], oa, count,
408                                offset);
409                 if (i == 0)
410                         retval = rc;
411                 else if (retval != rc)
412                         CERROR("different results on multiple OBDs!\n");
413         }
414
415         RETURN(rc);
416 }
417
418 struct lov_callback_data {
419         atomic_t count;
420         wait_queue_head waitq;
421 };
422
423 static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
424 {
425         struct lov_callback_data *cb_data = data;
426
427         if (atomic_dec_and_test(&cb_data->count))
428                 wake_up(&cb_data->waitq);
429 }
430
431 static int lov_read_check_status(struct lov_callback_data *cb_data)
432 {
433         ENTRY;
434         if (sigismember(&(current->pending.signal), SIGKILL) ||
435             sigismember(&(current->pending.signal), SIGTERM) ||
436             sigismember(&(current->pending.signal), SIGINT)) {
437                 cb_data->flags |= PTL_RPC_FL_INTR;
438                 RETURN(1);
439         }
440         if (atomic_read(&cb_data->count) == 0)
441                 RETURN(1);
442         RETURN(0);
443 }
444
445 /* buffer must lie in user memory here */
446 static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
447                    struct obdo **oa,
448                    obd_count *oa_bufs, struct page **buf,
449                    obd_size *count, obd_off *offset, obd_flag *flags,
450                    bulk_callback_t callback, void *data)
451 {
452         int rc, i, page_array_offset = 0;
453         obd_off off = offset;
454         obd_size retval = 0;
455         struct lov_callback_data *cb_data;
456         ENTRY;
457
458         if (num_oa != 1)
459                 LBUG();
460
461         if (!class_conn2export(conn))
462                 RETURN(-EINVAL);
463
464         OBD_ALLOC(cb_data, sizeof(*cb_data));
465         if (cb_data == NULL) {
466                 LBUG();
467                 RETURN(-ENOMEM);
468         }
469         INIT_WAITQUEUE_HEAD(&cb_data->waitq);
470         atomic_set(&cb_data->count, 0);
471
472         for (i = 0; i < oa_bufs[0]; i++) {
473                 struct page *current_page = buf[i];
474
475                 struct lov_md *md = (struct lov_md *)oa[i]->inline;
476                 int bufcount = oa_bufs[i];
477                 // md->lmd_stripe_count
478
479                 for (k = page_array_offset; k < bufcount + page_array_offset;
480                      k++) {
481                         
482                 }
483                 page_array_offset += bufcount;
484
485
486         while (off < offset + count) {
487                 int stripe, conn;
488                 obd_size size, tmp;
489
490                 stripe = off / conn->oc_dev->u.lov.lov_stripe_size;
491                 size = (stripe + 1) * conn->oc_dev->u.lov.lov_strip_size - off;
492                 if (size > *count)
493                         size = *count;
494
495                 conn = stripe % conn->oc_dev->obd_multi_count;
496
497                 tmp = size;
498                 atomic_inc(&cb_data->count);
499                 rc = obd_brw(cmd, &conn->oc_dev->obd_multi_conn[conn],
500                              num_oa, oa, buf,
501                               &size, off, lov_read_callback, cb_data);
502                 if (rc == 0)
503                         retval += size;
504                 else {
505                         CERROR("read(off=%Lu, count=%Lu): %d\n",
506                                (unsigned long long)off,
507                                (unsigned long long)size, rc);
508                         break;
509                 }
510
511                 buf += size;
512         }
513
514         wait_event(&cb_data->waitq, lov_read_check_status(cb_data));
515         if (cb_data->flags & PTL_RPC_FL_INTR)
516                 rc = -EINTR;
517
518         /* FIXME: The error handling here sucks */
519         *count = retval;
520         OBD_FREE(cb_data, sizeof(*cb_data));
521         RETURN(rc);
522 }
523
524 static void lov_write_finished(struct ptlrpc_bulk_desc *desc, void *data)
525 {
526         
527 }
528
529 /* buffer must lie in user memory here */
530 static int filter_write(struct lustre_handle *conn, struct obdo *oa, char *buf,
531                          obd_size *count, obd_off offset)
532 {
533         int err;
534         struct file *file;
535         unsigned long retval;
536
537         ENTRY;
538         if (!class_conn2export(conn)) {
539                 CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
540                 EXIT;
541                 return -EINVAL;
542         }
543
544         file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
545         if (!file || IS_ERR(file)) {
546                 EXIT;
547                 return -PTR_ERR(file);
548         }
549
550         /* count doubles as retval */
551         retval = file->f_op->write(file, buf, *count, (loff_t *)&offset);
552         filp_close(file, 0);
553
554         if ( retval >= 0 ) {
555                 err = 0;
556                 *count = retval;
557                 EXIT;
558         } else {
559                 err = retval;
560                 *count = 0;
561                 EXIT;
562         }
563
564         return err;
565 }
566
567 static int lov_enqueue(struct lustre_handle *conn, struct ldlm_namespace *ns,
568                        struct ldlm_handle *parent_lock, __u64 *res_id,
569                        __u32 type, struct ldlm_extent *extent, __u32 mode,
570                        int *flags, void *data, int datalen,
571                        struct ldlm_handle *lockh)
572 {
573         int rc;
574         ENTRY;
575
576         if (!class_conn2export(conn))
577                 RETURN(-EINVAL);
578
579         rc = obd_enqueue(&conn->oc_dev->obd_multi_conn[0], ns, parent_lock,
580                          res_id, type, extent, mode, flags, data, datalen,
581                          lockh);
582         RETURN(rc);
583 }
584
585 static int lov_cancel(struct lustre_handle *conn, __u32 mode,
586                       struct ldlm_handle *lockh)
587 {
588         int rc;
589         ENTRY;
590
591         if (!class_conn2export(conn))
592                 RETURN(-EINVAL);
593
594         rc = obd_cancel(&conn->oc_dev->obd_multi_conn[0], oa);
595         RETURN(rc);
596 }
597 #endif
598
599 struct obd_ops lov_obd_ops = {
600         o_setup:       lov_setup,
601         o_connect:     lov_connect,
602         o_disconnect:  lov_disconnect,
603         o_create:      lov_create,
604         o_destroy:     lov_destroy,
605 #if 0
606         o_getattr:     lov_getattr,
607         o_setattr:     lov_setattr,
608         o_open:        lov_open,
609         o_close:       lov_close,
610         o_brw:         lov_pgcache_brw,
611         o_punch:       lov_punch,
612         o_enqueue:     lov_enqueue,
613         o_cancel:      lov_cancel
614 #endif
615 };
616
617
618 #define LOV_VERSION "v0.1"
619
620 static int __init lov_init(void)
621 {
622         printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
623                ", info@clusterfs.com\n");
624         return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
625 }
626
627 static void __exit lov_exit(void)
628 {
629         class_unregister_type(OBD_LOV_DEVICENAME);
630 }
631
632 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
633 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
634 MODULE_LICENSE("GPL");
635
636 module_init(lov_init);
637 module_exit(lov_exit);