Whamcloud - gitweb
Poisoning of all cookies at free time (to work around slab cache bug, and
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lov/lov.c
5  *
6  * Copyright (C) 2002 Cluster File Systems, Inc.
7  * Author: Phil Schwan <phil@off.net>
8  *         Peter Braam <braam@clusterfs.com>
9  *
10  * This code is issued under the GNU General Public License.
11  * See the file COPYING in this distribution
12  */
13
14 #define EXPORT_SYMTAB
15 #define DEBUG_SUBSYSTEM S_LOV
16
17 #include <linux/slab.h>
18 #include <linux/module.h>
19 #include <linux/obd_support.h>
20 #include <linux/lustre_lib.h>
21 #include <linux/lustre_net.h>
22 #include <linux/lustre_idl.h>
23 #include <linux/lustre_mds.h>
24 #include <linux/obd_class.h>
25 #include <linux/obd_lov.h>
26 #include <linux/init.h>
27 #include <asm/div64.h>
28
29 /* obd methods */
30 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
31                        obd_uuid_t cluuid)
32 {
33         struct ptlrpc_request *req = NULL;
34         struct lov_obd *lov = &obd->u.lov;
35         struct client_obd *mdc = &lov->mdcobd->u.cli;
36         struct lov_desc *desc = &lov->desc;
37         struct lustre_handle mdc_conn;
38         obd_uuid_t *uuidarray;
39         int rc, rc2;
40         int i;
41
42         MOD_INC_USE_COUNT;
43         rc = class_connect(conn, obd, cluuid);
44         if (rc) {
45                 MOD_DEC_USE_COUNT;
46                 RETURN(rc);
47         }
48
49         /* retrieve LOV metadata from MDS */
50         rc = obd_connect(&mdc_conn, lov->mdcobd, NULL);
51         if (rc) {
52                 CERROR("cannot connect to mdc: rc = %d\n", rc);
53                 GOTO(out_conn, rc);
54         }
55
56         rc = mdc_getlovinfo(obd, &mdc_conn, &req);
57         rc2 = obd_disconnect(&mdc_conn);
58         if (rc) {
59                 CERROR("cannot get lov info %d\n", rc);
60                 GOTO(out_conn, rc);
61         }
62
63         if (rc2) {
64                 CERROR("error disconnecting from MDS %d\n", rc2);
65                 GOTO(out_conn, rc = rc2);
66         }
67
68         /* sanity... */
69         if (req->rq_repmsg->bufcount < 2 ||
70             req->rq_repmsg->buflens[0] < sizeof(*desc)) {
71                 CERROR("LOV desc: invalid descriptor returned\n");
72                 GOTO(out_conn, rc = -EINVAL);
73         }
74
75         memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
76         lov_unpackdesc(desc);
77
78         if (req->rq_repmsg->buflens[1] < sizeof(*uuidarray)*desc->ld_tgt_count){
79                 CERROR("LOV desc: invalid uuid array returned\n");
80                 GOTO(out_conn, rc = -EINVAL);
81         }
82
83         mdc->cl_max_mds_easize = lov_mds_md_size(desc->ld_tgt_count);
84         mdc->cl_max_ost_easize = lov_stripe_md_size(desc->ld_tgt_count);
85
86         if (memcmp(obd->obd_uuid, desc->ld_uuid, sizeof(desc->ld_uuid))) {
87                 CERROR("LOV desc: uuid %s not on mds device (%s)\n",
88                        obd->obd_uuid, desc->ld_uuid);
89                 GOTO(out_conn, rc = -EINVAL);
90         }
91
92         if (desc->ld_tgt_count > 1000) {
93                 CERROR("LOV desc: target count > 1000 (%d)\n",
94                        desc->ld_tgt_count);
95                 GOTO(out_conn, rc = -EINVAL);
96         }
97
98         if (desc->ld_default_stripe_count == 0)
99                 desc->ld_default_stripe_count = desc->ld_tgt_count;
100
101         /* Because of 64-bit divide/mod operations only work with a 32-bit
102          * divisor in a 32-bit kernel, we cannot support a stripe width
103          * of 4GB or larger.
104          */
105         if (desc->ld_default_stripe_size * desc->ld_tgt_count > ~0UL) {
106                 CERROR("LOV desc: stripe width > %lu on 32-bit system\n",
107                        ~0UL);
108                 GOTO(out_conn, rc = -EINVAL);
109         }
110
111         lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
112         OBD_ALLOC(lov->tgts, lov->bufsize);
113         if (!lov->tgts) {
114                 CERROR("Out of memory\n");
115                 GOTO(out_conn, rc = -ENOMEM);
116         }
117
118         uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
119         for (i = 0 ; i < desc->ld_tgt_count; i++)
120                 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(*uuidarray));
121
122         for (i = 0 ; i < desc->ld_tgt_count; i++) {
123                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
124                 if (!tgt) {
125                         CERROR("Target %s not attached\n", uuidarray[i]);
126                         GOTO(out_disc, rc = -EINVAL);
127                 }
128                 if (!(tgt->obd_flags & OBD_SET_UP)) {
129                         CERROR("Target %s not set up\n", uuidarray[i]);
130                         GOTO(out_disc, rc = -EINVAL);
131                 }
132                 rc = obd_connect(&lov->tgts[i].conn, tgt, NULL);
133                 if (rc) {
134                         CERROR("Target %s connect error %d\n",
135                                uuidarray[i], rc);
136                         GOTO(out_disc, rc);
137                 }
138         }
139
140  out:
141         ptlrpc_free_req(req);
142         return rc;
143
144  out_disc:
145         while (i-- > 0) {
146                 rc2 = obd_disconnect(&lov->tgts[i].conn);
147                 if (rc2)
148                         CERROR("LOV Target %s disconnect error: rc = %d\n",
149                                 uuidarray[i], rc2);
150         }
151         OBD_FREE(lov->tgts, lov->bufsize);
152  out_conn:
153         class_disconnect(conn);
154         goto out;
155 }
156
157 static int lov_disconnect(struct lustre_handle *conn)
158 {
159         struct obd_device *obd = class_conn2obd(conn);
160         struct lov_obd *lov = &obd->u.lov;
161         int rc;
162         int i;
163
164         if (!lov->tgts)
165                 goto out_local;
166
167         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
168                 rc = obd_disconnect(&lov->tgts[i].conn);
169                 if (rc) {
170                         CERROR("Target %s disconnect error %d\n",
171                                lov->tgts[i].uuid, rc);
172                         RETURN(rc);
173                 }
174         }
175         OBD_FREE(lov->tgts, lov->bufsize);
176         lov->bufsize = 0;
177         lov->tgts = NULL;
178
179  out_local:
180         rc = class_disconnect(conn);
181         if (!rc)
182                 MOD_DEC_USE_COUNT;
183         return rc;
184 }
185
186 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
187 {
188         struct obd_ioctl_data* data = buf;
189         struct lov_obd *lov = &obd->u.lov;
190         int rc = 0;
191         ENTRY;
192
193         if (data->ioc_inllen1 < 1) {
194                 CERROR("osc setup requires an MDC UUID\n");
195                 RETURN(-EINVAL);
196         }
197
198         if (data->ioc_inllen1 > 37) {
199                 CERROR("mdc UUID must be 36 characters or less\n");
200                 RETURN(-EINVAL);
201         }
202
203         lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1);
204         if (!lov->mdcobd) {
205                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid,
206                        data->ioc_inlbuf1);
207                 rc = -EINVAL;
208         }
209         RETURN(rc);
210 }
211
212
213 /* the LOV expects oa->o_id to be set to the LOV object id */
214 static int lov_create(struct lustre_handle *conn, struct obdo *oa,
215                       struct lov_stripe_md **ea)
216 {
217         struct obd_export *export = class_conn2export(conn);
218         struct lov_obd *lov;
219         struct lov_stripe_md *lsm;
220         struct lov_oinfo *loi;
221         int sub_offset, stripe_offset;
222         int ost_count;
223         int rc = 0, i;
224         ENTRY;
225
226         if (!ea) {
227                 CERROR("lov_create needs ea\n");
228                 RETURN(-EINVAL);
229         }
230
231         if (!export)
232                 RETURN(-EINVAL);
233
234         lov = &export->exp_obd->u.lov;
235         ost_count = lov->desc.ld_tgt_count;
236         oa->o_easize = lov_stripe_md_size(ost_count);
237         if (!*ea) {
238                 OBD_ALLOC(*ea, oa->o_easize);
239                 if (!*ea)
240                         RETURN(-ENOMEM);
241         }
242
243         lsm = *ea;
244         LASSERT(oa->o_valid & OBD_MD_FLID);
245         lsm->lsm_magic = LOV_MAGIC;
246         lsm->lsm_mds_easize = lov_mds_md_size(ost_count);
247         lsm->lsm_object_id = oa->o_id;
248         if (!lsm->lsm_stripe_count)
249                 lsm->lsm_stripe_count = lov->desc.ld_default_stripe_count;
250
251         if (!lsm->lsm_stripe_size)
252                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
253
254         lsm->lsm_ost_count = ost_count;
255         stripe_offset = (((int)lsm->lsm_object_id * lsm->lsm_stripe_count) %
256                          ost_count);
257         sub_offset = ((int)lsm->lsm_object_id*lsm->lsm_stripe_count/ost_count)%
258                         lsm->lsm_stripe_count;
259         lsm->lsm_stripe_offset = stripe_offset + sub_offset;
260
261         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
262                lsm->lsm_stripe_count,lsm->lsm_object_id,lsm->lsm_stripe_offset);
263
264         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
265                 struct lov_stripe_md obj_md;
266                 struct lov_stripe_md *obj_mdp = &obj_md;
267                 struct obdo tmp;
268                 int ost_idx = (((sub_offset + i) % lsm->lsm_stripe_count) +
269                                stripe_offset) % ost_count;
270
271                 /* create data objects with "parent" OA */
272                 memcpy(&tmp, oa, sizeof(tmp));
273                 tmp.o_easize = sizeof(struct lov_stripe_md);
274                 rc = obd_create(&lov->tgts[ost_idx].conn, &tmp, &obj_mdp);
275                 if (rc) {
276                         CERROR("error creating objid "LPX64" sub-object on "
277                                "OST idx %d: rc = %d\n", oa->o_id, ost_idx, rc);
278                         GOTO(out_cleanup, rc);
279                 }
280                 loi->loi_id = tmp.o_id;
281                 loi->loi_size = tmp.o_size;
282                 loi->loi_ost_idx = ost_idx;
283                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
284                        lsm->lsm_object_id, loi->loi_id, ost_idx);
285         }
286
287  out_cleanup:
288         if (rc) {
289                 while (i-- > 0) {
290                         struct obdo tmp;
291                         int err;
292
293                         --loi;
294                         /* destroy already created objects here */
295                         memcpy(&tmp, oa, sizeof(tmp));
296                         tmp.o_id = loi->loi_id;
297                         err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn,
298                                           &tmp, NULL);
299                         if (err)
300                                 CERROR("Failed to remove objid "LPX64" subobj "
301                                        LPX64" on OST idx %d: rc = %d\n",
302                                        oa->o_id, loi->loi_id, loi->loi_ost_idx,
303                                        err);
304                 }
305         }
306         return rc;
307 }
308
309 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
310                        struct lov_stripe_md *lsm)
311 {
312         struct obdo tmp;
313         struct obd_export *export = class_conn2export(conn);
314         struct lov_obd *lov;
315         struct lov_oinfo *loi;
316         int rc = 0, i;
317         ENTRY;
318
319         if (!lsm) {
320                 CERROR("LOV requires striping ea for destruction\n");
321                 RETURN(-EINVAL);
322         }
323
324         if (lsm->lsm_magic != LOV_MAGIC) {
325                 CERROR("LOV striping magic bad %#lx != %#lx\n",
326                        lsm->lsm_magic, LOV_MAGIC);
327                 RETURN(-EINVAL);
328         }
329
330         if (!export || !export->exp_obd)
331                 RETURN(-ENODEV);
332
333         lov = &export->exp_obd->u.lov;
334         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
335                 /* create data objects with "parent" OA */
336                 memcpy(&tmp, oa, sizeof(tmp));
337                 tmp.o_id = loi->loi_id;
338                 rc = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
339                 if (rc)
340                         CERROR("Error destroying objid "LPX64" subobj "LPX64
341                                " on OST idx %d\n: rc = %d",
342                                oa->o_id, loi->loi_id, loi->loi_ost_idx, rc);
343         }
344         RETURN(rc);
345 }
346
347 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
348                        struct lov_stripe_md *lsm)
349 {
350         struct obdo tmp;
351         struct obd_export *export = class_conn2export(conn);
352         struct lov_obd *lov;
353         struct lov_oinfo *loi;
354         int rc = 0, i;
355         int set = 0;
356         ENTRY;
357
358         if (!lsm) {
359                 CERROR("LOV requires striping ea\n");
360                 RETURN(-EINVAL);
361         }
362
363         if (lsm->lsm_magic != LOV_MAGIC) {
364                 CERROR("LOV striping magic bad %#lx != %#lx\n",
365                        lsm->lsm_magic, LOV_MAGIC);
366                 RETURN(-EINVAL);
367         }
368
369         if (!export || !export->exp_obd)
370                 RETURN(-ENODEV);
371
372         lov = &export->exp_obd->u.lov;
373         oa->o_size = 0;
374         oa->o_blocks = 0;
375         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
376                 int err;
377
378                 if (loi->loi_id == 0)
379                         continue;
380
381                 /* create data objects with "parent" OA */
382                 memcpy(&tmp, oa, sizeof(tmp));
383                 tmp.o_id = loi->loi_id;
384
385                 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
386                 if (err) {
387                         CERROR("Error getattr objid "LPX64" subobj "LPX64
388                                " on OST idx %d: rc = %d\n",
389                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
390                         if (!rc)
391                                 rc = err;
392                         continue; /* XXX or break? */
393                 }
394                 if (!set) {
395                         obdo_cpy_md(oa, &tmp, tmp.o_valid);
396                         set = 1;
397                 } else {
398 #warning FIXME: the size needs to be fixed for sparse files
399                         if (tmp.o_valid & OBD_MD_FLSIZE)
400                                 oa->o_size += tmp.o_size;
401                         if (tmp.o_valid & OBD_MD_FLBLOCKS)
402                                 oa->o_blocks += tmp.o_blocks;
403                         if (tmp.o_valid & OBD_MD_FLCTIME &&
404                             oa->o_ctime < tmp.o_ctime)
405                                 oa->o_ctime = tmp.o_ctime;
406                         if (tmp.o_valid & OBD_MD_FLMTIME &&
407                             oa->o_mtime < tmp.o_mtime)
408                                 oa->o_mtime = tmp.o_mtime;
409                 }
410         }
411         RETURN(rc);
412 }
413
414 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
415                        struct lov_stripe_md *lsm)
416 {
417         int rc = 0, i;
418         struct obdo tmp;
419         struct obd_export *export = class_conn2export(conn);
420         struct lov_obd *lov;
421         struct lov_oinfo *loi;
422         ENTRY;
423
424         if (!lsm) {
425                 CERROR("LOV requires striping ea\n");
426                 RETURN(-EINVAL);
427         }
428
429         if (lsm->lsm_magic != LOV_MAGIC) {
430                 CERROR("LOV striping magic bad %#lx != %#lx\n",
431                        lsm->lsm_magic, LOV_MAGIC);
432                 RETURN(-EINVAL);
433         }
434
435         if (!export || !export->exp_obd)
436                 RETURN(-ENODEV);
437
438         if (oa->o_valid & OBD_MD_FLSIZE)
439                 CERROR("setting size on an LOV object is totally broken\n");
440
441         lov = &export->exp_obd->u.lov;
442         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
443                 int err;
444
445                 /* create data objects with "parent" OA */
446                 memcpy(&tmp, oa, sizeof(tmp));
447                 tmp.o_id = loi->loi_id;
448
449                 err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
450                 if (err) {
451                         CERROR("Error setattr objid "LPX64" subobj "LPX64
452                                " on OST idx %d: rc = %d\n",
453                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
454                         if (!rc)
455                                 rc = err;
456                 }
457         }
458         RETURN(rc);
459 }
460
461 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
462                     struct lov_stripe_md *lsm)
463 {
464         struct obdo tmp;
465         struct obd_export *export = class_conn2export(conn);
466         struct lov_obd *lov;
467         struct lov_oinfo *loi;
468         int rc = 0, i;
469         ENTRY;
470
471         if (!lsm) {
472                 CERROR("LOV requires striping ea for opening\n");
473                 RETURN(-EINVAL);
474         }
475
476         if (lsm->lsm_magic != LOV_MAGIC) {
477                 CERROR("LOV striping magic bad %#lx != %#lx\n",
478                        lsm->lsm_magic, LOV_MAGIC);
479                 RETURN(-EINVAL);
480         }
481
482         if (!export || !export->exp_obd)
483                 RETURN(-ENODEV);
484
485         lov = &export->exp_obd->u.lov;
486         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
487                 int err;
488
489                 /* create data objects with "parent" OA */
490                 memcpy(&tmp, oa, sizeof(tmp));
491                 tmp.o_id = loi->loi_id;
492
493                 err = obd_open(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
494                 if (err) {
495                         CERROR("Error open objid "LPX64" subobj "LPX64
496                                " on OST idx %d: rc = %d\n",
497                                oa->o_id, lsm->lsm_oinfo[i].loi_id,
498                                loi->loi_ost_idx, rc);
499                         if (!rc)
500                                 rc = err;
501                 }
502         }
503         /* FIXME: returning an error, but having opened some objects is a bad
504          *        idea, since they will likely never be closed.  We either
505          *        need to not return an error if _some_ objects could be
506          *        opened, and leave it to read/write to return -EIO (with
507          *        hopefully partial error status) or close all opened objects
508          *        and return an error.
509          */
510         RETURN(rc);
511 }
512
513 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
514                      struct lov_stripe_md *lsm)
515 {
516         struct obdo tmp;
517         struct obd_export *export = class_conn2export(conn);
518         struct lov_obd *lov;
519         struct lov_oinfo *loi;
520         int rc = 0, i;
521         ENTRY;
522
523         if (!lsm) {
524                 CERROR("LOV requires striping ea\n");
525                 RETURN(-EINVAL);
526         }
527
528         if (lsm->lsm_magic != LOV_MAGIC) {
529                 CERROR("LOV striping magic bad %#lx != %#lx\n",
530                        lsm->lsm_magic, LOV_MAGIC);
531                 RETURN(-EINVAL);
532         }
533
534         if (!export || !export->exp_obd)
535                 RETURN(-ENODEV);
536
537         lov = &export->exp_obd->u.lov;
538         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
539                 int err;
540
541                 /* create data objects with "parent" OA */
542                 memcpy(&tmp, oa, sizeof(tmp));
543                 tmp.o_id = loi->loi_id;
544
545                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
546                 if (err) {
547                         CERROR("Error close objid "LPX64" subobj "LPX64
548                                " on OST idx %d: rc = %d\n",
549                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
550                         if (!rc)
551                                 rc = err;
552                 }
553         }
554         RETURN(rc);
555 }
556
557 #ifndef log2
558 #define log2(n) ffz(~(n))
559 #endif
560
561 #warning FIXME: merge these two functions now that they are nearly the same
562
563 /* compute ost offset in stripe "stripeno" corresponding to offset "lov_off" */
564 static __u64 lov_offset(struct lov_stripe_md *lsm, __u64 lov_off, int stripeno)
565 {
566         unsigned long ssize  = lsm->lsm_stripe_size;
567         unsigned long swidth = ssize * lsm->lsm_stripe_count;
568         unsigned long stripe_off;
569
570         if (lov_off == OBD_OBJECT_EOF)
571                 return OBD_OBJECT_EOF;
572
573         /* do_div(a, b) returns a % b, and a = a / b */
574         stripe_off = do_div(lov_off, swidth);
575
576         if (stripe_off < stripeno * ssize)
577                 stripe_off = 0;
578         else
579                 stripe_off -= stripeno * ssize;
580
581         return lov_off * ssize + stripe_off;
582 }
583
584 /* compute which stripe offset "lov_off" will be written into */
585 static int lov_stripe_which(struct lov_stripe_md *lsm, __u64 lov_off)
586 {
587         unsigned long ssize  = lsm->lsm_stripe_size;
588         unsigned long swidth = ssize * lsm->lsm_stripe_count;
589         unsigned long stripe_off;
590
591         stripe_off = do_div(lov_off, swidth);
592
593         return stripe_off / ssize;
594 }
595
596
597 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
598  * we can send this 'punch' to just the authoritative node and the nodes
599  * that the punch will affect. */
600 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
601                      struct lov_stripe_md *lsm,
602                      obd_off start, obd_off end)
603 {
604         struct obdo tmp;
605         struct obd_export *export = class_conn2export(conn);
606         struct lov_obd *lov;
607         struct lov_oinfo *loi;
608         int rc = 0, i;
609         ENTRY;
610
611         if (!lsm) {
612                 CERROR("LOV requires striping ea\n");
613                 RETURN(-EINVAL);
614         }
615
616         if (lsm->lsm_magic != LOV_MAGIC) {
617                 CERROR("LOV striping magic bad %#lx != %#lx\n",
618                        lsm->lsm_magic, LOV_MAGIC);
619                 RETURN(-EINVAL);
620         }
621
622         if (!export || !export->exp_obd)
623                 RETURN(-ENODEV);
624
625         lov = &export->exp_obd->u.lov;
626         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
627                 __u64 starti = lov_offset(lsm, start, i);
628                 __u64 endi = lov_offset(lsm, end, i);
629                 int err;
630
631                 if (starti == endi)
632                         continue;
633                 /* create data objects with "parent" OA */
634                 memcpy(&tmp, oa, sizeof(tmp));
635                 tmp.o_id = loi->loi_id;
636
637                 err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL,
638                                starti, endi);
639                 if (err) {
640                         CERROR("Error punch objid "LPX64" subobj "LPX64
641                                " on OST idx %d: rc = %d\n",
642                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
643                         if (!rc)
644                                 rc = err;
645                 }
646         }
647         RETURN(rc);
648 }
649
650 static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase)
651 {
652         int ret = 0;
653         ENTRY;
654
655         if (phase == CB_PHASE_START)
656                 RETURN(0);
657
658         if (phase == CB_PHASE_FINISH) {
659                 if (err)
660                         cbd->err = err;
661                 if (atomic_dec_and_test(&cbd->refcount))
662                         ret = cbd->cb(cbd->data, cbd->err, phase);
663                 RETURN(ret);
664         }
665
666         LBUG();
667         return 0;
668 }
669
670 static inline int lov_brw(int cmd, struct lustre_handle *conn,
671                           struct lov_stripe_md *lsm, obd_count oa_bufs,
672                           struct brw_page *pga,
673                           brw_callback_t callback, struct io_cb_data *cbd)
674 {
675         int stripe_count = lsm->lsm_stripe_count;
676         struct obd_export *export = class_conn2export(conn);
677         struct lov_obd *lov;
678         struct {
679                 int bufct;
680                 int index;
681                 int subcount;
682                 struct lov_stripe_md lsm;
683                 int ost_idx;
684         } *stripeinfo, *si, *si_last;
685         struct brw_page *ioarr;
686         int rc, i;
687         struct io_cb_data *our_cb;
688         struct lov_oinfo *loi;
689         int *where;
690         ENTRY;
691
692         if (!lsm) {
693                 CERROR("LOV requires striping ea\n");
694                 RETURN(-EINVAL);
695         }
696
697         if (lsm->lsm_magic != LOV_MAGIC) {
698                 CERROR("LOV striping magic bad %#lx != %#lx\n",
699                        lsm->lsm_magic, LOV_MAGIC);
700                 RETURN(-EINVAL);
701         }
702
703         lov = &export->exp_obd->u.lov;
704
705         our_cb = ll_init_cb();
706         if (!our_cb)
707                 RETURN(-ENOMEM);
708
709         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
710         if (!stripeinfo)
711                 GOTO(out_cbdata, rc = -ENOMEM);
712
713         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
714         if (!where)
715                 GOTO(out_sinfo, rc = -ENOMEM);
716
717         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
718         if (!ioarr)
719                 GOTO(out_where, rc = -ENOMEM);
720
721         /* This is the only race-free way I can think of to get the refcount
722          * correct. -phil */
723         atomic_set(&our_cb->refcount, 0);
724         our_cb->cb = callback;
725         our_cb->data = cbd;
726
727         for (i = 0; i < oa_bufs; i++) {
728                 where[i] = lov_stripe_which(lsm, pga[i].off);
729                 if (stripeinfo[where[i]].bufct++ == 0)
730                         atomic_inc(&our_cb->refcount);
731         }
732
733         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
734              i < stripe_count; i++, loi++, si_last = si, si++) {
735                 if (i > 0)
736                         si->index = si_last->index + si_last->bufct;
737                 si->lsm.lsm_object_id = loi->loi_id;
738                 si->ost_idx = loi->loi_ost_idx;
739         }
740
741         for (i = 0; i < oa_bufs; i++) {
742                 int which = where[i];
743                 int shift;
744
745                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
746                 LASSERT(shift < oa_bufs);
747                 ioarr[shift] = pga[i];
748                 ioarr[shift].off = lov_offset(lsm, pga[i].off, which);
749                 stripeinfo[which].subcount++;
750         }
751
752         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
753                 int shift = si->index;
754
755                 if (si->bufct) {
756                         LASSERT(shift < oa_bufs);
757                         /* XXX handle error returns here */
758                         obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
759                                 &si->lsm, si->bufct, &ioarr[shift],
760                                 lov_osc_brw_callback, our_cb);
761                 }
762         }
763
764         rc = callback(cbd, 0, CB_PHASE_START);
765
766         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
767  out_where:
768         OBD_FREE(where, sizeof(*where) * oa_bufs);
769  out_sinfo:
770         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
771  out_cbdata:
772         OBD_FREE(our_cb, sizeof(*our_cb));
773         RETURN(rc);
774 }
775
776 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
777                        struct lustre_handle *parent_lock,
778                        __u32 type, void *cookie, int cookielen, __u32 mode,
779                        int *flags, void *cb, void *data, int datalen,
780                        struct lustre_handle *lockhs)
781 {
782         struct obd_export *export = class_conn2export(conn);
783         struct lov_obd *lov;
784         struct lov_oinfo *loi;
785         int rc = 0, i;
786         ENTRY;
787
788         if (!lsm) {
789                 CERROR("LOV requires striping ea\n");
790                 RETURN(-EINVAL);
791         }
792
793         if (lsm->lsm_magic != LOV_MAGIC) {
794                 CERROR("LOV striping magic bad %#lx != %#lx\n",
795                        lsm->lsm_magic, LOV_MAGIC);
796                 RETURN(-EINVAL);
797         }
798
799         if (!export || !export->exp_obd)
800                 RETURN(-ENODEV);
801
802         lov = &export->exp_obd->u.lov;
803         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
804                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
805                 struct ldlm_extent sub_ext;
806                 struct lov_stripe_md submd;
807
808                 sub_ext.start = lov_offset(lsm, extent->start, i);
809                 sub_ext.end = lov_offset(lsm, extent->end, i);
810                 if (sub_ext.start == sub_ext.end)
811                         continue;
812
813                 submd.lsm_object_id = loi->loi_id;
814                 /* XXX submd lsm_mds_easize should be that from the subobj,
815                  *     and the subobj should get it opaquely from the LOV.
816                  */
817                 submd.lsm_mds_easize = lov_mds_md_size(lsm->lsm_ost_count);
818                 submd.lsm_stripe_count = 0;
819                 /* XXX submd is not fully initialized here */
820                 rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
821                                  parent_lock, type, &sub_ext, sizeof(sub_ext),
822                                  mode, flags, cb, data, datalen, &(lockhs[i]));
823                 // XXX add a lock debug statement here
824                 if (rc)
825                         CERROR("Error enqueue objid "LPX64" subobj "LPX64
826                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
827                                loi->loi_id, loi->loi_ost_idx, rc);
828         }
829         RETURN(rc);
830 }
831
832 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
833                       __u32 mode, struct lustre_handle *lockhs)
834 {
835         struct obd_export *export = class_conn2export(conn);
836         struct lov_obd *lov;
837         struct lov_oinfo *loi;
838         int rc = 0, i;
839         ENTRY;
840
841         if (!lsm) {
842                 CERROR("LOV requires striping ea\n");
843                 RETURN(-EINVAL);
844         }
845
846         if (lsm->lsm_magic != LOV_MAGIC) {
847                 CERROR("LOV striping magic bad %#lx != %#lx\n",
848                        lsm->lsm_magic, LOV_MAGIC);
849                 RETURN(-EINVAL);
850         }
851
852         if (!export || !export->exp_obd)
853                 RETURN(-ENODEV);
854
855         lov = &export->exp_obd->u.lov;
856         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
857                 struct lov_stripe_md submd;
858
859                 if (lockhs[i].addr == 0)
860                         continue;
861
862                 submd.lsm_object_id = loi->loi_id;
863                 submd.lsm_mds_easize = lov_mds_md_size(lsm->lsm_ost_count);
864                 submd.lsm_stripe_count = 0;
865                 rc = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
866                                 mode, &lockhs[i]);
867                 if (rc)
868                         CERROR("Error cancel objid "LPX64" subobj "LPX64
869                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
870                                loi->loi_id, loi->loi_ost_idx, rc);
871         }
872         RETURN(rc);
873 }
874
875 static int lov_cancel_unused(struct lustre_handle *conn,
876                              struct lov_stripe_md *lsm, int local_only)
877 {
878         struct obd_export *export = class_conn2export(conn);
879         struct lov_obd *lov;
880         struct lov_oinfo *loi;
881         int rc = 0, i;
882         ENTRY;
883
884         if (!lsm) {
885                 CERROR("LOV requires striping ea for lock cancellation\n");
886                 RETURN(-EINVAL);
887         }
888
889         if (!export || !export->exp_obd)
890                 RETURN(-ENODEV);
891
892         lov = &export->exp_obd->u.lov;
893         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
894                 struct lov_stripe_md submd;
895
896                 submd.lsm_object_id = loi->loi_id;
897                 submd.lsm_mds_easize = lov_mds_md_size(lsm->lsm_ost_count);
898                 submd.lsm_stripe_count = 0;
899                 rc = obd_cancel_unused(&lov->tgts[loi->loi_ost_idx].conn,
900                                        &submd, local_only);
901                 if (rc)
902                         CERROR("Error cancel unused objid "LPX64" subobj "LPX64
903                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
904                                loi->loi_id, loi->loi_ost_idx, rc);
905         }
906         RETURN(rc);
907 }
908
909 static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
910 {
911         struct obd_export *export = class_conn2export(conn);
912         struct lov_obd *lov;
913         struct obd_statfs lov_sfs;
914         int set = 0;
915         int rc = 0;
916         int i;
917         ENTRY;
918
919         if (!export || !export->exp_obd)
920                 RETURN(-ENODEV);
921
922         lov = &export->exp_obd->u.lov;
923
924         /* We only get block data from the OBD */
925         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
926                 int err;
927
928                 err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
929                 if (err) {
930                         CERROR("Error statfs OSC %s idx %d: err = %d\n",
931                                lov->tgts[i].uuid, i, err);
932                         if (!rc)
933                                 rc = err;
934                         continue; /* XXX or break? - probably OK to continue */
935                 }
936                 if (!set) {
937                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
938                         set = 1;
939                 } else {
940                         osfs->os_bfree += lov_sfs.os_bfree;
941                         osfs->os_bavail += lov_sfs.os_bavail;
942                         osfs->os_blocks += lov_sfs.os_blocks;
943                         /* XXX not sure about this one - depends on policy.
944                          *   - could be minimum if we always stripe on all OBDs
945                          *     (but that would be wrong for any other policy,
946                          *     if one of the OBDs has no more objects left)
947                          *   - could be sum if we stripe whole objects
948                          *   - could be average, just to give a nice number
949                          *   - we just pick first OST and hope it is enough
950                         sfs->f_ffree += lov_sfs.f_ffree;
951                          */
952                 }
953         }
954         RETURN(rc);
955 }
956
957
958 struct obd_ops lov_obd_ops = {
959         o_setup:       lov_setup,
960         o_connect:     lov_connect,
961         o_disconnect:  lov_disconnect,
962         o_create:      lov_create,
963         o_destroy:     lov_destroy,
964         o_getattr:     lov_getattr,
965         o_setattr:     lov_setattr,
966         o_statfs:      lov_statfs,
967         o_open:        lov_open,
968         o_close:       lov_close,
969         o_brw:         lov_brw,
970         o_punch:       lov_punch,
971         o_enqueue:     lov_enqueue,
972         o_cancel:      lov_cancel,
973         o_cancel_unused: lov_cancel_unused
974 };
975
976
977 #define LOV_VERSION "v0.1"
978
979 static int __init lov_init(void)
980 {
981         printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
982                ", info@clusterfs.com\n");
983         return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
984 }
985
986 static void __exit lov_exit(void)
987 {
988         class_unregister_type(OBD_LOV_DEVICENAME);
989 }
990
991 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
992 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
993 MODULE_LICENSE("GPL");
994
995 module_init(lov_init);
996 module_exit(lov_exit);