Whamcloud - gitweb
* merge b_recovery into HEAD
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1  /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lov/lov.c
5  *
6  * Copyright (C) 2002 Cluster File Systems, Inc.
7  * Author: Phil Schwan <phil@off.net>
8  *         Peter Braam <braam@clusterfs.com>
9  *         Mike Shaver <shaver@off.net>
10  *
11  * This code is issued under the GNU General Public License.
12  * See the file COPYING in this distribution
13  */
14
15 #define EXPORT_SYMTAB
16 #define DEBUG_SUBSYSTEM S_LOV
17
18 #include <linux/slab.h>
19 #include <linux/module.h>
20 #include <linux/obd_support.h>
21 #include <linux/lustre_lib.h>
22 #include <linux/lustre_net.h>
23 #include <linux/lustre_idl.h>
24 #include <linux/lustre_mds.h>
25 #include <linux/obd_class.h>
26 #include <linux/obd_lov.h>
27 #include <linux/init.h>
28 #include <linux/random.h>
29 #include <linux/slab.h>
30 #include <asm/div64.h>
31 #include <linux/lprocfs_status.h>
32
33 extern struct lprocfs_vars status_var_nm_1[];
34 extern struct lprocfs_vars status_class_var[];
35
36 static kmem_cache_t *lov_file_cache;
37
38 struct lov_file_handles {
39         struct list_head lfh_list;
40         __u64 lfh_cookie;
41         int lfh_count;
42         struct lustre_handle *lfh_handles;
43 };
44
45 extern int lov_packmd(struct lustre_handle *conn, struct lov_mds_md **lmm,
46                        struct lov_stripe_md *lsm);
47 extern int lov_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsm,
48                          struct lov_mds_md *lmm);
49
50 /* obd methods */
51 int lov_attach(struct obd_device *dev, obd_count len, void *data)
52 {
53         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
54 }
55
56 int lov_detach(struct obd_device *dev)
57 {
58         return lprocfs_dereg_obd(dev);
59 }
60
61 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
62                        obd_uuid_t cluuid, struct recovd_obd *recovd,
63                        ptlrpc_recovery_cb_t recover)
64 {
65         struct ptlrpc_request *req = NULL;
66         struct lov_obd *lov = &obd->u.lov;
67         struct client_obd *mdc = &lov->mdcobd->u.cli;
68         struct lov_desc *desc = &lov->desc;
69         struct obd_export *exp;
70         struct lustre_handle mdc_conn;
71         obd_uuid_t *uuidarray;
72         int rc, rc2, i;
73         ENTRY;
74
75         MOD_INC_USE_COUNT;
76         rc = class_connect(conn, obd, cluuid);
77         if (rc)
78                 GOTO(out_dec, rc);
79
80         /* We don't want to actually do the underlying connections more than
81          * once, so keep track. */
82         lov->refcount++;
83         if (lov->refcount > 1)
84                 RETURN(0);
85
86         exp = class_conn2export(conn);
87         INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
88
89         /* retrieve LOV metadata from MDS */
90         rc = obd_connect(&mdc_conn, lov->mdcobd, NULL, recovd, recover);
91         if (rc) {
92                 CERROR("cannot connect to mdc: rc = %d\n", rc);
93                 GOTO(out_conn, rc);
94         }
95
96         rc = mdc_getlovinfo(obd, &mdc_conn, &req);
97         rc2 = obd_disconnect(&mdc_conn);
98         if (rc) {
99                 CERROR("cannot get lov info %d\n", rc);
100                 GOTO(out_conn, rc);
101         }
102
103         if (rc2) {
104                 CERROR("error disconnecting from MDS %d\n", rc2);
105                 GOTO(out_conn, rc = rc2);
106         }
107
108         /* sanity... */
109         if (req->rq_repmsg->bufcount < 2 ||
110             req->rq_repmsg->buflens[0] < sizeof(*desc)) {
111                 CERROR("LOV desc: invalid descriptor returned\n");
112                 GOTO(out_conn, rc = -EINVAL);
113         }
114
115         memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
116         lov_unpackdesc(desc);
117
118         if (req->rq_repmsg->buflens[1] < sizeof(*uuidarray)*desc->ld_tgt_count){
119                 CERROR("LOV desc: invalid uuid array returned\n");
120                 GOTO(out_conn, rc = -EINVAL);
121         }
122
123         if (memcmp(obd->obd_uuid, desc->ld_uuid, sizeof(desc->ld_uuid))) {
124                 CERROR("LOV desc: uuid %s not on mds device (%s)\n",
125                        obd->obd_uuid, desc->ld_uuid);
126                 GOTO(out_conn, rc = -EINVAL);
127         }
128
129         if (desc->ld_tgt_count > 1000) {
130                 CERROR("LOV desc: target count > 1000 (%d)\n",
131                        desc->ld_tgt_count);
132                 GOTO(out_conn, rc = -EINVAL);
133         }
134
135         /* Because of 64-bit divide/mod operations only work with a 32-bit
136          * divisor in a 32-bit kernel, we cannot support a stripe width
137          * of 4GB or larger on 32-bit CPUs.
138          */
139         if ((desc->ld_default_stripe_count ?
140              desc->ld_default_stripe_count : desc->ld_tgt_count) *
141              desc->ld_default_stripe_size > ~0UL) {
142                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
143                        desc->ld_default_stripe_size,
144                        desc->ld_default_stripe_count ?
145                        desc->ld_default_stripe_count : desc->ld_tgt_count,~0UL);
146                 GOTO(out_conn, rc = -EINVAL);
147         }
148
149         lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
150         OBD_ALLOC(lov->tgts, lov->bufsize);
151         if (!lov->tgts) {
152                 CERROR("Out of memory\n");
153                 GOTO(out_conn, rc = -ENOMEM);
154         }
155
156         uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
157         for (i = 0; i < desc->ld_tgt_count; i++)
158                 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(*uuidarray));
159
160         for (i = 0; i < desc->ld_tgt_count; i++) {
161                 struct obd_device *tgt = client_tgtuuid2obd(uuidarray[i]);
162                 int rc2;
163
164                 if (!tgt) {
165                         CERROR("Target %s not attached\n", uuidarray[i]);
166                         GOTO(out_disc, rc = -EINVAL);
167                 }
168
169                 if (!(tgt->obd_flags & OBD_SET_UP)) {
170                         CERROR("Target %s not set up\n", uuidarray[i]);
171                         GOTO(out_disc, rc = -EINVAL);
172                 }
173
174                 rc = obd_connect(&lov->tgts[i].conn, tgt, NULL, recovd,
175                                  recover);
176
177                 /* Register even if connect failed, so that we get reactivation
178                  * notices.
179                  */
180                 rc2 = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
181                                     sizeof(struct obd_device *), obd, NULL);
182                 if (rc2) {
183                         CERROR("Target %s REGISTER_LOV error %d\n",
184                                uuidarray[i], rc2);
185                         GOTO(out_disc, rc2);
186                 }
187
188                 /* But mark failed-connect OSCs as inactive! */
189                 if (rc) {
190                         CDEBUG(D_INFO, "Target %s connect error %d\n",
191                                uuidarray[i], rc);
192                         LASSERT(lov->tgts[i].active == 0);
193                         rc = 0;
194                         continue;
195                 }
196                 
197                 desc->ld_active_tgt_count++;
198                 lov->tgts[i].active = 1;
199         }
200
201         mdc->cl_max_mds_easize = obd_size_wiremd(conn, NULL);
202
203  out:
204         ptlrpc_req_finished(req);
205         RETURN(rc);
206
207  out_disc:
208         while (i-- > 0) {
209                 desc->ld_active_tgt_count--;
210                 lov->tgts[i].active = 0;
211                 rc2 = obd_disconnect(&lov->tgts[i].conn);
212                 if (rc2)
213                         CERROR("LOV Target %s disconnect error: rc = %d\n",
214                                 uuidarray[i], rc2);
215         }
216         OBD_FREE(lov->tgts, lov->bufsize);
217  out_conn:
218         class_disconnect(conn);
219  out_dec:
220         MOD_DEC_USE_COUNT;
221         goto out;
222 }
223
224 static int lov_disconnect(struct lustre_handle *conn)
225 {
226         struct obd_device *obd = class_conn2obd(conn);
227         struct lov_obd *lov = &obd->u.lov;
228         struct obd_export *exp;
229         struct list_head *p, *n;
230         int rc, i;
231
232         if (!lov->tgts)
233                 goto out_local;
234
235         /* Only disconnect the underlying layers on the final disconnect. */
236         lov->refcount--;
237         if (lov->refcount != 0)
238                 goto out_local;
239
240         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
241                 rc = obd_disconnect(&lov->tgts[i].conn);
242                 if (rc) {
243                         if (lov->tgts[i].active) {
244                                 CERROR("Target %s disconnect error %d\n",
245                                        lov->tgts[i].uuid, rc);
246                         }
247                         rc = 0;
248                 }
249                 if (lov->tgts[i].active) {
250                         lov->desc.ld_active_tgt_count--;
251                         lov->tgts[i].active = 0;
252                 }
253         }
254         OBD_FREE(lov->tgts, lov->bufsize);
255         lov->bufsize = 0;
256         lov->tgts = NULL;
257
258         exp = class_conn2export(conn);
259         list_for_each_safe(p, n, &exp->exp_lov_data.led_open_head) {
260                 /* XXX close these, instead of just discarding them? */
261                 struct lov_file_handles *lfh;
262                 lfh = list_entry(p, typeof(*lfh), lfh_list);
263                 CERROR("discarding open LOV handle %p:"LPX64"\n",
264                        lfh, lfh->lfh_cookie);
265                 list_del(&lfh->lfh_list);
266                 OBD_FREE(lfh->lfh_handles,
267                          lfh->lfh_count * sizeof(*lfh->lfh_handles));
268                 kmem_cache_free(lov_file_cache, lfh);
269         }
270
271  out_local:
272         rc = class_disconnect(conn);
273         if (!rc)
274                 MOD_DEC_USE_COUNT;
275         return rc;
276 }
277
278 /* Error codes:
279  *
280  *  -EINVAL  : UUID can't be found in the LOV's target list
281  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
282  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
283  *  -EALREADY: The OSC is already marked (in)active
284  */
285 static int lov_set_osc_active(struct lov_obd *lov, obd_uuid_t uuid,
286                               int activate)
287 {
288         struct obd_device *obd;
289         int i, rc = 0;
290         ENTRY;
291
292         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
293                lov, uuid, activate);
294
295         spin_lock(&lov->lov_lock);
296         for (i = 0; i < lov->desc.ld_tgt_count; i++)
297                 if (strncmp(uuid, lov->tgts[i].uuid,
298                             sizeof(lov->tgts[i].uuid)) == 0)
299                         break;
300
301         if (i == lov->desc.ld_tgt_count)
302                 GOTO(out, rc = -EINVAL);
303
304         obd = class_conn2obd(&lov->tgts[i].conn);
305         if (obd == NULL) {
306                 LBUG();
307                 GOTO(out, rc = -ENOTCONN);
308         }
309
310         CDEBUG(D_INFO, "Found OBD %p type %s\n", obd, obd->obd_type->typ_name);
311         if (strcmp(obd->obd_type->typ_name, "osc") != 0) {
312                 LBUG();
313                 GOTO(out, rc = -EBADF);
314         }
315
316         if (lov->tgts[i].active == activate) {
317                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
318                        activate ? "" : "in");
319                 GOTO(out, rc = -EALREADY);
320         }
321
322         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
323
324         lov->tgts[i].active = activate;
325         if (activate) {
326                 /*
327                  * foreach(export)
328                  *     foreach(open_file)
329                  *         if (file_handle uses this_osc)
330                  *             if (has_no_filehandle)
331                  *                 open(file_handle, this_osc);
332                  */
333                 /* XXX reconnect? */
334                 lov->desc.ld_active_tgt_count++;
335         } else {
336                 /*
337                  * Should I invalidate filehandles that refer to this OSC, so
338                  * that I reopen them during reactivation?
339                  */
340                 /* XXX disconnect from OSC? */
341                 lov->desc.ld_active_tgt_count--;
342         }
343
344         EXIT;
345  out:
346         spin_unlock(&lov->lov_lock);
347         return rc;
348 }
349
350 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
351 {
352         struct obd_ioctl_data *data = buf;
353         struct lov_obd *lov = &obd->u.lov;
354         int rc = 0;
355         ENTRY;
356
357         if (data->ioc_inllen1 < 1) {
358                 CERROR("LOV setup requires an MDC UUID\n");
359                 RETURN(-EINVAL);
360         }
361
362         if (data->ioc_inllen1 > 37) {
363                 CERROR("mdc UUID must be 36 characters or less\n");
364                 RETURN(-EINVAL);
365         }
366
367         spin_lock_init(&lov->lov_lock);
368         lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1);
369         if (!lov->mdcobd) {
370                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid,
371                        data->ioc_inlbuf1);
372                 rc = -EINVAL;
373         }
374         RETURN(rc);
375 }
376
377 static struct lov_file_handles *lov_handle2lfh(struct lustre_handle *handle)
378 {
379         struct lov_file_handles *lfh = NULL;
380
381         if (!handle || !handle->addr)
382                 RETURN(NULL);
383
384         lfh = (struct lov_file_handles *)(unsigned long)(handle->addr);
385         if (!kmem_cache_validate(lov_file_cache, lfh))
386                 RETURN(NULL);
387
388         if (lfh->lfh_cookie != handle->cookie)
389                 RETURN(NULL);
390
391         return lfh;
392 }
393
394 /* the LOV expects oa->o_id to be set to the LOV object id */
395 static int lov_create(struct lustre_handle *conn, struct obdo *oa,
396                       struct lov_stripe_md **ea)
397 {
398         struct obd_export *export = class_conn2export(conn);
399         struct lov_obd *lov;
400         struct lov_stripe_md *lsm;
401         struct lov_oinfo *loi;
402         struct obdo *tmp;
403         int ost_count, ost_idx = 1;
404         int rc = 0, i;
405         ENTRY;
406
407         LASSERT(ea);
408
409         if (!export)
410                 RETURN(-EINVAL);
411
412         tmp = obdo_alloc();
413         if (!tmp)
414                 RETURN(-ENOMEM);
415
416         lov = &export->exp_obd->u.lov;
417
418         if (!lov->desc.ld_active_tgt_count)
419                 RETURN(-EIO);
420
421         spin_lock(&lov->lov_lock);
422         ost_count = lov->desc.ld_tgt_count;
423
424         lsm = *ea;
425
426         /* Can't create more stripes than we have targets (incl inactive). */
427         if (lsm && lsm->lsm_stripe_count > lov->desc.ld_tgt_count)
428                 GOTO(out_tmp, rc = -EINVAL);
429
430         /* Free the user lsm if it needs to be changed, to avoid memory leaks */
431         if (!lsm || (lsm &&
432                      lsm->lsm_stripe_count > lov->desc.ld_active_tgt_count)) {
433                 struct lov_stripe_md *lsm_new = NULL;
434                 rc = obd_alloc_memmd(conn, &lsm_new);
435                 if (rc < 0) {
436                         spin_unlock(&lov->lov_lock);
437                         if (lsm)
438                                 obd_free_memmd(conn, &lsm);
439                         GOTO(out_tmp, rc);
440                 }
441                 if (lsm) {
442                         LASSERT(lsm->lsm_magic == LOV_MAGIC);
443                         CERROR("replace user LOV MD: stripes %u > %u active\n",
444                                lsm->lsm_stripe_count,
445                                lov->desc.ld_active_tgt_count);
446                         lsm_new->lsm_stripe_offset = lsm->lsm_stripe_offset;
447                         lsm_new->lsm_stripe_size = lsm->lsm_stripe_size;
448                         lsm_new->lsm_stripe_pattern = lsm->lsm_stripe_pattern;
449                         obd_free_memmd(conn, &lsm);
450                 }
451                 lsm = lsm_new;
452                 ost_idx = 0; /* if lsm->lsm_stripe_offset is set yet */
453                 lsm->lsm_magic = LOV_MAGIC;
454         }
455
456         LASSERT(oa->o_valid & OBD_MD_FLID);
457         lsm->lsm_object_id = oa->o_id;
458         if (!lsm->lsm_stripe_size)
459                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
460
461         /* Because of 64-bit divide/mod operations only work with a 32-bit
462          * divisor in a 32-bit kernel, we cannot support a stripe width
463          * of 4GB or larger on 32-bit CPUs.
464          */
465         if (lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL) {
466                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
467                        lsm->lsm_stripe_size, lsm->lsm_stripe_count, ~0UL);
468                 spin_unlock(&lov->lov_lock);
469                 GOTO(out_free, rc = -EINVAL);
470         }
471
472         if (!ost_idx || lsm->lsm_stripe_offset >= ost_count) {
473                 int mult = lsm->lsm_object_id * lsm->lsm_stripe_count;
474                 int stripe_offset = mult % ost_count;
475                 int sub_offset = (mult / ost_count) % lsm->lsm_stripe_count;
476
477                 lsm->lsm_stripe_offset = stripe_offset + sub_offset;
478         }
479
480         /* Start with lsm_stripe_offset on an active OSC to avoid confusion */
481         while (!lov->tgts[lsm->lsm_stripe_offset].active)
482                 lsm->lsm_stripe_offset = (lsm->lsm_stripe_offset+1) % ost_count;
483
484         /* Pick the OSTs before we release the lock */
485         ost_idx = lsm->lsm_stripe_offset;
486         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
487                 CDEBUG(D_INODE, "objid "LPX64"[%d] is ost_idx %d (uuid %s)\n",
488                        lsm->lsm_object_id, i, ost_idx, lov->tgts[ost_idx].uuid);
489                 loi->loi_ost_idx = ost_idx;
490                 do {
491                         ost_idx = (ost_idx + 1) % ost_count;
492                 } while (!lov->tgts[ost_idx].active);
493         }
494
495         spin_unlock(&lov->lov_lock);
496
497         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
498                lsm->lsm_stripe_count,lsm->lsm_object_id,lsm->lsm_stripe_offset);
499
500         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
501                 struct lov_stripe_md obj_md;
502                 struct lov_stripe_md *obj_mdp = &obj_md;
503
504                 ost_idx = loi->loi_ost_idx;
505
506                 /* create data objects with "parent" OA */
507                 memcpy(tmp, oa, sizeof(*tmp));
508                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
509                 rc = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp);
510                 if (rc) {
511                         CERROR("error creating objid "LPX64" sub-object on "
512                                "OST idx %d: rc = %d\n", oa->o_id, ost_idx, rc);
513                         GOTO(out_cleanup, rc);
514                 }
515                 loi->loi_id = tmp->o_id;
516                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
517                        lsm->lsm_object_id, loi->loi_id, ost_idx);
518         }
519
520         *ea = lsm;
521
522  out_tmp:
523         obdo_free(tmp);
524         RETURN(rc);
525
526  out_cleanup:
527         while (i-- > 0) {
528                 int err;
529
530                 --loi;
531                 /* destroy already created objects here */
532                 memcpy(tmp, oa, sizeof(*tmp));
533                 tmp->o_id = loi->loi_id;
534                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
535                 if (err)
536                         CERROR("Failed to uncreate objid "LPX64" subobj "
537                                LPX64" on OST idx %d: rc = %d\n",
538                                oa->o_id, loi->loi_id, loi->loi_ost_idx,
539                                err);
540         }
541  out_free:
542         if (!*ea)
543                 obd_free_memmd(conn, &lsm);
544         goto out_tmp;
545 }
546
547 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
548                        struct lov_stripe_md *lsm)
549 {
550         struct obdo tmp;
551         struct obd_export *export = class_conn2export(conn);
552         struct lov_obd *lov;
553         struct lov_oinfo *loi;
554         struct lov_file_handles *lfh = NULL;
555         int rc = 0, i;
556         ENTRY;
557
558         if (!lsm) {
559                 CERROR("LOV requires striping ea for destruction\n");
560                 RETURN(-EINVAL);
561         }
562
563         if (lsm->lsm_magic != LOV_MAGIC) {
564                 CERROR("LOV striping magic bad %#lx != %#lx\n",
565                        lsm->lsm_magic, LOV_MAGIC);
566                 RETURN(-EINVAL);
567         }
568
569         if (!export || !export->exp_obd)
570                 RETURN(-ENODEV);
571
572         if (oa->o_valid & OBD_MD_FLHANDLE)
573                 lfh = lov_handle2lfh(obdo_handle(oa));
574
575         lov = &export->exp_obd->u.lov;
576         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
577                 int err;
578                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
579                         /* Orphan clean up will (someday) fix this up. */
580                         continue;
581                 }
582
583                 memcpy(&tmp, oa, sizeof(tmp));
584                 tmp.o_id = loi->loi_id;
585                 if (lfh)
586                         memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
587                                sizeof(lfh->lfh_handles[i]));
588                 else
589                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
590                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
591                                   NULL);
592                 if (err && lov->tgts[loi->loi_ost_idx].active) {
593                         CERROR("Error destroying objid "LPX64" subobj "
594                                LPX64" on OST idx %d\n: rc = %d",
595                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
596                         if (!rc)
597                                 rc = err;
598                 }
599         }
600         RETURN(rc);
601 }
602
603 /* compute object size given "stripeno" and the ost size */
604 static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
605                                 int stripeno)
606 {
607         unsigned long ssize  = lsm->lsm_stripe_size;
608         unsigned long swidth = ssize * lsm->lsm_stripe_count;
609         unsigned long stripe_size;
610         obd_size lov_size;
611
612         if (ost_size == 0)
613                 return 0;
614
615         /* do_div(a, b) returns a % b, and a = a / b */
616         stripe_size = do_div(ost_size, ssize);
617
618         if (stripe_size)
619                 lov_size = ost_size * swidth + stripeno * ssize + stripe_size;
620         else
621                 lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize;
622
623         return lov_size;
624 }
625
626 static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
627                             struct lov_stripe_md *lsm, int stripeno, int *new)
628 {
629         if (*new) {
630                 obdo_cpy_md(tgt, src, valid);
631                 if (valid & OBD_MD_FLSIZE)
632                         tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
633                 *new = 0;
634         } else {
635                 if (valid & OBD_MD_FLSIZE) {
636                         /* this handles sparse files properly */
637                         obd_size lov_size;
638
639                         lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
640                         if (lov_size > tgt->o_size)
641                                 tgt->o_size = lov_size;
642                 }
643                 if (valid & OBD_MD_FLBLOCKS)
644                         tgt->o_blocks += src->o_blocks;
645                 if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
646                         tgt->o_ctime = src->o_ctime;
647                 if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
648                         tgt->o_mtime = src->o_mtime;
649         }
650 }
651
652 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
653                        struct lov_stripe_md *lsm)
654 {
655         struct obdo tmp;
656         struct obd_export *export = class_conn2export(conn);
657         struct lov_obd *lov;
658         struct lov_oinfo *loi;
659         struct lov_file_handles *lfh = NULL;
660         int i;
661         int new = 1;
662         ENTRY;
663
664         if (!lsm) {
665                 CERROR("LOV requires striping ea\n");
666                 RETURN(-EINVAL);
667         }
668
669         if (lsm->lsm_magic != LOV_MAGIC) {
670                 CERROR("LOV striping magic bad %#lx != %#lx\n",
671                        lsm->lsm_magic, LOV_MAGIC);
672                 RETURN(-EINVAL);
673         }
674
675         if (!export || !export->exp_obd)
676                 RETURN(-ENODEV);
677
678         lov = &export->exp_obd->u.lov;
679
680         if (oa->o_valid & OBD_MD_FLHANDLE)
681                 lfh = lov_handle2lfh(obdo_handle(oa));
682
683         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
684                 int err;
685
686                 if (loi->loi_id == 0)
687                         continue;
688
689                 if (lov->tgts[loi->loi_ost_idx].active == 0)
690                         continue;
691
692                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
693                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
694                 /* create data objects with "parent" OA */
695                 memcpy(&tmp, oa, sizeof(tmp));
696                 tmp.o_id = loi->loi_id;
697                 if (lfh)
698                         memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
699                                sizeof(lfh->lfh_handles[i]));
700                 else
701                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
702
703                 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
704                 if (err) {
705                         if (lov->tgts[loi->loi_ost_idx].active) {
706                                 CERROR("Error getattr objid "LPX64" subobj "
707                                        LPX64" on OST idx %d: rc = %d\n",
708                                        oa->o_id, loi->loi_id, loi->loi_ost_idx,
709                                        err);
710                                 RETURN(err);
711                         }
712                 } else {
713                         lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &new);
714                 }
715         }
716
717         RETURN(0);
718 }
719
720 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
721                        struct lov_stripe_md *lsm)
722 {
723         struct obdo *tmp;
724         struct obd_export *export = class_conn2export(conn);
725         struct lov_obd *lov;
726         struct lov_oinfo *loi;
727         struct lov_file_handles *lfh = NULL;
728         int rc = 0, i;
729         ENTRY;
730
731         /* Note that this code is currently unused, hence LBUG(), just
732          * to know when/if it is ever revived that it needs cleanups.
733          */
734         LBUG();
735
736         if (!lsm) {
737                 CERROR("LOV requires striping ea\n");
738                 RETURN(-EINVAL);
739         }
740
741         if (lsm->lsm_magic != LOV_MAGIC) {
742                 CERROR("LOV striping magic bad %#lx != %#lx\n",
743                        lsm->lsm_magic, LOV_MAGIC);
744                 RETURN(-EINVAL);
745         }
746
747         if (!export || !export->exp_obd)
748                 RETURN(-ENODEV);
749
750         /* size changes should go through punch and not setattr */
751         LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
752
753         tmp = obdo_alloc();
754         if (!tmp)
755                 RETURN(-ENOMEM);
756
757         if (oa->o_valid & OBD_MD_FLHANDLE)
758                 lfh = lov_handle2lfh(obdo_handle(oa));
759
760         lov = &export->exp_obd->u.lov;
761         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
762                 int err;
763
764                 obdo_cpy_md(tmp, oa, oa->o_valid);
765
766                 if (lfh)
767                         memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
768                                 sizeof(lfh->lfh_handles[i]));
769                 else
770                         tmp->o_valid &= ~OBD_MD_FLHANDLE;
771
772                 tmp->o_id = loi->loi_id;
773
774                 err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
775                 if (err) {
776                         CERROR("Error setattr objid "LPX64" subobj "LPX64
777                                " on OST idx %d: rc = %d\n",
778                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
779                         if (!rc)
780                                 rc = err;
781                 }
782         }
783         obdo_free(tmp);
784         RETURN(rc);
785 }
786
787 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
788                     struct lov_stripe_md *lsm)
789 {
790         struct obdo *tmp; /* on the heap here, on the stack in lov_close? */
791         struct obd_export *export = class_conn2export(conn);
792         struct lov_obd *lov;
793         struct lov_oinfo *loi;
794         struct lov_file_handles *lfh = NULL;
795         struct lustre_handle *handle;
796         int new = 1;
797         int rc = 0, i;
798         ENTRY;
799
800         if (!lsm) {
801                 CERROR("LOV requires striping ea for opening\n");
802                 RETURN(-EINVAL);
803         }
804
805         if (lsm->lsm_magic != LOV_MAGIC) {
806                 CERROR("LOV striping magic bad %#lx != %#lx\n",
807                        lsm->lsm_magic, LOV_MAGIC);
808                 RETURN(-EINVAL);
809         }
810
811         if (!export || !export->exp_obd)
812                 RETURN(-ENODEV);
813
814         tmp = obdo_alloc();
815         if (!tmp)
816                 RETURN(-ENOMEM);
817
818         lfh = kmem_cache_alloc(lov_file_cache, GFP_KERNEL);
819         if (!lfh)
820                 GOTO(out_tmp, rc = -ENOMEM);
821         OBD_ALLOC(lfh->lfh_handles,
822                   lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
823         if (!lfh->lfh_handles)
824                 GOTO(out_lfh, rc = -ENOMEM);
825
826         lov = &export->exp_obd->u.lov;
827         oa->o_size = 0;
828         oa->o_blocks = 0;
829         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
830
831                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
832                         continue;
833                 }
834
835                 /* create data objects with "parent" OA */
836                 memcpy(tmp, oa, sizeof(*tmp));
837                 tmp->o_id = loi->loi_id;
838
839                 rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
840                 if (rc) {
841                         if (lov->tgts[loi->loi_ost_idx].active) {
842                                 CERROR("Error open objid "LPX64" subobj "LPX64
843                                        " on OST idx %d: rc = %d\n",
844                                        oa->o_id, lsm->lsm_oinfo[i].loi_id,
845                                        loi->loi_ost_idx, rc);
846                                 goto out_handles;
847                         }
848                         continue;
849                 }
850
851                 lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &new);
852
853                 if (tmp->o_valid & OBD_MD_FLHANDLE)
854                         memcpy(&lfh->lfh_handles[i], obdo_handle(tmp),
855                                sizeof(lfh->lfh_handles[i]));
856         }
857
858         handle = obdo_handle(oa);
859         
860         lfh->lfh_count = lsm->lsm_stripe_count;
861         get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie));
862         
863         handle->addr = (__u64)(unsigned long)lfh;
864         handle->cookie = lfh->lfh_cookie;
865         oa->o_valid |= OBD_MD_FLHANDLE;
866         list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
867
868 out_tmp:
869         obdo_free(tmp);
870         RETURN(rc);
871
872 out_handles:
873         for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) {
874                 int err;
875
876                 if (lov->tgts[loi->loi_ost_idx].active == 0)
877                         continue;
878
879                 memcpy(tmp, oa, sizeof(*tmp));
880                 tmp->o_id = loi->loi_id;
881                 memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
882                        sizeof(lfh->lfh_handles[i]));
883
884                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
885                 if (err) {
886                         CERROR("Error closing objid "LPX64" subobj "LPX64
887                                " on OST idx %d after open error: rc = %d\n",
888                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
889                 }
890         }
891        
892         OBD_FREE(lfh->lfh_handles,
893                  lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
894 out_lfh:
895         lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
896         kmem_cache_free(lov_file_cache, lfh);
897         goto out_tmp;
898 }
899
900 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
901                      struct lov_stripe_md *lsm)
902 {
903         struct obdo tmp;
904         struct obd_export *export = class_conn2export(conn);
905         struct lov_obd *lov;
906         struct lov_oinfo *loi;
907         struct lov_file_handles *lfh = NULL;
908         int rc = 0, i;
909         ENTRY;
910
911         if (!lsm) {
912                 CERROR("LOV requires striping ea\n");
913                 RETURN(-EINVAL);
914         }
915
916         if (lsm->lsm_magic != LOV_MAGIC) {
917                 CERROR("LOV striping magic bad %#lx != %#lx\n",
918                        lsm->lsm_magic, LOV_MAGIC);
919                 RETURN(-EINVAL);
920         }
921
922         if (!export || !export->exp_obd)
923                 RETURN(-ENODEV);
924
925         if (oa->o_valid & OBD_MD_FLHANDLE)
926                 lfh = lov_handle2lfh(obdo_handle(oa));
927
928         lov = &export->exp_obd->u.lov;
929         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
930                 int err;
931                 
932                 if (lov->tgts[loi->loi_ost_idx].active == 0)
933                         continue;
934
935                 /* create data objects with "parent" OA */
936                 memcpy(&tmp, oa, sizeof(tmp));
937                 tmp.o_id = loi->loi_id;
938                 if (lfh)
939                         memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
940                                sizeof(lfh->lfh_handles[i]));
941                 else
942                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
943
944                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
945                 if (err) {
946                         CERROR("Error close objid "LPX64" subobj "LPX64
947                                " on OST idx %d: rc = %d\n",
948                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
949                         if (!rc)
950                                 rc = err;
951                 }
952         }
953         if (lfh) {
954                 list_del(&lfh->lfh_list);
955                 OBD_FREE(lfh->lfh_handles,
956                          lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
957                 lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
958                 kmem_cache_free(lov_file_cache, lfh);
959         }
960
961         RETURN(rc);
962 }
963
964 #ifndef log2
965 #define log2(n) ffz(~(n))
966 #endif
967
968 #warning FIXME: merge these two functions now that they are nearly the same
969
970 /* compute ost offset in stripe "stripeno" corresponding to offset "lov_off" */
971 static obd_off lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
972                                  int stripeno)
973 {
974         unsigned long ssize  = lsm->lsm_stripe_size;
975         unsigned long swidth = ssize * lsm->lsm_stripe_count;
976         unsigned long stripe_off, this_stripe;
977
978         if (lov_off == OBD_OBJECT_EOF || lov_off == 0)
979                 return lov_off;
980
981         /* do_div(a, b) returns a % b, and a = a / b */
982         stripe_off = do_div(lov_off, swidth);
983
984         this_stripe = stripeno * ssize;
985         if (stripe_off <= this_stripe)
986                 stripe_off = 0;
987         else {
988                 stripe_off -= this_stripe;
989
990                 if (stripe_off > ssize)
991                         stripe_off = ssize;
992         }
993
994
995         return lov_off * ssize + stripe_off;
996 }
997
998 /* compute which stripe number "lov_off" will be written into */
999 static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
1000 {
1001         unsigned long ssize  = lsm->lsm_stripe_size;
1002         unsigned long swidth = ssize * lsm->lsm_stripe_count;
1003         unsigned long stripe_off;
1004
1005         stripe_off = do_div(lov_off, swidth);
1006
1007         return stripe_off / ssize;
1008 }
1009
1010
1011 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1012  * we can send this 'punch' to just the authoritative node and the nodes
1013  * that the punch will affect. */
1014 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
1015                      struct lov_stripe_md *lsm,
1016                      obd_off start, obd_off end)
1017 {
1018         struct obdo tmp;
1019         struct obd_export *export = class_conn2export(conn);
1020         struct lov_obd *lov;
1021         struct lov_oinfo *loi;
1022         struct lov_file_handles *lfh = NULL;
1023         int rc = 0, i;
1024         ENTRY;
1025
1026         if (!lsm) {
1027                 CERROR("LOV requires striping ea\n");
1028                 RETURN(-EINVAL);
1029         }
1030
1031         if (lsm->lsm_magic != LOV_MAGIC) {
1032                 CERROR("LOV striping magic bad %#lx != %#lx\n",
1033                        lsm->lsm_magic, LOV_MAGIC);
1034                 RETURN(-EINVAL);
1035         }
1036
1037         if (!export || !export->exp_obd)
1038                 RETURN(-ENODEV);
1039
1040         if (oa->o_valid & OBD_MD_FLHANDLE)
1041                 lfh = lov_handle2lfh(obdo_handle(oa));
1042
1043         lov = &export->exp_obd->u.lov;
1044         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1045                 obd_off starti = lov_stripe_offset(lsm, start, i);
1046                 obd_off endi = lov_stripe_offset(lsm, end, i);
1047                 int err;
1048
1049                 if (starti == endi)
1050                         continue;
1051                 /* create data objects with "parent" OA */
1052                 memcpy(&tmp, oa, sizeof(tmp));
1053                 tmp.o_id = loi->loi_id;
1054                 if (lfh)
1055                         memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
1056                                sizeof(lfh->lfh_handles[i]));
1057                 else
1058                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
1059
1060                 err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL,
1061                                 starti, endi);
1062                 if (err) {
1063                         CERROR("Error punch objid "LPX64" subobj "LPX64
1064                                " on OST idx %d: rc = %d\n",
1065                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
1066                         if (!rc)
1067                                 rc = err;
1068                 }
1069         }
1070         RETURN(rc);
1071 }
1072
1073 static inline int lov_brw(int cmd, struct lustre_handle *conn,
1074                           struct lov_stripe_md *lsm, obd_count oa_bufs,
1075                           struct brw_page *pga, struct obd_brw_set *set)
1076 {
1077         struct {
1078                 int bufct;
1079                 int index;
1080                 int subcount;
1081                 struct lov_stripe_md lsm;
1082                 int ost_idx;
1083         } *stripeinfo, *si, *si_last;
1084         struct obd_export *export = class_conn2export(conn);
1085         struct lov_obd *lov;
1086         struct brw_page *ioarr;
1087         struct lov_oinfo *loi;
1088         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
1089         ENTRY;
1090
1091         if (!lsm) {
1092                 CERROR("LOV requires striping ea\n");
1093                 RETURN(-EINVAL);
1094         }
1095
1096         if (lsm->lsm_magic != LOV_MAGIC) {
1097                 CERROR("LOV striping magic bad %#lx != %#lx\n",
1098                        lsm->lsm_magic, LOV_MAGIC);
1099                 RETURN(-EINVAL);
1100         }
1101
1102         lov = &export->exp_obd->u.lov;
1103
1104         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1105         if (!stripeinfo)
1106                 GOTO(out_cbdata, rc = -ENOMEM);
1107
1108         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1109         if (!where)
1110                 GOTO(out_sinfo, rc = -ENOMEM);
1111
1112         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1113         if (!ioarr)
1114                 GOTO(out_where, rc = -ENOMEM);
1115
1116         for (i = 0; i < oa_bufs; i++) {
1117                 where[i] = lov_stripe_number(lsm, pga[i].off);
1118                 stripeinfo[where[i]].bufct++;
1119         }
1120
1121         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1122              i < stripe_count; i++, loi++, si_last = si, si++) {
1123                 if (i > 0)
1124                         si->index = si_last->index + si_last->bufct;
1125                 si->lsm.lsm_object_id = loi->loi_id;
1126                 si->ost_idx = loi->loi_ost_idx;
1127         }
1128
1129         for (i = 0; i < oa_bufs; i++) {
1130                 int which = where[i];
1131                 int shift;
1132
1133                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1134                 LASSERT(shift < oa_bufs);
1135                 ioarr[shift] = pga[i];
1136                 ioarr[shift].off = lov_stripe_offset(lsm, pga[i].off, which);
1137                 stripeinfo[which].subcount++;
1138         }
1139
1140         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1141                 int shift = si->index;
1142
1143                 if (si->bufct) {
1144                         LASSERT(shift < oa_bufs);
1145                         rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
1146                                      &si->lsm, si->bufct, &ioarr[shift], set);
1147                         if (rc)
1148                                 GOTO(out_ioarr, rc);
1149                 }
1150         }
1151
1152  out_ioarr:
1153         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1154  out_where:
1155         OBD_FREE(where, sizeof(*where) * oa_bufs);
1156  out_sinfo:
1157         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1158  out_cbdata:
1159         RETURN(rc);
1160 }
1161
1162 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1163                        struct lustre_handle *parent_lock,
1164                        __u32 type, void *cookie, int cookielen, __u32 mode,
1165                        int *flags, void *cb, void *data, int datalen,
1166                        struct lustre_handle *lockhs)
1167 {
1168         struct obd_export *export = class_conn2export(conn);
1169         struct lov_obd *lov;
1170         struct lov_oinfo *loi;
1171         struct lov_stripe_md submd;
1172         int rc = 0, i;
1173         ENTRY;
1174
1175         if (!lsm) {
1176                 CERROR("LOV requires striping ea\n");
1177                 RETURN(-EINVAL);
1178         }
1179
1180         if (lsm->lsm_magic != LOV_MAGIC) {
1181                 CERROR("LOV striping magic bad %#lx != %#lx\n",
1182                        lsm->lsm_magic, LOV_MAGIC);
1183                 RETURN(-EINVAL);
1184         }
1185
1186         /* we should never be asked to replay a lock. */
1187
1188         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1189
1190         if (!export || !export->exp_obd)
1191                 RETURN(-ENODEV);
1192
1193         memset(lockhs, 0, sizeof(*lockhs) * lsm->lsm_stripe_count);
1194
1195         lov = &export->exp_obd->u.lov;
1196         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1197                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
1198                 struct ldlm_extent sub_ext;
1199
1200                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1201                         continue;
1202
1203                 *flags = 0;
1204                 sub_ext.start = lov_stripe_offset(lsm, extent->start, i);
1205                 sub_ext.end = lov_stripe_offset(lsm, extent->end, i);
1206                 if (sub_ext.start == sub_ext.end)
1207                         continue;
1208
1209                 submd.lsm_object_id = loi->loi_id;
1210                 /* XXX submd should be that from the subobj, it should come
1211                  *     opaquely from the LOV.
1212                  */
1213                 submd.lsm_stripe_count = 0;
1214                 /* XXX submd is not fully initialized here */
1215                 *flags = 0;
1216                 rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
1217                                  parent_lock, type, &sub_ext, sizeof(sub_ext),
1218                                  mode, flags, cb, data, datalen, &(lockhs[i]));
1219                 // XXX add a lock debug statement here
1220                 if (rc && lov->tgts[loi->loi_ost_idx].active) {
1221                         CERROR("Error enqueue objid "LPX64" subobj "LPX64
1222                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1223                                loi->loi_id, loi->loi_ost_idx, rc);
1224                         goto out_locks;
1225                 }
1226         }
1227
1228         RETURN(0);
1229
1230  out_locks:
1231         for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) {
1232                 int err;
1233                 
1234                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1235                         continue;
1236
1237                 submd.lsm_object_id = loi->loi_id;
1238                 submd.lsm_stripe_count = 0;
1239                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1240                                  mode, &lockhs[i]);
1241                 if (err) {
1242                         CERROR("Error cancelling objid "LPX64" subobj "LPX64
1243                                " on OST idx %d after enqueue error: rc = %d\n",
1244                                loi->loi_id, loi->loi_ost_idx, err);
1245                 }
1246         }
1247         RETURN(rc);
1248 }
1249
1250 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1251                       __u32 mode, struct lustre_handle *lockhs)
1252 {
1253         struct obd_export *export = class_conn2export(conn);
1254         struct lov_obd *lov;
1255         struct lov_oinfo *loi;
1256         int rc = 0, i;
1257         ENTRY;
1258
1259         if (!lsm) {
1260                 CERROR("LOV requires striping ea\n");
1261                 RETURN(-EINVAL);
1262         }
1263
1264         if (lsm->lsm_magic != LOV_MAGIC) {
1265                 CERROR("LOV striping magic bad %#lx != %#lx\n",
1266                        lsm->lsm_magic, LOV_MAGIC);
1267                 RETURN(-EINVAL);
1268         }
1269
1270         if (!export || !export->exp_obd)
1271                 RETURN(-ENODEV);
1272
1273         lov = &export->exp_obd->u.lov;
1274         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1275                 struct lov_stripe_md submd;
1276                 int err;
1277
1278                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1279                         continue;
1280
1281                 if (lockhs[i].addr == 0)
1282                         continue;
1283
1284                 submd.lsm_object_id = loi->loi_id;
1285                 submd.lsm_stripe_count = 0;
1286                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1287                                 mode, &lockhs[i]);
1288                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1289                         CERROR("Error cancel objid "LPX64" subobj "LPX64
1290                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1291                                loi->loi_id, loi->loi_ost_idx, err);
1292                         if (!rc)
1293                                 rc = err;
1294                 }
1295         }
1296         RETURN(rc);
1297 }
1298
1299 static int lov_cancel_unused(struct lustre_handle *conn,
1300                              struct lov_stripe_md *lsm, int flags)
1301 {
1302         struct obd_export *export = class_conn2export(conn);
1303         struct lov_obd *lov;
1304         struct lov_oinfo *loi;
1305         int rc = 0, i, err;
1306         ENTRY;
1307
1308         if (!lsm) {
1309                 CERROR("LOV requires striping ea for lock cancellation\n");
1310                 RETURN(-EINVAL);
1311         }
1312
1313         if (!export || !export->exp_obd)
1314                 RETURN(-ENODEV);
1315
1316         lov = &export->exp_obd->u.lov;
1317         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1318                 struct lov_stripe_md submd;
1319
1320                 submd.lsm_object_id = loi->loi_id;
1321                 submd.lsm_stripe_count = 0;
1322                 err = obd_cancel_unused(&lov->tgts[loi->loi_ost_idx].conn,
1323                                        &submd, flags);
1324                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1325                         CERROR("Error cancel unused objid "LPX64" subobj "LPX64
1326                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1327                                loi->loi_id, loi->loi_ost_idx, err);
1328                         if (!rc)
1329                                 rc = err;
1330                 }
1331         }
1332
1333         RETURN(rc);
1334 }
1335
1336 static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
1337 {
1338         struct obd_export *export = class_conn2export(conn);
1339         struct lov_obd *lov;
1340         struct obd_statfs lov_sfs;
1341         int set = 0;
1342         int rc = 0;
1343         int i;
1344         ENTRY;
1345
1346         if (!export || !export->exp_obd)
1347                 RETURN(-ENODEV);
1348
1349         lov = &export->exp_obd->u.lov;
1350
1351         /* We only get block data from the OBD */
1352         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1353                 int err;
1354
1355                 if (!lov->tgts[i].active)
1356                         continue;
1357
1358                 err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
1359                 if (err) {
1360                         CERROR("Error statfs OSC %s idx %d: err = %d\n",
1361                                lov->tgts[i].uuid, i, err);
1362                         if (!rc)
1363                                 rc = err;
1364                         continue; /* XXX or break? - probably OK to continue */
1365                 }
1366                 if (!set) {
1367                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
1368                         set = 1;
1369                 } else {
1370                         osfs->os_bfree += lov_sfs.os_bfree;
1371                         osfs->os_bavail += lov_sfs.os_bavail;
1372                         osfs->os_blocks += lov_sfs.os_blocks;
1373                         /* XXX not sure about this one - depends on policy.
1374                          *   - could be minimum if we always stripe on all OBDs
1375                          *     (but that would be wrong for any other policy,
1376                          *     if one of the OBDs has no more objects left)
1377                          *   - could be sum if we stripe whole objects
1378                          *   - could be average, just to give a nice number
1379                          *   - we just pick first OST and hope it is enough
1380                         sfs->f_ffree += lov_sfs.f_ffree;
1381                          */
1382                 }
1383         }
1384         RETURN(rc);
1385 }
1386
1387 static int lov_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
1388                          void *karg, void *uarg)
1389 {
1390         struct obd_device *obddev = class_conn2obd(conn);
1391         struct lov_obd *lov = &obddev->u.lov;
1392         struct obd_ioctl_data *data = karg;
1393         int i, count = lov->desc.ld_tgt_count;
1394         int rc;
1395
1396         ENTRY;
1397
1398         switch (cmd) {
1399         case IOC_LOV_SET_OSC_ACTIVE: {
1400                 rc = lov_set_osc_active(lov,data->ioc_inlbuf1,data->ioc_offset);
1401                 break;
1402         }
1403         case OBD_IOC_LOV_GET_CONFIG: {
1404                 struct lov_tgt_desc *tgtdesc;
1405                 struct lov_desc *desc;
1406                 obd_uuid_t *uuidp;
1407                 char *buf = NULL;
1408
1409                 buf = NULL;
1410                 len = 0;
1411                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1412                         RETURN(-EINVAL);
1413
1414                 data = (struct obd_ioctl_data *)buf;
1415
1416                 if (sizeof(*desc) > data->ioc_inllen1) {
1417                         OBD_FREE(buf, len);
1418                         RETURN(-EINVAL);
1419                 }
1420
1421                 if (sizeof(*uuidp) * count > data->ioc_inllen2) {
1422                         OBD_FREE(buf, len);
1423                         RETURN(-EINVAL);
1424                 }
1425
1426                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1427                 uuidp = (obd_uuid_t *)data->ioc_inlbuf2;
1428                 memcpy(desc, &(lov->desc), sizeof(*desc));
1429
1430                 tgtdesc = lov->tgts;
1431                 for (i = 0; i < count; i++, uuidp++, tgtdesc++)
1432                         memcpy(uuidp, tgtdesc->uuid, sizeof(*uuidp));
1433
1434                 rc = copy_to_user((void *)uarg, buf, len);
1435                 if (rc)
1436                         rc = -EFAULT;
1437                 OBD_FREE(buf, len);
1438                 break;
1439         }
1440         default:
1441                 if (count == 0)
1442                         RETURN(-ENOTTY);
1443                 rc = 0;
1444                 for (i = 0; i < count; i++) {
1445                         int err = obd_iocontrol(cmd, &lov->tgts[i].conn,
1446                                                 len, karg, uarg);
1447                         if (err && !rc)
1448                                 rc = err;
1449                 }
1450         }
1451
1452         RETURN(rc);
1453 }
1454
1455 struct obd_ops lov_obd_ops = {
1456         o_attach:      lov_attach,
1457         o_detach:      lov_detach,
1458         o_setup:       lov_setup,
1459         o_connect:     lov_connect,
1460         o_disconnect:  lov_disconnect,
1461         o_statfs:      lov_statfs,
1462         o_packmd:      lov_packmd,
1463         o_unpackmd:    lov_unpackmd,
1464         o_create:      lov_create,
1465         o_destroy:     lov_destroy,
1466         o_getattr:     lov_getattr,
1467         o_setattr:     lov_setattr,
1468         o_open:        lov_open,
1469         o_close:       lov_close,
1470         o_brw:         lov_brw,
1471         o_punch:       lov_punch,
1472         o_enqueue:     lov_enqueue,
1473         o_cancel:      lov_cancel,
1474         o_cancel_unused: lov_cancel_unused,
1475         o_iocontrol:   lov_iocontrol
1476 };
1477
1478
1479 #define LOV_VERSION "v0.1"
1480
1481 static int __init lov_init(void)
1482 {
1483         int rc;
1484         printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
1485                ", info@clusterfs.com\n");
1486         lov_file_cache = kmem_cache_create("ll_lov_file_data",
1487                                            sizeof(struct lov_file_handles),
1488                                            0, 0, NULL, NULL);
1489         if (!lov_file_cache)
1490                 RETURN(-ENOMEM);
1491
1492         rc = class_register_type(&lov_obd_ops, status_class_var,
1493                                  OBD_LOV_DEVICENAME);
1494         RETURN(rc);
1495 }
1496
1497 static void __exit lov_exit(void)
1498 {
1499         if (kmem_cache_destroy(lov_file_cache))
1500                 CERROR("couldn't free LOV open cache\n");
1501         class_unregister_type(OBD_LOV_DEVICENAME);
1502 }
1503
1504 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1505 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver " LOV_VERSION);
1506 MODULE_LICENSE("GPL");
1507
1508 module_init(lov_init);
1509 module_exit(lov_exit);