Whamcloud - gitweb
land b1_4_smallfix on b1_4(20050202_1817)
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002-2004 Cluster File Systems, Inc.
5  * Author: Phil Schwan <phil@clusterfs.com>
6  *         Peter Braam <braam@clusterfs.com>
7  *         Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29 #ifdef __KERNEL__
30 #include <linux/slab.h>
31 #include <linux/module.h>
32 #include <linux/init.h>
33 #include <linux/slab.h>
34 #include <linux/pagemap.h>
35 #include <linux/seq_file.h>
36 #include <asm/div64.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/obd_support.h>
42 #include <linux/lustre_lib.h>
43 #include <linux/lustre_net.h>
44 #include <linux/lustre_idl.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/lustre_mds.h>
47 #include <linux/lustre_debug.h>
48 #include <linux/obd_class.h>
49 #include <linux/obd_lov.h>
50 #include <linux/obd_ost.h>
51 #include <linux/lprocfs_status.h>
52
53 #include "lov_internal.h"
54
55 /* obd methods */
56 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
57                        struct obd_uuid *cluuid)
58 {
59         struct ptlrpc_request *req = NULL;
60         struct lov_obd *lov = &obd->u.lov;
61         struct lov_desc *desc = &lov->desc;
62         struct lov_tgt_desc *tgts;
63         struct obd_export *exp;
64         int rc, rc2, i;
65         ENTRY;
66
67         rc = class_connect(conn, obd, cluuid);
68         if (rc)
69                 RETURN(rc);
70
71         exp = class_conn2export(conn);
72
73         /* We don't want to actually do the underlying connections more than
74          * once, so keep track. */
75         lov->refcount++;
76         if (lov->refcount > 1) {
77                 class_export_put(exp);
78                 RETURN(0);
79         }
80
81         for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) {
82                 struct obd_uuid *tgt_uuid = &tgts->uuid;
83                 struct obd_device *tgt_obd;
84                 struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
85                 struct lustre_handle conn = {0, };
86
87                 LASSERT( tgt_uuid != NULL);
88
89                 tgt_obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
90                                                 &obd->obd_uuid);
91
92                 if (!tgt_obd) {
93                         CERROR("Target %s not attached\n", tgt_uuid->uuid);
94                         GOTO(out_disc, rc = -EINVAL);
95                 }
96
97                 if (!tgt_obd->obd_set_up) {
98                         CERROR("Target %s not set up\n", tgt_uuid->uuid);
99                         GOTO(out_disc, rc = -EINVAL);
100                 }
101
102                 if (tgt_obd->u.cli.cl_import->imp_invalid) {
103                         CERROR("not connecting OSC %s; administratively "
104                                "disabled\n", tgt_uuid->uuid);
105                         rc = obd_register_observer(tgt_obd, obd);
106                         if (rc) {
107                                 CERROR("Target %s register_observer error %d; "
108                                        "will not be able to reactivate\n",
109                                        tgt_uuid->uuid, rc);
110                         }
111                         continue;
112                 }
113
114                 rc = obd_connect(&conn, tgt_obd, &lov_osc_uuid);
115                 if (rc) {
116                         CERROR("Target %s connect error %d\n", tgt_uuid->uuid,
117                                rc);
118                         GOTO(out_disc, rc);
119                 }
120                 tgts->ltd_exp = class_conn2export(&conn);
121
122                 rc = obd_register_observer(tgt_obd, obd);
123                 if (rc) {
124                         CERROR("Target %s register_observer error %d\n",
125                                tgt_uuid->uuid, rc);
126                         obd_disconnect(tgts->ltd_exp, 0);
127                         GOTO(out_disc, rc);
128                 }
129
130                 desc->ld_active_tgt_count++;
131                 tgts->active = 1;
132         }
133
134         ptlrpc_req_finished(req);
135         class_export_put(exp);
136         RETURN (0);
137
138  out_disc:
139         while (i-- > 0) {
140                 struct obd_uuid uuid;
141                 --tgts;
142                 --desc->ld_active_tgt_count;
143                 tgts->active = 0;
144                 /* save for CERROR below; (we know it's terminated) */
145                 uuid = tgts->uuid;
146                 rc2 = obd_disconnect(tgts->ltd_exp, 0);
147                 if (rc2)
148                         CERROR("error: LOV target %s disconnect on OST idx %d: "
149                                "rc = %d\n", uuid.uuid, i, rc2);
150         }
151         class_disconnect(exp, 0);
152         RETURN (rc);
153 }
154
155 static int lov_disconnect(struct obd_export *exp, int flags)
156 {
157         struct obd_device *obd = class_exp2obd(exp);
158         struct lov_obd *lov = &obd->u.lov;
159         struct obd_export *osc_exp;
160         int rc, i;
161         ENTRY;
162
163         if (!lov->tgts)
164                 goto out_local;
165
166         /* Only disconnect the underlying layers on the final disconnect. */
167         lov->refcount--;
168         if (lov->refcount != 0)
169                 goto out_local;
170
171         spin_lock(&lov->lov_lock);
172         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
173                 if (lov->tgts[i].ltd_exp == NULL)
174                         continue;
175
176                 osc_exp = lov->tgts[i].ltd_exp;
177                 lov->tgts[i].ltd_exp = NULL;
178
179                 if (obd->obd_no_recov) {
180                         /* Pass it on to our clients.
181                          * XXX This should be an argument to disconnect,
182                          * XXX not a back-door flag on the OBD.  Ah well.
183                          */
184                         struct obd_device *osc_obd;
185                         osc_obd = class_exp2obd(osc_exp);
186                         if (osc_obd)
187                                 osc_obd->obd_no_recov = 1;
188                 }
189
190                 obd_register_observer(osc_exp->exp_obd, NULL);
191
192                 spin_unlock(&lov->lov_lock);
193                 rc = obd_disconnect(osc_exp, flags);
194                 spin_lock(&lov->lov_lock);
195                 if (rc) {
196                         if (lov->tgts[i].active) {
197                                 CERROR("Target %s disconnect error %d\n",
198                                        lov->tgts[i].uuid.uuid, rc);
199                         }
200                         rc = 0;
201                 }
202                 if (lov->tgts[i].active) {
203                         lov->desc.ld_active_tgt_count--;
204                         lov->tgts[i].active = 0;
205                 }
206         }
207         spin_unlock(&lov->lov_lock);
208
209  out_local:
210         rc = class_disconnect(exp, 0);
211         RETURN(rc);
212 }
213
214 /* Error codes:
215  *
216  *  -EINVAL  : UUID can't be found in the LOV's target list
217  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
218  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
219  */
220 static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
221                               int activate)
222 {
223         struct lov_tgt_desc *tgt;
224         int i, rc = 0;
225         ENTRY;
226
227         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
228                lov, uuid->uuid, activate);
229
230         spin_lock(&lov->lov_lock);
231         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
232                 if (tgt->ltd_exp == NULL)
233                         continue;
234
235                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
236                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
237                 if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof uuid->uuid) == 0)
238                         break;
239         }
240
241         if (i == lov->desc.ld_tgt_count)
242                 GOTO(out, rc = -EINVAL);
243
244         if (tgt->active == activate) {
245                 CDEBUG(D_INFO, "OSC %s already %sactive!\n", uuid->uuid,
246                        activate ? "" : "in");
247                 GOTO(out, rc);
248         }
249
250         CDEBUG(D_INFO, "Marking OSC %s %sactive\n", uuid->uuid, 
251                activate ? "" : "in");
252
253         tgt->active = activate;
254         if (activate)
255                 lov->desc.ld_active_tgt_count++;
256         else
257                 lov->desc.ld_active_tgt_count--;
258
259         EXIT;
260  out:
261         spin_unlock(&lov->lov_lock);
262         return rc;
263 }
264
265 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
266                        int active)
267 {
268         int rc;
269         struct obd_uuid *uuid;
270
271         if (strcmp(watched->obd_type->typ_name, "osc")) {
272                 CERROR("unexpected notification of %s %s!\n",
273                        watched->obd_type->typ_name,
274                        watched->obd_name);
275                 return -EINVAL;
276         }
277         uuid = &watched->u.cli.cl_import->imp_target_uuid;
278
279         /* Set OSC as active before notifying the observer, so the
280          * observer can use the OSC normally.  
281          */
282         rc = lov_set_osc_active(&obd->u.lov, uuid, active);
283         if (rc) {
284                 CERROR("%sactivation of %s failed: %d\n",
285                        active ? "" : "de", uuid->uuid, rc);
286                 RETURN(rc);
287         }
288
289         if (obd->obd_observer)
290                 /* Pass the notification up the chain. */
291                 rc = obd_notify(obd->obd_observer, watched, active);
292
293         RETURN(rc);
294 }
295
296 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
297 {
298         struct lprocfs_static_vars lvars;
299         struct lustre_cfg *lcfg = buf;
300         struct lov_desc *desc;
301         struct lov_obd *lov = &obd->u.lov;
302         struct obd_uuid *uuids;
303         struct lov_tgt_desc *tgts;
304         int i;
305         int count;
306         ENTRY;
307
308         if (lcfg->lcfg_inllen1 < 1) {
309                 CERROR("LOV setup requires a descriptor\n");
310                 RETURN(-EINVAL);
311         }
312
313         if (lcfg->lcfg_inllen2 < 1) {
314                 CERROR("LOV setup requires an OST UUID list\n");
315                 RETURN(-EINVAL);
316         }
317
318         desc = (struct lov_desc *)lcfg->lcfg_inlbuf1;
319         if (sizeof(*desc) > lcfg->lcfg_inllen1) {
320                 CERROR("descriptor size wrong: %d > %d\n",
321                        (int)sizeof(*desc), lcfg->lcfg_inllen1);
322                 RETURN(-EINVAL);
323         }
324
325         count = desc->ld_tgt_count;
326         uuids = (struct obd_uuid *)lcfg->lcfg_inlbuf2;
327         if (sizeof(*uuids) * count != lcfg->lcfg_inllen2) {
328                 CERROR("UUID array size wrong: %u * %u != %u\n",
329                        (int)sizeof(*uuids), count, lcfg->lcfg_inllen2);
330                 RETURN(-EINVAL);
331         }
332
333         if (desc->ld_default_stripe_size < PTLRPC_MAX_BRW_SIZE) {
334                 CWARN("Increasing default_stripe_size "LPU64" to %u\n",
335                       desc->ld_default_stripe_size, PTLRPC_MAX_BRW_SIZE);
336                 CWARN("Please update config and run --write-conf on MDS\n");
337
338                 desc->ld_default_stripe_size = PTLRPC_MAX_BRW_SIZE;
339         }
340
341         /* Because of 64-bit divide/mod operations only work with a 32-bit
342          * divisor in a 32-bit kernel, we cannot support a stripe width
343          * of 4GB or larger on 32-bit CPUs.
344          */
345         if ((desc->ld_default_stripe_count ?
346              desc->ld_default_stripe_count : desc->ld_tgt_count) *
347              desc->ld_default_stripe_size > ~0UL) {
348                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
349                        desc->ld_default_stripe_size,
350                        desc->ld_default_stripe_count ?
351                        desc->ld_default_stripe_count : desc->ld_tgt_count,~0UL);
352                 RETURN(-EINVAL);
353         }
354
355         lov->bufsize = sizeof(struct lov_tgt_desc) * count;
356         OBD_ALLOC(lov->tgts, lov->bufsize);
357         if (lov->tgts == NULL) {
358                 CERROR("Out of memory\n");
359                 RETURN(-EINVAL);
360         }
361
362         lov->desc = *desc;
363         spin_lock_init(&lov->lov_lock);
364
365         for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) {
366                 struct obd_uuid *uuid = &tgts->uuid;
367
368                 /* NULL termination already checked */
369                 *uuid = uuids[i];
370         }
371
372         lprocfs_init_vars(lov, &lvars);
373         lprocfs_obd_setup(obd, lvars.obd_vars);
374 #ifdef __KERNEL__
375         {
376                 struct proc_dir_entry *entry;
377
378                 entry = create_proc_entry("target_obd", 0444,
379                                           obd->obd_proc_entry);
380                 if (entry != NULL) {
381                         entry->proc_fops = &lov_proc_target_fops;
382                         entry->data = obd;
383                 }
384         }
385 #endif
386
387         RETURN(0);
388 }
389
390 static int lov_cleanup(struct obd_device *obd, int flags)
391 {
392         struct lov_obd *lov = &obd->u.lov;
393
394         lprocfs_obd_cleanup(obd);
395         OBD_FREE(lov->tgts, lov->bufsize);
396
397         RETURN(0);
398 }
399
400 #ifndef log2
401 #define log2(n) ffz(~(n))
402 #endif
403
404 static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
405                              struct lov_stripe_md **ea,
406                              struct obd_trans_info *oti)
407 {
408         struct lov_obd *lov;
409         struct obdo *tmp_oa;
410         struct obd_uuid *ost_uuid = NULL;
411         int rc = 0, i;
412         ENTRY;
413
414         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
415                 src_oa->o_flags == OBD_FL_DELORPHAN);
416
417         lov = &export->exp_obd->u.lov;
418
419         tmp_oa = obdo_alloc();
420         if (tmp_oa == NULL)
421                 RETURN(-ENOMEM);
422
423         if (src_oa->o_valid & OBD_MD_FLINLINE) {
424                 ost_uuid = (struct obd_uuid *)src_oa->o_inline;
425                 CDEBUG(D_HA, "clearing orphans only for %s\n",
426                        ost_uuid->uuid);
427         }
428
429         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
430                 struct lov_stripe_md obj_md;
431                 struct lov_stripe_md *obj_mdp = &obj_md;
432                 int err;
433
434                 /* if called for a specific target, we don't
435                    care if it is not active. */
436                 if (lov->tgts[i].active == 0 && ost_uuid == NULL) {
437                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
438                         continue;
439                 }
440
441                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &lov->tgts[i].uuid))
442                         continue;
443
444                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
445
446                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
447                 err = obd_create(lov->tgts[i].ltd_exp, tmp_oa, &obj_mdp, oti);
448                 if (err)
449                         /* This export will be disabled until it is recovered,
450                            and then orphan recovery will be completed. */
451                         CERROR("error in orphan recovery on OST idx %d/%d: "
452                                "rc = %d\n", i, lov->desc.ld_tgt_count, err);
453
454                 if (ost_uuid)
455                         break;
456         }
457         obdo_free(tmp_oa);
458         RETURN(rc);
459 }
460
461 static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
462                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
463 {
464         struct lov_stripe_md *obj_mdp, *lsm;
465         struct lov_obd *lov = &exp->exp_obd->u.lov;
466         unsigned ost_idx;
467         int rc, i;
468         ENTRY;
469
470         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
471                 src_oa->o_flags & OBD_FL_RECREATE_OBJS);
472
473         OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
474         if (obj_mdp == NULL)
475                 RETURN(-ENOMEM);
476
477         ost_idx = src_oa->o_nlink;
478         lsm = *ea;
479         if (lsm == NULL)
480                 GOTO(out, rc = -EINVAL);
481         if (ost_idx >= lov->desc.ld_tgt_count)
482                 GOTO(out, rc = -EINVAL);
483
484         for (i = 0; i < lsm->lsm_stripe_count; i++) {
485                 if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) {
486                         if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id)
487                                 GOTO(out, rc = -EINVAL);
488                         break;
489                 }
490         }
491         if (i == lsm->lsm_stripe_count)
492                 GOTO(out, rc = -EINVAL);
493
494         rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, &obj_mdp, oti);
495 out:
496         OBD_FREE(obj_mdp, sizeof(*obj_mdp));
497         RETURN(rc);
498 }
499
500 /* the LOV expects oa->o_id to be set to the LOV object id */
501 static int lov_create(struct obd_export *exp, struct obdo *src_oa,
502                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
503 {
504         struct lov_obd *lov;
505         struct lov_request_set *set = NULL;
506         struct list_head *pos;
507         int rc = 0;
508         ENTRY;
509
510         LASSERT(ea != NULL);
511         if (exp == NULL)
512                 RETURN(-EINVAL);
513
514         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
515             src_oa->o_flags == OBD_FL_DELORPHAN) {
516                 rc = lov_clear_orphans(exp, src_oa, ea, oti);
517                 RETURN(rc);
518         }
519
520         lov = &exp->exp_obd->u.lov;
521         if (!lov->desc.ld_active_tgt_count)
522                 RETURN(-EIO);
523
524         /* Recreate a specific object id at the given OST index */
525         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
526             (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
527                  rc = lov_recreate(exp, src_oa, ea, oti);
528                  RETURN(rc);
529         }
530
531         rc = lov_prep_create_set(exp, ea, src_oa, oti, &set);
532         if (rc)
533                 RETURN(rc);
534
535         list_for_each (pos, &set->set_list) {
536                 struct lov_request *req = 
537                         list_entry(pos, struct lov_request, rq_link);
538
539                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
540                 rc = obd_create(lov->tgts[req->rq_idx].ltd_exp, 
541                                 req->rq_oa, &req->rq_md, oti);
542                 lov_update_create_set(set, req, rc);
543         }
544         rc = lov_fini_create_set(set, ea);
545         RETURN(rc);
546 }
547
548 #define ASSERT_LSM_MAGIC(lsmp)                                  \
549 do {                                                            \
550         LASSERT((lsmp) != NULL);                                \
551         LASSERTF((lsmp)->lsm_magic == LOV_MAGIC, "%p, %x",      \
552                  (lsmp), (lsmp)->lsm_magic);                    \
553 } while (0)
554
555 static int lov_destroy(struct obd_export *exp, struct obdo *oa,
556                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
557 {
558         struct lov_request_set *set;
559         struct lov_request *req;
560         struct list_head *pos;
561         struct lov_obd *lov;
562         int rc = 0;
563         ENTRY;
564
565         ASSERT_LSM_MAGIC(lsm);
566
567         if (!exp || !exp->exp_obd)
568                 RETURN(-ENODEV);
569
570         lov = &exp->exp_obd->u.lov;
571         rc = lov_prep_destroy_set(exp, oa, lsm, oti, &set);
572         if (rc)
573                 RETURN(rc);
574
575         list_for_each (pos, &set->set_list) {
576                 int err;
577                 req = list_entry(pos, struct lov_request, rq_link);
578
579                 /* XXX update the cookie position */
580                 oti->oti_logcookies = set->set_cookies + req->rq_stripe;
581                 rc = obd_destroy(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
582                                  NULL, oti);
583                 err = lov_update_common_set(set, req, rc);
584                 if (rc) {
585                         CERROR("error: destroying objid "LPX64" subobj "
586                                LPX64" on OST idx %d: rc = %d\n", 
587                                set->set_oa->o_id, req->rq_oa->o_id, 
588                                req->rq_idx, rc);
589                         if (!rc)
590                                 rc = err;
591                 }
592         }
593         lov_fini_destroy_set(set);
594         RETURN(rc);
595 }
596
597 static int lov_getattr(struct obd_export *exp, struct obdo *oa,
598                        struct lov_stripe_md *lsm)
599 {
600         struct lov_request_set *set;
601         struct lov_request *req;
602         struct list_head *pos;
603         struct lov_obd *lov;
604         int err = 0, rc = 0;
605         ENTRY;
606
607         ASSERT_LSM_MAGIC(lsm);
608
609         if (!exp || !exp->exp_obd)
610                 RETURN(-ENODEV);
611
612         lov = &exp->exp_obd->u.lov;
613         
614         rc = lov_prep_getattr_set(exp, oa, lsm, &set);
615         if (rc)
616                 RETURN(rc);
617
618         list_for_each (pos, &set->set_list) {
619                 req = list_entry(pos, struct lov_request, rq_link);
620                 
621                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
622                        "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id, 
623                        req->rq_idx);
624
625                 rc = obd_getattr(lov->tgts[req->rq_idx].ltd_exp, 
626                                  req->rq_oa, NULL);
627                 err = lov_update_common_set(set, req, rc);
628                 if (err) {
629                         CERROR("error: getattr objid "LPX64" subobj "
630                                LPX64" on OST idx %d: rc = %d\n",
631                                set->set_oa->o_id, req->rq_oa->o_id, 
632                                req->rq_idx, err);
633                         break;
634                 }
635         }
636         
637         rc = lov_fini_getattr_set(set);
638         if (err)
639                 rc = err;
640         RETURN(rc);
641 }
642
643 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data,
644                                  int rc)
645 {
646         struct lov_request_set *lovset = (struct lov_request_set *)data;
647         ENTRY;
648
649         /* don't do attribute merge if this aysnc op failed */
650         if (rc) {
651                 lovset->set_completes = 0;
652                 lov_fini_getattr_set(lovset);
653         } else {
654                 rc = lov_fini_getattr_set(lovset);
655         }
656         RETURN (rc);
657 }
658
659 static int lov_getattr_async(struct obd_export *exp, struct obdo *oa,
660                               struct lov_stripe_md *lsm,
661                               struct ptlrpc_request_set *rqset)
662 {
663         struct lov_request_set *lovset;
664         struct lov_obd *lov;
665         struct list_head *pos;
666         struct lov_request *req;
667         int rc = 0;
668         ENTRY;
669
670         ASSERT_LSM_MAGIC(lsm);
671
672         if (!exp || !exp->exp_obd)
673                 RETURN(-ENODEV);
674
675         lov = &exp->exp_obd->u.lov;
676
677         rc = lov_prep_getattr_set(exp, oa, lsm, &lovset);
678         if (rc)
679                 RETURN(rc);
680
681         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
682                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
683
684         list_for_each (pos, &lovset->set_list) {
685                 req = list_entry(pos, struct lov_request, rq_link);
686                 
687                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
688                        "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id, 
689                        req->rq_idx);
690                 rc = obd_getattr_async(lov->tgts[req->rq_idx].ltd_exp,
691                                        req->rq_oa, NULL, rqset);
692                 if (rc) {
693                         CERROR("error: getattr objid "LPX64" subobj "
694                                LPX64" on OST idx %d: rc = %d\n",
695                                lovset->set_oa->o_id, req->rq_oa->o_id, 
696                                req->rq_idx, rc);
697                         GOTO(out, rc);
698                 }
699                 lov_update_common_set(lovset, req, rc);
700         }
701         
702         LASSERT(rc == 0);
703         LASSERT (rqset->set_interpret == NULL);
704         rqset->set_interpret = lov_getattr_interpret;
705         rqset->set_arg = (void *)lovset;
706         RETURN(rc);
707 out:
708         LASSERT(rc);
709         lov_fini_getattr_set(lovset);
710         RETURN(rc);
711 }
712
713 static int lov_setattr(struct obd_export *exp, struct obdo *src_oa,
714                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
715 {
716         struct lov_request_set *set;
717         struct lov_obd *lov;
718         struct list_head *pos;
719         struct lov_request *req;
720         int err = 0, rc = 0;
721         ENTRY;
722
723         ASSERT_LSM_MAGIC(lsm);
724
725         if (!exp || !exp->exp_obd)
726                 RETURN(-ENODEV);
727
728         /* for now, we only expect time updates here */
729         LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE | OBD_MD_FLMODE|
730                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
731                                       OBD_MD_FLCTIME | OBD_MD_FLFLAGS |
732                                       OBD_MD_FLSIZE)));
733         lov = &exp->exp_obd->u.lov;
734         rc = lov_prep_setattr_set(exp, src_oa, lsm, NULL, &set);
735         if (rc)
736                 RETURN(rc);
737
738         list_for_each (pos, &set->set_list) {
739                 req = list_entry(pos, struct lov_request, rq_link);
740                 
741                 rc = obd_setattr(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
742                                  NULL, NULL);
743                 err = lov_update_common_set(set, req, rc);
744                 if (err) {
745                         CERROR("error: setattr objid "LPX64" subobj "
746                                LPX64" on OST idx %d: rc = %d\n",
747                                set->set_oa->o_id, req->rq_oa->o_id,
748                                req->rq_idx, err);
749                         if (!rc)
750                                 rc = err;
751                 }
752         }
753         err = lov_fini_setattr_set(set);
754         if (!rc)
755                 rc = err;
756         RETURN(rc);
757 }
758
759 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
760  * we can send this 'punch' to just the authoritative node and the nodes
761  * that the punch will affect. */
762 static int lov_punch(struct obd_export *exp, struct obdo *oa,
763                      struct lov_stripe_md *lsm,
764                      obd_off start, obd_off end, struct obd_trans_info *oti)
765 {
766         struct lov_request_set *set;
767         struct lov_obd *lov;
768         struct list_head *pos;
769         struct lov_request *req;
770         int err = 0, rc = 0;
771         ENTRY;
772
773         ASSERT_LSM_MAGIC(lsm);
774
775         if (!exp || !exp->exp_obd)
776                 RETURN(-ENODEV);
777
778         lov = &exp->exp_obd->u.lov;
779         rc = lov_prep_punch_set(exp, oa, lsm, start, end, oti, &set);
780         if (rc)
781                 RETURN(rc);
782
783         list_for_each (pos, &set->set_list) {
784                 req = list_entry(pos, struct lov_request, rq_link);
785
786                 rc = obd_punch(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, 
787                                NULL, req->rq_extent.start, 
788                                req->rq_extent.end, NULL);
789                 err = lov_update_punch_set(set, req, rc);
790                 if (err) {
791                         CERROR("error: punch objid "LPX64" subobj "LPX64
792                                " on OST idx %d: rc = %d\n", set->set_oa->o_id,
793                                req->rq_oa->o_id, req->rq_idx, rc);
794                         if (!rc)
795                                 rc = err;
796                 }
797         }
798         err = lov_fini_punch_set(set);
799         if (!rc)
800                 rc = err;
801         RETURN(rc);
802 }
803
804 static int lov_sync(struct obd_export *exp, struct obdo *oa,
805                     struct lov_stripe_md *lsm, obd_off start, obd_off end)
806 {
807         struct lov_request_set *set;
808         struct lov_obd *lov;
809         struct list_head *pos;
810         struct lov_request *req;
811         int err = 0, rc = 0;
812         ENTRY;
813
814         ASSERT_LSM_MAGIC(lsm);
815
816         if (!exp->exp_obd)
817                 RETURN(-ENODEV);
818
819         lov = &exp->exp_obd->u.lov;
820         rc = lov_prep_sync_set(exp, oa, lsm, start, end, &set);
821         if (rc)
822                 RETURN(rc);
823
824         list_for_each (pos, &set->set_list) {
825                 req = list_entry(pos, struct lov_request, rq_link);
826
827                 rc = obd_sync(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, 
828                               NULL, req->rq_extent.start, req->rq_extent.end);
829                 err = lov_update_common_set(set, req, rc);
830                 if (err) {
831                         CERROR("error: fsync objid "LPX64" subobj "LPX64
832                                " on OST idx %d: rc = %d\n", set->set_oa->o_id,
833                                req->rq_oa->o_id, req->rq_idx, rc);
834                         if (!rc)
835                                 rc = err;
836                 }
837         }
838         err = lov_fini_sync_set(set);
839         if (!rc)
840                 rc = err;
841         RETURN(rc);
842 }
843
844 static int lov_brw_check(struct lov_obd *lov, struct obdo *oa,
845                          struct lov_stripe_md *lsm,
846                          obd_count oa_bufs, struct brw_page *pga)
847 {
848         int i, rc = 0;
849
850         /* The caller just wants to know if there's a chance that this
851          * I/O can succeed */
852         for (i = 0; i < oa_bufs; i++) {
853                 int stripe = lov_stripe_number(lsm, pga[i].off);
854                 int ost = lsm->lsm_oinfo[stripe].loi_ost_idx;
855                 obd_off start, end;
856
857                 if (!lov_stripe_intersects(lsm, i, pga[i].off,
858                                            pga[i].off + pga[i].count,
859                                            &start, &end))
860                         continue;
861
862                 if (lov->tgts[ost].active == 0) {
863                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
864                         return -EIO;
865                 }
866                 rc = obd_brw(OBD_BRW_CHECK, lov->tgts[stripe].ltd_exp, oa,
867                              NULL, 1, &pga[i], NULL);
868                 if (rc)
869                         break;
870         }
871         return rc;
872 }
873
874 static int lov_brw(int cmd, struct obd_export *exp, struct obdo *src_oa,
875                    struct lov_stripe_md *lsm, obd_count oa_bufs,
876                    struct brw_page *pga, struct obd_trans_info *oti)
877 {
878         struct lov_request_set *set;
879         struct lov_request *req;
880         struct list_head *pos;
881         struct lov_obd *lov = &exp->exp_obd->u.lov;
882         int err, rc = 0;
883         ENTRY;
884
885         ASSERT_LSM_MAGIC(lsm);
886
887         if (cmd == OBD_BRW_CHECK) {
888                 rc = lov_brw_check(lov, src_oa, lsm, oa_bufs, pga);
889                 RETURN(rc);
890         }
891
892         rc = lov_prep_brw_set(exp, src_oa, lsm, oa_bufs, pga, oti, &set);
893         if (rc)
894                 RETURN(rc);
895
896         list_for_each (pos, &set->set_list) {
897                 struct obd_export *sub_exp;
898                 struct brw_page *sub_pga;
899                 req = list_entry(pos, struct lov_request, rq_link);
900                 
901                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
902                 sub_pga = set->set_pga + req->rq_pgaidx;
903                 rc = obd_brw(cmd, sub_exp, req->rq_oa, req->rq_md, 
904                              req->rq_oabufs, sub_pga, oti);
905                 if (rc)
906                         break;
907                 lov_update_common_set(set, req, rc);
908         }
909
910         err = lov_fini_brw_set(set);
911         if (!rc)
912                 rc = err;
913         RETURN(rc);
914 }
915
916 static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data,
917                              int rc)
918 {
919         struct lov_request_set *lovset = (struct lov_request_set *)data;
920         ENTRY;
921         
922         if (rc) {
923                 lovset->set_completes = 0;
924                 lov_fini_brw_set(lovset);
925         } else {
926                 rc = lov_fini_brw_set(lovset);
927         }
928                 
929         RETURN(rc);
930 }
931
932 static int lov_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
933                          struct lov_stripe_md *lsm, obd_count oa_bufs,
934                          struct brw_page *pga, struct ptlrpc_request_set *set,
935                          struct obd_trans_info *oti)
936 {
937         struct lov_request_set *lovset;
938         struct lov_request *req;
939         struct list_head *pos;
940         struct lov_obd *lov = &exp->exp_obd->u.lov;
941         int rc = 0;
942         ENTRY;
943
944         ASSERT_LSM_MAGIC(lsm);
945
946         if (cmd == OBD_BRW_CHECK) {
947                 rc = lov_brw_check(lov, oa, lsm, oa_bufs, pga);
948                 RETURN(rc);
949         }
950
951         rc = lov_prep_brw_set(exp, oa, lsm, oa_bufs, pga, oti, &lovset);
952         if (rc)
953                 RETURN(rc);
954
955         list_for_each (pos, &lovset->set_list) {
956                 struct obd_export *sub_exp;
957                 struct brw_page *sub_pga;
958                 req = list_entry(pos, struct lov_request, rq_link);
959                 
960                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
961                 sub_pga = lovset->set_pga + req->rq_pgaidx;
962                 rc = obd_brw_async(cmd, sub_exp, req->rq_oa, req->rq_md,
963                                    req->rq_oabufs, sub_pga, set, oti);
964                 if (rc)
965                         GOTO(out, rc);
966                 lov_update_common_set(lovset, req, rc);
967         }
968         LASSERT(rc == 0);
969         LASSERT(set->set_interpret == NULL);
970         set->set_interpret = (set_interpreter_func)lov_brw_interpret;
971         set->set_arg = (void *)lovset;
972         
973         RETURN(rc);
974 out:
975         lov_fini_brw_set(lovset);
976         RETURN(rc);
977 }
978
979 static int lov_ap_make_ready(void *data, int cmd)
980 {
981         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
982
983         return lap->lap_caller_ops->ap_make_ready(lap->lap_caller_data, cmd);
984 }
985 static int lov_ap_refresh_count(void *data, int cmd)
986 {
987         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
988
989         return lap->lap_caller_ops->ap_refresh_count(lap->lap_caller_data,
990                                                      cmd);
991 }
992 static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
993 {
994         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
995
996         lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa);
997         /* XXX woah, shouldn't we be altering more here?  size? */
998         oa->o_id = lap->lap_loi_id;
999 }
1000
1001 static void lov_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
1002 {
1003         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1004
1005         /* in a raid1 regime this would down a count of many ios
1006          * in flight, onl calling the caller_ops completion when all
1007          * the raid1 ios are complete */
1008         lap->lap_caller_ops->ap_completion(lap->lap_caller_data, cmd, oa, rc);
1009 }
1010
1011 static struct obd_async_page_ops lov_async_page_ops = {
1012         .ap_make_ready =        lov_ap_make_ready,
1013         .ap_refresh_count =     lov_ap_refresh_count,
1014         .ap_fill_obdo =         lov_ap_fill_obdo,
1015         .ap_completion =        lov_ap_completion,
1016 };
1017
1018 int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
1019                            struct lov_oinfo *loi, struct page *page,
1020                            obd_off offset, struct obd_async_page_ops *ops,
1021                            void *data, void **res)
1022 {
1023         struct lov_obd *lov = &exp->exp_obd->u.lov;
1024         struct lov_async_page *lap;
1025         int rc;
1026         ENTRY;
1027
1028         if (!page)
1029                 return size_round(sizeof(*lap)) +
1030                        obd_prep_async_page(lov->tgts[0].ltd_exp, NULL, NULL,
1031                                            NULL, 0, NULL, NULL, NULL);
1032
1033         ASSERT_LSM_MAGIC(lsm);
1034         LASSERT(loi == NULL);
1035
1036         lap = *res;
1037         lap->lap_magic = LAP_MAGIC;
1038         lap->lap_caller_ops = ops;
1039         lap->lap_caller_data = data;
1040
1041         /* for now only raid 0 which passes through */
1042         lap->lap_stripe = lov_stripe_number(lsm, offset);
1043         lov_stripe_offset(lsm, offset, lap->lap_stripe, &lap->lap_sub_offset);
1044         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1045
1046         /* so the callback doesn't need the lsm */
1047         lap->lap_loi_id = loi->loi_id;
1048
1049         lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap));
1050
1051         rc = obd_prep_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1052                                  lsm, loi, page, lap->lap_sub_offset,
1053                                  &lov_async_page_ops, lap,
1054                                  &lap->lap_sub_cookie);
1055         if (rc)
1056                 RETURN(rc);
1057         CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
1058                lap->lap_sub_cookie, offset);
1059         RETURN(0);
1060 }
1061
1062 static int lov_queue_async_io(struct obd_export *exp,
1063                               struct lov_stripe_md *lsm,
1064                               struct lov_oinfo *loi, void *cookie,
1065                               int cmd, obd_off off, int count,
1066                               obd_flag brw_flags, obd_flag async_flags)
1067 {
1068         struct lov_obd *lov = &exp->exp_obd->u.lov;
1069         struct lov_async_page *lap;
1070         int rc;
1071
1072         LASSERT(loi == NULL);
1073
1074         ASSERT_LSM_MAGIC(lsm);
1075
1076         lap = LAP_FROM_COOKIE(cookie);
1077
1078         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1079         rc = obd_queue_async_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm,
1080                                 loi, lap->lap_sub_cookie, cmd, off, count,
1081                                 brw_flags, async_flags);
1082         RETURN(rc);
1083 }
1084
1085 static int lov_set_async_flags(struct obd_export *exp,
1086                                struct lov_stripe_md *lsm,
1087                                struct lov_oinfo *loi, void *cookie,
1088                                obd_flag async_flags)
1089 {
1090         struct lov_obd *lov = &exp->exp_obd->u.lov;
1091         struct lov_async_page *lap;
1092         int rc;
1093
1094         LASSERT(loi == NULL);
1095
1096         ASSERT_LSM_MAGIC(lsm);
1097
1098         lap = LAP_FROM_COOKIE(cookie);
1099
1100         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1101         rc = obd_set_async_flags(lov->tgts[loi->loi_ost_idx].ltd_exp,
1102                                  lsm, loi, lap->lap_sub_cookie, async_flags);
1103         RETURN(rc);
1104 }
1105
1106 static int lov_queue_group_io(struct obd_export *exp,
1107                               struct lov_stripe_md *lsm,
1108                               struct lov_oinfo *loi,
1109                               struct obd_io_group *oig, void *cookie,
1110                               int cmd, obd_off off, int count,
1111                               obd_flag brw_flags, obd_flag async_flags)
1112 {
1113         struct lov_obd *lov = &exp->exp_obd->u.lov;
1114         struct lov_async_page *lap;
1115         int rc;
1116
1117         LASSERT(loi == NULL);
1118
1119         ASSERT_LSM_MAGIC(lsm);
1120
1121         lap = LAP_FROM_COOKIE(cookie);
1122
1123         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1124         rc = obd_queue_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi,
1125                                 oig, lap->lap_sub_cookie, cmd, off, count,
1126                                 brw_flags, async_flags);
1127         RETURN(rc);
1128 }
1129
1130 /* this isn't exactly optimal.  we may have queued sync io in oscs on
1131  * all stripes, but we don't record that fact at queue time.  so we
1132  * trigger sync io on all stripes. */
1133 static int lov_trigger_group_io(struct obd_export *exp,
1134                                 struct lov_stripe_md *lsm,
1135                                 struct lov_oinfo *loi,
1136                                 struct obd_io_group *oig)
1137 {
1138         struct lov_obd *lov = &exp->exp_obd->u.lov;
1139         int rc = 0, i, err;
1140
1141         LASSERT(loi == NULL);
1142
1143         ASSERT_LSM_MAGIC(lsm);
1144
1145         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1146              i++, loi++) {
1147                 err = obd_trigger_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp,
1148                                            lsm, loi, oig);
1149                 if (rc == 0 && err != 0)
1150                         rc = err;
1151         };
1152         RETURN(rc);
1153 }
1154
1155 static int lov_teardown_async_page(struct obd_export *exp,
1156                                    struct lov_stripe_md *lsm,
1157                                    struct lov_oinfo *loi, void *cookie)
1158 {
1159         struct lov_obd *lov = &exp->exp_obd->u.lov;
1160         struct lov_async_page *lap;
1161         int rc;
1162
1163         LASSERT(loi == NULL);
1164
1165         ASSERT_LSM_MAGIC(lsm);
1166
1167         lap = LAP_FROM_COOKIE(cookie);
1168
1169         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1170         rc = obd_teardown_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1171                                      lsm, loi, lap->lap_sub_cookie);
1172         if (rc) {
1173                 CERROR("unable to teardown sub cookie %p: %d\n",
1174                        lap->lap_sub_cookie, rc);
1175                 RETURN(rc);
1176         }
1177         RETURN(rc);
1178 }
1179
1180 static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
1181                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
1182                        int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
1183                        void *data,__u32 lvb_len, void *lvb_swabber,
1184                        struct lustre_handle *lockh)
1185 {
1186         struct lov_request_set *set;
1187         struct lov_request *req;
1188         struct list_head *pos;
1189         struct lustre_handle *lov_lockhp;
1190         struct lov_obd *lov;
1191         ldlm_error_t rc;
1192         int save_flags = *flags;
1193         ENTRY;
1194
1195         ASSERT_LSM_MAGIC(lsm);
1196
1197         /* we should never be asked to replay a lock this way. */
1198         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1199
1200         if (!exp || !exp->exp_obd)
1201                 RETURN(-ENODEV);
1202
1203         lov = &exp->exp_obd->u.lov;
1204         rc = lov_prep_enqueue_set(exp, lsm, policy, mode, lockh, &set);
1205         if (rc)
1206                 RETURN(rc);
1207
1208         list_for_each (pos, &set->set_list) {
1209                 ldlm_policy_data_t sub_policy;
1210                 req = list_entry(pos, struct lov_request, rq_link);
1211                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1212                 LASSERT(lov_lockhp);
1213
1214                 *flags = save_flags;
1215                 sub_policy.l_extent.start = req->rq_extent.start;
1216                 sub_policy.l_extent.end = req->rq_extent.end;
1217
1218                 rc = obd_enqueue(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1219                                  type, &sub_policy, mode, flags, bl_cb,
1220                                  cp_cb, gl_cb, data, lvb_len, lvb_swabber,
1221                                  lov_lockhp);
1222                 rc = lov_update_enqueue_set(set, req, rc, save_flags);
1223                 if (rc != ELDLM_OK)
1224                         break;
1225         }
1226
1227         lov_fini_enqueue_set(set, mode);
1228         RETURN(rc);
1229 }
1230
1231 static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
1232                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
1233                      int *flags, void *data, struct lustre_handle *lockh)
1234 {
1235         struct lov_request_set *set;
1236         struct lov_request *req;
1237         struct list_head *pos;
1238         struct lov_obd *lov = &exp->exp_obd->u.lov;
1239         struct lustre_handle *lov_lockhp;
1240         int lov_flags, rc = 0;
1241         ENTRY;
1242
1243         ASSERT_LSM_MAGIC(lsm);
1244
1245         if (!exp || !exp->exp_obd)
1246                 RETURN(-ENODEV);
1247
1248         lov = &exp->exp_obd->u.lov;
1249         rc = lov_prep_match_set(exp, lsm, policy, mode, lockh, &set);
1250         if (rc)
1251                 RETURN(rc);
1252
1253         list_for_each (pos, &set->set_list) {
1254                 ldlm_policy_data_t sub_policy;
1255                 req = list_entry(pos, struct lov_request, rq_link);
1256                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1257                 LASSERT(lov_lockhp);
1258
1259                 sub_policy.l_extent.start = req->rq_extent.start;
1260                 sub_policy.l_extent.end = req->rq_extent.end;
1261                 lov_flags = *flags;
1262
1263                 rc = obd_match(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1264                                type, &sub_policy, mode, &lov_flags, data,
1265                                lov_lockhp);
1266                 rc = lov_update_match_set(set, req, rc);
1267                 if (rc != 1)
1268                         break;
1269         }
1270         lov_fini_match_set(set, mode, *flags);
1271         RETURN(rc);
1272 }
1273
1274 static int lov_change_cbdata(struct obd_export *exp,
1275                              struct lov_stripe_md *lsm, ldlm_iterator_t it,
1276                              void *data)
1277 {
1278         struct lov_obd *lov;
1279         struct lov_oinfo *loi;
1280         int rc = 0, i;
1281         ENTRY;
1282
1283         ASSERT_LSM_MAGIC(lsm);
1284
1285         if (!exp || !exp->exp_obd)
1286                 RETURN(-ENODEV);
1287
1288         lov = &exp->exp_obd->u.lov;
1289         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1290                 struct lov_stripe_md submd;
1291                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1292                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1293
1294                 submd.lsm_object_id = loi->loi_id;
1295                 submd.lsm_stripe_count = 0;
1296                 rc = obd_change_cbdata(lov->tgts[loi->loi_ost_idx].ltd_exp,
1297                                        &submd, it, data);
1298         }
1299         RETURN(rc);
1300 }
1301
1302 static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
1303                       __u32 mode, struct lustre_handle *lockh)
1304 {
1305         struct lov_request_set *set;
1306         struct lov_request *req;
1307         struct list_head *pos;
1308         struct lov_obd *lov = &exp->exp_obd->u.lov;
1309         struct lustre_handle *lov_lockhp;
1310         int err = 0, rc = 0;
1311         ENTRY;
1312
1313         ASSERT_LSM_MAGIC(lsm);
1314
1315         if (!exp || !exp->exp_obd)
1316                 RETURN(-ENODEV);
1317
1318         LASSERT(lockh);
1319         lov = &exp->exp_obd->u.lov;
1320         rc = lov_prep_cancel_set(exp, lsm, mode, lockh, &set);
1321         if (rc)
1322                 RETURN(rc);
1323
1324         list_for_each (pos, &set->set_list) {
1325                 req = list_entry(pos, struct lov_request, rq_link);
1326                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1327
1328                 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1329                                 mode, lov_lockhp);
1330                 rc = lov_update_common_set(set, req, rc);
1331                 if (rc) {
1332                         CERROR("error: cancel objid "LPX64" subobj "
1333                                LPX64" on OST idx %d: rc = %d\n",
1334                                lsm->lsm_object_id,
1335                                req->rq_md->lsm_object_id, req->rq_idx, rc);
1336                         err = rc;
1337                 }
1338  
1339         }
1340         lov_fini_cancel_set(set);
1341         RETURN(err);
1342 }
1343
1344 static int lov_cancel_unused(struct obd_export *exp,
1345                              struct lov_stripe_md *lsm, int flags, void *opaque)
1346 {
1347         struct lov_obd *lov;
1348         struct lov_oinfo *loi;
1349         int rc = 0, i;
1350         ENTRY;
1351
1352         ASSERT_LSM_MAGIC(lsm);
1353
1354         if (!exp || !exp->exp_obd)
1355                 RETURN(-ENODEV);
1356
1357         lov = &exp->exp_obd->u.lov;
1358         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1359                 struct lov_stripe_md submd;
1360                 int err;
1361
1362                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1363                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1364
1365                 submd.lsm_object_id = loi->loi_id;
1366                 submd.lsm_stripe_count = 0;
1367                 err = obd_cancel_unused(lov->tgts[loi->loi_ost_idx].ltd_exp,
1368                                         &submd, flags, opaque);
1369                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1370                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
1371                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1372                                loi->loi_id, loi->loi_ost_idx, err);
1373                         if (!rc)
1374                                 rc = err;
1375                 }
1376         }
1377         RETURN(rc);
1378 }
1379
1380 #define LOV_U64_MAX ((__u64)~0ULL)
1381 #define LOV_SUM_MAX(tot, add)                                           \
1382         do {                                                            \
1383                 if ((tot) + (add) < (tot))                              \
1384                         (tot) = LOV_U64_MAX;                            \
1385                 else                                                    \
1386                         (tot) += (add);                                 \
1387         } while(0)
1388
1389 static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1390                       unsigned long max_age)
1391 {
1392         struct lov_obd *lov = &obd->u.lov;
1393         struct obd_statfs lov_sfs;
1394         int set = 0;
1395         int rc = 0;
1396         int i;
1397         ENTRY;
1398
1399
1400         /* We only get block data from the OBD */
1401         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1402                 int err;
1403
1404                 if (!lov->tgts[i].active) {
1405                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1406                         continue;
1407                 }
1408
1409                 err = obd_statfs(class_exp2obd(lov->tgts[i].ltd_exp), &lov_sfs,
1410                                  max_age);
1411                 if (err) {
1412                         if (lov->tgts[i].active && !rc)
1413                                 rc = err;
1414                         continue;
1415                 }
1416
1417                 if (!set) {
1418                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
1419                         set = 1;
1420                 } else {
1421                         osfs->os_bfree += lov_sfs.os_bfree;
1422                         osfs->os_bavail += lov_sfs.os_bavail;
1423                         osfs->os_blocks += lov_sfs.os_blocks;
1424                         /* XXX not sure about this one - depends on policy.
1425                          *   - could be minimum if we always stripe on all OBDs
1426                          *     (but that would be wrong for any other policy,
1427                          *     if one of the OBDs has no more objects left)
1428                          *   - could be sum if we stripe whole objects
1429                          *   - could be average, just to give a nice number
1430                          *
1431                          * To give a "reasonable" (if not wholly accurate)
1432                          * number, we divide the total number of free objects
1433                          * by expected stripe count (watch out for overflow).
1434                          */
1435                         LOV_SUM_MAX(osfs->os_files, lov_sfs.os_files);
1436                         LOV_SUM_MAX(osfs->os_ffree, lov_sfs.os_ffree);
1437                 }
1438         }
1439
1440         if (set) {
1441                 __u32 expected_stripes = lov->desc.ld_default_stripe_count ?
1442                                          lov->desc.ld_default_stripe_count :
1443                                          lov->desc.ld_active_tgt_count;
1444
1445                 if (osfs->os_files != LOV_U64_MAX)
1446                         do_div(osfs->os_files, expected_stripes);
1447                 if (osfs->os_ffree != LOV_U64_MAX)
1448                         do_div(osfs->os_ffree, expected_stripes);
1449         } else if (!rc)
1450                 rc = -EIO;
1451
1452         RETURN(rc);
1453 }
1454
1455 static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1456                          void *karg, void *uarg)
1457 {
1458         struct obd_device *obddev = class_exp2obd(exp);
1459         struct lov_obd *lov = &obddev->u.lov;
1460         int i, count = lov->desc.ld_tgt_count;
1461         struct obd_uuid *uuidp;
1462         int rc;
1463
1464         ENTRY;
1465
1466         switch (cmd) {
1467         case OBD_IOC_LOV_GET_CONFIG: {
1468                 struct obd_ioctl_data *data = karg;
1469                 struct lov_tgt_desc *tgtdesc;
1470                 struct lov_desc *desc;
1471                 char *buf = NULL;
1472
1473                 buf = NULL;
1474                 len = 0;
1475                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1476                         RETURN(-EINVAL);
1477
1478                 data = (struct obd_ioctl_data *)buf;
1479
1480                 if (sizeof(*desc) > data->ioc_inllen1) {
1481                         OBD_FREE(buf, len);
1482                         RETURN(-EINVAL);
1483                 }
1484
1485                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
1486                         OBD_FREE(buf, len);
1487                         RETURN(-EINVAL);
1488                 }
1489
1490                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1491                 memcpy(desc, &(lov->desc), sizeof(*desc));
1492
1493                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
1494                 tgtdesc = lov->tgts;
1495                 for (i = 0; i < count; i++, uuidp++, tgtdesc++)
1496                         obd_str2uuid(uuidp, tgtdesc->uuid.uuid);
1497
1498                 rc = copy_to_user((void *)uarg, buf, len);
1499                 if (rc)
1500                         rc = -EFAULT;
1501                 obd_ioctl_freedata(buf, len);
1502                 break;
1503         }
1504         case LL_IOC_LOV_SETSTRIPE:
1505                 rc = lov_setstripe(exp, karg, uarg);
1506                 break;
1507         case LL_IOC_LOV_GETSTRIPE:
1508                 rc = lov_getstripe(exp, karg, uarg);
1509                 break;
1510         case LL_IOC_LOV_SETEA:
1511                 rc = lov_setea(exp, karg, uarg);
1512                 break;
1513         default: {
1514                 int set = 0;
1515                 if (count == 0)
1516                         RETURN(-ENOTTY);
1517                 rc = 0;
1518                 for (i = 0; i < count; i++) {
1519                         int err;
1520
1521                         err = obd_iocontrol(cmd, lov->tgts[i].ltd_exp,
1522                                             len, karg, uarg);
1523                         if (err) {
1524                                 if (lov->tgts[i].active) {
1525                                         CERROR("error: iocontrol OSC %s on OST "
1526                                                "idx %d cmd %x: err = %d\n",
1527                                                lov->tgts[i].uuid.uuid, i,
1528                                                cmd, err);
1529                                         if (!rc)
1530                                                 rc = err;
1531                                 }
1532                         } else
1533                                 set = 1;
1534                 }
1535                 if (!set && !rc)
1536                         rc = -EIO;
1537         }
1538         }
1539
1540         RETURN(rc);
1541 }
1542
1543 static int lov_get_info(struct obd_export *exp, __u32 keylen,
1544                         void *key, __u32 *vallen, void *val)
1545 {
1546         struct obd_device *obddev = class_exp2obd(exp);
1547         struct lov_obd *lov = &obddev->u.lov;
1548         int i;
1549         ENTRY;
1550
1551         if (!vallen || !val)
1552                 RETURN(-EFAULT);
1553
1554         if (keylen > strlen("lock_to_stripe") &&
1555             strcmp(key, "lock_to_stripe") == 0) {
1556                 struct {
1557                         char name[16];
1558                         struct ldlm_lock *lock;
1559                         struct lov_stripe_md *lsm;
1560                 } *data = key;
1561                 struct lov_oinfo *loi;
1562                 __u32 *stripe = val;
1563
1564                 if (*vallen < sizeof(*stripe))
1565                         RETURN(-EFAULT);
1566                 *vallen = sizeof(*stripe);
1567
1568                 /* XXX This is another one of those bits that will need to
1569                  * change if we ever actually support nested LOVs.  It uses
1570                  * the lock's export to find out which stripe it is. */
1571                 for (i = 0, loi = data->lsm->lsm_oinfo;
1572                      i < data->lsm->lsm_stripe_count;
1573                      i++, loi++) {
1574                         if (lov->tgts[loi->loi_ost_idx].ltd_exp ==
1575                             data->lock->l_conn_export) {
1576                                 *stripe = i;
1577                                 RETURN(0);
1578                         }
1579                 }
1580                 LDLM_ERROR(data->lock, "lock on inode without such object\n");
1581                 dump_lsm(D_ERROR, data->lsm);
1582                 RETURN(-ENXIO);
1583         } else if (keylen >= strlen("size_to_stripe") &&
1584                    strcmp(key, "size_to_stripe") == 0) {
1585                 struct {
1586                         int stripe_number;
1587                         __u64 size;
1588                         struct lov_stripe_md *lsm;
1589                 } *data = val;
1590
1591                 if (*vallen < sizeof(*data))
1592                         RETURN(-EFAULT);
1593
1594                 data->size = lov_size_to_stripe(data->lsm, data->size,
1595                                                 data->stripe_number);
1596                 RETURN(0);
1597         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
1598                 obd_id *ids = val;
1599                 int rc, size = sizeof(obd_id);
1600                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1601                         if (!lov->tgts[i].active)
1602                                 continue;
1603                         rc = obd_get_info(lov->tgts[i].ltd_exp,
1604                                           keylen, key, &size, &(ids[i]));
1605                         if (rc != 0)
1606                                 RETURN(rc);
1607                 }
1608                 RETURN(0);
1609         } else if (keylen >= strlen("lovdesc") && strcmp(key, "lovdesc") == 0) {
1610                 struct lov_desc *desc_ret = val;
1611                 *desc_ret = lov->desc;
1612
1613                 RETURN(0);
1614         }
1615
1616         RETURN(-EINVAL);
1617 }
1618
1619 static int lov_set_info(struct obd_export *exp, obd_count keylen,
1620                         void *key, obd_count vallen, void *val)
1621 {
1622         struct obd_device *obddev = class_exp2obd(exp);
1623         struct lov_obd *lov = &obddev->u.lov;
1624         int i, rc = 0, err;
1625         ENTRY;
1626
1627 #define KEY_IS(str) \
1628         (keylen == strlen(str) && memcmp(key, str, keylen) == 0)
1629
1630         if (KEY_IS("next_id")) {
1631                 if (vallen != lov->desc.ld_tgt_count)
1632                         RETURN(-EINVAL);
1633                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1634                         /* initialize all OSCs, even inactive ones */
1635
1636                         err = obd_set_info(lov->tgts[i].ltd_exp,
1637                                           keylen, key, sizeof(obd_id),
1638                                           ((obd_id*)val) + i);
1639                         if (!rc)
1640                                 rc = err;
1641                 }
1642                 RETURN(rc);
1643         }
1644
1645         if (KEY_IS("mds_conn") || KEY_IS("unlinked")) {
1646                 if (vallen != 0)
1647                         RETURN(-EINVAL);
1648         } else {
1649                 RETURN(-EINVAL);
1650         }
1651
1652         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1653                 if (val && !obd_uuid_equals(val, &lov->tgts[i].uuid))
1654                         continue;
1655
1656                 if (!val && !lov->tgts[i].active)
1657                         continue;
1658
1659                 err = obd_set_info(lov->tgts[i].ltd_exp,
1660                                   keylen, key, vallen, val);
1661                 if (!rc)
1662                         rc = err;
1663         }
1664         RETURN(rc);
1665 #undef KEY_IS
1666 }
1667
1668 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm)
1669 {
1670         struct lov_oinfo *loi;
1671         int i, rc = 0;
1672         ENTRY;
1673
1674         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1675              i++, loi++) {
1676                 if (loi->loi_ar.ar_rc && !rc)
1677                         rc = loi->loi_ar.ar_rc;
1678                 loi->loi_ar.ar_rc = 0;
1679         }
1680         RETURN(rc);
1681 }
1682 EXPORT_SYMBOL(lov_test_and_clear_async_rc);
1683
1684 #if 0
1685 struct lov_multi_wait {
1686         struct ldlm_lock *lock;
1687         wait_queue_t      wait;
1688         int               completed;
1689         int               generation;
1690 };
1691
1692 int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm,
1693                       struct lustre_handle *lockh)
1694 {
1695         struct lov_lock_handles *lov_lockh = NULL;
1696         struct lustre_handle *lov_lockhp;
1697         struct lov_obd *lov;
1698         struct lov_oinfo *loi;
1699         struct lov_multi_wait *queues;
1700         int rc = 0, i;
1701         ENTRY;
1702
1703         ASSERT_LSM_MAGIC(lsm);
1704
1705         if (!exp || !exp->exp_obd)
1706                 RETURN(-ENODEV);
1707
1708         LASSERT(lockh != NULL);
1709         if (lsm->lsm_stripe_count > 1) {
1710                 lov_lockh = lov_handle2llh(lockh);
1711                 if (lov_lockh == NULL) {
1712                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
1713                         RETURN(-EINVAL);
1714                 }
1715
1716                 lov_lockhp = lov_lockh->llh_handles;
1717         } else {
1718                 lov_lockhp = lockh;
1719         }
1720
1721         OBD_ALLOC(queues, lsm->lsm_stripe_count * sizeof(*queues));
1722         if (queues == NULL)
1723                 GOTO(out, rc = -ENOMEM);
1724
1725         lov = &exp->exp_obd->u.lov;
1726         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1727              i++, loi++, lov_lockhp++) {
1728                 struct ldlm_lock *lock;
1729                 struct obd_device *obd;
1730                 unsigned long irqflags;
1731
1732                 lock = ldlm_handle2lock(lov_lockhp);
1733                 if (lock == NULL) {
1734                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
1735                                loi->loi_ost_idx, loi->loi_id);
1736                         queues[i].completed = 1;
1737                         continue;
1738                 }
1739
1740                 queues[i].lock = lock;
1741                 init_waitqueue_entry(&(queues[i].wait), current);
1742                 add_wait_queue(lock->l_waitq, &(queues[i].wait));
1743
1744                 obd = class_exp2obd(lock->l_conn_export);
1745                 if (obd != NULL)
1746                         imp = obd->u.cli.cl_import;
1747                 if (imp != NULL) {
1748                         spin_lock_irqsave(&imp->imp_lock, irqflags);
1749                         queues[i].generation = imp->imp_generation;
1750                         spin_unlock_irqrestore(&imp->imp_lock, irqflags);
1751                 }
1752         }
1753
1754         lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
1755                                interrupted_completion_wait, &lwd);
1756         rc = l_wait_event_added(check_multi_complete(queues, lsm), &lwi);
1757
1758         for (i = 0; i < lsm->lsm_stripe_count; i++)
1759                 remove_wait_queue(lock->l_waitq, &(queues[i].wait));
1760
1761         if (rc == -EINTR || rc == -ETIMEDOUT) {
1762
1763
1764         }
1765
1766  out:
1767         if (lov_lockh != NULL)
1768                 lov_llh_put(lov_lockh);
1769         RETURN(rc);
1770 }
1771 #endif
1772
1773 int lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm,
1774                       obd_off size);
1775
1776 struct obd_ops lov_obd_ops = {
1777         .o_owner               = THIS_MODULE,
1778         .o_setup               = lov_setup,
1779         .o_cleanup             = lov_cleanup,
1780         .o_connect             = lov_connect,
1781         .o_disconnect          = lov_disconnect,
1782         .o_statfs              = lov_statfs,
1783         .o_packmd              = lov_packmd,
1784         .o_unpackmd            = lov_unpackmd,
1785         .o_create              = lov_create,
1786         .o_destroy             = lov_destroy,
1787         .o_getattr             = lov_getattr,
1788         .o_getattr_async       = lov_getattr_async,
1789         .o_setattr             = lov_setattr,
1790         .o_brw                 = lov_brw,
1791         .o_brw_async           = lov_brw_async,
1792         .o_prep_async_page     = lov_prep_async_page,
1793         .o_queue_async_io      = lov_queue_async_io,
1794         .o_set_async_flags     = lov_set_async_flags,
1795         .o_queue_group_io      = lov_queue_group_io,
1796         .o_trigger_group_io    = lov_trigger_group_io,
1797         .o_teardown_async_page = lov_teardown_async_page,
1798         .o_increase_kms        = lov_increase_kms,
1799         .o_punch               = lov_punch,
1800         .o_sync                = lov_sync,
1801         .o_enqueue             = lov_enqueue,
1802         .o_match               = lov_match,
1803         .o_change_cbdata       = lov_change_cbdata,
1804         .o_cancel              = lov_cancel,
1805         .o_cancel_unused       = lov_cancel_unused,
1806         .o_iocontrol           = lov_iocontrol,
1807         .o_get_info            = lov_get_info,
1808         .o_set_info            = lov_set_info,
1809         .o_llog_init           = lov_llog_init,
1810         .o_llog_finish         = lov_llog_finish,
1811         .o_notify              = lov_notify,
1812 };
1813
1814 int __init lov_init(void)
1815 {
1816         struct lprocfs_static_vars lvars;
1817         int rc;
1818
1819         lprocfs_init_vars(lov, &lvars);
1820         rc = class_register_type(&lov_obd_ops, lvars.module_vars,
1821                                  OBD_LOV_DEVICENAME);
1822         RETURN(rc);
1823 }
1824
1825 #ifdef __KERNEL__
1826 static void /*__exit*/ lov_exit(void)
1827 {
1828         class_unregister_type(OBD_LOV_DEVICENAME);
1829 }
1830
1831 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1832 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
1833 MODULE_LICENSE("GPL");
1834
1835 module_init(lov_init);
1836 module_exit(lov_exit);
1837 #endif