Whamcloud - gitweb
bba10b8128f351c9cdb004d7f97c0e0f1147899e
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lov/lov.c
5  *
6  * Copyright (C) 2002 Cluster File Systems, Inc.
7  * Author: Phil Schwan <phil@off.net>
8  *         Peter Braam <braam@clusterfs.com>
9  *
10  * This code is issued under the GNU General Public License.
11  * See the file COPYING in this distribution
12  */
13
14 #define EXPORT_SYMTAB
15 #define DEBUG_SUBSYSTEM S_LOV
16
17 #include <linux/slab.h>
18 #include <linux/module.h>
19 #include <linux/obd_support.h>
20 #include <linux/lustre_lib.h>
21 #include <linux/lustre_net.h>
22 #include <linux/lustre_idl.h>
23 #include <linux/lustre_mds.h>
24 #include <linux/obd_class.h>
25 #include <linux/obd_lov.h>
26 #include <linux/init.h>
27
28 extern struct obd_device obd_dev[MAX_OBD_DEVICES];
29
30 /* obd methods */
31 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
32                        char *cluuid)
33 {
34         struct ptlrpc_request *req;
35         struct lov_obd *lov = &obd->u.lov;
36         struct client_obd *mdc = &lov->mdcobd->u.cli;
37         struct lov_desc *desc = &lov->desc;
38         struct lustre_handle mdc_conn;
39         uuid_t *uuidarray;
40         int rc, rc2;
41         int i;
42
43         MOD_INC_USE_COUNT;
44         rc = class_connect(conn, obd, cluuid);
45         if (rc) {
46                 MOD_DEC_USE_COUNT;
47                 RETURN(rc);
48         }
49
50         /* retrieve LOV metadata from MDS */
51         rc = obd_connect(&mdc_conn, lov->mdcobd, NULL);
52         if (rc) {
53                 CERROR("cannot connect to mdc: rc = %d\n", rc);
54                 GOTO(out, rc = -EINVAL);
55         }
56
57         rc = mdc_getlovinfo(obd, &mdc_conn, &req);
58         rc2 = obd_disconnect(&mdc_conn);
59         if (rc || rc2) {
60                 CERROR("cannot get lov info or disconnect %d/%d\n", rc, rc2);
61                 GOTO(out, (rc) ? rc : rc2 );
62         }
63
64         /* sanity... */
65         if (req->rq_repmsg->bufcount < 2 ||
66             req->rq_repmsg->buflens[0] < sizeof(*desc)) {
67                 CERROR("invalid descriptor returned\n");
68                 GOTO(out, rc = -EINVAL);
69         }
70
71         memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
72         lov_unpackdesc(desc);
73
74         if (req->rq_repmsg->buflens[1] < sizeof(*uuidarray)*desc->ld_tgt_count){
75                 CERROR("invalid uuid array returned\n");
76                 GOTO(out, rc = -EINVAL);
77         }
78
79         mdc->cl_max_mdsize = sizeof(struct lov_mds_md) +
80                 desc->ld_tgt_count * sizeof(struct lov_object_id);
81
82         if (memcmp(obd->obd_uuid, desc->ld_uuid, sizeof(desc->ld_uuid))) {
83                 CERROR("lov uuid %s not on mds device (%s)\n",
84                        obd->obd_uuid, desc->ld_uuid);
85                 GOTO(out, rc = -EINVAL);
86         }
87
88         if (desc->ld_tgt_count > 1000) {
89                 CERROR("configuration error: target count > 1000 (%d)\n",
90                        desc->ld_tgt_count);
91                 GOTO(out, rc = -EINVAL);
92         }
93
94         lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
95         OBD_ALLOC(lov->tgts, lov->bufsize);
96         if (!lov->tgts) {
97                 CERROR("Out of memory\n");
98                 GOTO(out, rc = -ENOMEM);
99         }
100
101         uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
102         for (i = 0 ; i < desc->ld_tgt_count; i++)
103                 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(*uuidarray));
104
105         for (i = 0 ; i < desc->ld_tgt_count; i++) {
106                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
107                 if (!tgt) {
108                         CERROR("Target %s not attached\n", uuidarray[i]);
109                         GOTO(out_mem, rc = -EINVAL);
110                 }
111                 if (!(tgt->obd_flags & OBD_SET_UP)) {
112                         CERROR("Target %s not set up\n", uuidarray[i]);
113                         GOTO(out_mem, rc = -EINVAL);
114                 }
115                 rc = obd_connect(&lov->tgts[i].conn, tgt, NULL);
116                 if (rc) {
117                         CERROR("Target %s connect error %d\n",
118                                uuidarray[i], rc);
119                         GOTO(out_mem, rc);
120                 }
121         }
122
123  out_mem:
124         if (rc) {
125                 for (i = 0 ; i < desc->ld_tgt_count; i++) {
126                         rc2 = obd_disconnect(&lov->tgts[i].conn);
127                         if (rc2)
128                                 CERROR("BAD: Target %s disconnect error %d\n",
129                                        uuidarray[i], rc2);
130                 }
131                 OBD_FREE(lov->tgts, lov->bufsize);
132         }
133  out:
134         if (rc)
135                 class_disconnect(conn);
136         ptlrpc_free_req(req);
137         return rc;
138 }
139
140 static int lov_disconnect(struct lustre_handle *conn)
141 {
142         struct obd_device *obd = class_conn2obd(conn);
143         struct lov_obd *lov = &obd->u.lov;
144         int rc;
145         int i;
146
147         if (!lov->tgts)
148                 goto out_local;
149
150         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
151                 rc = obd_disconnect(&lov->tgts[i].conn);
152                 if (rc) {
153                         CERROR("Target %s disconnect error %d\n",
154                                lov->tgts[i].uuid, rc);
155                         RETURN(rc);
156                 }
157         }
158         OBD_FREE(lov->tgts, lov->bufsize);
159         lov->bufsize = 0;
160         lov->tgts = NULL;
161
162  out_local:
163         rc = class_disconnect(conn);
164         if (!rc)
165                 MOD_DEC_USE_COUNT;
166         return rc;
167 }
168
169 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
170 {
171         struct obd_ioctl_data* data = buf;
172         struct lov_obd *lov = &obd->u.lov;
173         int rc = 0;
174         ENTRY;
175
176         if (data->ioc_inllen1 < 1) {
177                 CERROR("osc setup requires an MDC UUID\n");
178                 RETURN(-EINVAL);
179         }
180
181         if (data->ioc_inllen1 > 37) {
182                 CERROR("mdc UUID must be less than 38 characters\n");
183                 RETURN(-EINVAL);
184         }
185
186         lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1);
187         if (!lov->mdcobd) {
188                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid,
189                        data->ioc_inlbuf1);
190                 rc = -EINVAL;
191         }
192         RETURN(rc);
193 }
194
195
196 static inline int lov_stripe_md_size(struct obd_device *obd)
197 {
198         struct lov_obd *lov = &obd->u.lov;
199         int size;
200
201         size = sizeof(struct lov_stripe_md) +
202                 lov->desc.ld_tgt_count * sizeof(struct lov_oinfo);
203         return size;
204 }
205
206 static inline int lov_mds_md_size(struct obd_device *obd)
207 {
208         struct lov_obd *lov = &obd->u.lov;
209         int size;
210
211         size = sizeof(struct lov_mds_md) +
212                 lov->desc.ld_tgt_count * sizeof(struct lov_object_id);
213         return size;
214 }
215
216 /* the LOV counts on oa->o_id to be set as the LOV object id */
217 static int lov_create(struct lustre_handle *conn, struct obdo *oa,
218                       struct lov_stripe_md **ea)
219 {
220         int rc = 0, i;
221         struct obdo tmp;
222         struct obd_export *export = class_conn2export(conn);
223         struct lov_obd *lov;
224         struct lov_stripe_md *md;
225         ENTRY;
226
227         if (!ea) {
228                 CERROR("lov_create needs EA for striping information\n");
229                 RETURN(-EINVAL);
230         }
231         if (!export)
232                 RETURN(-EINVAL);
233         lov = &export->exp_obd->u.lov;
234
235         oa->o_easize = lov_stripe_md_size(export->exp_obd);
236         if (!*ea) {
237                 OBD_ALLOC(*ea, oa->o_easize);
238                 if (! *ea)
239                         RETURN(-ENOMEM);
240         }
241
242         md = *ea;
243         md->lmd_easize = lov_mds_md_size(export->exp_obd);
244         md->lmd_object_id = oa->o_id;
245         if (!md->lmd_stripe_count)
246                 md->lmd_stripe_count = lov->desc.ld_default_stripe_count;
247
248         if (!md->lmd_stripe_size)
249                 md->lmd_stripe_size = lov->desc.ld_default_stripe_size;
250
251
252
253         for (i = 0; i < md->lmd_stripe_count; i++) {
254                 struct lov_stripe_md obj_md;
255                 struct lov_stripe_md *obj_mdp = &obj_md;
256                 /* create data objects with "parent" OA */
257                 memcpy(&tmp, oa, sizeof(tmp));
258                 tmp.o_easize = sizeof(struct lov_stripe_md);
259                 rc = obd_create(&lov->tgts[i].conn, &tmp, &obj_mdp);
260                 if (rc)
261                         GOTO(out_cleanup, rc);
262                 md->lmd_oinfo[i].loi_id = tmp.o_id;
263                 md->lmd_oinfo[i].loi_size = tmp.o_size;
264         }
265
266  out_cleanup:
267         if (rc) {
268                 int i2, rc2;
269                 for (i2 = 0; i2 < i; i2++) {
270                         /* destroy already created objects here */
271                         tmp.o_id = md->lmd_oinfo[i].loi_id;
272                         rc2 = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
273                         if (rc2)
274                                 CERROR("Failed to remove object from target "
275                                        "%d\n", i2);
276                 }
277         }
278         return rc;
279 }
280
281 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
282                        struct lov_stripe_md *md)
283 {
284         int rc = 0, i;
285         struct obdo tmp;
286         struct obd_export *export = class_conn2export(conn);
287         struct lov_obd *lov;
288         ENTRY;
289
290         if (!md) {
291                 CERROR("LOV requires striping ea for destruction\n");
292                 RETURN(-EINVAL);
293         }
294
295         if (!export || !export->exp_obd)
296                 RETURN(-ENODEV);
297
298         lov = &export->exp_obd->u.lov;
299         for (i = 0; i < md->lmd_stripe_count; i++) {
300                 /* create data objects with "parent" OA */
301                 memcpy(&tmp, oa, sizeof(tmp));
302                 tmp.o_id = md->lmd_oinfo[i].loi_id;
303                 rc = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
304                 if (rc)
305                         CERROR("Error destroying object %Ld on %d\n",
306                                md->lmd_oinfo[i].loi_id, i);
307         }
308         RETURN(rc);
309 }
310
311 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
312                        struct lov_stripe_md *md)
313 {
314         int rc = 0, i;
315         struct obdo tmp;
316         struct obd_export *export = class_conn2export(conn);
317         struct lov_obd *lov;
318         int set = 0;
319         ENTRY;
320
321         if (!md) {
322                 CERROR("LOV requires striping ea\n");
323                 RETURN(-EINVAL);
324         }
325
326         if (!export || !export->exp_obd)
327                 RETURN(-ENODEV);
328
329         lov = &export->exp_obd->u.lov;
330         oa->o_size = 0;
331         oa->o_blocks = 0;
332         for (i = 0; i < md->lmd_stripe_count; i++) {
333                 int err;
334
335                 if (md->lmd_oinfo[i].loi_id == 0)
336                         continue;
337
338                 /* create data objects with "parent" OA */
339                 memcpy(&tmp, oa, sizeof(tmp));
340                 tmp.o_id = md->lmd_oinfo[i].loi_id;
341
342                 err = obd_getattr(&lov->tgts[i].conn, &tmp, NULL);
343                 if (err) {
344                         CERROR("Error getattr object %Ld on %d: err = %d\n",
345                                md->lmd_oinfo[i].loi_id, i, err);
346                         if (!rc)
347                                 rc = err;
348                         continue; /* XXX or break? */
349                 }
350                 if (!set) {
351                         obdo_cpy_md(oa, &tmp, tmp.o_valid);
352                         set = 1;
353                 } else {
354                         if (tmp.o_valid & OBD_MD_FLSIZE)
355                                 oa->o_size += tmp.o_size;
356                         if (tmp.o_valid & OBD_MD_FLBLOCKS)
357                                 oa->o_blocks += tmp.o_blocks;
358                         if (tmp.o_valid & OBD_MD_FLCTIME &&
359                             oa->o_ctime < tmp.o_ctime)
360                                 oa->o_ctime = tmp.o_ctime;
361                         if (tmp.o_valid & OBD_MD_FLMTIME &&
362                             oa->o_mtime < tmp.o_mtime)
363                                 oa->o_mtime = tmp.o_mtime;
364                 }
365         }
366         RETURN(rc);
367 }
368
369 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
370                        struct lov_stripe_md *md)
371 {
372         int rc = 0, i;
373         struct obdo tmp;
374         struct obd_export *export = class_conn2export(conn);
375         struct lov_obd *lov;
376         ENTRY;
377
378         if (!md) {
379                 CERROR("LOV requires striping ea\n");
380                 RETURN(-EINVAL);
381         }
382
383         if (!export || !export->exp_obd)
384                 RETURN(-ENODEV);
385
386         lov = &export->exp_obd->u.lov;
387         for (i = 0; i < md->lmd_stripe_count; i++) {
388                 /* create data objects with "parent" OA */
389                 memcpy(&tmp, oa, sizeof(tmp));
390                 tmp.o_id = md->lmd_oinfo[i].loi_id;
391
392                 rc = obd_setattr(&lov->tgts[i].conn, &tmp, NULL);
393                 if (rc)
394                         CERROR("Error setattr object %Ld on %d\n",
395                                tmp.o_id, i);
396         }
397         RETURN(rc);
398 }
399
400 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
401                     struct lov_stripe_md *md)
402 {
403         int rc = 0, rc2 = 0, i;
404         struct obdo tmp;
405         struct obd_export *export = class_conn2export(conn);
406         struct lov_obd *lov;
407         ENTRY;
408
409         if (!md) {
410                 CERROR("LOV requires striping ea for opening\n");
411                 RETURN(-EINVAL);
412         }
413
414         if (!export || !export->exp_obd)
415                 RETURN(-ENODEV);
416
417         lov = &export->exp_obd->u.lov;
418         for (i = 0; i < md->lmd_stripe_count; i++) {
419                 /* create data objects with "parent" OA */
420                 memcpy(&tmp, oa, sizeof(tmp));
421                 tmp.o_id = md->lmd_oinfo[i].loi_id;
422
423                 rc = obd_open(&lov->tgts[i].conn, &tmp, NULL);
424                 if (rc) {
425                         rc2 = rc;
426                         CERROR("Error open object %Ld on %d\n",
427                                md->lmd_oinfo[i].loi_id, i);
428                 }
429         }
430         RETURN(rc2);
431 }
432
433 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
434                      struct lov_stripe_md *md)
435 {
436         int rc = 0, i;
437         struct obdo tmp;
438         struct obd_export *export = class_conn2export(conn);
439         struct lov_obd *lov;
440         ENTRY;
441
442         if (!md) {
443                 CERROR("LOV requires striping ea\n");
444                 RETURN(-EINVAL);
445         }
446
447         if (!export || !export->exp_obd)
448                 RETURN(-ENODEV);
449
450         lov = &export->exp_obd->u.lov;
451         for (i = 0; i < md->lmd_stripe_count; i++) {
452                 /* create data objects with "parent" OA */
453                 memcpy(&tmp, oa, sizeof(tmp));
454                 tmp.o_id = md->lmd_oinfo[i].loi_id;
455
456                 rc = obd_close(&lov->tgts[i].conn, &tmp, NULL);
457                 if (rc)
458                         CERROR("Error close object %Ld on %d\n",
459                                md->lmd_oinfo[i].loi_id, i);
460         }
461         RETURN(rc);
462 }
463
464 #ifndef log2
465 #define log2(n) ffz(~(n))
466 #endif
467
468 /* compute offset in stripe i corresponding to offset "in" */
469 __u64 lov_offset(struct lov_stripe_md *md, __u64 in, int i)
470 {
471         __u32 ssz = md->lmd_stripe_size;
472         /* full stripes across all * stripe size */
473         __u32 out = ( ((__u32)in) / (md->lmd_stripe_count * ssz)) * ssz;
474         __u32 off = (__u32)in % (md->lmd_stripe_count * ssz);
475
476         if ( in == 0xffffffffffffffff ) {
477                 return 0xffffffffffffffff;
478         }
479
480         if ( (i+1) * ssz <= off )
481                 out += (i+1) * ssz;
482         else if ( i * ssz > off )
483                 out += 0;
484         else
485                 out += (off - (i * ssz)) % ssz;
486
487         return (__u64) out;
488 }
489
490 /* compute offset in stripe i corresponding to offset "in" */
491 __u64 lov_stripe(struct lov_stripe_md *md, __u64 in, int *j)
492 {
493         __u32 ssz = md->lmd_stripe_size;
494         __u32 off, out;
495         /* full stripes across all * stripe size */
496         *j = (((__u32) in)/ssz) % md->lmd_stripe_count;
497         off =  (__u32)in % (md->lmd_stripe_count * ssz);
498         out = ( ((__u32)in) / (md->lmd_stripe_count * ssz)) * ssz +
499                 (off - ((*j) * ssz)) % ssz;;
500
501         return (__u64) out;
502 }
503
504 int lov_stripe_which(struct lov_stripe_md *md, __u64 in)
505 {
506         __u32 ssz = md->lmd_stripe_size;
507         int j;
508         j = (((__u32) in) / ssz) % md->lmd_stripe_count;
509         return j;
510 }
511
512
513 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
514  * we can send this 'punch' to just the authoritative node and the nodes
515  * that the punch will affect. */
516 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
517                      struct lov_stripe_md *md,
518                      obd_off start, obd_off end)
519 {
520         int rc = 0, i;
521         struct obdo tmp;
522         struct obd_export *export = class_conn2export(conn);
523         struct lov_obd *lov;
524         ENTRY;
525
526         if (!md) {
527                 CERROR("LOV requires striping ea for desctruction\n");
528                 RETURN(-EINVAL);
529         }
530
531         if (!export || !export->exp_obd)
532                 RETURN(-ENODEV);
533
534         lov = &export->exp_obd->u.lov;
535         for (i = 0; i < md->lmd_stripe_count; i++) {
536                 __u64 starti = lov_offset(md, start, i);
537                 __u64 endi = lov_offset(md, end, i);
538
539                 if (starti == endi)
540                         continue;
541                 /* create data objects with "parent" OA */
542                 memcpy(&tmp, oa, sizeof(tmp));
543                 tmp.o_id = md->lmd_oinfo[i].loi_id;
544
545                 rc = obd_punch(&lov->tgts[i].conn, &tmp, NULL,
546                                starti, endi);
547                 if (rc)
548                         CERROR("Error punch object %Ld on %d\n",
549                                md->lmd_oinfo[i].loi_id, i);
550         }
551         RETURN(rc);
552 }
553
554 static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase)
555 {
556         int ret = 0;
557         ENTRY;
558
559         if (phase == CB_PHASE_START)
560                 RETURN(0);
561
562         if (phase == CB_PHASE_FINISH) {
563                 if (err)
564                         cbd->err = err;
565                 if (atomic_dec_and_test(&cbd->refcount))
566                         ret = cbd->cb(cbd->data, cbd->err, phase);
567                 RETURN(ret);
568         }
569
570         LBUG();
571         return 0;
572 }
573
574 static inline int lov_brw(int cmd, struct lustre_handle *conn,
575                           struct lov_stripe_md *md,
576                           obd_count oa_bufs,
577                           struct brw_page *pga,
578                           brw_callback_t callback, struct io_cb_data *cbd)
579 {
580         int stripe_count = md->lmd_stripe_count;
581         struct obd_export *export = class_conn2export(conn);
582         struct lov_obd *lov;
583         struct {
584                 int bufct;
585                 int index;
586                 int subcount;
587                 struct lov_stripe_md md;
588         } *stripeinfo;
589         struct brw_page *ioarr;
590         int rc, i;
591         struct io_cb_data *our_cb;
592         ENTRY;
593
594         lov = &export->exp_obd->u.lov;
595
596         our_cb = ll_init_cb();
597         if (!our_cb)
598                 RETURN(-ENOMEM);
599
600         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
601         if (!stripeinfo)
602                 GOTO(out_cbdata, rc = -ENOMEM);
603
604         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
605         if (!ioarr)
606                 GOTO(out_sinfo, rc = -ENOMEM);
607
608         for (i = 0; i < oa_bufs; i++) {
609                 int which;
610                 which = lov_stripe_which(md, pga[i].off);
611                 stripeinfo[which].bufct++;
612         }
613
614         for (i = 0; i < stripe_count; i++) {
615                 if (i > 0)
616                         stripeinfo[i].index = stripeinfo[i - 1].index +
617                                 stripeinfo[i - 1].bufct;
618                 stripeinfo[i].md.lmd_object_id = md->lmd_oinfo[i].loi_id;
619         }
620
621         for (i = 0; i < oa_bufs; i++) {
622                 int which, shift;
623                 which = lov_stripe_which(md, pga[i].off);
624
625                 shift = stripeinfo[which].index;
626                 LASSERT(shift + stripeinfo[which].subcount < oa_bufs);
627                 ioarr[shift + stripeinfo[which].subcount] = pga[i];
628                 ioarr[shift + stripeinfo[which].subcount].off =
629                         lov_offset(md, pga[i].off, which);
630                 stripeinfo[which].subcount++;
631         }
632
633         our_cb->cb = callback;
634         our_cb->data = cbd;
635
636         /* This is the only race-free way I can think of to get the refcount
637          * correct. -phil */
638         atomic_set(&our_cb->refcount, 0);
639         for (i = 0; i < stripe_count; i++)
640                 if (stripeinfo[i].bufct)
641                         atomic_inc(&our_cb->refcount);
642
643         for (i = 0; i < stripe_count; i++) {
644                 int shift = stripeinfo[i].index;
645                 if (stripeinfo[i].bufct) {
646                         LASSERT(shift < oa_bufs);
647                         obd_brw(cmd, &lov->tgts[i].conn, &stripeinfo[i].md,
648                                 stripeinfo[i].bufct, &ioarr[shift],
649                                 lov_osc_brw_callback, our_cb);
650                 }
651         }
652
653         rc = callback(cbd, 0, CB_PHASE_START);
654
655         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
656  out_sinfo:
657         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
658  out_cbdata:
659         OBD_FREE(our_cb, sizeof(*our_cb));
660         RETURN(rc);
661 }
662
663 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *md,
664                        struct lustre_handle *parent_lock,
665                        __u32 type, void *cookie, int cookielen, __u32 mode,
666                        int *flags, void *cb, void *data, int datalen,
667                        struct lustre_handle *lockhs)
668 {
669         int rc = 0, i;
670         struct obd_export *export = class_conn2export(conn);
671         struct lov_obd *lov;
672         struct lov_stripe_md submd;
673         ENTRY;
674
675         if (!md) {
676                 CERROR("LOV requires striping ea for desctruction\n");
677                 RETURN(-EINVAL);
678         }
679
680         if (!export || !export->exp_obd)
681                 RETURN(-ENODEV);
682
683         lov = &export->exp_obd->u.lov;
684         for (i = 0; i < md->lmd_stripe_count; i++) {
685                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
686                 struct ldlm_extent sub_ext;
687
688                 sub_ext.start = lov_offset(md, extent->start, i);
689                 sub_ext.end = lov_offset(md, extent->end, i);
690                 if ( sub_ext.start == sub_ext.end )
691                         continue;
692
693                 submd.lmd_object_id = md->lmd_oinfo[i].loi_id;
694                 submd.lmd_easize = sizeof(struct lov_mds_md);
695                 submd.lmd_stripe_count = md->lmd_stripe_count;
696                 /* XXX submd is not fully initialized here */
697                 rc = obd_enqueue(&(lov->tgts[i].conn), &submd, parent_lock,
698                                  type, &sub_ext, sizeof(sub_ext), mode,
699                                  flags, cb, data, datalen, &(lockhs[i]));
700                 // XXX add a lock debug statement here
701                 if (rc)
702                         CERROR("Error obd_enqueue object %Ld subobj %Ld\n",
703                                md->lmd_object_id, md->lmd_oinfo[i].loi_id);
704         }
705         RETURN(rc);
706 }
707
708 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *md,
709                       __u32 mode, struct lustre_handle *lockhs)
710 {
711         int rc = 0, i;
712         struct obd_export *export = class_conn2export(conn);
713         struct lov_obd *lov;
714         ENTRY;
715
716         if (!md) {
717                 CERROR("LOV requires striping ea for lock cancellation\n");
718                 RETURN(-EINVAL);
719         }
720
721         if (!export || !export->exp_obd)
722                 RETURN(-ENODEV);
723
724         lov = &export->exp_obd->u.lov;
725         for (i = 0; i < md->lmd_stripe_count; i++) {
726                 struct lov_stripe_md submd;
727
728                 if ( lockhs[i].addr == 0 )
729                         continue;
730
731                 submd.lmd_object_id = md->lmd_oinfo[i].loi_id;
732                 submd.lmd_easize = sizeof(struct lov_mds_md);
733                 rc = obd_cancel(&lov->tgts[i].conn, &submd, mode, &lockhs[i]);
734                 if (rc)
735                         CERROR("Error cancel object %Ld subobj %Ld\n",
736                                md->lmd_object_id, md->lmd_oinfo[i].loi_id);
737         }
738         RETURN(rc);
739 }
740
741 static int lov_statfs(struct lustre_handle *conn, struct statfs *sfs)
742 {
743         struct obd_export *export = class_conn2export(conn);
744         struct lov_obd *lov;
745         struct statfs lov_sfs;
746         int set = 0;
747         int rc = 0;
748         int i;
749         ENTRY;
750
751         if (!export || !export->exp_obd)
752                 RETURN(-ENODEV);
753
754         lov = &export->exp_obd->u.lov;
755
756         /* We only get block data from the OBD */
757         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
758                 int err;
759
760                 err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
761                 if (err) {
762                         CERROR("Error statfs OSC %s on %d: err = %d\n",
763                                lov->tgts[i].uuid, i, err);
764                         if (!rc)
765                                 rc = err;
766                         continue; /* XXX or break? - probably OK to continue */
767                 }
768                 if (!set) {
769                         memcpy(sfs, &lov_sfs, sizeof(lov_sfs));
770                         set = 1;
771                 } else {
772                         sfs->f_bfree += lov_sfs.f_bfree;
773                         sfs->f_bavail += lov_sfs.f_bavail;
774                         sfs->f_blocks += lov_sfs.f_blocks;
775                         /* XXX not sure about this one - depends on policy.
776                          *   - could be minimum if we always stripe on all OBDs
777                          *     (but that would be wrong for any other policy,
778                          *     if one of the OBDs has no more objects left)
779                          *   - could be sum if we stripe whole objects
780                          *   - could be average, just to give a nice number
781                          *   - we just pick first OST and hope it is enough
782                         sfs->f_ffree += lov_sfs.f_ffree;
783                          */
784                 }
785         }
786         RETURN(rc);
787 }
788
789
790 struct obd_ops lov_obd_ops = {
791         o_setup:       lov_setup,
792         o_connect:     lov_connect,
793         o_disconnect:  lov_disconnect,
794         o_create:      lov_create,
795         o_destroy:     lov_destroy,
796         o_getattr:     lov_getattr,
797         o_setattr:     lov_setattr,
798         o_statfs:      lov_statfs,
799         o_open:        lov_open,
800         o_close:       lov_close,
801         o_brw:         lov_brw,
802         o_punch:       lov_punch,
803         o_enqueue:     lov_enqueue,
804         o_cancel:      lov_cancel
805 };
806
807
808 #define LOV_VERSION "v0.1"
809
810 static int __init lov_init(void)
811 {
812         printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
813                ", info@clusterfs.com\n");
814         return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
815 }
816
817 static void __exit lov_exit(void)
818 {
819         class_unregister_type(OBD_LOV_DEVICENAME);
820 }
821
822 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
823 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
824 MODULE_LICENSE("GPL");
825
826 module_init(lov_init);
827 module_exit(lov_exit);