Whamcloud - gitweb
- change field names for clarity.
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lov/lov.c
5  *
6  * Copyright (C) 2002 Cluster File Systems, Inc.
7  * Author: Phil Schwan <phil@off.net>
8  *         Peter Braam <braam@clusterfs.com>
9  *
10  * This code is issued under the GNU General Public License.
11  * See the file COPYING in this distribution
12  */
13
14 #define EXPORT_SYMTAB
15 #define DEBUG_SUBSYSTEM S_LOV
16
17 #include <linux/slab.h>
18 #include <linux/module.h>
19 #include <linux/obd_support.h>
20 #include <linux/lustre_lib.h>
21 #include <linux/lustre_net.h>
22 #include <linux/lustre_idl.h>
23 #include <linux/lustre_mds.h>
24 #include <linux/obd_class.h>
25 #include <linux/obd_lov.h>
26 #include <linux/init.h>
27
28 /* obd methods */
29 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
30                        char *cluuid)
31 {
32         struct ptlrpc_request *req;
33         struct lov_obd *lov = &obd->u.lov;
34         struct client_obd *mdc = &lov->mdcobd->u.cli;
35         struct lov_desc *desc = &lov->desc;
36         struct lustre_handle mdc_conn;
37         uuid_t *uuidarray;
38         int rc, rc2;
39         int i;
40
41         MOD_INC_USE_COUNT;
42         rc = class_connect(conn, obd, cluuid);
43         if (rc) {
44                 MOD_DEC_USE_COUNT;
45                 RETURN(rc);
46         }
47
48         /* retrieve LOV metadata from MDS */
49         rc = obd_connect(&mdc_conn, lov->mdcobd, NULL);
50         if (rc) {
51                 CERROR("cannot connect to mdc: rc = %d\n", rc);
52                 GOTO(out, rc = -EINVAL);
53         }
54
55         rc = mdc_getlovinfo(obd, &mdc_conn, &req);
56         rc2 = obd_disconnect(&mdc_conn);
57         if (rc || rc2) {
58                 CERROR("cannot get lov info or disconnect %d/%d\n", rc, rc2);
59                 GOTO(out, (rc) ? rc : rc2 );
60         }
61
62         /* sanity... */
63         if (req->rq_repmsg->bufcount < 2 ||
64             req->rq_repmsg->buflens[0] < sizeof(*desc)) {
65                 CERROR("invalid descriptor returned\n");
66                 GOTO(out, rc = -EINVAL);
67         }
68
69         memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
70         lov_unpackdesc(desc);
71
72         if (req->rq_repmsg->buflens[1] < sizeof(*uuidarray)*desc->ld_tgt_count){
73                 CERROR("invalid uuid array returned\n");
74                 GOTO(out, rc = -EINVAL);
75         }
76
77         mdc->cl_max_mds_easize = sizeof(struct lov_mds_md) +
78                 desc->ld_tgt_count * sizeof(struct lov_object_id);
79
80         if (memcmp(obd->obd_uuid, desc->ld_uuid, sizeof(desc->ld_uuid))) {
81                 CERROR("lov uuid %s not on mds device (%s)\n",
82                        obd->obd_uuid, desc->ld_uuid);
83                 GOTO(out, rc = -EINVAL);
84         }
85
86         if (desc->ld_tgt_count > 1000) {
87                 CERROR("configuration error: target count > 1000 (%d)\n",
88                        desc->ld_tgt_count);
89                 GOTO(out, rc = -EINVAL);
90         }
91
92         lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
93         OBD_ALLOC(lov->tgts, lov->bufsize);
94         if (!lov->tgts) {
95                 CERROR("Out of memory\n");
96                 GOTO(out, rc = -ENOMEM);
97         }
98
99         uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
100         for (i = 0 ; i < desc->ld_tgt_count; i++)
101                 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(*uuidarray));
102
103         for (i = 0 ; i < desc->ld_tgt_count; i++) {
104                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
105                 if (!tgt) {
106                         CERROR("Target %s not attached\n", uuidarray[i]);
107                         GOTO(out_mem, rc = -EINVAL);
108                 }
109                 if (!(tgt->obd_flags & OBD_SET_UP)) {
110                         CERROR("Target %s not set up\n", uuidarray[i]);
111                         GOTO(out_mem, rc = -EINVAL);
112                 }
113                 rc = obd_connect(&lov->tgts[i].conn, tgt, NULL);
114                 if (rc) {
115                         CERROR("Target %s connect error %d\n",
116                                uuidarray[i], rc);
117                         GOTO(out_mem, rc);
118                 }
119         }
120
121  out_mem:
122         if (rc) {
123                 for (i = 0 ; i < desc->ld_tgt_count; i++) {
124                         rc2 = obd_disconnect(&lov->tgts[i].conn);
125                         if (rc2)
126                                 CERROR("BAD: Target %s disconnect error %d\n",
127                                        uuidarray[i], rc2);
128                 }
129                 OBD_FREE(lov->tgts, lov->bufsize);
130         }
131  out:
132         if (rc)
133                 class_disconnect(conn);
134         ptlrpc_free_req(req);
135         return rc;
136 }
137
138 static int lov_disconnect(struct lustre_handle *conn)
139 {
140         struct obd_device *obd = class_conn2obd(conn);
141         struct lov_obd *lov = &obd->u.lov;
142         int rc;
143         int i;
144
145         if (!lov->tgts)
146                 goto out_local;
147
148         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
149                 rc = obd_disconnect(&lov->tgts[i].conn);
150                 if (rc) {
151                         CERROR("Target %s disconnect error %d\n",
152                                lov->tgts[i].uuid, rc);
153                         RETURN(rc);
154                 }
155         }
156         OBD_FREE(lov->tgts, lov->bufsize);
157         lov->bufsize = 0;
158         lov->tgts = NULL;
159
160  out_local:
161         rc = class_disconnect(conn);
162         if (!rc)
163                 MOD_DEC_USE_COUNT;
164         return rc;
165 }
166
167 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
168 {
169         struct obd_ioctl_data* data = buf;
170         struct lov_obd *lov = &obd->u.lov;
171         int rc = 0;
172         ENTRY;
173
174         if (data->ioc_inllen1 < 1) {
175                 CERROR("osc setup requires an MDC UUID\n");
176                 RETURN(-EINVAL);
177         }
178
179         if (data->ioc_inllen1 > 37) {
180                 CERROR("mdc UUID must be less than 38 characters\n");
181                 RETURN(-EINVAL);
182         }
183
184         lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1);
185         if (!lov->mdcobd) {
186                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid,
187                        data->ioc_inlbuf1);
188                 rc = -EINVAL;
189         }
190         RETURN(rc);
191 }
192
193
194 static inline int lov_stripe_md_size(struct obd_device *obd)
195 {
196         struct lov_obd *lov = &obd->u.lov;
197         int size;
198
199         size = sizeof(struct lov_stripe_md) +
200                 lov->desc.ld_tgt_count * sizeof(struct lov_oinfo);
201         return size;
202 }
203
204 static inline int lov_mds_md_size(struct obd_device *obd)
205 {
206         struct lov_obd *lov = &obd->u.lov;
207         int size;
208
209         size = sizeof(struct lov_mds_md) +
210                 lov->desc.ld_tgt_count * sizeof(struct lov_object_id);
211         return size;
212 }
213
214 /* the LOV counts on oa->o_id to be set as the LOV object id */
215 static int lov_create(struct lustre_handle *conn, struct obdo *oa,
216                       struct lov_stripe_md **ea)
217 {
218         int rc = 0, i;
219         struct obdo tmp;
220         struct obd_export *export = class_conn2export(conn);
221         struct lov_obd *lov;
222         struct lov_stripe_md *md;
223         ENTRY;
224
225         if (!ea) {
226                 CERROR("lov_create needs EA for striping information\n");
227                 RETURN(-EINVAL);
228         }
229         if (!export)
230                 RETURN(-EINVAL);
231         lov = &export->exp_obd->u.lov;
232
233         oa->o_easize = lov_stripe_md_size(export->exp_obd);
234         if (!*ea) {
235                 OBD_ALLOC(*ea, oa->o_easize);
236                 if (! *ea)
237                         RETURN(-ENOMEM);
238         }
239
240         md = *ea;
241         md->lmd_mds_easize = lov_mds_md_size(export->exp_obd);
242         md->lmd_object_id = oa->o_id;
243         if (!md->lmd_stripe_count)
244                 md->lmd_stripe_count = lov->desc.ld_default_stripe_count;
245
246         if (!md->lmd_stripe_size)
247                 md->lmd_stripe_size = lov->desc.ld_default_stripe_size;
248
249
250
251         for (i = 0; i < md->lmd_stripe_count; i++) {
252                 struct lov_stripe_md obj_md;
253                 struct lov_stripe_md *obj_mdp = &obj_md;
254                 /* create data objects with "parent" OA */
255                 memcpy(&tmp, oa, sizeof(tmp));
256                 tmp.o_easize = sizeof(struct lov_stripe_md);
257                 rc = obd_create(&lov->tgts[i].conn, &tmp, &obj_mdp);
258                 if (rc)
259                         GOTO(out_cleanup, rc);
260                 md->lmd_oinfo[i].loi_id = tmp.o_id;
261                 md->lmd_oinfo[i].loi_size = tmp.o_size;
262         }
263
264  out_cleanup:
265         if (rc) {
266                 int i2, rc2;
267                 for (i2 = 0; i2 < i; i2++) {
268                         /* destroy already created objects here */
269                         tmp.o_id = md->lmd_oinfo[i].loi_id;
270                         rc2 = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
271                         if (rc2)
272                                 CERROR("Failed to remove object from target "
273                                        "%d\n", i2);
274                 }
275         }
276         return rc;
277 }
278
279 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
280                        struct lov_stripe_md *md)
281 {
282         int rc = 0, i;
283         struct obdo tmp;
284         struct obd_export *export = class_conn2export(conn);
285         struct lov_obd *lov;
286         ENTRY;
287
288         if (!md) {
289                 CERROR("LOV requires striping ea for destruction\n");
290                 RETURN(-EINVAL);
291         }
292
293         if (!export || !export->exp_obd)
294                 RETURN(-ENODEV);
295
296         lov = &export->exp_obd->u.lov;
297         for (i = 0; i < md->lmd_stripe_count; i++) {
298                 /* create data objects with "parent" OA */
299                 memcpy(&tmp, oa, sizeof(tmp));
300                 tmp.o_id = md->lmd_oinfo[i].loi_id;
301                 rc = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
302                 if (rc)
303                         CERROR("Error destroying object "LPD64"on %d\n",
304                                md->lmd_oinfo[i].loi_id, i);
305         }
306         RETURN(rc);
307 }
308
309 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
310                        struct lov_stripe_md *md)
311 {
312         int rc = 0, i;
313         struct obdo tmp;
314         struct obd_export *export = class_conn2export(conn);
315         struct lov_obd *lov;
316         int set = 0;
317         ENTRY;
318
319         if (!md) {
320                 CERROR("LOV requires striping ea\n");
321                 RETURN(-EINVAL);
322         }
323
324         if (!export || !export->exp_obd)
325                 RETURN(-ENODEV);
326
327         lov = &export->exp_obd->u.lov;
328         oa->o_size = 0;
329         oa->o_blocks = 0;
330         for (i = 0; i < md->lmd_stripe_count; i++) {
331                 int err;
332
333                 if (md->lmd_oinfo[i].loi_id == 0)
334                         continue;
335
336                 /* create data objects with "parent" OA */
337                 memcpy(&tmp, oa, sizeof(tmp));
338                 tmp.o_id = md->lmd_oinfo[i].loi_id;
339
340                 err = obd_getattr(&lov->tgts[i].conn, &tmp, NULL);
341                 if (err) {
342                         CERROR("Error getattr object "LPD64" on %d: err = %d\n",
343                                md->lmd_oinfo[i].loi_id, i, err);
344                         if (!rc)
345                                 rc = err;
346                         continue; /* XXX or break? */
347                 }
348                 if (!set) {
349                         obdo_cpy_md(oa, &tmp, tmp.o_valid);
350                         set = 1;
351                 } else {
352                         if (tmp.o_valid & OBD_MD_FLSIZE)
353                                 oa->o_size += tmp.o_size;
354                         if (tmp.o_valid & OBD_MD_FLBLOCKS)
355                                 oa->o_blocks += tmp.o_blocks;
356                         if (tmp.o_valid & OBD_MD_FLCTIME &&
357                             oa->o_ctime < tmp.o_ctime)
358                                 oa->o_ctime = tmp.o_ctime;
359                         if (tmp.o_valid & OBD_MD_FLMTIME &&
360                             oa->o_mtime < tmp.o_mtime)
361                                 oa->o_mtime = tmp.o_mtime;
362                 }
363         }
364         RETURN(rc);
365 }
366
367 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
368                        struct lov_stripe_md *md)
369 {
370         int rc = 0, i;
371         struct obdo tmp;
372         struct obd_export *export = class_conn2export(conn);
373         struct lov_obd *lov;
374         ENTRY;
375
376         if (!md) {
377                 CERROR("LOV requires striping ea\n");
378                 RETURN(-EINVAL);
379         }
380
381         if (!export || !export->exp_obd)
382                 RETURN(-ENODEV);
383
384         lov = &export->exp_obd->u.lov;
385         for (i = 0; i < md->lmd_stripe_count; i++) {
386                 /* create data objects with "parent" OA */
387                 memcpy(&tmp, oa, sizeof(tmp));
388                 tmp.o_id = md->lmd_oinfo[i].loi_id;
389
390                 rc = obd_setattr(&lov->tgts[i].conn, &tmp, NULL);
391                 if (rc)
392                         CERROR("Error setattr object "LPD64" on %d\n",
393                                tmp.o_id, i);
394         }
395         RETURN(rc);
396 }
397
398 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
399                     struct lov_stripe_md *md)
400 {
401         int rc = 0, rc2 = 0, i;
402         struct obdo tmp;
403         struct obd_export *export = class_conn2export(conn);
404         struct lov_obd *lov;
405         ENTRY;
406
407         if (!md) {
408                 CERROR("LOV requires striping ea for opening\n");
409                 RETURN(-EINVAL);
410         }
411
412         if (!export || !export->exp_obd)
413                 RETURN(-ENODEV);
414
415         lov = &export->exp_obd->u.lov;
416         for (i = 0; i < md->lmd_stripe_count; i++) {
417                 /* create data objects with "parent" OA */
418                 memcpy(&tmp, oa, sizeof(tmp));
419                 tmp.o_id = md->lmd_oinfo[i].loi_id;
420
421                 rc = obd_open(&lov->tgts[i].conn, &tmp, NULL);
422                 if (rc) {
423                         rc2 = rc;
424                         CERROR("Error open object "LPD64" on %d\n",
425                                md->lmd_oinfo[i].loi_id, i);
426                 }
427         }
428         RETURN(rc2);
429 }
430
431 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
432                      struct lov_stripe_md *md)
433 {
434         int rc = 0, i;
435         struct obdo tmp;
436         struct obd_export *export = class_conn2export(conn);
437         struct lov_obd *lov;
438         ENTRY;
439
440         if (!md) {
441                 CERROR("LOV requires striping ea\n");
442                 RETURN(-EINVAL);
443         }
444
445         if (!export || !export->exp_obd)
446                 RETURN(-ENODEV);
447
448         lov = &export->exp_obd->u.lov;
449         for (i = 0; i < md->lmd_stripe_count; i++) {
450                 /* create data objects with "parent" OA */
451                 memcpy(&tmp, oa, sizeof(tmp));
452                 tmp.o_id = md->lmd_oinfo[i].loi_id;
453
454                 rc = obd_close(&lov->tgts[i].conn, &tmp, NULL);
455                 if (rc)
456                         CERROR("Error close object "LPD64" on %d\n",
457                                md->lmd_oinfo[i].loi_id, i);
458         }
459         RETURN(rc);
460 }
461
462 #ifndef log2
463 #define log2(n) ffz(~(n))
464 #endif
465
466 /* compute offset in stripe i corresponding to offset "in" */
467 static __u64 lov_offset(struct lov_stripe_md *md, __u64 in, int i)
468 {
469         __u32 ssz = md->lmd_stripe_size;
470         /* full stripes across all * stripe size */
471         __u32 out = ( ((__u32)in) / (md->lmd_stripe_count * ssz)) * ssz;
472         __u32 off = (__u32)in % (md->lmd_stripe_count * ssz);
473
474         if ( in == 0xffffffffffffffff ) {
475                 return 0xffffffffffffffff;
476         }
477
478         if ( (i+1) * ssz <= off )
479                 out += (i+1) * ssz;
480         else if ( i * ssz > off )
481                 out += 0;
482         else
483                 out += (off - (i * ssz)) % ssz;
484
485         return (__u64) out;
486 }
487
488 static int lov_stripe_which(struct lov_stripe_md *md, __u64 in)
489 {
490         __u32 ssz = md->lmd_stripe_size;
491         int j;
492         j = (((__u32) in) / ssz) % md->lmd_stripe_count;
493         return j;
494 }
495
496
497 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
498  * we can send this 'punch' to just the authoritative node and the nodes
499  * that the punch will affect. */
500 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
501                      struct lov_stripe_md *md,
502                      obd_off start, obd_off end)
503 {
504         int rc = 0, i;
505         struct obdo tmp;
506         struct obd_export *export = class_conn2export(conn);
507         struct lov_obd *lov;
508         ENTRY;
509
510         if (!md) {
511                 CERROR("LOV requires striping ea for desctruction\n");
512                 RETURN(-EINVAL);
513         }
514
515         if (!export || !export->exp_obd)
516                 RETURN(-ENODEV);
517
518         lov = &export->exp_obd->u.lov;
519         for (i = 0; i < md->lmd_stripe_count; i++) {
520                 __u64 starti = lov_offset(md, start, i);
521                 __u64 endi = lov_offset(md, end, i);
522
523                 if (starti == endi)
524                         continue;
525                 /* create data objects with "parent" OA */
526                 memcpy(&tmp, oa, sizeof(tmp));
527                 tmp.o_id = md->lmd_oinfo[i].loi_id;
528
529                 rc = obd_punch(&lov->tgts[i].conn, &tmp, NULL,
530                                starti, endi);
531                 if (rc)
532                         CERROR("Error punch object "LPD64" on %d\n",
533                                md->lmd_oinfo[i].loi_id, i);
534         }
535         RETURN(rc);
536 }
537
538 static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase)
539 {
540         int ret = 0;
541         ENTRY;
542
543         if (phase == CB_PHASE_START)
544                 RETURN(0);
545
546         if (phase == CB_PHASE_FINISH) {
547                 if (err)
548                         cbd->err = err;
549                 if (atomic_dec_and_test(&cbd->refcount))
550                         ret = cbd->cb(cbd->data, cbd->err, phase);
551                 RETURN(ret);
552         }
553
554         LBUG();
555         return 0;
556 }
557
558 static inline int lov_brw(int cmd, struct lustre_handle *conn,
559                           struct lov_stripe_md *md, obd_count oa_bufs,
560                           struct brw_page *pga,
561                           brw_callback_t callback, struct io_cb_data *cbd)
562 {
563         int stripe_count = md->lmd_stripe_count;
564         struct obd_export *export = class_conn2export(conn);
565         struct lov_obd *lov;
566         struct {
567                 int bufct;
568                 int index;
569                 int subcount;
570                 struct lov_stripe_md md;
571         } *stripeinfo;
572         struct brw_page *ioarr;
573         int rc, i;
574         struct io_cb_data *our_cb;
575         ENTRY;
576
577         lov = &export->exp_obd->u.lov;
578
579         our_cb = ll_init_cb();
580         if (!our_cb)
581                 RETURN(-ENOMEM);
582
583         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
584         if (!stripeinfo)
585                 GOTO(out_cbdata, rc = -ENOMEM);
586
587         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
588         if (!ioarr)
589                 GOTO(out_sinfo, rc = -ENOMEM);
590
591         for (i = 0; i < oa_bufs; i++) {
592                 int which;
593                 which = lov_stripe_which(md, pga[i].off);
594                 stripeinfo[which].bufct++;
595         }
596
597         for (i = 0; i < stripe_count; i++) {
598                 if (i > 0)
599                         stripeinfo[i].index = stripeinfo[i - 1].index +
600                                 stripeinfo[i - 1].bufct;
601                 stripeinfo[i].md.lmd_object_id = md->lmd_oinfo[i].loi_id;
602         }
603
604         for (i = 0; i < oa_bufs; i++) {
605                 int which, shift;
606                 which = lov_stripe_which(md, pga[i].off);
607
608                 shift = stripeinfo[which].index;
609                 LASSERT(shift + stripeinfo[which].subcount < oa_bufs);
610                 ioarr[shift + stripeinfo[which].subcount] = pga[i];
611                 ioarr[shift + stripeinfo[which].subcount].off =
612                         lov_offset(md, pga[i].off, which);
613                 stripeinfo[which].subcount++;
614         }
615
616         our_cb->cb = callback;
617         our_cb->data = cbd;
618
619         /* This is the only race-free way I can think of to get the refcount
620          * correct. -phil */
621         atomic_set(&our_cb->refcount, 0);
622         for (i = 0; i < stripe_count; i++)
623                 if (stripeinfo[i].bufct)
624                         atomic_inc(&our_cb->refcount);
625
626         for (i = 0; i < stripe_count; i++) {
627                 int shift = stripeinfo[i].index;
628                 if (stripeinfo[i].bufct) {
629                         LASSERT(shift < oa_bufs);
630                         obd_brw(cmd, &lov->tgts[i].conn, &stripeinfo[i].md,
631                                 stripeinfo[i].bufct, &ioarr[shift],
632                                 lov_osc_brw_callback, our_cb);
633                 }
634         }
635
636         rc = callback(cbd, 0, CB_PHASE_START);
637
638         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
639  out_sinfo:
640         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
641  out_cbdata:
642         OBD_FREE(our_cb, sizeof(*our_cb));
643         RETURN(rc);
644 }
645
646 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *md,
647                        struct lustre_handle *parent_lock,
648                        __u32 type, void *cookie, int cookielen, __u32 mode,
649                        int *flags, void *cb, void *data, int datalen,
650                        struct lustre_handle *lockhs)
651 {
652         int rc = 0, i;
653         struct obd_export *export = class_conn2export(conn);
654         struct lov_obd *lov;
655         struct lov_stripe_md submd;
656         ENTRY;
657
658         if (!md) {
659                 CERROR("LOV requires striping ea for desctruction\n");
660                 RETURN(-EINVAL);
661         }
662
663         if (!export || !export->exp_obd)
664                 RETURN(-ENODEV);
665
666         lov = &export->exp_obd->u.lov;
667         for (i = 0; i < md->lmd_stripe_count; i++) {
668                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
669                 struct ldlm_extent sub_ext;
670
671                 sub_ext.start = lov_offset(md, extent->start, i);
672                 sub_ext.end = lov_offset(md, extent->end, i);
673                 if ( sub_ext.start == sub_ext.end )
674                         continue;
675
676                 submd.lmd_object_id = md->lmd_oinfo[i].loi_id;
677                 submd.lmd_mds_easize = sizeof(struct lov_mds_md);
678                 submd.lmd_stripe_count = 0;
679                 /* XXX submd is not fully initialized here */
680                 rc = obd_enqueue(&(lov->tgts[i].conn), &submd, parent_lock,
681                                  type, &sub_ext, sizeof(sub_ext), mode,
682                                  flags, cb, data, datalen, &(lockhs[i]));
683                 // XXX add a lock debug statement here
684                 if (rc)
685                         CERROR("Error obd_enqueue object "LPD64
686                                " subobj "LPD64"\n",
687                                md->lmd_object_id, md->lmd_oinfo[i].loi_id);
688         }
689         RETURN(rc);
690 }
691
692 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *md,
693                       __u32 mode, struct lustre_handle *lockhs)
694 {
695         int rc = 0, i;
696         struct obd_export *export = class_conn2export(conn);
697         struct lov_obd *lov;
698         ENTRY;
699
700         if (!md) {
701                 CERROR("LOV requires striping ea for lock cancellation\n");
702                 RETURN(-EINVAL);
703         }
704
705         if (!export || !export->exp_obd)
706                 RETURN(-ENODEV);
707
708         lov = &export->exp_obd->u.lov;
709         for (i = 0; i < md->lmd_stripe_count; i++) {
710                 struct lov_stripe_md submd;
711
712                 if ( lockhs[i].addr == 0 )
713                         continue;
714
715                 submd.lmd_object_id = md->lmd_oinfo[i].loi_id;
716                 submd.lmd_mds_easize = sizeof(struct lov_mds_md);
717                 submd.lmd_stripe_count = 0;
718                 rc = obd_cancel(&lov->tgts[i].conn, &submd, mode, &lockhs[i]);
719                 if (rc)
720                         CERROR("Error cancel object "LPD64" subobj "LPD64"\n",
721                                md->lmd_object_id, md->lmd_oinfo[i].loi_id);
722         }
723         RETURN(rc);
724 }
725
726 static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
727 {
728         struct obd_export *export = class_conn2export(conn);
729         struct lov_obd *lov;
730         struct obd_statfs lov_sfs;
731         int set = 0;
732         int rc = 0;
733         int i;
734         ENTRY;
735
736         if (!export || !export->exp_obd)
737                 RETURN(-ENODEV);
738
739         lov = &export->exp_obd->u.lov;
740
741         /* We only get block data from the OBD */
742         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
743                 int err;
744
745                 err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
746                 if (err) {
747                         CERROR("Error statfs OSC %s on %d: err = %d\n",
748                                lov->tgts[i].uuid, i, err);
749                         if (!rc)
750                                 rc = err;
751                         continue; /* XXX or break? - probably OK to continue */
752                 }
753                 if (!set) {
754                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
755                         set = 1;
756                 } else {
757                         osfs->os_bfree += lov_sfs.os_bfree;
758                         osfs->os_bavail += lov_sfs.os_bavail;
759                         osfs->os_blocks += lov_sfs.os_blocks;
760                         /* XXX not sure about this one - depends on policy.
761                          *   - could be minimum if we always stripe on all OBDs
762                          *     (but that would be wrong for any other policy,
763                          *     if one of the OBDs has no more objects left)
764                          *   - could be sum if we stripe whole objects
765                          *   - could be average, just to give a nice number
766                          *   - we just pick first OST and hope it is enough
767                         sfs->f_ffree += lov_sfs.f_ffree;
768                          */
769                 }
770         }
771         RETURN(rc);
772 }
773
774
775 struct obd_ops lov_obd_ops = {
776         o_setup:       lov_setup,
777         o_connect:     lov_connect,
778         o_disconnect:  lov_disconnect,
779         o_create:      lov_create,
780         o_destroy:     lov_destroy,
781         o_getattr:     lov_getattr,
782         o_setattr:     lov_setattr,
783         o_statfs:      lov_statfs,
784         o_open:        lov_open,
785         o_close:       lov_close,
786         o_brw:         lov_brw,
787         o_punch:       lov_punch,
788         o_enqueue:     lov_enqueue,
789         o_cancel:      lov_cancel
790 };
791
792
793 #define LOV_VERSION "v0.1"
794
795 static int __init lov_init(void)
796 {
797         printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
798                ", info@clusterfs.com\n");
799         return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
800 }
801
802 static void __exit lov_exit(void)
803 {
804         class_unregister_type(OBD_LOV_DEVICENAME);
805 }
806
807 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
808 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
809 MODULE_LICENSE("GPL");
810
811 module_init(lov_init);
812 module_exit(lov_exit);