Whamcloud - gitweb
0ebde7994f76465cc36edacda5998e513f8f3942
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1  /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lov/lov.c
5  *
6  * Copyright (C) 2002 Cluster File Systems, Inc.
7  * Author: Phil Schwan <phil@off.net>
8  *         Peter Braam <braam@clusterfs.com>
9  *
10  * This code is issued under the GNU General Public License.
11  * See the file COPYING in this distribution
12  */
13
14 #define EXPORT_SYMTAB
15 #define DEBUG_SUBSYSTEM S_LOV
16
17 #include <linux/slab.h>
18 #include <linux/module.h>
19 #include <linux/obd_support.h>
20 #include <linux/lustre_lib.h>
21 #include <linux/lustre_net.h>
22 #include <linux/lustre_idl.h>
23 #include <linux/lustre_mds.h>
24 #include <linux/obd_class.h>
25 #include <linux/obd_lov.h>
26 #include <linux/init.h>
27 #include <linux/random.h>
28 #include <linux/slab.h>
29 #include <asm/div64.h>
30 #include <linux/lprocfs_status.h>
31
32 extern struct lprocfs_vars status_var_nm_1[];
33 extern struct lprocfs_vars status_class_var[];
34
35 static kmem_cache_t *lov_file_cache;
36
37 struct lov_file_handles {
38         struct list_head lfh_list;
39         __u64 lfh_cookie;
40         int lfh_count;
41         struct lustre_handle *lfh_handles;
42 };
43
44 /* obd methods */
45 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
46                        obd_uuid_t cluuid, struct recovd_obd *recovd,
47                        ptlrpc_recovery_cb_t recover)
48 {
49         struct ptlrpc_request *req = NULL;
50         struct lov_obd *lov = &obd->u.lov;
51         struct client_obd *mdc = &lov->mdcobd->u.cli;
52         struct lov_desc *desc = &lov->desc;
53         struct obd_export *exp;
54         struct lustre_handle mdc_conn;
55         obd_uuid_t *uuidarray;
56         int rc, rc2, i;
57         ENTRY;
58
59         MOD_INC_USE_COUNT;
60         rc = class_connect(conn, obd, cluuid);
61         if (rc) {
62                 MOD_DEC_USE_COUNT;
63                 RETURN(rc);
64         }
65
66         /* We don't want to actually do the underlying connections more than
67          * once, so keep track. */
68         lov->refcount++;
69         if (lov->refcount > 1)
70                 RETURN(0);
71
72         exp = class_conn2export(conn);
73         INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
74
75         /* retrieve LOV metadata from MDS */
76         rc = obd_connect(&mdc_conn, lov->mdcobd, NULL, recovd, recover);
77         if (rc) {
78                 CERROR("cannot connect to mdc: rc = %d\n", rc);
79                 GOTO(out_conn, rc);
80         }
81
82         rc = mdc_getlovinfo(obd, &mdc_conn, &req);
83         rc2 = obd_disconnect(&mdc_conn);
84         if (rc) {
85                 CERROR("cannot get lov info %d\n", rc);
86                 GOTO(out_conn, rc);
87         }
88
89         if (rc2) {
90                 CERROR("error disconnecting from MDS %d\n", rc2);
91                 GOTO(out_conn, rc = rc2);
92         }
93
94         /* sanity... */
95         if (req->rq_repmsg->bufcount < 2 ||
96             req->rq_repmsg->buflens[0] < sizeof(*desc)) {
97                 CERROR("LOV desc: invalid descriptor returned\n");
98                 GOTO(out_conn, rc = -EINVAL);
99         }
100
101         memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
102         lov_unpackdesc(desc);
103
104         if (req->rq_repmsg->buflens[1] < sizeof(*uuidarray)*desc->ld_tgt_count){
105                 CERROR("LOV desc: invalid uuid array returned\n");
106                 GOTO(out_conn, rc = -EINVAL);
107         }
108
109         mdc->cl_max_mds_easize = lov_mds_md_size(desc->ld_tgt_count);
110         mdc->cl_max_ost_easize = lov_stripe_md_size(desc->ld_tgt_count);
111
112         if (memcmp(obd->obd_uuid, desc->ld_uuid, sizeof(desc->ld_uuid))) {
113                 CERROR("LOV desc: uuid %s not on mds device (%s)\n",
114                        obd->obd_uuid, desc->ld_uuid);
115                 GOTO(out_conn, rc = -EINVAL);
116         }
117
118         if (desc->ld_tgt_count > 1000) {
119                 CERROR("LOV desc: target count > 1000 (%d)\n",
120                        desc->ld_tgt_count);
121                 GOTO(out_conn, rc = -EINVAL);
122         }
123
124         /* Because of 64-bit divide/mod operations only work with a 32-bit
125          * divisor in a 32-bit kernel, we cannot support a stripe width
126          * of 4GB or larger on 32-bit CPUs.
127          */
128         if ((desc->ld_default_stripe_count ?
129              desc->ld_default_stripe_count : desc->ld_tgt_count) *
130              desc->ld_default_stripe_size > ~0UL) {
131                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
132                        desc->ld_default_stripe_size,
133                        desc->ld_default_stripe_count ?
134                        desc->ld_default_stripe_count : desc->ld_tgt_count,~0UL);
135                 GOTO(out_conn, rc = -EINVAL);
136         }
137
138         lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
139         OBD_ALLOC(lov->tgts, lov->bufsize);
140         if (!lov->tgts) {
141                 CERROR("Out of memory\n");
142                 GOTO(out_conn, rc = -ENOMEM);
143         }
144
145         uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
146         for (i = 0; i < desc->ld_tgt_count; i++)
147                 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(*uuidarray));
148
149         for (i = 0; i < desc->ld_tgt_count; i++) {
150                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
151
152                 if (!tgt) {
153                         CERROR("Target %s not attached\n", uuidarray[i]);
154                         GOTO(out_disc, rc = -EINVAL);
155                 }
156
157                 if (!(tgt->obd_flags & OBD_SET_UP)) {
158                         CERROR("Target %s not set up\n", uuidarray[i]);
159                         GOTO(out_disc, rc = -EINVAL);
160                 }
161
162                 rc = obd_connect(&lov->tgts[i].conn, tgt, NULL, recovd,
163                                  recover);
164                 if (rc) {
165                         CERROR("Target %s connect error %d\n",
166                                uuidarray[i], rc);
167                         GOTO(out_disc, rc);
168                 }
169                 rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
170                                    sizeof(struct obd_device *), obd, NULL);
171                 if (rc) {
172                         CERROR("Target %s REGISTER_LOV error %d\n",
173                                uuidarray[i], rc);
174                         GOTO(out_disc, rc);
175                 }
176                 desc->ld_active_tgt_count++;
177                 lov->tgts[i].active = 1;
178         }
179
180  out:
181         ptlrpc_req_finished(req);
182         RETURN(rc);
183
184  out_disc:
185         while (i-- > 0) {
186                 desc->ld_active_tgt_count--;
187                 lov->tgts[i].active = 0;
188                 rc2 = obd_disconnect(&lov->tgts[i].conn);
189                 if (rc2)
190                         CERROR("LOV Target %s disconnect error: rc = %d\n",
191                                 uuidarray[i], rc2);
192         }
193         OBD_FREE(lov->tgts, lov->bufsize);
194  out_conn:
195         class_disconnect(conn);
196         goto out;
197 }
198
199 static int lov_disconnect(struct lustre_handle *conn)
200 {
201         struct obd_device *obd = class_conn2obd(conn);
202         struct lov_obd *lov = &obd->u.lov;
203         struct obd_export *exp;
204         struct list_head *p, *n;
205         int rc, i;
206
207         if (!lov->tgts)
208                 goto out_local;
209
210         /* Only disconnect the underlying layers on the final disconnect. */
211         lov->refcount--;
212         if (lov->refcount != 0)
213                 goto out_local;
214
215         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
216                 if (!lov->tgts[i].active) {
217                         CERROR("Skipping disconnect for inactive OSC %s\n",
218                                lov->tgts[i].uuid);
219                         continue;
220                 }
221
222                 lov->desc.ld_active_tgt_count--;
223                 lov->tgts[i].active = 0;
224                 rc = obd_disconnect(&lov->tgts[i].conn);
225                 if (rc) {
226                         CERROR("Target %s disconnect error %d\n",
227                                lov->tgts[i].uuid, rc);
228                         RETURN(rc);
229                 }
230         }
231         OBD_FREE(lov->tgts, lov->bufsize);
232         lov->bufsize = 0;
233         lov->tgts = NULL;
234
235         exp = class_conn2export(conn);
236         list_for_each_safe(p, n, &exp->exp_lov_data.led_open_head) {
237                 /* XXX close these, instead of just discarding them? */
238                 struct lov_file_handles *lfh;
239                 lfh = list_entry(p, typeof(*lfh), lfh_list);
240                 CERROR("discarding open LOV handle %p:"LPX64"\n",
241                        lfh, lfh->lfh_cookie);
242                 list_del(&lfh->lfh_list);
243                 OBD_FREE(lfh->lfh_handles,
244                          lfh->lfh_count * sizeof(*lfh->lfh_handles));
245                 kmem_cache_free(lov_file_cache, lfh);
246         }
247
248  out_local:
249         rc = class_disconnect(conn);
250         if (!rc)
251                 MOD_DEC_USE_COUNT;
252         return rc;
253 }
254
255 /* Error codes:
256  *
257  *  -EINVAL  : UUID can't be found in the LOV's target list
258  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
259  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
260  *  -EALREADY: The OSC is already marked (in)active
261  */
262 static int lov_set_osc_active(struct lov_obd *lov, obd_uuid_t uuid,
263                               int activate)
264 {
265         struct obd_device *obd;
266         int i, rc = 0;
267         ENTRY;
268
269         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
270                lov, uuid, activate);
271
272         spin_lock(&lov->lov_lock);
273         for (i = 0; i < lov->desc.ld_tgt_count; i++)
274                 if (strncmp(uuid, lov->tgts[i].uuid,
275                             sizeof(lov->tgts[i].uuid)) == 0)
276                         break;
277
278         if (i == lov->desc.ld_tgt_count)
279                 GOTO(out, rc = -EINVAL);
280
281         obd = class_conn2obd(&lov->tgts[i].conn);
282         if (obd == NULL) {
283                 LBUG();
284                 GOTO(out, rc = -ENOTCONN);
285         }
286
287         CDEBUG(D_INFO, "Found OBD %p type %s\n", obd, obd->obd_type->typ_name);
288         if (strcmp(obd->obd_type->typ_name, "osc") != 0) {
289                 LBUG();
290                 GOTO(out, rc = -EBADF);
291         }
292
293         if (lov->tgts[i].active == activate) {
294                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
295                        activate ? "" : "in");
296                 GOTO(out, rc = -EALREADY);
297         }
298
299         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
300
301         lov->tgts[i].active = activate;
302         if (activate)
303                 lov->desc.ld_active_tgt_count++;
304         else
305                 lov->desc.ld_active_tgt_count--;
306
307         EXIT;
308  out:
309         spin_unlock(&lov->lov_lock);
310         return rc;
311 }
312
313 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
314 {
315         struct obd_ioctl_data* data = buf;
316         struct lov_obd *lov = &obd->u.lov;
317         int rc = 0;
318         ENTRY;
319
320         if (data->ioc_inllen1 < 1) {
321                 CERROR("osc setup requires an MDC UUID\n");
322                 RETURN(-EINVAL);
323         }
324
325         if (data->ioc_inllen1 > 37) {
326                 CERROR("mdc UUID must be 36 characters or less\n");
327                 RETURN(-EINVAL);
328         }
329
330         spin_lock_init(&lov->lov_lock);
331         lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1);
332         if (!lov->mdcobd) {
333                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid,
334                        data->ioc_inlbuf1);
335                 rc = -EINVAL;
336         }
337         RETURN(rc);
338 }
339
340 static struct lov_file_handles *lov_handle2lfh(struct lustre_handle *handle)
341 {
342         struct lov_file_handles *lfh = NULL;
343
344         if (!handle || !handle->addr)
345                 RETURN(NULL);
346
347         lfh = (struct lov_file_handles *)(unsigned long)(handle->addr);
348         if (!kmem_cache_validate(lov_file_cache, lfh))
349                 RETURN(NULL);
350
351         if (lfh->lfh_cookie != handle->cookie)
352                 RETURN(NULL);
353
354         return lfh;
355 }
356
357 /* the LOV expects oa->o_id to be set to the LOV object id */
358 static int lov_create(struct lustre_handle *conn, struct obdo *oa,
359                       struct lov_stripe_md **ea)
360 {
361         struct obd_export *export = class_conn2export(conn);
362         struct lov_obd *lov;
363         struct lov_stripe_md *lsm;
364         struct lov_oinfo *loi;
365         struct obdo *tmp;
366         int ost_count, ost_idx = 1, i, rc = 0;
367         ENTRY;
368
369         LASSERT(ea);
370
371         if (!export)
372                 RETURN(-EINVAL);
373
374         tmp = obdo_alloc();
375         if (!tmp)
376                 RETURN(-ENOMEM);
377
378         lov = &export->exp_obd->u.lov;
379
380         spin_lock(&lov->lov_lock);
381         ost_count = lov->desc.ld_tgt_count;
382         oa->o_easize = lov_stripe_md_size(ost_count);
383
384         lsm = *ea;
385         if (!lsm) {
386                 OBD_ALLOC(lsm, oa->o_easize);
387                 if (!lsm) {
388                         spin_unlock(&lov->lov_lock);
389                         GOTO(out_tmp, rc = -ENOMEM);
390                 }
391                 lsm->lsm_magic = LOV_MAGIC;
392                 lsm->lsm_mds_easize = lov_mds_md_size(ost_count);
393                 ost_idx = 0; /* if lsm->lsm_stripe_offset is set yet */
394         }
395
396         LASSERT(oa->o_valid & OBD_MD_FLID);
397         lsm->lsm_object_id = oa->o_id;
398         if (!lsm->lsm_stripe_count)
399                 lsm->lsm_stripe_count = lov->desc.ld_default_stripe_count;
400         if (!lsm->lsm_stripe_count)
401                 lsm->lsm_stripe_count = lov->desc.ld_active_tgt_count;
402         else if (lsm->lsm_stripe_count > lov->desc.ld_active_tgt_count)
403                 lsm->lsm_stripe_count = lov->desc.ld_active_tgt_count;
404
405         if (!lsm->lsm_stripe_size)
406                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
407
408         /* Because of 64-bit divide/mod operations only work with a 32-bit
409          * divisor in a 32-bit kernel, we cannot support a stripe width
410          * of 4GB or larger on 32-bit CPUs.
411          */
412         if (lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL) {
413                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
414                        lsm->lsm_stripe_size, lsm->lsm_stripe_count, ~0UL);
415                 spin_unlock(&lov->lov_lock);
416                 GOTO(out_free, rc = -EINVAL);
417         }
418
419         lsm->lsm_ost_count = ost_count;
420         if (!ost_idx || lsm->lsm_stripe_offset >= ost_count) {
421                 int mult = lsm->lsm_object_id * lsm->lsm_stripe_count;
422                 int stripe_offset = mult % ost_count;
423                 int sub_offset = (mult / ost_count) % lsm->lsm_stripe_count;
424
425                 lsm->lsm_stripe_offset = stripe_offset + sub_offset;
426         }
427
428         while (!lov->tgts[lsm->lsm_stripe_offset].active)
429                 lsm->lsm_stripe_offset = (lsm->lsm_stripe_offset+1) % ost_count;
430
431         /* Pick the OSTs before we release the lock */
432         ost_idx = lsm->lsm_stripe_offset;
433         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
434                 CDEBUG(D_INODE, "objid "LPX64"[%d] is ost_idx %d (uuid %s)\n",
435                        lsm->lsm_object_id, i, ost_idx, lov->tgts[ost_idx].uuid);
436                 loi->loi_ost_idx = ost_idx;
437                 do {
438                         ost_idx = (ost_idx + 1) % ost_count;
439                 } while (!lov->tgts[ost_idx].active);
440         }
441
442         spin_unlock(&lov->lov_lock);
443
444         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
445                lsm->lsm_stripe_count,lsm->lsm_object_id,lsm->lsm_stripe_offset);
446
447         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
448                 struct lov_stripe_md obj_md;
449                 struct lov_stripe_md *obj_mdp = &obj_md;
450
451                 ost_idx = loi->loi_ost_idx;
452
453                 /* create data objects with "parent" OA */
454                 memcpy(tmp, oa, sizeof(*tmp));
455                 tmp->o_easize = sizeof(struct lov_stripe_md);
456                 rc = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp);
457                 if (rc) {
458                         CERROR("error creating objid "LPX64" sub-object on "
459                                "OST idx %d: rc = %d\n", oa->o_id, ost_idx, rc);
460                         GOTO(out_cleanup, rc);
461                 }
462                 loi->loi_id = tmp->o_id;
463                 loi->loi_size = tmp->o_size;
464                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
465                        lsm->lsm_object_id, loi->loi_id, ost_idx);
466         }
467
468         *ea = lsm;
469
470  out_tmp:
471         obdo_free(tmp);
472         return rc;
473
474  out_cleanup:
475         while (i-- > 0) {
476                 int err;
477
478                 --loi;
479                 /* destroy already created objects here */
480                 memcpy(tmp, oa, sizeof(*tmp));
481                 tmp->o_id = loi->loi_id;
482                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
483                 if (err)
484                         CERROR("Failed to uncreate objid "LPX64" subobj "
485                                LPX64" on OST idx %d: rc = %d\n",
486                                oa->o_id, loi->loi_id, loi->loi_ost_idx,
487                                err);
488         }
489  out_free:
490         OBD_FREE(lsm, oa->o_easize);
491         goto out_tmp;
492 }
493
494 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
495                        struct lov_stripe_md *lsm)
496 {
497         struct obdo tmp;
498         struct obd_export *export = class_conn2export(conn);
499         struct lov_obd *lov;
500         struct lov_oinfo *loi;
501         struct lov_file_handles *lfh = NULL;
502         int rc = 0, i;
503         ENTRY;
504
505         if (!lsm) {
506                 CERROR("LOV requires striping ea for destruction\n");
507                 RETURN(-EINVAL);
508         }
509
510         if (lsm->lsm_magic != LOV_MAGIC) {
511                 CERROR("LOV striping magic bad %#lx != %#lx\n",
512                        lsm->lsm_magic, LOV_MAGIC);
513                 RETURN(-EINVAL);
514         }
515
516         if (!export || !export->exp_obd)
517                 RETURN(-ENODEV);
518
519         if (oa->o_valid & OBD_MD_FLHANDLE)
520                 lfh = lov_handle2lfh(obdo_handle(oa));
521
522         lov = &export->exp_obd->u.lov;
523         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
524                 memcpy(&tmp, oa, sizeof(tmp));
525                 tmp.o_id = loi->loi_id;
526                 if (lfh)
527                         memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
528                                sizeof(lfh->lfh_handles[i]));
529                 else
530                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
531                 rc = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
532                 if (rc)
533                         CERROR("Error destroying objid "LPX64" subobj "LPX64
534                                " on OST idx %d\n: rc = %d",
535                                oa->o_id, loi->loi_id, loi->loi_ost_idx, rc);
536         }
537         RETURN(rc);
538 }
539
540 /* compute object size given "stripeno" and the ost size */
541 static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
542                                 int stripeno)
543 {
544         unsigned long ssize  = lsm->lsm_stripe_size;
545         unsigned long swidth = ssize * lsm->lsm_stripe_count;
546         unsigned long stripe_size;
547         obd_size lov_size;
548
549         if (ost_size == 0)
550                 return 0;
551
552         /* do_div(a, b) returns a % b, and a = a / b */
553         stripe_size = do_div(ost_size, ssize);
554
555         if (stripe_size)
556                 lov_size = ost_size * swidth + stripeno * ssize + stripe_size;
557         else
558                 lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize;
559
560         return lov_size;
561 }
562
563 static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
564                             struct lov_stripe_md *lsm, int stripeno, int *new)
565 {
566         if (*new) {
567                 obdo_cpy_md(tgt, src, valid);
568                 if (valid & OBD_MD_FLSIZE)
569                         tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
570                 *new = 0;
571         } else {
572                 if (valid & OBD_MD_FLSIZE) {
573                         /* this handles sparse files properly */
574                         obd_size lov_size;
575
576                         lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
577                         if (lov_size > tgt->o_size)
578                                 tgt->o_size = lov_size;
579                 }
580                 if (valid & OBD_MD_FLBLOCKS)
581                         tgt->o_blocks += src->o_blocks;
582                 if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
583                         tgt->o_ctime = src->o_ctime;
584                 if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
585                         tgt->o_mtime = src->o_mtime;
586         }
587 }
588
589 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
590                        struct lov_stripe_md *lsm)
591 {
592         struct obdo tmp;
593         struct obd_export *export = class_conn2export(conn);
594         struct lov_obd *lov;
595         struct lov_oinfo *loi;
596         struct lov_file_handles *lfh = NULL;
597         int rc = 0, i;
598         int new = 1;
599         ENTRY;
600
601         if (!lsm) {
602                 CERROR("LOV requires striping ea\n");
603                 RETURN(-EINVAL);
604         }
605
606         if (lsm->lsm_magic != LOV_MAGIC) {
607                 CERROR("LOV striping magic bad %#lx != %#lx\n",
608                        lsm->lsm_magic, LOV_MAGIC);
609                 RETURN(-EINVAL);
610         }
611
612         if (!export || !export->exp_obd)
613                 RETURN(-ENODEV);
614
615         lov = &export->exp_obd->u.lov;
616
617         if (oa->o_valid & OBD_MD_FLHANDLE)
618                 lfh = lov_handle2lfh(obdo_handle(oa));
619
620         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
621                 int err;
622
623                 if (loi->loi_id == 0)
624                         continue;
625
626                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
627                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
628                 /* create data objects with "parent" OA */
629                 memcpy(&tmp, oa, sizeof(tmp));
630                 tmp.o_id = loi->loi_id;
631                 if (lfh)
632                         memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
633                                sizeof(lfh->lfh_handles[i]));
634                 else
635                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
636
637                 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
638                 if (err) {
639                         CERROR("Error getattr objid "LPX64" subobj "LPX64
640                                " on OST idx %d: rc = %d\n",
641                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
642                         if (!rc)
643                                 rc = err;
644                         continue; /* XXX or break? */
645                 }
646                 lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &new);
647         }
648         RETURN(rc);
649 }
650
651 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
652                        struct lov_stripe_md *lsm)
653 {
654         struct obdo *tmp;
655         struct obd_export *export = class_conn2export(conn);
656         struct lov_obd *lov;
657         struct lov_oinfo *loi;
658         struct lov_file_handles *lfh = NULL;
659         int rc = 0, i;
660         ENTRY;
661
662         /* Note that this code is currently unused, hence LBUG(), just
663          * to know when/if it is ever revived that it needs cleanups.
664          */
665         LBUG();
666
667         if (!lsm) {
668                 CERROR("LOV requires striping ea\n");
669                 RETURN(-EINVAL);
670         }
671
672         if (lsm->lsm_magic != LOV_MAGIC) {
673                 CERROR("LOV striping magic bad %#lx != %#lx\n",
674                        lsm->lsm_magic, LOV_MAGIC);
675                 RETURN(-EINVAL);
676         }
677
678         if (!export || !export->exp_obd)
679                 RETURN(-ENODEV);
680
681         /* size changes should go through punch and not setattr */
682         LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
683
684         tmp = obdo_alloc();
685         if (!tmp)
686                 RETURN(-ENOMEM);
687
688         if (oa->o_valid & OBD_MD_FLHANDLE)
689                 lfh = lov_handle2lfh(obdo_handle(oa));
690
691         lov = &export->exp_obd->u.lov;
692         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
693                 int err;
694
695                 obdo_cpy_md(tmp, oa, oa->o_valid);
696
697                 if (lfh)
698                         memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
699                                 sizeof(lfh->lfh_handles[i]));
700                 else
701                         tmp->o_valid &= ~OBD_MD_FLHANDLE;
702
703                 tmp->o_id = loi->loi_id;
704
705                 err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
706                 if (err) {
707                         CERROR("Error setattr objid "LPX64" subobj "LPX64
708                                " on OST idx %d: rc = %d\n",
709                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
710                         if (!rc)
711                                 rc = err;
712                 }
713         }
714         obdo_free(tmp);
715         RETURN(rc);
716 }
717
718 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
719                     struct lov_stripe_md *lsm)
720 {
721         struct obdo *tmp;
722         struct obd_export *export = class_conn2export(conn);
723         struct lov_obd *lov;
724         struct lov_oinfo *loi;
725         struct lov_file_handles *lfh = NULL;
726         int new = 1;
727         int rc = 0, i;
728         ENTRY;
729
730         if (!lsm) {
731                 CERROR("LOV requires striping ea for opening\n");
732                 RETURN(-EINVAL);
733         }
734
735         if (lsm->lsm_magic != LOV_MAGIC) {
736                 CERROR("LOV striping magic bad %#lx != %#lx\n",
737                        lsm->lsm_magic, LOV_MAGIC);
738                 RETURN(-EINVAL);
739         }
740
741         if (!export || !export->exp_obd)
742                 RETURN(-ENODEV);
743
744         tmp = obdo_alloc();
745         if (!tmp)
746                 RETURN(-ENOMEM);
747
748         lfh = kmem_cache_alloc(lov_file_cache, GFP_KERNEL);
749         if (!lfh)
750                 GOTO(out_tmp, rc = -ENOMEM);
751         OBD_ALLOC(lfh->lfh_handles,
752                   lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
753         if (!lfh->lfh_handles)
754                 GOTO(out_lfh, rc = -ENOMEM);
755
756         lov = &export->exp_obd->u.lov;
757         oa->o_size = 0;
758         oa->o_blocks = 0;
759         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
760                 int err;
761
762                 /* create data objects with "parent" OA */
763                 memcpy(tmp, oa, sizeof(*tmp));
764                 tmp->o_id = loi->loi_id;
765
766                 err = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
767                 if (err) {
768                         CERROR("Error open objid "LPX64" subobj "LPX64
769                                " on OST idx %d: rc = %d\n",
770                                oa->o_id, lsm->lsm_oinfo[i].loi_id,
771                                loi->loi_ost_idx, rc);
772                         if (!rc)
773                                 rc = err;
774                 }
775
776                 lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &new);
777
778                 if (tmp->o_valid & OBD_MD_FLHANDLE)
779                         memcpy(&lfh->lfh_handles[i], obdo_handle(tmp),
780                                sizeof(lfh->lfh_handles[i]));
781         }
782
783         if (tmp->o_valid & OBD_MD_FLHANDLE) {
784                 struct lustre_handle *handle = obdo_handle(oa);
785
786                 lfh->lfh_count = lsm->lsm_stripe_count;
787                 get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie));
788
789                 handle->addr = (__u64)(unsigned long)lfh;
790                 handle->cookie = lfh->lfh_cookie;
791                 oa->o_valid |= OBD_MD_FLHANDLE;
792                 list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
793         } else
794                 goto out_handles;
795
796         /* FIXME: returning an error, but having opened some objects is a bad
797          *        idea, since they will likely never be closed.  We either
798          *        need to not return an error if _some_ objects could be
799          *        opened, and leave it to read/write to return -EIO (with
800          *        hopefully partial error status) or close all opened objects
801          *        and return an error.  I think the former is preferred.
802          */
803 out_tmp:
804         obdo_free(tmp);
805         RETURN(rc);
806
807 out_handles:
808         OBD_FREE(lfh->lfh_handles,
809                  lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
810 out_lfh:
811         lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
812         kmem_cache_free(lov_file_cache, lfh);
813         goto out_tmp;
814 }
815
816 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
817                      struct lov_stripe_md *lsm)
818 {
819         struct obdo tmp;
820         struct obd_export *export = class_conn2export(conn);
821         struct lov_obd *lov;
822         struct lov_oinfo *loi;
823         struct lov_file_handles *lfh = NULL;
824         int rc = 0, i;
825         ENTRY;
826
827         if (!lsm) {
828                 CERROR("LOV requires striping ea\n");
829                 RETURN(-EINVAL);
830         }
831
832         if (lsm->lsm_magic != LOV_MAGIC) {
833                 CERROR("LOV striping magic bad %#lx != %#lx\n",
834                        lsm->lsm_magic, LOV_MAGIC);
835                 RETURN(-EINVAL);
836         }
837
838         if (!export || !export->exp_obd)
839                 RETURN(-ENODEV);
840
841         if (oa->o_valid & OBD_MD_FLHANDLE)
842                 lfh = lov_handle2lfh(obdo_handle(oa));
843
844         lov = &export->exp_obd->u.lov;
845         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
846                 int err;
847
848                 /* create data objects with "parent" OA */
849                 memcpy(&tmp, oa, sizeof(tmp));
850                 tmp.o_id = loi->loi_id;
851                 if (lfh)
852                         memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
853                                sizeof(lfh->lfh_handles[i]));
854                 else
855                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
856
857                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
858                 if (err) {
859                         CERROR("Error close objid "LPX64" subobj "LPX64
860                                " on OST idx %d: rc = %d\n",
861                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
862                         if (!rc)
863                                 rc = err;
864                 }
865         }
866         if (lfh) {
867                 list_del(&lfh->lfh_list);
868                 OBD_FREE(lfh->lfh_handles,
869                          lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
870                 lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
871                 kmem_cache_free(lov_file_cache, lfh);
872         }
873
874         RETURN(rc);
875 }
876
877 #ifndef log2
878 #define log2(n) ffz(~(n))
879 #endif
880
881 #warning FIXME: merge these two functions now that they are nearly the same
882
883 /* compute ost offset in stripe "stripeno" corresponding to offset "lov_off" */
884 static obd_off lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
885                                  int stripeno)
886 {
887         unsigned long ssize  = lsm->lsm_stripe_size;
888         unsigned long swidth = ssize * lsm->lsm_stripe_count;
889         unsigned long stripe_off, this_stripe;
890
891         if (lov_off == OBD_OBJECT_EOF || lov_off == 0)
892                 return lov_off;
893
894         /* do_div(a, b) returns a % b, and a = a / b */
895         stripe_off = do_div(lov_off, swidth);
896
897         this_stripe = stripeno * ssize;
898         if (stripe_off <= this_stripe)
899                 stripe_off = 0;
900         else {
901                 stripe_off -= this_stripe;
902
903                 if (stripe_off > ssize)
904                         stripe_off = ssize;
905         }
906
907
908         return lov_off * ssize + stripe_off;
909 }
910
911 /* compute which stripe number "lov_off" will be written into */
912 static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
913 {
914         unsigned long ssize  = lsm->lsm_stripe_size;
915         unsigned long swidth = ssize * lsm->lsm_stripe_count;
916         unsigned long stripe_off;
917
918         stripe_off = do_div(lov_off, swidth);
919
920         return stripe_off / ssize;
921 }
922
923
924 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
925  * we can send this 'punch' to just the authoritative node and the nodes
926  * that the punch will affect. */
927 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
928                      struct lov_stripe_md *lsm,
929                      obd_off start, obd_off end)
930 {
931         struct obdo tmp;
932         struct obd_export *export = class_conn2export(conn);
933         struct lov_obd *lov;
934         struct lov_oinfo *loi;
935         struct lov_file_handles *lfh = NULL;
936         int rc = 0, i;
937         ENTRY;
938
939         if (!lsm) {
940                 CERROR("LOV requires striping ea\n");
941                 RETURN(-EINVAL);
942         }
943
944         if (lsm->lsm_magic != LOV_MAGIC) {
945                 CERROR("LOV striping magic bad %#lx != %#lx\n",
946                        lsm->lsm_magic, LOV_MAGIC);
947                 RETURN(-EINVAL);
948         }
949
950         if (!export || !export->exp_obd)
951                 RETURN(-ENODEV);
952
953         if (oa->o_valid & OBD_MD_FLHANDLE)
954                 lfh = lov_handle2lfh(obdo_handle(oa));
955
956         lov = &export->exp_obd->u.lov;
957         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
958                 obd_off starti = lov_stripe_offset(lsm, start, i);
959                 obd_off endi = lov_stripe_offset(lsm, end, i);
960                 int err;
961
962                 if (starti == endi)
963                         continue;
964                 /* create data objects with "parent" OA */
965                 memcpy(&tmp, oa, sizeof(tmp));
966                 tmp.o_id = loi->loi_id;
967                 if (lfh)
968                         memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
969                                sizeof(lfh->lfh_handles[i]));
970                 else
971                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
972
973                 err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL,
974                                 starti, endi);
975                 if (err) {
976                         CERROR("Error punch objid "LPX64" subobj "LPX64
977                                " on OST idx %d: rc = %d\n",
978                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
979                         if (!rc)
980                                 rc = err;
981                 }
982         }
983         RETURN(rc);
984 }
985
986 static int lov_osc_brw_cb(struct brw_cb_data *brw_cbd, int err, int phase)
987 {
988         int ret = 0;
989         ENTRY;
990
991         if (phase == CB_PHASE_START)
992                 RETURN(0);
993
994         if (phase == CB_PHASE_FINISH) {
995                 if (err)
996                         brw_cbd->brw_err = err;
997                 if (atomic_dec_and_test(&brw_cbd->brw_refcount))
998                         ret = brw_cbd->brw_cb(brw_cbd->brw_data, brw_cbd->brw_err, phase);
999                 RETURN(ret);
1000         }
1001
1002         LBUG();
1003         return 0;
1004 }
1005
1006 static inline int lov_brw(int cmd, struct lustre_handle *conn,
1007                           struct lov_stripe_md *lsm, obd_count oa_bufs,
1008                           struct brw_page *pga,
1009                           brw_cb_t brw_cb, struct brw_cb_data *brw_cbd)
1010 {
1011         int stripe_count = lsm->lsm_stripe_count;
1012         struct obd_export *export = class_conn2export(conn);
1013         struct lov_obd *lov;
1014         struct {
1015                 int bufct;
1016                 int index;
1017                 int subcount;
1018                 struct lov_stripe_md lsm;
1019                 int ost_idx;
1020         } *stripeinfo, *si, *si_last;
1021         struct brw_page *ioarr;
1022         int rc, i;
1023         struct brw_cb_data *osc_brw_cbd;
1024         struct lov_oinfo *loi;
1025         int *where;
1026         ENTRY;
1027
1028         if (!lsm) {
1029                 CERROR("LOV requires striping ea\n");
1030                 RETURN(-EINVAL);
1031         }
1032
1033         if (lsm->lsm_magic != LOV_MAGIC) {
1034                 CERROR("LOV striping magic bad %#lx != %#lx\n",
1035                        lsm->lsm_magic, LOV_MAGIC);
1036                 RETURN(-EINVAL);
1037         }
1038
1039         lov = &export->exp_obd->u.lov;
1040
1041         osc_brw_cbd = ll_init_brw_cb_data();
1042         if (!osc_brw_cbd)
1043                 RETURN(-ENOMEM);
1044
1045         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1046         if (!stripeinfo)
1047                 GOTO(out_cbdata, rc = -ENOMEM);
1048
1049         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1050         if (!where)
1051                 GOTO(out_sinfo, rc = -ENOMEM);
1052
1053         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1054         if (!ioarr)
1055                 GOTO(out_where, rc = -ENOMEM);
1056
1057         /* This is the only race-free way I can think of to get the refcount
1058          * correct. -phil */
1059         atomic_set(&osc_brw_cbd->brw_refcount, 0);
1060         osc_brw_cbd->brw_cb = brw_cb;
1061         osc_brw_cbd->brw_data = brw_cbd;
1062
1063         for (i = 0; i < oa_bufs; i++) {
1064                 where[i] = lov_stripe_number(lsm, pga[i].off);
1065                 if (stripeinfo[where[i]].bufct++ == 0)
1066                         atomic_inc(&osc_brw_cbd->brw_refcount);
1067         }
1068
1069         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1070              i < stripe_count; i++, loi++, si_last = si, si++) {
1071                 if (i > 0)
1072                         si->index = si_last->index + si_last->bufct;
1073                 si->lsm.lsm_object_id = loi->loi_id;
1074                 si->ost_idx = loi->loi_ost_idx;
1075         }
1076
1077         for (i = 0; i < oa_bufs; i++) {
1078                 int which = where[i];
1079                 int shift;
1080
1081                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1082                 LASSERT(shift < oa_bufs);
1083                 ioarr[shift] = pga[i];
1084                 ioarr[shift].off = lov_stripe_offset(lsm, pga[i].off, which);
1085                 stripeinfo[which].subcount++;
1086         }
1087
1088         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1089                 int shift = si->index;
1090
1091                 if (si->bufct) {
1092                         LASSERT(shift < oa_bufs);
1093                         /* XXX handle error returns here */
1094                         obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
1095                                 &si->lsm, si->bufct, &ioarr[shift],
1096                                 lov_osc_brw_cb, osc_brw_cbd);
1097                 }
1098         }
1099
1100         rc = brw_cb(brw_cbd, 0, CB_PHASE_START);
1101
1102         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1103  out_where:
1104         OBD_FREE(where, sizeof(*where) * oa_bufs);
1105  out_sinfo:
1106         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1107  out_cbdata:
1108         OBD_FREE(osc_brw_cbd, sizeof(*osc_brw_cbd));
1109         RETURN(rc);
1110 }
1111
1112 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1113                        struct lustre_handle *parent_lock,
1114                        __u32 type, void *cookie, int cookielen, __u32 mode,
1115                        int *flags, void *cb, void *data, int datalen,
1116                        struct lustre_handle *lockhs)
1117 {
1118         struct obd_export *export = class_conn2export(conn);
1119         struct lov_obd *lov;
1120         struct lov_oinfo *loi;
1121         int rc = 0, i;
1122         ENTRY;
1123
1124         if (!lsm) {
1125                 CERROR("LOV requires striping ea\n");
1126                 RETURN(-EINVAL);
1127         }
1128
1129         if (lsm->lsm_magic != LOV_MAGIC) {
1130                 CERROR("LOV striping magic bad %#lx != %#lx\n",
1131                        lsm->lsm_magic, LOV_MAGIC);
1132                 RETURN(-EINVAL);
1133         }
1134
1135         if (!export || !export->exp_obd)
1136                 RETURN(-ENODEV);
1137
1138         lov = &export->exp_obd->u.lov;
1139         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1140                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
1141                 struct ldlm_extent sub_ext;
1142                 struct lov_stripe_md submd;
1143
1144                 sub_ext.start = lov_stripe_offset(lsm, extent->start, i);
1145                 sub_ext.end = lov_stripe_offset(lsm, extent->end, i);
1146                 if (sub_ext.start == sub_ext.end)
1147                         continue;
1148
1149                 submd.lsm_object_id = loi->loi_id;
1150                 /* XXX submd lsm_mds_easize should be that from the subobj,
1151                  *     and the subobj should get it opaquely from the LOV.
1152                  */
1153                 submd.lsm_mds_easize = lov_mds_md_size(lsm->lsm_ost_count);
1154                 submd.lsm_stripe_count = 0;
1155                 /* XXX submd is not fully initialized here */
1156                 rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
1157                                  parent_lock, type, &sub_ext, sizeof(sub_ext),
1158                                  mode, flags, cb, data, datalen, &(lockhs[i]));
1159                 // XXX add a lock debug statement here
1160                 if (rc)
1161                         CERROR("Error enqueue objid "LPX64" subobj "LPX64
1162                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1163                                loi->loi_id, loi->loi_ost_idx, rc);
1164         }
1165         RETURN(rc);
1166 }
1167
1168 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1169                       __u32 mode, struct lustre_handle *lockhs)
1170 {
1171         struct obd_export *export = class_conn2export(conn);
1172         struct lov_obd *lov;
1173         struct lov_oinfo *loi;
1174         int rc = 0, i;
1175         ENTRY;
1176
1177         if (!lsm) {
1178                 CERROR("LOV requires striping ea\n");
1179                 RETURN(-EINVAL);
1180         }
1181
1182         if (lsm->lsm_magic != LOV_MAGIC) {
1183                 CERROR("LOV striping magic bad %#lx != %#lx\n",
1184                        lsm->lsm_magic, LOV_MAGIC);
1185                 RETURN(-EINVAL);
1186         }
1187
1188         if (!export || !export->exp_obd)
1189                 RETURN(-ENODEV);
1190
1191         lov = &export->exp_obd->u.lov;
1192         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1193                 struct lov_stripe_md submd;
1194
1195                 if (lockhs[i].addr == 0)
1196                         continue;
1197
1198                 submd.lsm_object_id = loi->loi_id;
1199                 submd.lsm_mds_easize = lov_mds_md_size(lsm->lsm_ost_count);
1200                 submd.lsm_stripe_count = 0;
1201                 rc = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1202                                 mode, &lockhs[i]);
1203                 if (rc)
1204                         CERROR("Error cancel objid "LPX64" subobj "LPX64
1205                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1206                                loi->loi_id, loi->loi_ost_idx, rc);
1207         }
1208         RETURN(rc);
1209 }
1210
1211 static int lov_cancel_unused(struct lustre_handle *conn,
1212                              struct lov_stripe_md *lsm, int flags)
1213 {
1214         struct obd_export *export = class_conn2export(conn);
1215         struct lov_obd *lov;
1216         struct lov_oinfo *loi;
1217         int rc = 0, i;
1218         ENTRY;
1219
1220         if (!lsm) {
1221                 CERROR("LOV requires striping ea for lock cancellation\n");
1222                 RETURN(-EINVAL);
1223         }
1224
1225         if (!export || !export->exp_obd)
1226                 RETURN(-ENODEV);
1227
1228         lov = &export->exp_obd->u.lov;
1229         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1230                 struct lov_stripe_md submd;
1231
1232                 submd.lsm_object_id = loi->loi_id;
1233                 submd.lsm_mds_easize = lov_mds_md_size(lsm->lsm_ost_count);
1234                 submd.lsm_stripe_count = 0;
1235                 rc = obd_cancel_unused(&lov->tgts[loi->loi_ost_idx].conn,
1236                                        &submd, flags);
1237                 if (rc)
1238                         CERROR("Error cancel unused objid "LPX64" subobj "LPX64
1239                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1240                                loi->loi_id, loi->loi_ost_idx, rc);
1241         }
1242         RETURN(rc);
1243 }
1244
1245 static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
1246 {
1247         struct obd_export *export = class_conn2export(conn);
1248         struct lov_obd *lov;
1249         struct obd_statfs lov_sfs;
1250         int set = 0;
1251         int rc = 0;
1252         int i;
1253         ENTRY;
1254
1255         if (!export || !export->exp_obd)
1256                 RETURN(-ENODEV);
1257
1258         lov = &export->exp_obd->u.lov;
1259
1260         /* We only get block data from the OBD */
1261         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1262                 int err;
1263
1264                 if (!lov->tgts[i].active)
1265                         continue;
1266
1267                 err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
1268                 if (err) {
1269                         CERROR("Error statfs OSC %s idx %d: err = %d\n",
1270                                lov->tgts[i].uuid, i, err);
1271                         if (!rc)
1272                                 rc = err;
1273                         continue; /* XXX or break? - probably OK to continue */
1274                 }
1275                 if (!set) {
1276                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
1277                         set = 1;
1278                 } else {
1279                         osfs->os_bfree += lov_sfs.os_bfree;
1280                         osfs->os_bavail += lov_sfs.os_bavail;
1281                         osfs->os_blocks += lov_sfs.os_blocks;
1282                         /* XXX not sure about this one - depends on policy.
1283                          *   - could be minimum if we always stripe on all OBDs
1284                          *     (but that would be wrong for any other policy,
1285                          *     if one of the OBDs has no more objects left)
1286                          *   - could be sum if we stripe whole objects
1287                          *   - could be average, just to give a nice number
1288                          *   - we just pick first OST and hope it is enough
1289                         sfs->f_ffree += lov_sfs.f_ffree;
1290                          */
1291                 }
1292         }
1293         RETURN(rc);
1294 }
1295
1296 static int lov_iocontrol(long cmd, struct lustre_handle *conn, int len,
1297                          void *karg, void *uarg)
1298 {
1299         struct obd_device *obddev = class_conn2obd(conn);
1300         struct obd_ioctl_data *data = karg;
1301         struct lov_obd *lov = &obddev->u.lov;
1302         struct lov_desc *desc;
1303         struct lov_tgt_desc *tgtdesc;
1304         obd_uuid_t *uuidp;
1305         char *buf;
1306         int rc, i, count;
1307         ENTRY;
1308
1309         switch (cmd) {
1310         case IOC_LOV_SET_OSC_ACTIVE:
1311                 rc = lov_set_osc_active(lov,data->ioc_inlbuf1,data->ioc_offset);
1312                 break;
1313         case OBD_IOC_LOV_GET_CONFIG:
1314                 buf = NULL;
1315                 len = 0;
1316                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1317                         RETURN(-EINVAL);
1318
1319                 data = (struct obd_ioctl_data *)buf;
1320
1321                 if (sizeof(*desc) > data->ioc_inllen1) {
1322                         OBD_FREE(buf, len);
1323                         RETURN(-EINVAL);
1324                 }
1325
1326                 count = lov->desc.ld_tgt_count;
1327
1328                 if (sizeof(*uuidp) * count > data->ioc_inllen2) {
1329                         OBD_FREE(buf, len);
1330                         RETURN(-EINVAL);
1331                 }
1332
1333                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1334                 uuidp = (obd_uuid_t *)data->ioc_inlbuf2;
1335                 memcpy(desc, &(lov->desc), sizeof(*desc));
1336
1337                 tgtdesc = lov->tgts;
1338                 for (i = 0; i < count; i++, uuidp++, tgtdesc++)
1339                         memcpy(uuidp, tgtdesc->uuid, sizeof(*uuidp));
1340
1341                 rc = copy_to_user((void *)uarg, buf, len);
1342                 OBD_FREE(buf, len);
1343                 break;
1344         default:
1345                 if (lov->desc.ld_tgt_count == 0)
1346                         RETURN(-ENOTTY);
1347                 rc = 0;
1348                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1349                         int err = obd_iocontrol(cmd, &lov->tgts[i].conn,
1350                                                 len, data, NULL);
1351                         if (err && !rc)
1352                                 rc = err;
1353                 }
1354         }
1355
1356         RETURN(rc);
1357 }
1358
1359 int lov_attach(struct obd_device *dev,
1360                obd_count len, void *data)
1361 {
1362         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
1363 }
1364
1365 int lov_detach(struct obd_device *dev)
1366 {
1367         return lprocfs_dereg_obd(dev);
1368 }
1369
1370 struct obd_ops lov_obd_ops = {
1371         o_attach:      lov_attach,
1372         o_detach:      lov_detach,
1373         o_setup:       lov_setup,
1374         o_connect:     lov_connect,
1375         o_disconnect:  lov_disconnect,
1376         o_create:      lov_create,
1377         o_destroy:     lov_destroy,
1378         o_getattr:     lov_getattr,
1379         o_setattr:     lov_setattr,
1380         o_statfs:      lov_statfs,
1381         o_open:        lov_open,
1382         o_close:       lov_close,
1383         o_brw:         lov_brw,
1384         o_punch:       lov_punch,
1385         o_enqueue:     lov_enqueue,
1386         o_cancel:      lov_cancel,
1387         o_cancel_unused: lov_cancel_unused,
1388         o_iocontrol:   lov_iocontrol
1389 };
1390
1391
1392 #define LOV_VERSION "v0.1"
1393
1394 static int __init lov_init(void)
1395 {
1396         int rc;
1397         printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
1398                ", info@clusterfs.com\n");
1399         lov_file_cache = kmem_cache_create("ll_lov_file_data",
1400                                            sizeof(struct lov_file_handles),
1401                                            0, 0, NULL, NULL);
1402         if (!lov_file_cache)
1403                 RETURN(-ENOMEM);
1404         rc = class_register_type(&lov_obd_ops, status_class_var,
1405                                  OBD_LOV_DEVICENAME);
1406         RETURN(rc);
1407 }
1408
1409 static void __exit lov_exit(void)
1410 {
1411         if (kmem_cache_destroy(lov_file_cache))
1412                 CERROR("couldn't free LOV open cache\n");
1413         class_unregister_type(OBD_LOV_DEVICENAME);
1414 }
1415
1416 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1417 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver " LOV_VERSION);
1418 MODULE_LICENSE("GPL");
1419
1420 module_init(lov_init);
1421 module_exit(lov_exit);