Whamcloud - gitweb
Landing b_bug974 onto HEAD (20040213_1538).
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  * Author: Phil Schwan <phil@clusterfs.com>
6  *         Peter Braam <braam@clusterfs.com>
7  *         Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29 #ifdef __KERNEL__
30 #include <linux/slab.h>
31 #include <linux/module.h>
32 #include <linux/init.h>
33 #include <linux/slab.h>
34 #include <linux/pagemap.h>
35 #include <linux/seq_file.h>
36 #include <asm/div64.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/obd_support.h>
42 #include <linux/lustre_lib.h>
43 #include <linux/lustre_net.h>
44 #include <linux/lustre_idl.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/lustre_mds.h>
47 #include <linux/obd_class.h>
48 #include <linux/obd_lov.h>
49 #include <linux/obd_ost.h>
50 #include <linux/lprocfs_status.h>
51
52 #include "lov_internal.h"
53
54 static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
55                              int stripeno, obd_off *obd_off);
56
57 /* obd methods */
58 int lov_attach(struct obd_device *dev, obd_count len, void *data)
59 {
60         struct lprocfs_static_vars lvars;
61         int rc;
62
63         lprocfs_init_vars(lov, &lvars);
64         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
65         if (rc == 0) {
66 #ifdef __KERNEL__
67                 struct proc_dir_entry *entry;
68
69                 entry = create_proc_entry("target_obd", 0444, 
70                                           dev->obd_proc_entry);
71                 if (entry == NULL) {
72                         rc = -ENOMEM;
73                 } else {
74                         entry->proc_fops = &lov_proc_target_fops;
75                         entry->data = dev;
76                 }
77 #endif
78         }
79         return rc;
80 }
81
82 int lov_detach(struct obd_device *dev)
83 {
84         return lprocfs_obd_detach(dev);
85 }
86
87 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
88                        struct obd_uuid *cluuid)
89 {
90         struct ptlrpc_request *req = NULL;
91         struct lov_obd *lov = &obd->u.lov;
92         struct lov_desc *desc = &lov->desc;
93         struct lov_tgt_desc *tgts;
94         struct obd_export *exp;
95         int rc, rc2, i;
96         ENTRY;
97
98         rc = class_connect(conn, obd, cluuid);
99         if (rc)
100                 RETURN(rc);
101
102         exp = class_conn2export(conn);
103
104         /* We don't want to actually do the underlying connections more than
105          * once, so keep track. */
106         lov->refcount++;
107         if (lov->refcount > 1) {
108                 class_export_put(exp);
109                 RETURN(0);
110         }
111
112         for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) {
113                 struct obd_uuid *tgt_uuid = &tgts->uuid;
114                 struct obd_device *tgt_obd;
115                 struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
116                 struct lustre_handle conn = {0, };
117
118                 LASSERT( tgt_uuid != NULL);
119
120                 tgt_obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME, 
121                                                 &obd->obd_uuid);
122
123                 if (!tgt_obd) {
124                         CERROR("Target %s not attached\n", tgt_uuid->uuid);
125                         GOTO(out_disc, rc = -EINVAL);
126                 }
127
128                 if (!tgt_obd->obd_set_up) {
129                         CERROR("Target %s not set up\n", tgt_uuid->uuid);
130                         GOTO(out_disc, rc = -EINVAL);
131                 }
132
133                 if (tgt_obd->u.cli.cl_import->imp_invalid) {
134                         CERROR("not connecting OSC %s; administratively "
135                                "disabled\n", tgt_uuid->uuid);
136                         rc = obd_register_observer(tgt_obd, obd);
137                         if (rc) {
138                                 CERROR("Target %s register_observer error %d; "
139                                        "will not be able to reactivate\n",
140                                        tgt_uuid->uuid, rc);
141                         }
142                         continue;
143                 }
144
145                 rc = obd_connect(&conn, tgt_obd, &lov_osc_uuid);
146                 if (rc) {
147                         CERROR("Target %s connect error %d\n", tgt_uuid->uuid,
148                                rc);
149                         GOTO(out_disc, rc);
150                 }
151                 tgts->ltd_exp = class_conn2export(&conn);
152
153                 rc = obd_register_observer(tgt_obd, obd);
154                 if (rc) {
155                         CERROR("Target %s register_observer error %d\n",
156                                tgt_uuid->uuid, rc);
157                         obd_disconnect(tgts->ltd_exp, 0);
158                         GOTO(out_disc, rc);
159                 }
160
161                 desc->ld_active_tgt_count++;
162                 tgts->active = 1;
163         }
164
165         ptlrpc_req_finished(req);
166         class_export_put(exp);
167         RETURN (0);
168
169  out_disc:
170         while (i-- > 0) {
171                 struct obd_uuid uuid;
172                 --tgts;
173                 --desc->ld_active_tgt_count;
174                 tgts->active = 0;
175                 /* save for CERROR below; (we know it's terminated) */
176                 uuid = tgts->uuid;
177                 rc2 = obd_disconnect(tgts->ltd_exp, 0);
178                 if (rc2)
179                         CERROR("error: LOV target %s disconnect on OST idx %d: "
180                                "rc = %d\n", uuid.uuid, i, rc2);
181         }
182         class_disconnect(exp, 0);
183         RETURN (rc);
184 }
185
186 static int lov_disconnect(struct obd_export *exp, int flags)
187 {
188         struct obd_device *obd = class_exp2obd(exp);
189         struct lov_obd *lov = &obd->u.lov;
190         int rc, i;
191         ENTRY;
192
193         if (!lov->tgts)
194                 goto out_local;
195
196         /* Only disconnect the underlying layers on the final disconnect. */
197         lov->refcount--;
198         if (lov->refcount != 0)
199                 goto out_local;
200
201         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
202                 if (lov->tgts[i].ltd_exp == NULL)
203                         continue;
204
205                 if (obd->obd_no_recov) {
206                         /* Pass it on to our clients.
207                          * XXX This should be an argument to disconnect,
208                          * XXX not a back-door flag on the OBD.  Ah well.
209                          */
210                         struct obd_device *osc_obd;
211                         osc_obd = class_exp2obd(lov->tgts[i].ltd_exp);
212                         if (osc_obd)
213                                 osc_obd->obd_no_recov = 1;
214                 }
215
216                 obd_register_observer(lov->tgts[i].ltd_exp->exp_obd, NULL);
217
218                 rc = obd_disconnect(lov->tgts[i].ltd_exp, flags);
219                 if (rc) {
220                         if (lov->tgts[i].active) {
221                                 CERROR("Target %s disconnect error %d\n",
222                                        lov->tgts[i].uuid.uuid, rc);
223                         }
224                         rc = 0;
225                 }
226                 if (lov->tgts[i].active) {
227                         lov->desc.ld_active_tgt_count--;
228                         lov->tgts[i].active = 0;
229                 }
230                 lov->tgts[i].ltd_exp = NULL;
231         }
232
233  out_local:
234         rc = class_disconnect(exp, 0);
235         RETURN(rc);
236 }
237
238 /* Error codes:
239  *
240  *  -EINVAL  : UUID can't be found in the LOV's target list
241  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
242  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
243  */
244 static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
245                               int activate)
246 {
247         struct obd_device *obd;
248         struct lov_tgt_desc *tgt;
249         int i, rc = 0;
250         ENTRY;
251
252         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
253                lov, uuid->uuid, activate);
254
255         spin_lock(&lov->lov_lock);
256         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
257                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
258                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
259                 if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof uuid->uuid) == 0)
260                         break;
261         }
262
263         if (i == lov->desc.ld_tgt_count)
264                 GOTO(out, rc = -EINVAL);
265
266         obd = class_exp2obd(tgt->ltd_exp);
267         if (obd == NULL) {
268                 /* This can happen if OST failure races with node shutdown */
269                 GOTO(out, rc = -ENOTCONN);
270         }
271
272         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LOV idx %d\n",
273                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
274                obd->obd_type->typ_name, i);
275         LASSERT(strcmp(obd->obd_type->typ_name, "osc") == 0);
276
277         if (tgt->active == activate) {
278                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
279                        activate ? "" : "in");
280                 GOTO(out, rc);
281         }
282
283         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
284
285         tgt->active = activate;
286         if (activate)
287                 lov->desc.ld_active_tgt_count++;
288         else
289                 lov->desc.ld_active_tgt_count--;
290
291         EXIT;
292  out:
293         spin_unlock(&lov->lov_lock);
294         return rc;
295 }
296
297 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
298                        int active)
299 {
300         int rc;
301         struct obd_uuid *uuid;
302
303         if (strcmp(watched->obd_type->typ_name, "osc")) {
304                 CERROR("unexpected notification of %s %s!\n",
305                        watched->obd_type->typ_name,
306                        watched->obd_name);
307                 return -EINVAL;
308         }
309         uuid = &watched->u.cli.cl_import->imp_target_uuid;
310
311         /*
312          * Must notify (MDS) before we mark the OSC as active, so that
313          * the orphan deletion happens without interference from racing
314          * creates.
315          */
316         if (obd->obd_observer) {
317                 /* Pass the notification up the chain. */
318                 rc = obd_notify(obd->obd_observer, watched, active);
319                 if (rc)
320                         RETURN(rc);
321         }
322
323         rc = lov_set_osc_active(&obd->u.lov, uuid, active);
324
325         if (rc) {
326                 CERROR("%sactivation of %s failed: %d\n",
327                        active ? "" : "de", uuid->uuid, rc);
328         }
329         RETURN(rc);
330 }
331
332 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
333 {
334         struct lustre_cfg *lcfg = buf;
335         struct lov_desc *desc;
336         struct lov_obd *lov = &obd->u.lov;
337         struct obd_uuid *uuids;
338         struct lov_tgt_desc *tgts;
339         int i;
340         int count;
341         int rc = 0;
342         ENTRY;
343
344         if (lcfg->lcfg_inllen1 < 1) {
345                 CERROR("LOV setup requires a descriptor\n");
346                 RETURN(-EINVAL);
347         }
348
349         if (lcfg->lcfg_inllen2 < 1) {
350                 CERROR("LOV setup requires an OST UUID list\n");
351                 RETURN(-EINVAL);
352         }
353
354         desc = (struct lov_desc *)lcfg->lcfg_inlbuf1;
355         if (sizeof(*desc) > lcfg->lcfg_inllen1) {
356                 CERROR("descriptor size wrong: %d > %d\n",
357                        (int)sizeof(*desc), lcfg->lcfg_inllen1);
358                 RETURN(-EINVAL);
359         }
360
361         count = desc->ld_tgt_count;
362         uuids = (struct obd_uuid *)lcfg->lcfg_inlbuf2;
363         if (sizeof(*uuids) * count != lcfg->lcfg_inllen2) {
364                 CERROR("UUID array size wrong: %u * %u != %u\n",
365                        (int)sizeof(*uuids), count, lcfg->lcfg_inllen2);
366                 RETURN(-EINVAL);
367         }
368
369         /* Because of 64-bit divide/mod operations only work with a 32-bit
370          * divisor in a 32-bit kernel, we cannot support a stripe width
371          * of 4GB or larger on 32-bit CPUs.
372          */
373         if ((desc->ld_default_stripe_count ?
374              desc->ld_default_stripe_count : desc->ld_tgt_count) *
375              desc->ld_default_stripe_size > ~0UL) {
376                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
377                        desc->ld_default_stripe_size,
378                        desc->ld_default_stripe_count ?
379                        desc->ld_default_stripe_count : desc->ld_tgt_count,~0UL);
380                 RETURN(-EINVAL);
381         }
382
383         lov->bufsize = sizeof(struct lov_tgt_desc) * count;
384         OBD_ALLOC(lov->tgts, lov->bufsize);
385         if (lov->tgts == NULL) {
386                 CERROR("Out of memory\n");
387                 RETURN(-EINVAL);
388         }
389
390         lov->desc = *desc;
391         spin_lock_init(&lov->lov_lock);
392
393         for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) {
394                 struct obd_uuid *uuid = &tgts->uuid;
395
396                 /* NULL termination already checked */
397                 *uuid = uuids[i];
398         }
399
400
401         RETURN(rc);
402 }
403
404 static int lov_cleanup(struct obd_device *obd, int flags) 
405 {
406         struct lov_obd *lov = &obd->u.lov;
407
408         OBD_FREE(lov->tgts, lov->bufsize);
409         RETURN(0);
410 }
411
412
413 /* compute object size given "stripeno" and the ost size */
414 static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
415                                 int stripeno)
416 {
417         unsigned long ssize  = lsm->lsm_stripe_size;
418         unsigned long swidth = ssize * lsm->lsm_stripe_count;
419         unsigned long stripe_size;
420         obd_size lov_size;
421
422         if (ost_size == 0)
423                 return 0;
424
425         /* do_div(a, b) returns a % b, and a = a / b */
426         stripe_size = do_div(ost_size, ssize);
427
428         if (stripe_size)
429                 lov_size = ost_size * swidth + stripeno * ssize + stripe_size;
430         else
431                 lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize;
432
433         return lov_size;
434 }
435
436 static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
437                             struct lov_stripe_md *lsm, int stripeno, int *set)
438 {
439         valid &= src->o_valid;
440
441         if (*set) {
442                 if (valid & OBD_MD_FLSIZE) {
443                         /* this handles sparse files properly */
444                         obd_size lov_size;
445
446                         lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
447                         if (lov_size > tgt->o_size)
448                                 tgt->o_size = lov_size;
449                 }
450                 if (valid & OBD_MD_FLBLOCKS)
451                         tgt->o_blocks += src->o_blocks;
452                 if (valid & OBD_MD_FLBLKSZ)
453                         tgt->o_blksize += src->o_blksize;
454                 if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
455                         tgt->o_ctime = src->o_ctime;
456                 if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
457                         tgt->o_mtime = src->o_mtime;
458         } else {
459                 memcpy(tgt, src, sizeof(*tgt));
460                 tgt->o_id = lsm->lsm_object_id;
461                 if (valid & OBD_MD_FLSIZE)
462                         tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
463                 *set = 1;
464         }
465 }
466
467 #ifndef log2
468 #define log2(n) ffz(~(n))
469 #endif
470
471 static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
472                              struct lov_stripe_md **ea,
473                              struct obd_trans_info *oti)
474 {
475         struct lov_obd *lov;
476         struct obdo *tmp_oa;
477         struct obd_uuid *ost_uuid = NULL;
478         int rc = 0, i;
479         ENTRY;
480
481         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
482                 src_oa->o_flags == OBD_FL_DELORPHAN);
483
484         lov = &export->exp_obd->u.lov;
485
486         tmp_oa = obdo_alloc();
487         if (tmp_oa == NULL)
488                 RETURN(-ENOMEM);
489
490         if (src_oa->o_valid & OBD_MD_FLINLINE) {
491                 ost_uuid = (struct obd_uuid *)src_oa->o_inline;
492                 CDEBUG(D_HA, "clearing orphans only for %s\n",
493                        ost_uuid->uuid);
494         }
495
496         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
497                 struct lov_stripe_md obj_md;
498                 struct lov_stripe_md *obj_mdp = &obj_md;
499                 int err;
500
501                 /* if called for a specific target, we don't 
502                    care if it is not active. */
503                 if (lov->tgts[i].active == 0 && ost_uuid == NULL) {
504                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
505                         continue;
506                 }
507
508                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &lov->tgts[i].uuid))
509                         continue;
510                 
511                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
512
513                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
514                 err = obd_create(lov->tgts[i].ltd_exp, tmp_oa, &obj_mdp, oti);
515                 if (err) {
516                         CERROR("error in orphan recovery on OST idx %d/%d: "
517                                "rc = %d\n", i, lov->desc.ld_tgt_count, err);
518                         if (!rc)
519                                 rc = err;
520                 }
521
522                 if (ost_uuid)
523                         break;
524         }
525         obdo_free(tmp_oa);
526         RETURN(rc);
527 }
528
529 #define LOV_CREATE_RESEED_INTERVAL 1000
530
531 /* the LOV expects oa->o_id to be set to the LOV object id */
532 static int lov_create(struct obd_export *exp, struct obdo *src_oa,
533                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
534 {
535         static int ost_start_idx, ost_start_count;
536         struct lov_obd *lov;
537         struct lov_stripe_md *lsm;
538         struct lov_oinfo *loi = NULL;
539         struct obdo *tmp_oa, *ret_oa;
540         struct llog_cookie *cookies = NULL;
541         unsigned ost_count, ost_idx;
542         int set = 0, obj_alloc = 0, cookie_sent = 0, rc = 0, i;
543         ENTRY;
544
545         LASSERT(ea != NULL);
546
547         if ((src_oa->o_valid & OBD_MD_FLFLAGS) && 
548             src_oa->o_flags == OBD_FL_DELORPHAN) {
549                 rc = lov_clear_orphans(exp, src_oa, ea, oti);
550                 RETURN(rc);
551         }
552
553         if (exp == NULL)
554                 RETURN(-EINVAL);
555
556         lov = &exp->exp_obd->u.lov;
557
558         if (!lov->desc.ld_active_tgt_count)
559                 RETURN(-EIO);
560
561         /* Recreate a specific object id at the given OST index */ 
562         if (src_oa->o_valid & OBD_MD_FLFLAGS && src_oa->o_flags &
563                                                 OBD_FL_RECREATE_OBJS) {
564                  struct lov_stripe_md obj_md;
565                  struct lov_stripe_md *obj_mdp = &obj_md;
566
567                  ost_idx = src_oa->o_nlink;
568                  lsm = *ea;
569                  if (lsm == NULL)
570                         RETURN(-EINVAL);
571                  if (ost_idx >= lov->desc.ld_tgt_count)
572                          RETURN(-EINVAL);
573                  for (i = 0; i < lsm->lsm_stripe_count; i++) {
574                          if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) {
575                                  if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id)
576                                          RETURN(-EINVAL);
577                                  break;
578                          }
579                  }
580                  if (i == lsm->lsm_stripe_count)
581                          RETURN(-EINVAL);
582
583                  rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, &obj_mdp, oti);
584                  RETURN(rc);
585         }
586
587         ret_oa = obdo_alloc();
588         if (!ret_oa)
589                 RETURN(-ENOMEM);
590
591         tmp_oa = obdo_alloc();
592         if (!tmp_oa)
593                 GOTO(out_oa, rc = -ENOMEM);
594
595         lsm = *ea;
596         if (lsm == NULL) {
597                 int stripes;
598                 ost_count = lov_get_stripecnt(lov, 0);
599
600                 /* If the MDS file was truncated up to some size, stripe over
601                  * enough OSTs to allow the file to be created at that size. */
602                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
603                         stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
604                         do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
605
606                         if (stripes > lov->desc.ld_active_tgt_count)
607                                 RETURN(-EFBIG);
608                         if (stripes > ost_count)
609                                 stripes = ost_count;
610                 } else {
611                         stripes = ost_count;
612                 }
613
614                 rc = lov_alloc_memmd(&lsm, stripes, lov->desc.ld_pattern ?
615                                      lov->desc.ld_pattern : LOV_PATTERN_RAID0);
616                 if (rc < 0)
617                         GOTO(out_tmp, rc);
618
619                 rc = 0;
620         }
621
622         ost_count = lov->desc.ld_tgt_count;
623
624         LASSERT(src_oa->o_valid & OBD_MD_FLID);
625         lsm->lsm_object_id = src_oa->o_id;
626         if (!lsm->lsm_stripe_size)
627                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
628         if (!lsm->lsm_pattern) {
629                 lsm->lsm_pattern = lov->desc.ld_pattern ?
630                         lov->desc.ld_pattern : LOV_PATTERN_RAID0;
631         }
632
633         if (*ea == NULL || lsm->lsm_oinfo[0].loi_ost_idx >= ost_count) {
634                 if (ost_start_count <= 0) {
635                         ost_start_idx = ll_insecure_random_int();
636                         ost_start_count = LOV_CREATE_RESEED_INTERVAL;
637                 } else {
638                         --ost_start_count;
639                         ost_start_idx += lsm->lsm_stripe_count;
640                 }
641                 ost_idx = ost_start_idx % ost_count;
642         } else {
643                 ost_idx = lsm->lsm_oinfo[0].loi_ost_idx;
644         }
645
646         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
647                lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
648
649         /* XXX LOV STACKING: need to figure out how many real OSCs */
650         if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
651                 oti_alloc_cookies(oti, lsm->lsm_stripe_count);
652                 if (!oti->oti_logcookies)
653                         GOTO(out_cleanup, rc = -ENOMEM);
654                 cookies = oti->oti_logcookies;
655         }
656
657         loi = lsm->lsm_oinfo;
658         for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
659                 struct lov_stripe_md obj_md;
660                 struct lov_stripe_md *obj_mdp = &obj_md;
661                 int err;
662
663                 if (lov->tgts[ost_idx].active == 0) {
664                         CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
665                         continue;
666                 }
667
668                 /* create data objects with "parent" OA */
669                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
670
671                 /* XXX When we start creating objects on demand, we need to
672                  *     make sure that we always create the object on the
673                  *     stripe which holds the existing file size.
674                  */
675                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
676                         if (lov_stripe_offset(lsm, src_oa->o_size, i,
677                                               &tmp_oa->o_size) < 0 &&
678                             tmp_oa->o_size)
679                                 tmp_oa->o_size--;
680
681                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
682                                i, tmp_oa->o_size, src_oa->o_size);
683                 }
684
685
686                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
687                 err = obd_create(lov->tgts[ost_idx].ltd_exp, tmp_oa, &obj_mdp,
688                                  oti);
689                 if (err) {
690                         if (lov->tgts[ost_idx].active) {
691                                 CERROR("error creating objid "LPX64" sub-object"
692                                        " on OST idx %d/%d: rc = %d\n",
693                                        src_oa->o_id, ost_idx,
694                                        lsm->lsm_stripe_count, err);
695                                 if (err > 0) {
696                                         CERROR("obd_create returned invalid "
697                                                "err %d\n", err);
698                                         err = -EIO;
699                                 }
700                         }
701                         if (!rc)
702                                 rc = err;
703                         continue;
704                 }
705                 if (oti->oti_objid)
706                         oti->oti_objid[ost_idx] = tmp_oa->o_id;
707                 loi->loi_id = tmp_oa->o_id;
708                 loi->loi_ost_idx = ost_idx;
709                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
710                        lsm->lsm_object_id, loi->loi_id, ost_idx);
711
712                 lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm,
713                                 obj_alloc, &set);
714                 loi_init(loi);
715
716                 if (cookies)
717                         ++oti->oti_logcookies;
718                 if (tmp_oa->o_valid & OBD_MD_FLCOOKIE)
719                         ++cookie_sent;
720                 ++obj_alloc;
721                 ++loi;
722
723                 /* If we have allocated enough objects, we are OK */
724                 if (obj_alloc == lsm->lsm_stripe_count)
725                         GOTO(out_done, rc = 0);
726         }
727
728         if (obj_alloc == 0) {
729                 if (rc == 0)
730                         rc = -EIO;
731                 GOTO(out_cleanup, rc);
732         }
733
734         /* If we were passed specific striping params, then a failure to
735          * meet those requirements is an error, since we can't reallocate
736          * that memory (it might be part of a larger array or something).
737          *
738          * We can only get here if lsm_stripe_count was originally > 1.
739          */
740         if (*ea != NULL) {
741                 CERROR("can't lstripe objid "LPX64": have %u want %u, rc %d\n",
742                        lsm->lsm_object_id, obj_alloc, lsm->lsm_stripe_count,rc);
743                 if (rc == 0)
744                         rc = -EFBIG;
745                 GOTO(out_cleanup, rc);
746         } else {
747                 struct lov_stripe_md *lsm_new;
748                 /* XXX LOV STACKING call into osc for sizes */
749                 unsigned oldsize, newsize;
750
751                 if (oti && cookies && cookie_sent) {
752                         oldsize = lsm->lsm_stripe_count * sizeof(*cookies);
753                         newsize = obj_alloc * sizeof(*cookies);
754
755                         oti_alloc_cookies(oti, obj_alloc);
756                         if (oti->oti_logcookies) {
757                                 memcpy(oti->oti_logcookies, cookies, newsize);
758                                 OBD_FREE(cookies, oldsize);
759                                 cookies = oti->oti_logcookies;
760                         } else {
761                                 CWARN("'leaking' %d bytes\n", oldsize-newsize);
762                         }
763                 }
764
765                 CWARN("using fewer stripes for object "LPX64": old %u new %u\n",
766                       lsm->lsm_object_id, lsm->lsm_stripe_count, obj_alloc);
767                 oldsize = lov_stripe_md_size(lsm->lsm_stripe_count);
768                 newsize = lov_stripe_md_size(obj_alloc);
769                 OBD_ALLOC(lsm_new, newsize);
770                 if (lsm_new != NULL) {
771                         memcpy(lsm_new, lsm, newsize);
772                         lsm_new->lsm_stripe_count = obj_alloc;
773                         OBD_FREE(lsm, newsize);
774                         lsm = lsm_new;
775                 } else {
776                         CWARN("'leaking' %d bytes\n", oldsize - newsize);
777                 }
778                 rc = 0;
779         }
780         EXIT;
781  out_done:
782         *ea = lsm;
783         if (src_oa->o_valid & OBD_MD_FLSIZE &&
784             ret_oa->o_size != src_oa->o_size) {
785                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
786                        src_oa->o_size, ret_oa->o_size);
787                 LBUG();
788         }
789         ret_oa->o_id = src_oa->o_id;
790         memcpy(src_oa, ret_oa, sizeof(*src_oa));
791
792  out_tmp:
793         obdo_free(tmp_oa);
794  out_oa:
795         obdo_free(ret_oa);
796         if (oti && cookies) {
797                 oti->oti_logcookies = cookies;
798                 if (!cookie_sent) {
799                         oti_free_cookies(oti);
800                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
801                 } else {
802                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
803                 }
804         }
805         RETURN(rc);
806
807  out_cleanup:
808         while (obj_alloc-- > 0) {
809                 struct obd_export *sub_exp;
810                 int err;
811
812                 --loi;
813                 sub_exp = lov->tgts[loi->loi_ost_idx].ltd_exp;
814                 /* destroy already created objects here */
815                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
816                 tmp_oa->o_id = loi->loi_id;
817
818                 err = obd_destroy(sub_exp, tmp_oa, NULL, oti);
819                 if (err)
820                         CERROR("Failed to uncreate objid "LPX64" subobj "LPX64
821                                " on OST idx %d: rc = %d\n", src_oa->o_id,
822                                loi->loi_id, loi->loi_ost_idx, err);
823         }
824         if (*ea == NULL)
825                 obd_free_memmd(exp, &lsm);
826         goto out_tmp;
827 }
828
829 #define lsm_bad_magic(LSMP)                                     \
830 ({                                                              \
831         struct lov_stripe_md *_lsm__ = (LSMP);                  \
832         int _ret__ = 0;                                         \
833         if (!_lsm__) {                                          \
834                 CERROR("LOV requires striping ea\n");           \
835                 _ret__ = 1;                                     \
836         } else if (_lsm__->lsm_magic != LOV_MAGIC) {            \
837                 CERROR("LOV striping magic bad %#x != %#x\n",   \
838                        _lsm__->lsm_magic, LOV_MAGIC);           \
839                 _ret__ = 1;                                     \
840         }                                                       \
841         _ret__;                                                 \
842 })
843
844 static int lov_destroy(struct obd_export *exp, struct obdo *oa,
845                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
846 {
847         struct obdo tmp;
848         struct lov_obd *lov;
849         struct lov_oinfo *loi;
850         int rc = 0, i;
851         ENTRY;
852
853         if (lsm_bad_magic(lsm))
854                 RETURN(-EINVAL);
855
856         if (!exp || !exp->exp_obd)
857                 RETURN(-ENODEV);
858
859         lov = &exp->exp_obd->u.lov;
860         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
861                 int err;
862                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
863                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
864                         /* Orphan clean up will (someday) fix this up. */
865                         continue;
866                 }
867
868                 memcpy(&tmp, oa, sizeof(tmp));
869                 tmp.o_id = loi->loi_id;
870                 err = obd_destroy(lov->tgts[loi->loi_ost_idx].ltd_exp, &tmp,
871                                   NULL, oti);
872                 if (err && lov->tgts[loi->loi_ost_idx].active) {
873                         CERROR("error: destroying objid "LPX64" subobj "
874                                LPX64" on OST idx %d: rc = %d\n",
875                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
876                         if (!rc)
877                                 rc = err;
878                 }
879         }
880         RETURN(rc);
881 }
882
883 static int lov_getattr(struct obd_export *exp, struct obdo *oa,
884                        struct lov_stripe_md *lsm)
885 {
886         struct obdo tmp;
887         struct lov_obd *lov;
888         struct lov_oinfo *loi;
889         int i, rc = 0, set = 0;
890         ENTRY;
891
892         if (lsm_bad_magic(lsm))
893                 RETURN(-EINVAL);
894
895         if (!exp || !exp->exp_obd)
896                 RETURN(-ENODEV);
897
898         lov = &exp->exp_obd->u.lov;
899
900         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
901                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
902         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
903                 int err;
904
905                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
906                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
907                         continue;
908                 }
909
910                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
911                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
912                 /* create data objects with "parent" OA */
913                 memcpy(&tmp, oa, sizeof(tmp));
914                 tmp.o_id = loi->loi_id;
915
916                 err = obd_getattr(lov->tgts[loi->loi_ost_idx].ltd_exp, &tmp,
917                                   NULL);
918                 if (err) {
919                         if (lov->tgts[loi->loi_ost_idx].active) {
920                                 CERROR("error: getattr objid "LPX64" subobj "
921                                        LPX64" on OST idx %d: rc = %d\n",
922                                        oa->o_id, loi->loi_id, loi->loi_ost_idx,
923                                        err);
924                                 RETURN(err);
925                         }
926                 } else {
927                         lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &set);
928                 }
929         }
930         if (!set)
931                 rc = -EIO;
932         RETURN(rc);
933 }
934
935 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data, 
936                                  int rc)
937 {
938         struct lov_getattr_async_args *aa = data;
939         struct lov_stripe_md *lsm = aa->aa_lsm;
940         struct obdo          *oa = aa->aa_oa;
941         struct obdo          *obdos = aa->aa_obdos;
942         struct lov_oinfo     *loi;
943         int                   i;
944         int                   set = 0;
945         ENTRY;
946
947         if (rc == 0) {
948                 /* NB all stripe requests succeeded to get here */
949
950                 for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
951                      i++, loi++) {
952                         if (obdos[i].o_valid == 0)      /* inactive stripe */
953                                 continue;
954
955                         lov_merge_attrs(oa, &obdos[i], obdos[i].o_valid, lsm,
956                                         i, &set);
957                 }
958
959                 if (!set) {
960                         CERROR ("No stripes had valid attrs\n");
961                         rc = -EIO;
962                 }
963         }
964
965         OBD_FREE (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
966         RETURN (rc);
967 }
968
969 static int lov_getattr_async(struct obd_export *exp, struct obdo *oa,
970                               struct lov_stripe_md *lsm,
971                               struct ptlrpc_request_set *rqset)
972 {
973         struct obdo *obdos;
974         struct lov_obd *lov;
975         struct lov_oinfo *loi;
976         struct lov_getattr_async_args *aa;
977         int i, rc = 0, set = 0;
978         ENTRY;
979
980         if (lsm_bad_magic(lsm))
981                 RETURN(-EINVAL);
982
983         if (!exp || !exp->exp_obd)
984                 RETURN(-ENODEV);
985
986         lov = &exp->exp_obd->u.lov;
987
988         OBD_ALLOC (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
989         if (obdos == NULL)
990                 RETURN(-ENOMEM);
991
992         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
993                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
994         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
995                 int err;
996
997                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
998                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
999                         /* leaves obdos[i].obd_valid unset */
1000                         continue;
1001                 }
1002
1003                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
1004                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
1005                 /* create data objects with "parent" OA */
1006                 memcpy(&obdos[i], oa, sizeof(obdos[i]));
1007                 obdos[i].o_id = loi->loi_id;
1008
1009                 err = obd_getattr_async(lov->tgts[loi->loi_ost_idx].ltd_exp,
1010                                          &obdos[i], NULL, rqset);
1011                 if (err) {
1012                         CERROR("error: getattr objid "LPX64" subobj "
1013                                LPX64" on OST idx %d: rc = %d\n",
1014                                oa->o_id, loi->loi_id, loi->loi_ost_idx,
1015                                err);
1016                         GOTO(out_obdos, rc = err);
1017                 }
1018                 set = 1;
1019         }
1020         if (!set)
1021                 GOTO (out_obdos, rc = -EIO);
1022
1023         LASSERT (rqset->set_interpret == NULL);
1024         rqset->set_interpret = lov_getattr_interpret;
1025         LASSERT (sizeof (rqset->set_args) >= sizeof (*aa));
1026         aa = (struct lov_getattr_async_args *)&rqset->set_args;
1027         aa->aa_lsm = lsm;
1028         aa->aa_oa = oa;
1029         aa->aa_obdos = obdos;
1030         aa->aa_lov = lov;
1031         GOTO(out, rc = 0);
1032
1033 out_obdos:
1034         OBD_FREE (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
1035 out:
1036         RETURN(rc);
1037 }
1038
1039
1040 static int lov_setattr(struct obd_export *exp, struct obdo *src_oa,
1041                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
1042 {
1043         struct obdo *tmp_oa, *ret_oa;
1044         struct lov_obd *lov;
1045         struct lov_oinfo *loi;
1046         int rc = 0, i, set = 0;
1047         ENTRY;
1048
1049         if (lsm_bad_magic(lsm))
1050                 RETURN(-EINVAL);
1051
1052         if (!exp || !exp->exp_obd)
1053                 RETURN(-ENODEV);
1054
1055         /* for now, we only expect time updates here */
1056         LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE | OBD_MD_FLMODE|
1057                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
1058                                       OBD_MD_FLCTIME | OBD_MD_FLFLAGS |
1059                                       OBD_MD_FLSIZE)));
1060         ret_oa = obdo_alloc();
1061         if (!ret_oa)
1062                 RETURN(-ENOMEM);
1063
1064         tmp_oa = obdo_alloc();
1065         if (!tmp_oa)
1066                 GOTO(out_oa, rc = -ENOMEM);
1067
1068         lov = &exp->exp_obd->u.lov;
1069         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1070                 int err;
1071
1072                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1073                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1074                         continue;
1075                 }
1076
1077                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
1078                 tmp_oa->o_id = loi->loi_id;
1079
1080                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1081                         if (lov_stripe_offset(lsm, src_oa->o_size, i,
1082                                               &tmp_oa->o_size) < 0 &&
1083                             tmp_oa->o_size)
1084                                 tmp_oa->o_size--;
1085
1086                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1087                                i, tmp_oa->o_size, src_oa->o_size);
1088                 }
1089
1090                 err = obd_setattr(lov->tgts[loi->loi_ost_idx].ltd_exp, tmp_oa,
1091                                   NULL, NULL);
1092                 if (err) {
1093                         if (lov->tgts[loi->loi_ost_idx].active) {
1094                                 CERROR("error: setattr objid "LPX64" subobj "
1095                                        LPX64" on OST idx %d: rc = %d\n",
1096                                        src_oa->o_id, loi->loi_id,
1097                                        loi->loi_ost_idx, err);
1098                                 if (!rc)
1099                                         rc = err;
1100                         }
1101                         continue;
1102                 }
1103                 lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm, i, &set);
1104         }
1105         if (!set && !rc)
1106                 rc = -EIO;
1107
1108         ret_oa->o_id = src_oa->o_id;
1109         memcpy(src_oa, ret_oa, sizeof(*src_oa));
1110         GOTO(out_tmp, rc);
1111 out_tmp:
1112         obdo_free(tmp_oa);
1113 out_oa:
1114         obdo_free(ret_oa);
1115         return rc;
1116 }
1117
1118 /* we have an offset in file backed by an lov and want to find out where
1119  * that offset lands in our given stripe of the file.  for the easy
1120  * case where the offset is within the stripe, we just have to scale the
1121  * offset down to make it relative to the stripe instead of the lov.
1122  *
1123  * the harder case is what to do when the offset doesn't intersect the
1124  * stripe.  callers will want start offsets clamped ahead to the start
1125  * of the nearest stripe in the file.  end offsets similarly clamped to the
1126  * nearest ending byte of a stripe in the file:
1127  *
1128  * all this function does is move offsets to the nearest region of the
1129  * stripe, and it does its work "mod" the full length of all the stripes.
1130  * consider a file with 3 stripes:
1131  *
1132  *             S                                              E
1133  * ---------------------------------------------------------------------
1134  * |    0    |     1     |     2     |    0    |     1     |     2     |
1135  * ---------------------------------------------------------------------
1136  *
1137  * to find stripe 1's offsets for S and E, it divides by the full stripe
1138  * width and does its math in the context of a single set of stripes:
1139  *
1140  *             S         E
1141  * -----------------------------------
1142  * |    0    |     1     |     2     |
1143  * -----------------------------------
1144  *
1145  * it'll notice that E is outside stripe 1 and clamp it to the end of the
1146  * stripe, then multiply it back out by lov_off to give the real offsets in
1147  * the stripe:
1148  *
1149  *   S                   E
1150  * ---------------------------------------------------------------------
1151  * |    1    |     1     |     1     |    1    |     1     |     1     |
1152  * ---------------------------------------------------------------------
1153  *
1154  * it would have done similarly and pulled S forward to the start of a 1
1155  * stripe if, say, S had landed in a 0 stripe.
1156  *
1157  * this rounding isn't always correct.  consider an E lov offset that lands
1158  * on a 0 stripe, the "mod stripe width" math will pull it forward to the
1159  * start of a 1 stripe, when in fact it wanted to be rounded back to the end
1160  * of a previous 1 stripe.  this logic is handled by callers and this is why:
1161  *
1162  * this function returns < 0 when the offset was "before" the stripe and
1163  * was moved forward to the start of the stripe in question;  0 when it
1164  * falls in the stripe and no shifting was done; > 0 when the offset
1165  * was outside the stripe and was pulled back to its final byte. */
1166 static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
1167                              int stripeno, obd_off *obd_off)
1168 {
1169         unsigned long ssize  = lsm->lsm_stripe_size;
1170         unsigned long swidth = ssize * lsm->lsm_stripe_count;
1171         unsigned long stripe_off, this_stripe;
1172         int ret = 0;
1173
1174         if (lov_off == OBD_OBJECT_EOF) {
1175                 *obd_off = OBD_OBJECT_EOF;
1176                 return 0;
1177         }
1178
1179         /* do_div(a, b) returns a % b, and a = a / b */
1180         stripe_off = do_div(lov_off, swidth);
1181
1182         this_stripe = stripeno * ssize;
1183         if (stripe_off < this_stripe) {
1184                 stripe_off = 0;
1185                 ret = -1;
1186         } else {
1187                 stripe_off -= this_stripe;
1188
1189                 if (stripe_off >= ssize) {
1190                         stripe_off = ssize;
1191                         ret = 1;
1192                 }
1193         }
1194
1195         *obd_off = lov_off * ssize + stripe_off;
1196         return ret;
1197 }
1198
1199 /* given an extent in an lov and a stripe, calculate the extent of the stripe
1200  * that is contained within the lov extent.  this returns true if the given
1201  * stripe does intersect with the lov extent. */
1202 static int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno,
1203                                  obd_off start, obd_off end,
1204                                  obd_off *obd_start, obd_off *obd_end)
1205 {
1206         int start_side, end_side;
1207
1208         start_side = lov_stripe_offset(lsm, start, stripeno, obd_start);
1209         end_side = lov_stripe_offset(lsm, end, stripeno, obd_end);
1210
1211         CDEBUG(D_INODE, "["LPU64"->"LPU64"] -> [(%d) "LPU64"->"LPU64" (%d)]\n",
1212                start, end, start_side, *obd_start, *obd_end, end_side);
1213
1214         /* this stripe doesn't intersect the file extent when neither
1215          * start or the end intersected the stripe and obd_start and
1216          * obd_end got rounded up to the save value. */
1217         if (start_side != 0 && end_side != 0 && *obd_start == *obd_end)
1218                 return 0;
1219
1220         /* as mentioned in the lov_stripe_offset commentary, end
1221          * might have been shifted in the wrong direction.  This
1222          * happens when an end offset is before the stripe when viewed
1223          * through the "mod stripe size" math. we detect it being shifted
1224          * in the wrong direction and touch it up.
1225          * interestingly, this can't underflow since end must be > start
1226          * if we passed through the previous check.
1227          * (should we assert for that somewhere?) */
1228         if (end_side != 0)
1229                 (*obd_end)--;
1230
1231         return 1;
1232 }
1233
1234 /* compute which stripe number "lov_off" will be written into */
1235 static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
1236 {
1237         unsigned long ssize  = lsm->lsm_stripe_size;
1238         unsigned long swidth = ssize * lsm->lsm_stripe_count;
1239         unsigned long stripe_off;
1240
1241         stripe_off = do_div(lov_off, swidth);
1242
1243         return stripe_off / ssize;
1244 }
1245
1246 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1247  * we can send this 'punch' to just the authoritative node and the nodes
1248  * that the punch will affect. */
1249 static int lov_punch(struct obd_export *exp, struct obdo *oa,
1250                      struct lov_stripe_md *lsm,
1251                      obd_off start, obd_off end, struct obd_trans_info *oti)
1252 {
1253         struct obdo tmp;
1254         struct lov_obd *lov;
1255         struct lov_oinfo *loi;
1256         int rc = 0, i;
1257         ENTRY;
1258
1259         if (lsm_bad_magic(lsm))
1260                 RETURN(-EINVAL);
1261
1262         if (!exp || !exp->exp_obd)
1263                 RETURN(-ENODEV);
1264
1265         lov = &exp->exp_obd->u.lov;
1266         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1267                 obd_off starti, endi;
1268                 int err;
1269
1270                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1271                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1272                         continue;
1273                 }
1274
1275                 if (!lov_stripe_intersects(lsm, i, start, end, &starti, &endi))
1276                         continue;
1277
1278                 /* create data objects with "parent" OA */
1279                 memcpy(&tmp, oa, sizeof(tmp));
1280                 tmp.o_id = loi->loi_id;
1281
1282                 err = obd_punch(lov->tgts[loi->loi_ost_idx].ltd_exp, &tmp, NULL,
1283                                 starti, endi, NULL);
1284                 if (err) {
1285                         if (lov->tgts[loi->loi_ost_idx].active) {
1286                                 CERROR("error: punch objid "LPX64" subobj "LPX64
1287                                        " on OST idx %d: rc = %d\n", oa->o_id,
1288                                        loi->loi_id, loi->loi_ost_idx, err);
1289                         }
1290                         if (!rc)
1291                                 rc = err;
1292                 }
1293         }
1294         RETURN(rc);
1295 }
1296
1297 static int lov_sync(struct obd_export *exp, struct obdo *oa,
1298                     struct lov_stripe_md *lsm, obd_off start, obd_off end)
1299 {
1300         struct obdo *tmp;
1301         struct lov_obd *lov;
1302         struct lov_oinfo *loi;
1303         int rc = 0, i;
1304         ENTRY;
1305
1306         if (lsm_bad_magic(lsm))
1307                 RETURN(-EINVAL);
1308
1309         if (!exp->exp_obd)
1310                 RETURN(-ENODEV);
1311
1312         tmp = obdo_alloc();
1313         if (!tmp)
1314                 RETURN(-ENOMEM);
1315
1316         lov = &exp->exp_obd->u.lov;
1317         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1318                 obd_off starti, endi;
1319                 int err;
1320
1321                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1322                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1323                         continue;
1324                 }
1325
1326                 if (!lov_stripe_intersects(lsm, i, start, end, &starti, &endi))
1327                         continue;
1328
1329                 memcpy(tmp, oa, sizeof(*tmp));
1330                 tmp->o_id = loi->loi_id;
1331
1332                 err = obd_sync(lov->tgts[loi->loi_ost_idx].ltd_exp, tmp, NULL,
1333                                starti, endi);
1334                 if (err) {
1335                         if (lov->tgts[loi->loi_ost_idx].active) {
1336                                 CERROR("error: fsync objid "LPX64" subobj "LPX64
1337                                        " on OST idx %d: rc = %d\n", oa->o_id,
1338                                        loi->loi_id, loi->loi_ost_idx, err);
1339                         }
1340                         if (!rc)
1341                                 rc = err;
1342                 }
1343         }
1344
1345         obdo_free(tmp);
1346         RETURN(rc);
1347 }
1348
1349 static int lov_brw_check(struct lov_obd *lov, struct obdo *oa,
1350                          struct lov_stripe_md *lsm,
1351                          obd_count oa_bufs, struct brw_page *pga)
1352 {
1353         int i, rc = 0;
1354
1355         /* The caller just wants to know if there's a chance that this
1356          * I/O can succeed */
1357         for (i = 0; i < oa_bufs; i++) {
1358                 int stripe = lov_stripe_number(lsm, pga[i].off);
1359                 int ost = lsm->lsm_oinfo[stripe].loi_ost_idx;
1360                 struct ldlm_extent ext, subext;
1361                 ext.start = pga[i].off;
1362                 ext.end = pga[i].off + pga[i].count;
1363
1364                 if (!lov_stripe_intersects(lsm, i, ext.start, ext.end,
1365                                            &subext.start, &subext.end))
1366                         continue;
1367
1368                 if (lov->tgts[ost].active == 0) {
1369                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1370                         return -EIO;
1371                 }
1372                 rc = obd_brw(OBD_BRW_CHECK, lov->tgts[stripe].ltd_exp, oa,
1373                              NULL, 1, &pga[i], NULL);
1374                 if (rc)
1375                         break;
1376         }
1377         return rc;
1378 }
1379
1380 static int lov_brw(int cmd, struct obd_export *exp, struct obdo *src_oa,
1381                    struct lov_stripe_md *lsm, obd_count oa_bufs,
1382                    struct brw_page *pga, struct obd_trans_info *oti)
1383 {
1384         struct {
1385                 int bufct;
1386                 int index;
1387                 int subcount;
1388                 struct lov_stripe_md lsm;
1389                 int ost_idx;
1390         } *stripeinfo, *si, *si_last;
1391         struct obdo *ret_oa = NULL, *tmp_oa = NULL;
1392         struct lov_obd *lov;
1393         struct brw_page *ioarr;
1394         struct lov_oinfo *loi;
1395         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count, set = 0;
1396         ENTRY;
1397
1398         if (lsm_bad_magic(lsm))
1399                 RETURN(-EINVAL);
1400
1401         lov = &exp->exp_obd->u.lov;
1402
1403         if (cmd == OBD_BRW_CHECK) {
1404                 rc = lov_brw_check(lov, src_oa, lsm, oa_bufs, pga);
1405                 RETURN(rc);
1406         }
1407
1408         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1409         if (!stripeinfo)
1410                 RETURN(-ENOMEM);
1411
1412         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1413         if (!where)
1414                 GOTO(out_sinfo, rc = -ENOMEM);
1415
1416         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1417         if (!ioarr)
1418                 GOTO(out_where, rc = -ENOMEM);
1419
1420         if (src_oa) {
1421                 ret_oa = obdo_alloc();
1422                 if (!ret_oa)
1423                         GOTO(out_ioarr, rc = -ENOMEM);
1424
1425                 tmp_oa = obdo_alloc();
1426                 if (!tmp_oa)
1427                         GOTO(out_oa, rc = -ENOMEM);
1428         }
1429
1430         for (i = 0; i < oa_bufs; i++) {
1431                 where[i] = lov_stripe_number(lsm, pga[i].off);
1432                 stripeinfo[where[i]].bufct++;
1433         }
1434
1435         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1436              i < stripe_count; i++, loi++, si_last = si, si++) {
1437                 if (i > 0)
1438                         si->index = si_last->index + si_last->bufct;
1439                 si->lsm.lsm_object_id = loi->loi_id;
1440                 si->ost_idx = loi->loi_ost_idx;
1441         }
1442
1443         for (i = 0; i < oa_bufs; i++) {
1444                 int which = where[i];
1445                 int shift;
1446
1447                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1448                 LASSERT(shift < oa_bufs);
1449                 ioarr[shift] = pga[i];
1450                 lov_stripe_offset(lsm, pga[i].off, which, &ioarr[shift].off);
1451                 stripeinfo[which].subcount++;
1452         }
1453
1454         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1455                 int shift = si->index;
1456
1457                 if (lov->tgts[si->ost_idx].active == 0) {
1458                         CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx);
1459                         GOTO(out_oa, rc = -EIO);
1460                 }
1461
1462                 if (si->bufct) {
1463                         LASSERT(shift < oa_bufs);
1464                         if (src_oa)
1465                                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
1466
1467                         tmp_oa->o_id = si->lsm.lsm_object_id;
1468                         rc = obd_brw(cmd, lov->tgts[si->ost_idx].ltd_exp, 
1469                                      tmp_oa, &si->lsm, si->bufct, 
1470                                      &ioarr[shift], oti);
1471                         if (rc)
1472                                 GOTO(out_ioarr, rc);
1473
1474                         lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm,
1475                                         i, &set);
1476                 }
1477         }
1478
1479         ret_oa->o_id = src_oa->o_id;
1480         memcpy(src_oa, ret_oa, sizeof(*src_oa));
1481
1482         GOTO(out_oa, rc);
1483  out_oa:
1484         if (tmp_oa)
1485                 obdo_free(tmp_oa);
1486         if (ret_oa)
1487                 obdo_free(ret_oa);
1488  out_ioarr:
1489         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1490  out_where:
1491         OBD_FREE(where, sizeof(*where) * oa_bufs);
1492  out_sinfo:
1493         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1494         return rc;
1495 }
1496
1497 static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data,
1498                              int rc)
1499 {
1500         struct lov_brw_async_args *aa = data;
1501         struct lov_stripe_md *lsm = aa->aa_lsm;
1502         obd_count             oa_bufs = aa->aa_oa_bufs;
1503         struct obdo          *oa = aa->aa_oa;
1504         struct obdo          *obdos = aa->aa_obdos;
1505         struct brw_page      *ioarr = aa->aa_ioarr;
1506         struct lov_oinfo     *loi;
1507         int i, set = 0;
1508         ENTRY;
1509
1510         if (rc == 0) {
1511                 /* NB all stripe requests succeeded to get here */
1512
1513                 for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1514                      i++, loi++) {
1515                         if (obdos[i].o_valid == 0)      /* inactive stripe */
1516                                 continue;
1517
1518                         lov_merge_attrs(oa, &obdos[i], obdos[i].o_valid, lsm,
1519                                         i, &set);
1520                 }
1521
1522                 if (!set) {
1523                         CERROR("No stripes had valid attrs\n");
1524                         rc = -EIO;
1525                 }
1526         }
1527         oa->o_id = lsm->lsm_object_id;
1528
1529         OBD_FREE(obdos, lsm->lsm_stripe_count * sizeof(*obdos));
1530         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1531         RETURN(rc);
1532 }
1533
1534 static int lov_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
1535                          struct lov_stripe_md *lsm, obd_count oa_bufs,
1536                          struct brw_page *pga, struct ptlrpc_request_set *set,
1537                          struct obd_trans_info *oti)
1538 {
1539         struct {
1540                 int bufct;
1541                 int index;
1542                 int subcount;
1543                 struct lov_stripe_md lsm;
1544                 int ost_idx;
1545         } *stripeinfo, *si, *si_last;
1546         struct lov_obd *lov;
1547         struct brw_page *ioarr;
1548         struct obdo *obdos = NULL;
1549         struct lov_oinfo *loi;
1550         struct lov_brw_async_args *aa;
1551         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
1552         ENTRY;
1553
1554         if (lsm_bad_magic(lsm))
1555                 RETURN(-EINVAL);
1556
1557         lov = &exp->exp_obd->u.lov;
1558
1559         if (cmd == OBD_BRW_CHECK) {
1560                 rc = lov_brw_check(lov, oa, lsm, oa_bufs, pga);
1561                 RETURN(rc);
1562         }
1563
1564         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1565         if (!stripeinfo)
1566                 RETURN(-ENOMEM);
1567
1568         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1569         if (!where)
1570                 GOTO(out_sinfo, rc = -ENOMEM);
1571
1572         if (oa) {
1573                 OBD_ALLOC(obdos, sizeof(*obdos) * stripe_count);
1574                 if (!obdos)
1575                         GOTO(out_where, rc = -ENOMEM);
1576         }
1577
1578         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1579         if (!ioarr)
1580                 GOTO(out_obdos, rc = -ENOMEM);
1581
1582         for (i = 0; i < oa_bufs; i++) {
1583                 where[i] = lov_stripe_number(lsm, pga[i].off);
1584                 stripeinfo[where[i]].bufct++;
1585         }
1586
1587         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1588              i < stripe_count; i++, loi++, si_last = si, si++) {
1589                 if (i > 0)
1590                         si->index = si_last->index + si_last->bufct;
1591                 si->lsm.lsm_object_id = loi->loi_id;
1592                 si->ost_idx = loi->loi_ost_idx;
1593
1594                 if (oa) {
1595                         memcpy(&obdos[i], oa, sizeof(*obdos));
1596                         obdos[i].o_id = si->lsm.lsm_object_id;
1597                 }
1598         }
1599
1600         for (i = 0; i < oa_bufs; i++) {
1601                 int which = where[i];
1602                 int shift;
1603
1604                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1605                 LASSERT(shift < oa_bufs);
1606                 ioarr[shift] = pga[i];
1607                 lov_stripe_offset(lsm, pga[i].off, which, &ioarr[shift].off);
1608                 stripeinfo[which].subcount++;
1609         }
1610
1611         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1612                 int shift = si->index;
1613
1614                 if (si->bufct == 0)
1615                         continue;
1616
1617                 if (lov->tgts[si->ost_idx].active == 0) {
1618                         CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx);
1619                         GOTO(out_ioarr, rc = -EIO);
1620                 }
1621
1622                 LASSERT(shift < oa_bufs);
1623
1624                 rc = obd_brw_async(cmd, lov->tgts[si->ost_idx].ltd_exp,
1625                                    &obdos[i], &si->lsm, si->bufct,
1626                                    &ioarr[shift], set, oti);
1627                 if (rc)
1628                         GOTO(out_ioarr, rc);
1629         }
1630         LASSERT(rc == 0);
1631         LASSERT(set->set_interpret == NULL);
1632         set->set_interpret = (set_interpreter_func)lov_brw_interpret;
1633         LASSERT(sizeof(set->set_args) >= sizeof(struct lov_brw_async_args));
1634         aa = (struct lov_brw_async_args *)&set->set_args;
1635         aa->aa_lsm = lsm;
1636         aa->aa_obdos = obdos;
1637         aa->aa_oa = oa;
1638         aa->aa_ioarr = ioarr;
1639         aa->aa_oa_bufs = oa_bufs;
1640
1641         /* Don't free ioarr or obdos - that's done in lov_brw_interpret */
1642         GOTO(out_where, rc);
1643
1644  out_ioarr:
1645         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1646  out_obdos:
1647         OBD_FREE(obdos, stripe_count * sizeof(*obdos));
1648  out_where:
1649         OBD_FREE(where, sizeof(*where) * oa_bufs);
1650  out_sinfo:
1651         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1652         return rc;
1653 }
1654
1655 struct lov_async_page *lap_from_cookie(void *cookie)
1656 {
1657         struct lov_async_page *lap = cookie;
1658         if (lap->lap_magic != LAP_MAGIC)
1659                 return ERR_PTR(-EINVAL);
1660         return lap;
1661 };
1662
1663 static int lov_ap_make_ready(void *data, int cmd)
1664 {
1665         struct lov_async_page *lap = lap_from_cookie(data);
1666         /* XXX should these assert? */
1667         if (IS_ERR(lap))
1668                 return -EINVAL;
1669
1670         return lap->lap_caller_ops->ap_make_ready(lap->lap_caller_data, cmd);
1671 }
1672 static int lov_ap_refresh_count(void *data, int cmd)
1673 {
1674         struct lov_async_page *lap = lap_from_cookie(data);
1675         if (IS_ERR(lap))
1676                 return -EINVAL;
1677
1678         return lap->lap_caller_ops->ap_refresh_count(lap->lap_caller_data, 
1679                                                      cmd);
1680 }
1681 static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
1682 {
1683         struct lov_async_page *lap = lap_from_cookie(data);
1684         /* XXX should these assert? */
1685         if (IS_ERR(lap))
1686                 return;
1687
1688         lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa);
1689         /* XXX woah, shouldn't we be altering more here?  size? */
1690         oa->o_id = lap->lap_loi_id;
1691 }
1692 static void lov_ap_completion(void *data, int cmd, int rc)
1693 {
1694         struct lov_async_page *lap = lap_from_cookie(data);
1695         if (IS_ERR(lap))
1696                 return;
1697
1698         /* in a raid1 regime this would down a count of many ios
1699          * in flight, onl calling the caller_ops completion when all
1700          * the raid1 ios are complete */
1701         lap->lap_caller_ops->ap_completion(lap->lap_caller_data, cmd, rc);
1702 }
1703
1704 static struct obd_async_page_ops lov_async_page_ops = {
1705         .ap_make_ready =        lov_ap_make_ready,
1706         .ap_refresh_count =     lov_ap_refresh_count,
1707         .ap_fill_obdo =         lov_ap_fill_obdo,
1708         .ap_completion =        lov_ap_completion,
1709 };
1710
1711 int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
1712                            struct lov_oinfo *loi, struct page *page,
1713                            obd_off offset, struct obd_async_page_ops *ops, 
1714                            void *data, void **res)
1715 {
1716         struct lov_obd *lov = &exp->exp_obd->u.lov;
1717         struct lov_async_page *lap;
1718         int rc;
1719         ENTRY;
1720
1721         if (lsm_bad_magic(lsm))
1722                 RETURN(-EINVAL);
1723         LASSERT(loi == NULL);
1724
1725         OBD_ALLOC(lap, sizeof(*lap));
1726         if (lap == NULL)
1727                 RETURN(-ENOMEM);
1728
1729         lap->lap_magic = LAP_MAGIC;
1730         lap->lap_caller_ops = ops;
1731         lap->lap_caller_data = data;
1732
1733         /* for now only raid 0 which passes through */
1734         lap->lap_stripe = lov_stripe_number(lsm, offset);
1735         lov_stripe_offset(lsm, offset, lap->lap_stripe, &lap->lap_sub_offset);
1736         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1737
1738         /* so the callback doesn't need the lsm */ 
1739         lap->lap_loi_id = loi->loi_id;
1740
1741         rc = obd_prep_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1742                                  lsm, loi, page, lap->lap_sub_offset,
1743                                  &lov_async_page_ops, lap,
1744                                  &lap->lap_sub_cookie);
1745         if (rc) {
1746                 OBD_FREE(lap, sizeof(*lap));
1747                 RETURN(rc);
1748         }
1749         CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
1750                lap->lap_sub_cookie, offset);
1751         *res = lap;
1752         RETURN(0);
1753 }
1754
1755 static int lov_queue_async_io(struct obd_export *exp,
1756                               struct lov_stripe_md *lsm,
1757                               struct lov_oinfo *loi, void *cookie,
1758                               int cmd, obd_off off, int count,
1759                               obd_flag brw_flags, obd_flag async_flags)
1760 {
1761         struct lov_obd *lov = &exp->exp_obd->u.lov;
1762         struct lov_async_page *lap;
1763         int rc;
1764
1765         LASSERT(loi == NULL);
1766
1767         if (lsm_bad_magic(lsm))
1768                 RETURN(-EINVAL);
1769
1770         lap = lap_from_cookie(cookie);
1771         if (IS_ERR(lap))
1772                 RETURN(PTR_ERR(lap));
1773
1774         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1775         rc = obd_queue_async_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm,
1776                                 loi, lap->lap_sub_cookie, cmd, off, count,
1777                                 brw_flags, async_flags);
1778         RETURN(rc);
1779 }
1780
1781 static int lov_set_async_flags(struct obd_export *exp,
1782                                struct lov_stripe_md *lsm,
1783                                struct lov_oinfo *loi, void *cookie,
1784                                obd_flag async_flags)
1785 {
1786         struct lov_obd *lov = &exp->exp_obd->u.lov;
1787         struct lov_async_page *lap;
1788         int rc;
1789
1790         LASSERT(loi == NULL);
1791
1792         if (lsm_bad_magic(lsm))
1793                 RETURN(-EINVAL);
1794
1795         lap = lap_from_cookie(cookie);
1796         if (IS_ERR(lap))
1797                 RETURN(PTR_ERR(lap));
1798
1799         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1800         rc = obd_set_async_flags(lov->tgts[loi->loi_ost_idx].ltd_exp,
1801                                  lsm, loi, lap->lap_sub_cookie, async_flags);
1802         RETURN(rc);
1803 }
1804
1805 static int lov_queue_sync_io(struct obd_export *exp,
1806                              struct lov_stripe_md *lsm,
1807                              struct lov_oinfo *loi,
1808                              struct obd_sync_io_container *osic, void *cookie,
1809                              int cmd, obd_off off, int count,
1810                              obd_flag brw_flags)
1811 {
1812         struct lov_obd *lov = &exp->exp_obd->u.lov;
1813         struct lov_async_page *lap;
1814         int rc;
1815
1816         LASSERT(loi == NULL);
1817
1818         if (lsm_bad_magic(lsm))
1819                 RETURN(-EINVAL);
1820
1821         lap = lap_from_cookie(cookie);
1822         if (IS_ERR(lap))
1823                 RETURN(PTR_ERR(lap));
1824
1825         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1826         rc = obd_queue_sync_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi,
1827                                osic, lap->lap_sub_cookie, cmd, off, count,
1828                                brw_flags);
1829         RETURN(rc);
1830 }
1831
1832 /* this isn't exactly optimal.  we may have queued sync io in oscs on
1833  * all stripes, but we don't record that fact at queue time.  so we
1834  * trigger sync io on all stripes. */
1835 static int lov_trigger_sync_io(struct obd_export *exp,
1836                                struct lov_stripe_md *lsm,
1837                                struct lov_oinfo *loi,
1838                                struct obd_sync_io_container *osic)
1839 {
1840         struct lov_obd *lov = &exp->exp_obd->u.lov;
1841         int rc = 0, i, err;
1842
1843         LASSERT(loi == NULL);
1844
1845         if (lsm_bad_magic(lsm))
1846                 RETURN(-EINVAL);
1847
1848         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1849              i++, loi++) {
1850                 err = obd_trigger_sync_io(lov->tgts[loi->loi_ost_idx].ltd_exp, 
1851                                           lsm, loi, osic);
1852                 if (rc == 0 && err != 0)
1853                         rc = err;
1854         };
1855         RETURN(rc);
1856 }
1857
1858 static int lov_teardown_async_page(struct obd_export *exp,
1859                                    struct lov_stripe_md *lsm,
1860                                    struct lov_oinfo *loi, void *cookie)
1861 {
1862         struct lov_obd *lov = &exp->exp_obd->u.lov;
1863         struct lov_async_page *lap;
1864         int rc;
1865
1866         LASSERT(loi == NULL);
1867
1868         if (lsm_bad_magic(lsm))
1869                 RETURN(-EINVAL);
1870
1871         lap = lap_from_cookie(cookie);
1872         if (IS_ERR(lap))
1873                 RETURN(PTR_ERR(lap));
1874
1875         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1876         rc = obd_teardown_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp, 
1877                                      lsm, loi, lap->lap_sub_cookie);
1878         if (rc) {
1879                 CERROR("unable to teardown sub cookie %p: %d\n", 
1880                        lap->lap_sub_cookie, rc);
1881                 RETURN(rc);
1882         }
1883         OBD_FREE(lap, sizeof(*lap));
1884         RETURN(rc);
1885 }
1886
1887 static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
1888                        struct lustre_handle *parent_lock,
1889                        __u32 type, void *cookie, int cookielen, __u32 mode,
1890                        int *flags, void *cb, void *data,
1891                        struct lustre_handle *lockh)
1892 {
1893         struct lov_obd *lov;
1894         struct lov_oinfo *loi;
1895         struct lov_stripe_md submd;
1896         struct ldlm_extent *extent = cookie;
1897         int rc;
1898         ENTRY;
1899
1900         if (lsm_bad_magic(lsm))
1901                 RETURN(-EINVAL);
1902
1903         /* we should never be asked to replay a lock this way. */
1904         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1905
1906         if (!exp || !exp->exp_obd)
1907                 RETURN(-ENODEV);
1908
1909         lov = &exp->exp_obd->u.lov;
1910         loi = lsm->lsm_oinfo;
1911         if (lov->tgts[loi->loi_ost_idx].active == 0) {
1912                  CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1913                  RETURN(-EIO);
1914         }
1915         
1916         /* XXX LOV STACKING: submd should be from the subobj */
1917         submd.lsm_object_id = loi->loi_id;
1918         submd.lsm_stripe_count = 0;
1919         /* XXX submd is not fully initialized here */
1920         *flags = 0;
1921         rc = obd_enqueue(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd,
1922                          parent_lock, type, extent, sizeof(*extent),
1923                          mode, flags, cb, data, lockh);
1924
1925         if (rc != ELDLM_OK) {
1926                 memset(lockh, 0, sizeof(*lockh));
1927                 if (lov->tgts[loi->loi_ost_idx].active)
1928                         CERROR("error: enqueue objid "LPX64" subobj "
1929                                LPX64" on OST idx %d: rc = %d\n",
1930                                lsm->lsm_object_id, loi->loi_id,
1931                                loi->loi_ost_idx, rc);
1932         }
1933         RETURN(rc);
1934 }
1935
1936 static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
1937                      __u32 type, void *cookie, int cookielen, __u32 mode,
1938                      int *flags, void *data, struct lustre_handle *lockh)
1939 {
1940         struct lov_obd *lov;
1941         struct lov_oinfo *loi;
1942         struct lov_stripe_md submd;
1943         struct ldlm_extent *extent = cookie;
1944         int rc = 0;
1945         ENTRY;
1946
1947         if (lsm_bad_magic(lsm))
1948                 RETURN(-EINVAL);
1949
1950         if (!exp || !exp->exp_obd)
1951                 RETURN(-ENODEV);
1952
1953         lov = &exp->exp_obd->u.lov;
1954         loi = lsm->lsm_oinfo;
1955         if (lov->tgts[loi->loi_ost_idx].active == 0) {
1956                 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1957                 RETURN(-EIO);
1958         }
1959
1960         /* XXX LOV STACKING: submd should be from the subobj */
1961         submd.lsm_object_id = loi->loi_id;
1962         submd.lsm_stripe_count = 0;
1963         /* XXX submd is not fully initialized here */
1964         rc = obd_match(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd, type,
1965                        extent, sizeof(*extent), mode, flags, data, lockh);
1966         RETURN(rc);
1967 }
1968
1969 static int lov_change_cbdata(struct obd_export *exp,
1970                              struct lov_stripe_md *lsm, ldlm_iterator_t it,
1971                              void *data)
1972 {
1973         struct lov_obd *lov;
1974         struct lov_oinfo *loi;
1975         int rc = 0, i;
1976         ENTRY;
1977
1978         if (lsm_bad_magic(lsm))
1979                 RETURN(-EINVAL);
1980
1981         if (!exp || !exp->exp_obd)
1982                 RETURN(-ENODEV);
1983
1984         lov = &exp->exp_obd->u.lov;
1985         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1986                 struct lov_stripe_md submd;
1987                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1988                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1989
1990                 submd.lsm_object_id = loi->loi_id;
1991                 submd.lsm_stripe_count = 0;
1992                 rc = obd_change_cbdata(lov->tgts[loi->loi_ost_idx].ltd_exp,
1993                                        &submd, it, data);
1994         }
1995         RETURN(rc);
1996 }
1997
1998 static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
1999                       __u32 mode, struct lustre_handle *lockh)
2000 {
2001         struct lov_obd *lov;
2002         struct lov_oinfo *loi;
2003         struct lov_stripe_md submd;
2004         int rc = 0;
2005         ENTRY;
2006
2007         if (lsm_bad_magic(lsm))
2008                 RETURN(-EINVAL);
2009
2010         if (!exp || !exp->exp_obd)
2011                 RETURN(-ENODEV);
2012
2013         lov = &exp->exp_obd->u.lov;
2014         loi = lsm->lsm_oinfo;
2015
2016         /* XXX LOV STACKING: submd should be from the subobj */
2017         submd.lsm_object_id = loi->loi_id;
2018         submd.lsm_stripe_count = 0;
2019         rc = obd_cancel(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd,
2020                          mode, lockh);
2021         if (rc && lov->tgts[loi->loi_ost_idx].active)
2022                 CERROR("error: cancel objid "LPX64" subobj "
2023                        LPX64" on OST idx %d: rc = %d\n",
2024                        lsm->lsm_object_id, loi->loi_id, loi->loi_ost_idx, rc);
2025         GOTO(out, rc);
2026 out:
2027         RETURN(rc);
2028 }
2029
2030 static int lov_cancel_unused(struct obd_export *exp,
2031                              struct lov_stripe_md *lsm, int flags, void *opaque)
2032 {
2033         struct lov_obd *lov;
2034         struct lov_oinfo *loi;
2035         int rc = 0, i;
2036         ENTRY;
2037
2038         if (lsm_bad_magic(lsm))
2039                 RETURN(-EINVAL);
2040
2041         if (!exp || !exp->exp_obd)
2042                 RETURN(-ENODEV);
2043
2044         lov = &exp->exp_obd->u.lov;
2045         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
2046                 struct lov_stripe_md submd;
2047                 int err;
2048
2049                 if (lov->tgts[loi->loi_ost_idx].active == 0)
2050                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
2051
2052                 submd.lsm_object_id = loi->loi_id;
2053                 submd.lsm_stripe_count = 0;
2054                 err = obd_cancel_unused(lov->tgts[loi->loi_ost_idx].ltd_exp,
2055                                         &submd, flags, opaque);
2056                 if (err && lov->tgts[loi->loi_ost_idx].active) {
2057                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
2058                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
2059                                loi->loi_id, loi->loi_ost_idx, err);
2060                         if (!rc)
2061                                 rc = err;
2062                 }
2063         }
2064         RETURN(rc);
2065 }
2066
2067 #define LOV_U64_MAX ((__u64)~0ULL)
2068 #define LOV_SUM_MAX(tot, add)                                           \
2069         do {                                                            \
2070                 if ((tot) + (add) < (tot))                              \
2071                         (tot) = LOV_U64_MAX;                            \
2072                 else                                                    \
2073                         (tot) += (add);                                 \
2074         } while(0)
2075
2076 static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2077                       unsigned long max_age)
2078 {
2079         struct lov_obd *lov = &obd->u.lov;
2080         struct obd_statfs lov_sfs;
2081         int set = 0;
2082         int rc = 0;
2083         int i;
2084         ENTRY;
2085
2086
2087         /* We only get block data from the OBD */
2088         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2089                 int err;
2090
2091                 if (!lov->tgts[i].active) {
2092                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
2093                         continue;
2094                 }
2095
2096                 err = obd_statfs(class_exp2obd(lov->tgts[i].ltd_exp), &lov_sfs,
2097                                  max_age);
2098                 if (err) {
2099                         if (lov->tgts[i].active && !rc)
2100                                 rc = err;
2101                         continue;
2102                 }
2103
2104                 if (!set) {
2105                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
2106                         set = 1;
2107                 } else {
2108                         osfs->os_bfree += lov_sfs.os_bfree;
2109                         osfs->os_bavail += lov_sfs.os_bavail;
2110                         osfs->os_blocks += lov_sfs.os_blocks;
2111                         /* XXX not sure about this one - depends on policy.
2112                          *   - could be minimum if we always stripe on all OBDs
2113                          *     (but that would be wrong for any other policy,
2114                          *     if one of the OBDs has no more objects left)
2115                          *   - could be sum if we stripe whole objects
2116                          *   - could be average, just to give a nice number
2117                          *
2118                          * To give a "reasonable" (if not wholly accurate)
2119                          * number, we divide the total number of free objects
2120                          * by expected stripe count (watch out for overflow).
2121                          */
2122                         LOV_SUM_MAX(osfs->os_files, lov_sfs.os_files);
2123                         LOV_SUM_MAX(osfs->os_ffree, lov_sfs.os_ffree);
2124                 }
2125         }
2126
2127         if (set) {
2128                 __u32 expected_stripes = lov->desc.ld_default_stripe_count ?
2129                                          lov->desc.ld_default_stripe_count :
2130                                          lov->desc.ld_active_tgt_count;
2131
2132                 if (osfs->os_files != LOV_U64_MAX)
2133                         do_div(osfs->os_files, expected_stripes);
2134                 if (osfs->os_ffree != LOV_U64_MAX)
2135                         do_div(osfs->os_ffree, expected_stripes);
2136         } else if (!rc)
2137                 rc = -EIO;
2138
2139         RETURN(rc);
2140 }
2141
2142 static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2143                          void *karg, void *uarg)
2144 {
2145         struct obd_device *obddev = class_exp2obd(exp);
2146         struct lov_obd *lov = &obddev->u.lov;
2147         int i, count = lov->desc.ld_tgt_count;
2148         struct obd_uuid *uuidp;
2149         int rc;
2150
2151         ENTRY;
2152
2153         switch (cmd) {
2154         case OBD_IOC_LOV_GET_CONFIG: {
2155                 struct obd_ioctl_data *data = karg;
2156                 struct lov_tgt_desc *tgtdesc;
2157                 struct lov_desc *desc;
2158                 char *buf = NULL;
2159
2160                 buf = NULL;
2161                 len = 0;
2162                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2163                         RETURN(-EINVAL);
2164
2165                 data = (struct obd_ioctl_data *)buf;
2166
2167                 if (sizeof(*desc) > data->ioc_inllen1) {
2168                         OBD_FREE(buf, len);
2169                         RETURN(-EINVAL);
2170                 }
2171
2172                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
2173                         OBD_FREE(buf, len);
2174                         RETURN(-EINVAL);
2175                 }
2176
2177                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2178                 memcpy(desc, &(lov->desc), sizeof(*desc));
2179
2180                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
2181                 tgtdesc = lov->tgts;
2182                 for (i = 0; i < count; i++, uuidp++, tgtdesc++)
2183                         obd_str2uuid(uuidp, tgtdesc->uuid.uuid);
2184
2185                 rc = copy_to_user((void *)uarg, buf, len);
2186                 if (rc)
2187                         rc = -EFAULT;
2188                 obd_ioctl_freedata(buf, len);
2189                 break;
2190         }
2191         case LL_IOC_LOV_SETSTRIPE:
2192                 rc = lov_setstripe(exp, karg, uarg);
2193                 break;
2194         case LL_IOC_LOV_GETSTRIPE:
2195                 rc = lov_getstripe(exp, karg, uarg);
2196                 break;
2197         case LL_IOC_LOV_SETEA:
2198                 rc = lov_setea(exp, karg, uarg);
2199                 break;
2200         default: {
2201                 int set = 0;
2202                 if (count == 0)
2203                         RETURN(-ENOTTY);
2204                 rc = 0;
2205                 for (i = 0; i < count; i++) {
2206                         int err;
2207
2208                         err = obd_iocontrol(cmd, lov->tgts[i].ltd_exp,
2209                                             len, karg, uarg);
2210                         if (err) {
2211                                 if (lov->tgts[i].active) {
2212                                         CERROR("error: iocontrol OSC %s on OST"
2213                                                "idx %d: err = %d\n",
2214                                                lov->tgts[i].uuid.uuid, i, err);
2215                                         if (!rc)
2216                                                 rc = err;
2217                                 }
2218                         } else
2219                                 set = 1;
2220                 }
2221                 if (!set && !rc)
2222                         rc = -EIO;
2223         }
2224         }
2225
2226         RETURN(rc);
2227 }
2228
2229 static int lov_get_info(struct obd_export *exp, __u32 keylen,
2230                         void *key, __u32 *vallen, void *val)
2231 {       
2232         struct obd_device *obddev = class_exp2obd(exp);
2233         struct lov_obd *lov = &obddev->u.lov;
2234         int i;
2235         ENTRY;
2236
2237         if (!vallen || !val)
2238                 RETURN(-EFAULT);
2239
2240         if (keylen > strlen("lock_to_stripe") &&
2241             strcmp(key, "lock_to_stripe") == 0) {
2242                 struct {
2243                         char name[16];
2244                         struct ldlm_lock *lock;
2245                         struct lov_stripe_md *lsm;
2246                 } *data = key;
2247                 struct lov_oinfo *loi;
2248                 __u32 *stripe = val;
2249
2250                 if (*vallen < sizeof(*stripe))
2251                         RETURN(-EFAULT);
2252                 *vallen = sizeof(*stripe);
2253
2254                 /* XXX This is another one of those bits that will need to
2255                  * change if we ever actually support nested LOVs.  It uses
2256                  * the lock's export to find out which stripe it is. */
2257                 for (i = 0, loi = data->lsm->lsm_oinfo;
2258                      i < data->lsm->lsm_stripe_count;
2259                      i++, loi++) {
2260                         if (lov->tgts[loi->loi_ost_idx].ltd_exp ==
2261                             data->lock->l_conn_export) {
2262                                 *stripe = i;
2263                                 RETURN(0);
2264                         }
2265                 }
2266                 RETURN(-ENXIO);
2267         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
2268                 obd_id *ids = val;
2269                 int rc, size = sizeof(obd_id);
2270                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2271                         if (!lov->tgts[i].active)
2272                                 continue;
2273                         rc = obd_get_info(lov->tgts[i].ltd_exp, keylen, key,
2274                                           &size, &(ids[i]));
2275                         if (rc != 0)
2276                                 RETURN(rc);
2277                 }
2278                 RETURN(0);
2279         } else if (keylen >= strlen("lovdesc") && strcmp(key, "lovdesc") == 0) {
2280                 struct lov_desc *desc_ret = val;
2281                 *desc_ret = lov->desc;
2282                 
2283                 RETURN(0);
2284         }
2285
2286         RETURN(-EINVAL);
2287 }
2288
2289 static int lov_set_info(struct obd_export *exp, obd_count keylen,
2290                         void *key, obd_count vallen, void *val)
2291 {
2292         struct obd_device *obddev = class_exp2obd(exp);
2293         struct lov_obd *lov = &obddev->u.lov;
2294         int i, rc = 0;
2295         ENTRY;
2296
2297 #define KEY_IS(str) \
2298         (keylen == strlen(str) && memcmp(key, str, keylen) == 0)
2299
2300         if (KEY_IS("next_id")) {
2301                 if (vallen != lov->desc.ld_tgt_count)
2302                         RETURN(-EINVAL);
2303                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2304                         int er;
2305
2306                         if (!lov->tgts[i].active)
2307                                 continue;
2308
2309                         er = obd_set_info(lov->tgts[i].ltd_exp, keylen, key,
2310                                           sizeof(obd_id), ((obd_id*)val) + i);
2311                         if (!rc)
2312                                 rc = er;
2313                 }
2314                 RETURN(rc);
2315         }
2316
2317         if (KEY_IS("growth_count")) {
2318                 if (vallen != sizeof(int))
2319                         RETURN(-EINVAL);
2320         } else if (KEY_IS("mds_conn") || KEY_IS("unlinked")) {
2321                 if (vallen != 0)
2322                         RETURN(-EINVAL);
2323         } else {
2324                 RETURN(-EINVAL);
2325         }
2326
2327         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2328                 int er;
2329
2330                 if (!lov->tgts[i].active)
2331                         continue;
2332
2333                 er = obd_set_info(lov->tgts[i].ltd_exp, keylen, key, vallen,
2334                                    val);
2335                 if (!rc)
2336                         rc = er;
2337         }
2338         RETURN(rc);
2339 #undef KEY_IS
2340
2341 }
2342
2343
2344 static int lov_lock_contains(struct obd_export *exp, struct lov_stripe_md *lsm,
2345                              struct ldlm_lock *lock, obd_off offset)
2346 {
2347         struct lov_obd *lov;
2348         struct lov_oinfo *loi;
2349         struct lov_stripe_md submd;
2350         int rc;
2351         ENTRY;
2352
2353         LASSERT(lsm != NULL);
2354         if (exp == NULL)
2355                 RETURN(-ENODEV);
2356
2357         lov = &exp->exp_obd->u.lov;
2358         loi = lsm->lsm_oinfo;
2359
2360         if (lov->tgts[loi->loi_ost_idx].active == 0) {
2361                  CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
2362                  RETURN(-EIO);
2363         }
2364         
2365         /* XXX submd is not fully initialized here */
2366         /* XXX LOV STACKING: submd should be from the subobj */
2367         submd.lsm_object_id = loi->loi_id;
2368         submd.lsm_stripe_count = 0;
2369
2370         rc = obd_lock_contains(lov->tgts[loi->loi_ost_idx].ltd_exp, &submd,
2371                                lock, offset);
2372         RETURN(rc);
2373 }
2374
2375 struct obd_ops lov_obd_ops = {
2376         o_owner:       THIS_MODULE,
2377         o_attach:      lov_attach,
2378         o_detach:      lov_detach,
2379         o_setup:       lov_setup,
2380         o_cleanup:     lov_cleanup,
2381         o_connect:     lov_connect,
2382         o_disconnect:  lov_disconnect,
2383         o_statfs:      lov_statfs,
2384         o_packmd:      lov_packmd,
2385         o_unpackmd:    lov_unpackmd,
2386         o_create:      lov_create,
2387         o_destroy:     lov_destroy,
2388         o_getattr:     lov_getattr,
2389         o_getattr_async: lov_getattr_async,
2390         o_setattr:     lov_setattr,
2391         o_brw:         lov_brw,
2392         o_brw_async:   lov_brw_async,
2393         .o_prep_async_page =    lov_prep_async_page,
2394         .o_queue_async_io =     lov_queue_async_io,
2395         .o_set_async_flags =    lov_set_async_flags,
2396         .o_queue_sync_io =      lov_queue_sync_io,
2397         .o_trigger_sync_io =    lov_trigger_sync_io,
2398         .o_teardown_async_page  lov_teardown_async_page,
2399         o_punch:       lov_punch,
2400         o_sync:        lov_sync,
2401         o_enqueue:     lov_enqueue,
2402         o_match:       lov_match,
2403         o_change_cbdata: lov_change_cbdata,
2404         o_cancel:      lov_cancel,
2405         o_cancel_unused: lov_cancel_unused,
2406         o_iocontrol:   lov_iocontrol,
2407         o_get_info:    lov_get_info,
2408         o_set_info:    lov_set_info,
2409         o_llog_init:   lov_llog_init,
2410         o_llog_finish: lov_llog_finish,
2411         o_lock_contains:lov_lock_contains,
2412         o_notify: lov_notify,
2413 };
2414
2415 int __init lov_init(void)
2416 {
2417         struct lprocfs_static_vars lvars;
2418         int rc;
2419
2420         lprocfs_init_vars(lov, &lvars);
2421         rc = class_register_type(&lov_obd_ops, lvars.module_vars,
2422                                  OBD_LOV_DEVICENAME);
2423         RETURN(rc);
2424 }
2425
2426 #ifdef __KERNEL__
2427 static void /*__exit*/ lov_exit(void)
2428 {
2429         class_unregister_type(OBD_LOV_DEVICENAME);
2430 }
2431
2432 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2433 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
2434 MODULE_LICENSE("GPL");
2435
2436 module_init(lov_init);
2437 module_exit(lov_exit);
2438 #endif