Whamcloud - gitweb
Prevent C-c and C-z from locking us up, and make most of our waits
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lov/lov.c
5  *
6  * Copyright (C) 2002 Cluster File Systems, Inc.
7  * Author: Phil Schwan <phil@off.net>
8  *
9  * This code is issued under the GNU General Public License.
10  * See the file COPYING in this distribution
11  */
12
13 #define EXPORT_SYMTAB
14 #define DEBUG_SUBSYSTEM S_LOV
15
16 #include <linux/slab.h>
17 #include <linux/module.h>
18 #include <linux/obd_support.h>
19 #include <linux/lustre_lib.h>
20 #include <linux/lustre_net.h>
21 #include <linux/lustre_idl.h>
22 #include <linux/obd_class.h>
23 #include <linux/obd_lov.h>
24
25 extern struct obd_device obd_dev[MAX_OBD_DEVICES];
26
27 /* obd methods */
28
29 static int lov_getinfo(struct obd_device *obd, 
30                        struct lov_desc *desc, 
31                        uuid_t **uuids, 
32                        struct ptlrpc_request **request)
33 {
34         struct ptlrpc_request *req;
35         struct mds_status_req *streq;
36         struct lov_obd *lov = &obd->u.lov; 
37         struct mdc_obd *mdc = &lov->mdcobd->u.mdc;
38         int rc, size[2] = {sizeof(*streq)};
39         ENTRY;
40
41         req = ptlrpc_prep_req2(mdc->mdc_client, mdc->mdc_conn, &mdc->mdc_connh,
42                                MDS_LOVINFO, 1, size, NULL);
43         if (!req)
44                 GOTO(out, rc = -ENOMEM);
45         
46         *request = req;
47         streq = lustre_msg_buf(req->rq_reqmsg, 0);
48         streq->flags = HTON__u32(MDS_STATUS_LOV);
49         streq->repbuf = HTON__u32(8000);
50         
51         /* prepare for reply */ 
52         req->rq_level = LUSTRE_CONN_CON;
53         size[0] = sizeof(*desc); 
54         size[1] = 8000; 
55         req->rq_replen = lustre_msg_size(2, size);
56         
57         rc = ptlrpc_queue_wait(req);
58         rc = ptlrpc_check_status(req, rc);
59
60         if (!rc) {
61                 memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
62                 *uuids = lustre_msg_buf(req->rq_repmsg, 1);
63                 lov_unpackdesc(desc); 
64         }
65         EXIT;
66  out:
67         return rc;
68 }
69
70 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
71 {
72         int rc;
73         int i;
74         struct ptlrpc_request *req;
75         struct lov_obd *lov = &obd->u.lov;
76         uuid_t *uuidarray; 
77
78         MOD_INC_USE_COUNT;
79         rc = class_connect(conn, obd);
80         if (rc) { 
81                 MOD_DEC_USE_COUNT;
82                 RETURN(rc); 
83         }
84         
85         rc = lov_getinfo(obd, &lov->desc, &uuidarray, &req);
86         if (rc) { 
87                 CERROR("cannot get lov info %d\n", rc);
88                 GOTO(out, rc); 
89         }
90         
91         if (lov->desc.ld_tgt_count > 1000) { 
92                 CERROR("configuration error: target count > 1000 (%d)\n",
93                        lov->desc.ld_tgt_count);
94                 GOTO(out, rc = -EINVAL); 
95         }
96         
97         if (strcmp(obd->obd_uuid, lov->desc.ld_uuid)) { 
98                 CERROR("lov uuid %s not on mds device (%s)\n", 
99                        obd->obd_uuid, lov->desc.ld_uuid);
100                 GOTO(out, rc = -EINVAL); 
101         }                
102             
103         if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[1] < 
104             sizeof(uuid_t) * lov->desc.ld_tgt_count) { 
105                 CERROR("invalid uuid array returned\n");
106                 GOTO(out, rc = -EINVAL); 
107         }
108
109         lov->bufsize = sizeof(struct lov_tgt_desc) *  lov->desc.ld_tgt_count; 
110         OBD_ALLOC(lov->tgts, lov->bufsize); 
111         if (!lov->tgts) { 
112                 CERROR("Out of memory\n"); 
113                 GOTO(out, rc = -ENOMEM); 
114         }
115
116         uuidarray = lustre_msg_buf(req->rq_repmsg, 1); 
117         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { 
118                 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(uuid_t)); 
119         }
120
121         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { 
122                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
123                 if (!tgt) { 
124                         CERROR("Target %s not configured\n", uuidarray[i]); 
125                         GOTO(out_mem, rc = -EINVAL); 
126                 }
127                 rc = obd_connect(&lov->tgts[i].conn, tgt); 
128                 if (rc) { 
129                         CERROR("Target %s connect error %d\n", 
130                                uuidarray[i], rc); 
131                         GOTO(out_mem, rc);
132                 }
133         }
134
135  out_mem:
136         if (rc) { 
137                 for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { 
138                         rc = obd_disconnect(&lov->tgts[i].conn);
139                         if (rc)
140                                 CERROR("Target %s disconnect error %d\n", 
141                                        uuidarray[i], rc); 
142                 }
143                 OBD_FREE(lov->tgts, lov->bufsize);
144         }
145  out:
146         if (rc) { 
147                 class_disconnect(conn);
148         }
149         if (req)
150                 ptlrpc_free_req(req); 
151         return rc;
152         
153 }
154
155 static int lov_disconnect(struct lustre_handle *conn)
156 {
157         struct obd_device *obd = class_conn2obd(conn);
158         struct lov_obd *lov = &obd->u.lov;
159         int rc;
160         int i; 
161
162         if (!lov->tgts)
163                 goto out_local;
164
165         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) { 
166                 rc = obd_disconnect(&lov->tgts[i].conn);
167                 if (rc)
168                         CERROR("Target %s disconnect error %d\n", 
169                                lov->tgts[i].uuid, rc); 
170         }
171         OBD_FREE(lov->tgts, lov->bufsize);
172         lov->bufsize = 0;
173         lov->tgts = NULL; 
174
175  out_local:
176
177         rc = class_disconnect(conn);
178         if (!rc)
179                 MOD_DEC_USE_COUNT;
180
181         return rc;
182 }
183
184 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
185 {
186         struct obd_ioctl_data* data = buf;
187         struct lov_obd *lov = &obd->u.lov;
188         int rc = 0;
189         ENTRY;
190
191         if (data->ioc_inllen1 < 1) {
192                 CERROR("osc setup requires an MDC UUID\n");
193                 RETURN(-EINVAL);
194         }
195
196         if (data->ioc_inllen1 > 37) {
197                 CERROR("mdc UUID must be less than 38 characters\n");
198                 RETURN(-EINVAL);
199         }
200
201         /* FIXME: we should make a connection instead perhaps to avoid
202            the mdc from walking away? The fs guarantees this. */ 
203         lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1); 
204         if (!lov->mdcobd) { 
205                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid, 
206                        data->ioc_inlbuf1); 
207                 rc = -EINVAL;
208         }
209         RETURN(rc); 
210
211
212 #if 0
213 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa)
214 {
215         int rc;
216         ENTRY;
217
218         if (!class_conn2export(conn))
219                 RETURN(-EINVAL);
220
221         rc = obd_getattr(&conn->oc_dev->obd_multi_conn[0], oa);
222         RETURN(rc);
223 }
224
225 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa)
226 {
227         int rc, retval, i;
228         ENTRY;
229
230         if (!class_conn2export(conn))
231                 RETURN(-EINVAL);
232
233         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
234                 rc = obd_setattr(&conn->oc_dev->obd_multi_conn[i], oa);
235                 if (i == 0)
236                         retval = rc;
237                 else if (retval != rc)
238                         CERROR("different results on multiple OBDs!\n");
239         }
240
241         RETURN(rc);
242 }
243
244 static int lov_open(struct lustre_handle *conn, struct obdo *oa)
245 {
246         int rc, retval, i;
247         ENTRY;
248
249         if (!class_conn2export(conn))
250                 RETURN(-EINVAL);
251
252         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
253                 rc = obd_open(&conn->oc_dev->obd_multi_conn[i], oa);
254                 if (i == 0)
255                         retval = rc;
256                 else if (retval != rc)
257                         CERROR("different results on multiple OBDs!\n");
258         }
259
260         RETURN(rc);
261 }
262
263 static int lov_close(struct lustre_handle *conn, struct obdo *oa)
264 {
265         int rc, retval, i;
266         ENTRY;
267
268         if (!class_conn2export(conn))
269                 RETURN(-EINVAL);
270
271         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
272                 rc = obd_close(&conn->oc_dev->obd_multi_conn[i], oa);
273                 if (i == 0)
274                         retval = rc;
275                 else if (retval != rc)
276                         CERROR("different results on multiple OBDs!\n");
277         }
278
279         RETURN(rc);
280 }
281
282 static int lov_create(struct lustre_handle *conn, struct obdo *oa)
283 {
284         int rc, retval, i, offset;
285         struct obdo tmp;
286         struct lov_md md;
287         ENTRY;
288
289         if (!class_conn2export(conn))
290                 RETURN(-EINVAL);
291
292         md.lmd_object_id = oa->o_id;
293         md.lmd_stripe_count = conn->oc_dev->obd_multi_count;
294
295         memset(oa->o_inline, 0, sizeof(oa->o_inline));
296         offset = sizeof(md);
297         for (i = 0; i < md.lmd_stripe_count; i++) {
298                 struct lov_object_id lov_id;
299                 rc = obd_create(&conn->oc_dev->obd_multi_conn[i], &tmp);
300                 if (i == 0) {
301                         memcpy(oa, &tmp, sizeof(tmp));
302                         retval = rc;
303                 } else if (retval != rc)
304                         CERROR("return codes didn't match (%d, %d)\n",
305                                retval, rc);
306                 lov_id = (struct lov_object_id *)(oa->o_inline + offset);
307                 lov_id->l_device_id = i;
308                 lov_id->l_object_id = tmp.o_id;
309                 offset += sizeof(*lov_id);
310         }
311         memcpy(oa->o_inline, &md, sizeof(md));
312
313         return rc;
314 }
315
316 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa)
317 {
318         int rc, retval, i, offset;
319         struct obdo tmp;
320         struct lov_md *md;
321         struct lov_object_id lov_id;
322         ENTRY;
323
324         if (!class_conn2export(conn))
325                 RETURN(-EINVAL);
326
327         md = (struct lov_md *)oa->o_inline;
328
329         memcpy(&tmp, oa, sizeof(tmp));
330
331         offset = sizeof(md);
332         for (i = 0; i < md->lmd_stripe_count; i++) {
333                 struct lov_object_id *lov_id;
334                 lov_id = (struct lov_object_id *)(oa->o_inline + offset);
335
336                 tmp.o_id = lov_id->l_object_id;
337
338                 rc = obd_destroy(&conn->oc_dev->obd_multi_conn[i], &tmp);
339                 if (i == 0)
340                         retval = rc;
341                 else if (retval != rc)
342                         CERROR("return codes didn't match (%d, %d)\n",
343                                retval, rc);
344                 offset += sizeof(*lov_id);
345         }
346
347         return rc;
348 }
349
350 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
351  * we can send this 'punch' to just the authoritative node and the nodes
352  * that the punch will affect. */
353 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
354                      obd_size count, obd_off offset)
355 {
356         int rc, retval, i;
357         ENTRY;
358
359         if (!class_conn2export(conn))
360                 RETURN(-EINVAL);
361
362         for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
363                 rc = obd_punch(&conn->oc_dev->obd_multi_conn[i], oa, count,
364                                offset);
365                 if (i == 0)
366                         retval = rc;
367                 else if (retval != rc)
368                         CERROR("different results on multiple OBDs!\n");
369         }
370
371         RETURN(rc);
372 }
373
374 struct lov_callback_data {
375         atomic_t count;
376         wait_queue_head waitq;
377 };
378
379 static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
380 {
381         struct lov_callback_data *cb_data = data;
382
383         if (atomic_dec_and_test(&cb_data->count))
384                 wake_up(&cb_data->waitq);
385 }
386
387 static int lov_read_check_status(struct lov_callback_data *cb_data)
388 {
389         ENTRY;
390         if (sigismember(&(current->pending.signal), SIGKILL) ||
391             sigismember(&(current->pending.signal), SIGTERM) ||
392             sigismember(&(current->pending.signal), SIGINT)) {
393                 cb_data->flags |= PTL_RPC_FL_INTR;
394                 RETURN(1);
395         }
396         if (atomic_read(&cb_data->count) == 0)
397                 RETURN(1);
398         RETURN(0);
399 }
400
401 /* buffer must lie in user memory here */
402 static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
403                    struct obdo **oa,
404                    obd_count *oa_bufs, struct page **buf,
405                    obd_size *count, obd_off *offset, obd_flag *flags,
406                    bulk_callback_t callback, void *data)
407 {
408         int rc, i, page_array_offset = 0;
409         obd_off off = offset;
410         obd_size retval = 0;
411         struct lov_callback_data *cb_data;
412         ENTRY;
413
414         if (num_oa != 1)
415                 LBUG();
416
417         if (!class_conn2export(conn))
418                 RETURN(-EINVAL);
419
420         OBD_ALLOC(cb_data, sizeof(*cb_data));
421         if (cb_data == NULL) {
422                 LBUG();
423                 RETURN(-ENOMEM);
424         }
425         INIT_WAITQUEUE_HEAD(&cb_data->waitq);
426         atomic_set(&cb_data->count, 0);
427
428         for (i = 0; i < oa_bufs[0]; i++) {
429                 struct page *current_page = buf[i];
430
431                 struct lov_md *md = (struct lov_md *)oa[i]->inline;
432                 int bufcount = oa_bufs[i];
433                 // md->lmd_stripe_count
434
435                 for (k = page_array_offset; k < bufcount + page_array_offset;
436                      k++) {
437                         
438                 }
439                 page_array_offset += bufcount;
440
441
442         while (off < offset + count) {
443                 int stripe, conn;
444                 obd_size size, tmp;
445
446                 stripe = off / conn->oc_dev->u.lov.lov_stripe_size;
447                 size = (stripe + 1) * conn->oc_dev->u.lov.lov_strip_size - off;
448                 if (size > *count)
449                         size = *count;
450
451                 conn = stripe % conn->oc_dev->obd_multi_count;
452
453                 tmp = size;
454                 atomic_inc(&cb_data->count);
455                 rc = obd_brw(cmd, &conn->oc_dev->obd_multi_conn[conn],
456                              num_oa, oa, buf,
457                               &size, off, lov_read_callback, cb_data);
458                 if (rc == 0)
459                         retval += size;
460                 else {
461                         CERROR("read(off=%Lu, count=%Lu): %d\n",
462                                (unsigned long long)off,
463                                (unsigned long long)size, rc);
464                         break;
465                 }
466
467                 buf += size;
468         }
469
470         wait_event(&cb_data->waitq, lov_read_check_status(cb_data));
471         if (cb_data->flags & PTL_RPC_FL_INTR)
472                 rc = -EINTR;
473
474         /* FIXME: The error handling here sucks */
475         *count = retval;
476         OBD_FREE(cb_data, sizeof(*cb_data));
477         RETURN(rc);
478 }
479
480 static void lov_write_finished(struct ptlrpc_bulk_desc *desc, void *data)
481 {
482         
483 }
484
485 /* buffer must lie in user memory here */
486 static int filter_write(struct lustre_handle *conn, struct obdo *oa, char *buf,
487                          obd_size *count, obd_off offset)
488 {
489         int err;
490         struct file *file;
491         unsigned long retval;
492
493         ENTRY;
494         if (!class_conn2export(conn)) {
495                 CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
496                 EXIT;
497                 return -EINVAL;
498         }
499
500         file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
501         if (!file || IS_ERR(file)) {
502                 EXIT;
503                 return -PTR_ERR(file);
504         }
505
506         /* count doubles as retval */
507         retval = file->f_op->write(file, buf, *count, (loff_t *)&offset);
508         filp_close(file, 0);
509
510         if ( retval >= 0 ) {
511                 err = 0;
512                 *count = retval;
513                 EXIT;
514         } else {
515                 err = retval;
516                 *count = 0;
517                 EXIT;
518         }
519
520         return err;
521 }
522
523 static int lov_enqueue(struct lustre_handle *conn, struct ldlm_namespace *ns,
524                        struct ldlm_handle *parent_lock, __u64 *res_id,
525                        __u32 type, struct ldlm_extent *extent, __u32 mode,
526                        int *flags, void *data, int datalen,
527                        struct ldlm_handle *lockh)
528 {
529         int rc;
530         ENTRY;
531
532         if (!class_conn2export(conn))
533                 RETURN(-EINVAL);
534
535         rc = obd_enqueue(&conn->oc_dev->obd_multi_conn[0], ns, parent_lock,
536                          res_id, type, extent, mode, flags, data, datalen,
537                          lockh);
538         RETURN(rc);
539 }
540
541 static int lov_cancel(struct lustre_handle *conn, __u32 mode,
542                       struct ldlm_handle *lockh)
543 {
544         int rc;
545         ENTRY;
546
547         if (!class_conn2export(conn))
548                 RETURN(-EINVAL);
549
550         rc = obd_cancel(&conn->oc_dev->obd_multi_conn[0], oa);
551         RETURN(rc);
552 }
553 #endif
554
555 struct obd_ops lov_obd_ops = {
556         o_setup:       lov_setup,
557         o_connect:     lov_connect,
558         o_disconnect:  lov_disconnect,
559 #if 0
560         o_create:      lov_create,
561         o_getattr:     lov_getattr,
562         o_setattr:     lov_setattr,
563         o_open:        lov_open,
564         o_close:       lov_close,
565         o_destroy:     lov_destroy,
566         o_brw:         lov_pgcache_brw,
567         o_punch:       lov_punch,
568         o_enqueue:     lov_enqueue,
569         o_cancel:      lov_cancel
570 #endif
571 };
572
573
574 #define LOV_VERSION "v0.1"
575
576 static int __init lov_init(void)
577 {
578         printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
579                ", info@clusterfs.com\n");
580         return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
581 }
582
583 static void __exit lov_exit(void)
584 {
585         class_unregister_type(OBD_LOV_DEVICENAME);
586 }
587
588 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
589 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
590 MODULE_LICENSE("GPL");
591
592 module_init(lov_init);
593 module_exit(lov_exit);