Whamcloud - gitweb
- define NOHIGHMEM for recently added highmem-split patch
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  * Author: Phil Schwan <phil@clusterfs.com>
6  *         Peter Braam <braam@clusterfs.com>
7  *         Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29 #ifdef __KERNEL__
30 #include <linux/slab.h>
31 #include <linux/module.h>
32 #include <linux/init.h>
33 #include <linux/slab.h>
34 #include <linux/pagemap.h>
35 #include <linux/seq_file.h>
36 #include <asm/div64.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/obd_support.h>
42 #include <linux/lustre_lib.h>
43 #include <linux/lustre_net.h>
44 #include <linux/lustre_idl.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/lustre_mds.h>
47 #include <linux/obd_class.h>
48 #include <linux/obd_lov.h>
49 #include <linux/obd_ost.h>
50 #include <linux/lprocfs_status.h>
51
52 #include "lov_internal.h"
53
54 /* obd methods */
55 #define MAX_STRING_SIZE 128
56 static int lov_connect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt,
57                            int activate, struct obd_connect_data *conn_data,
58                            unsigned long connect_flags)
59 {
60         struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
61         struct obd_uuid *tgt_uuid = &tgt->uuid;
62
63 #ifdef __KERNEL__
64         struct proc_dir_entry *lov_proc_dir;
65 #endif
66         struct lov_obd *lov = &obd->u.lov;
67         struct lustre_handle conn = {0, };
68         struct obd_device *tgt_obd;
69         int rc;
70         ENTRY;
71
72         tgt_obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
73                                         &obd->obd_uuid);
74
75         if (!tgt_obd) {
76                 CERROR("Target %s not attached\n", tgt_uuid->uuid);
77                 RETURN(-EINVAL);
78         }
79
80         if (!tgt_obd->obd_set_up) {
81                 CERROR("Target %s not set up\n", tgt_uuid->uuid);
82                 RETURN(-EINVAL);
83         }
84
85         if (activate) {
86                 tgt_obd->obd_no_recov = 0;
87                 ptlrpc_activate_import(tgt_obd->u.cli.cl_import);
88         }
89
90         if (tgt_obd->u.cli.cl_import->imp_invalid) {
91                 CERROR("not connecting OSC %s; administratively "
92                        "disabled\n", tgt_uuid->uuid);
93                 rc = obd_register_observer(tgt_obd, obd);
94                 if (rc) {
95                         CERROR("Target %s register_observer error %d; "
96                                "will not be able to reactivate\n",
97                                tgt_uuid->uuid, rc);
98                 }
99                 RETURN(0);
100         }
101
102         rc = obd_connect(&conn, tgt_obd, &lov_osc_uuid, conn_data,
103                          connect_flags);
104         if (rc) {
105                 CERROR("Target %s connect error %d\n", tgt_uuid->uuid, rc);
106                 RETURN(rc);
107         }
108         tgt->ltd_exp = class_conn2export(&conn);
109
110         rc = obd_register_observer(tgt_obd, obd);
111         if (rc) {
112                 CERROR("Target %s register_observer error %d\n",
113                        tgt_uuid->uuid, rc);
114                 obd_disconnect(tgt->ltd_exp, 0);
115                 tgt->ltd_exp = NULL;
116                 RETURN(rc);
117         }
118
119         tgt->active = 1;
120         lov->desc.ld_active_tgt_count++;
121
122 #ifdef __KERNEL__
123         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
124         if (lov_proc_dir) {
125                 struct obd_device *osc_obd = class_conn2obd(&conn);
126                 struct proc_dir_entry *osc_symlink;
127                 char name[MAX_STRING_SIZE + 1];
128
129                 LASSERT(osc_obd != NULL);
130                 LASSERT(osc_obd->obd_type != NULL);
131                 LASSERT(osc_obd->obd_type->typ_name != NULL);
132                 name[MAX_STRING_SIZE] = '\0';
133                 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
134                          osc_obd->obd_type->typ_name,
135                          osc_obd->obd_name);
136                 osc_symlink = proc_symlink(osc_obd->obd_name, lov_proc_dir,
137                                            name);
138                 if (osc_symlink == NULL) {
139                         CERROR("could not register LOV target "
140                                "/proc/fs/lustre/%s/%s/target_obds/%s\n",
141                                obd->obd_type->typ_name, obd->obd_name,
142                                osc_obd->obd_name);
143                         lprocfs_remove(lov_proc_dir);
144                         lov_proc_dir = NULL;
145                 }
146         }
147 #endif
148
149         RETURN(0);
150 }
151
152 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
153                        struct obd_uuid *cluuid, struct obd_connect_data *data,
154                        unsigned long flags)
155 {
156 #ifdef __KERNEL__
157         struct proc_dir_entry *lov_proc_dir;
158 #endif
159         struct lov_obd *lov = &obd->u.lov;
160         struct lov_tgt_desc *tgt;
161         struct obd_export *exp;
162         int rc, rc2, i;
163         ENTRY;
164
165         rc = class_connect(conn, obd, cluuid);
166         if (rc)
167                 RETURN(rc);
168
169         exp = class_conn2export(conn);
170
171         /* We don't want to actually do the underlying connections more than
172          * once, so keep track. */
173         lov->refcount++;
174         if (lov->refcount > 1) {
175                 class_export_put(exp);
176                 RETURN(0);
177         }
178
179 #ifdef __KERNEL__
180         lov_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
181                                         NULL, NULL);
182         if (IS_ERR(lov_proc_dir)) {
183                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
184                        obd->obd_type->typ_name, obd->obd_name);
185                 lov_proc_dir = NULL;
186         }
187 #endif
188
189         /* connect_flags is the MDS number, save for use in lov_add_obd */
190         lov->lov_connect_flags = flags;
191         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
192                 if (obd_uuid_empty(&tgt->uuid))
193                         continue;
194                 rc = lov_connect_obd(obd, tgt, 0, data, flags);
195                 if (rc)
196                         GOTO(out_disc, rc);
197         }
198
199         class_export_put(exp);
200         RETURN (0);
201
202  out_disc:
203 #ifdef __KERNEL__
204         if (lov_proc_dir)
205                 lprocfs_remove(lov_proc_dir);
206 #endif
207
208         while (i-- > 0) {
209                 struct obd_uuid uuid;
210                 --tgt;
211                 --lov->desc.ld_active_tgt_count;
212                 tgt->active = 0;
213                 /* save for CERROR below; (we know it's terminated) */
214                 uuid = tgt->uuid;
215                 rc2 = obd_disconnect(tgt->ltd_exp, 0);
216                 if (rc2)
217                         CERROR("error: LOV target %s disconnect on OST idx %d: "
218                                "rc = %d\n", uuid.uuid, i, rc2);
219         }
220         class_disconnect(exp, 0);
221         RETURN (rc);
222 }
223
224 static int lov_disconnect_obd(struct obd_device *obd, 
225                               struct lov_tgt_desc *tgt,
226                               unsigned long flags)
227 {
228 #ifdef __KERNEL__
229         struct proc_dir_entry *lov_proc_dir;
230 #endif
231         struct obd_device *osc_obd = class_exp2obd(tgt->ltd_exp);
232         struct lov_obd *lov = &obd->u.lov;
233         int rc;
234         ENTRY;
235
236 #ifdef __KERNEL__
237         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
238         if (lov_proc_dir) {
239                 struct proc_dir_entry *osc_symlink;
240
241                 osc_symlink = lprocfs_srch(lov_proc_dir, osc_obd->obd_name);
242                 if (osc_symlink) {
243                         lprocfs_remove(osc_symlink);
244                 } else {
245                         CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
246                                obd->obd_type->typ_name, obd->obd_name,
247                                osc_obd->obd_name);
248                 }
249         }
250 #endif
251         if (obd->obd_no_recov) {
252                 /* Pass it on to our clients.
253                  * XXX This should be an argument to disconnect,
254                  * XXX not a back-door flag on the OBD.  Ah well.
255                  */
256                 if (osc_obd)
257                         osc_obd->obd_no_recov = 1;
258         }
259
260         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
261         rc = obd_disconnect(tgt->ltd_exp, flags);
262         if (rc) {
263                 if (tgt->active) {
264                         CERROR("Target %s disconnect error %d\n",
265                                tgt->uuid.uuid, rc);
266                 }
267                 rc = 0;
268         }
269
270         if (tgt->active) {
271                 tgt->active = 0;
272                 lov->desc.ld_active_tgt_count--;
273         }
274         tgt->ltd_exp = NULL;
275         RETURN(0);
276 }
277
278 static int lov_disconnect(struct obd_export *exp, unsigned long flags)
279 {
280         struct obd_device *obd = class_exp2obd(exp);
281 #ifdef __KERNEL__
282         struct proc_dir_entry *lov_proc_dir;
283 #endif
284         struct lov_obd *lov = &obd->u.lov;
285         struct lov_tgt_desc *tgt;
286         int rc, i;
287         ENTRY;
288
289         if (!lov->tgts)
290                 goto out_local;
291
292         /* Only disconnect the underlying layers on the final disconnect. */
293         lov->refcount--;
294         if (lov->refcount != 0)
295                 goto out_local;
296
297         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
298                 if (tgt->ltd_exp)
299                         lov_disconnect_obd(obd, tgt, flags);
300         }
301
302 #ifdef __KERNEL__
303         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
304         if (lov_proc_dir) {
305                 lprocfs_remove(lov_proc_dir);
306         } else {
307                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing.",
308                        obd->obd_type->typ_name, obd->obd_name);
309         }
310 #endif
311         
312  out_local:
313         rc = class_disconnect(exp, 0);
314         RETURN(rc);
315 }
316
317 /* Error codes:
318  *
319  *  -EINVAL  : UUID can't be found in the LOV's target list
320  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
321  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
322  */
323 static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
324                               int activate)
325 {
326         struct lov_tgt_desc *tgt;
327         int i, rc = 0;
328         ENTRY;
329
330         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
331                lov, uuid->uuid, activate);
332
333         spin_lock(&lov->lov_lock);
334         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
335                 if (tgt->ltd_exp == NULL)
336                         continue;
337
338                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
339                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
340                 
341                 if (obd_uuid_equals(uuid, &tgt->uuid))
342                         break;
343         }
344
345         if (i == lov->desc.ld_tgt_count)
346                 GOTO(out, rc = -EINVAL);
347
348
349         if (tgt->active == activate) {
350                 CDEBUG(D_INFO, "OSC %s already %sactive!\n", uuid->uuid,                       
351                         activate ? "" : "in");
352                 GOTO(out, rc);
353         }
354
355         CDEBUG(D_INFO, "Marking OSC %s %sactive\n", uuid->uuid,
356                activate ? "" : "in");
357
358         tgt->active = activate;
359         if (activate)
360                 lov->desc.ld_active_tgt_count++;
361         else
362                 lov->desc.ld_active_tgt_count--;
363
364         EXIT;
365  out:
366         spin_unlock(&lov->lov_lock);
367         return rc;
368 }
369
370 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
371                       int active, void *data)
372 {
373         struct obd_uuid *uuid;
374         int rc;
375         ENTRY;
376
377         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME)) {
378                 CERROR("unexpected notification of %s %s!\n",
379                        watched->obd_type->typ_name,
380                        watched->obd_name);
381                 return -EINVAL;
382         }
383         uuid = &watched->u.cli.cl_import->imp_target_uuid;
384
385         /* Set OSC as active before notifying the observer, so the
386          * observer can use the OSC normally.  
387          */
388         rc = lov_set_osc_active(&obd->u.lov, uuid, active);
389         if (rc) {
390                 CERROR("%sactivation of %s failed: %d\n",
391                        active ? "" : "de", uuid->uuid, rc);
392                 RETURN(rc);
393         }
394
395         if (obd->obd_observer)
396                 /* Pass the notification up the chain. */
397                 rc = obd_notify(obd->obd_observer, watched, active, data);
398
399         RETURN(rc);
400 }
401
402 int lov_attach(struct obd_device *dev, obd_count len, void *data)
403 {
404         struct lprocfs_static_vars lvars;
405         int rc;
406
407         lprocfs_init_vars(lov, &lvars);
408         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
409         if (rc == 0) {
410 #ifdef __KERNEL__
411                 struct proc_dir_entry *entry;
412
413                 entry = create_proc_entry("target_obd_status", 0444, 
414                                           dev->obd_proc_entry);
415                 if (entry == NULL) {
416                         rc = -ENOMEM;
417                 } else {
418                         entry->proc_fops = &lov_proc_target_fops;
419                         entry->data = dev;
420                 }
421 #endif
422         }
423         return rc;
424 }
425
426 int lov_detach(struct obd_device *dev)
427 {
428         return lprocfs_obd_detach(dev);
429 }
430
431 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
432 {
433         struct lov_obd *lov = &obd->u.lov;
434         struct lustre_cfg *lcfg = buf;
435         struct lov_desc *desc;
436         int count;
437         ENTRY;
438
439         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
440                 CERROR("LOV setup requires a descriptor\n");
441                 RETURN(-EINVAL);
442         }
443
444         desc = (struct lov_desc *)lustre_cfg_string(lcfg, 1);
445         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
446                 CERROR("descriptor size wrong: %d > %d\n",
447                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
448                 RETURN(-EINVAL);
449         }
450  
451         /* Because of 64-bit divide/mod operations only work with a 32-bit
452          * divisor in a 32-bit kernel, we cannot support a stripe width
453          * of 4GB or larger on 32-bit CPUs.
454          */
455        
456         count = desc->ld_default_stripe_count;
457         if (count && (count * desc->ld_default_stripe_size) > ~0UL) {
458                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
459                        desc->ld_default_stripe_size, count, ~0UL);
460                 RETURN(-EINVAL);
461         }
462         if (desc->ld_tgt_count > 0) {
463                 lov->bufsize= sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
464         } else {
465                 lov->bufsize = sizeof(struct lov_tgt_desc) * LOV_MAX_TGT_COUNT;  
466         }
467         OBD_ALLOC(lov->tgts, lov->bufsize);
468         if (lov->tgts == NULL) {
469                 lov->bufsize = 0;
470                 CERROR("couldn't allocate %d bytes for target table.\n",
471                        lov->bufsize);
472                 RETURN(-EINVAL);
473         }
474
475         desc->ld_tgt_count = 0;
476         desc->ld_active_tgt_count = 0;
477         lov->desc = *desc;
478         spin_lock_init(&lov->lov_lock);
479         sema_init(&lov->lov_llog_sem, 1);
480
481         RETURN(0);
482 }
483
484 static int lov_cleanup(struct obd_device *obd, int flags)
485 {
486         struct lov_obd *lov = &obd->u.lov;
487
488         OBD_FREE(lov->tgts, lov->bufsize);
489         RETURN(0);
490 }
491
492 static int
493 lov_add_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
494 {
495         struct lov_obd *lov = &obd->u.lov;
496         struct lov_tgt_desc *tgt;
497         int rc;
498         ENTRY;
499
500         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
501                uuidp->uuid, index, gen);
502
503         if ((index < 0) || (index >= LOV_MAX_TGT_COUNT)) {
504                 CERROR("request to add OBD %s at invalid index: %d\n",
505                        uuidp->uuid, index);
506                 RETURN(-EINVAL);
507         }
508
509         if (gen <= 0) {
510                 CERROR("request to add OBD %s with invalid generation: %d\n",
511                        uuidp->uuid, gen);
512                 RETURN(-EINVAL);
513         }
514
515         tgt = lov->tgts + index;
516         if (!obd_uuid_empty(&tgt->uuid)) {
517                 CERROR("OBD already assigned at LOV target index %d\n",
518                        index);
519                 RETURN(-EEXIST);
520         }
521
522         tgt->uuid = *uuidp;
523         /* XXX - add a sanity check on the generation number. */
524         tgt->ltd_gen = gen;
525
526         if (index >= lov->desc.ld_tgt_count)
527                 lov->desc.ld_tgt_count = index + 1;
528
529         CDEBUG(D_CONFIG, "idx: %d ltd_gen: %d ld_tgt_count: %d\n",
530                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
531
532         if (lov->refcount == 0)
533                 RETURN(0);
534
535         if (tgt->ltd_exp) {
536                 struct obd_device *osc_obd;
537
538                 osc_obd = class_exp2obd(tgt->ltd_exp);
539                 if (osc_obd)
540                         osc_obd->obd_no_recov = 0;
541         }
542
543         rc = lov_connect_obd(obd, tgt, 1, NULL, lov->lov_connect_flags);
544         if (rc)
545                 GOTO(out, rc);
546
547         if (obd->obd_observer) {
548                 /* tell the mds_lov about the new target */
549                 rc = obd_notify(obd->obd_observer, tgt->ltd_exp->exp_obd, 1,
550                                 (void *)index);
551         }
552
553         GOTO(out, rc);
554  out:
555         if (rc && tgt->ltd_exp != NULL)
556                 lov_disconnect_obd(obd, tgt, 0);
557         return rc;
558 }
559
560 static int
561 lov_del_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
562 {
563         struct lov_obd *lov = &obd->u.lov;
564         struct lov_tgt_desc *tgt;
565         int count = lov->desc.ld_tgt_count;
566         int rc = 0;
567         ENTRY;
568
569         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
570                uuidp->uuid, index, gen);
571
572         if (index >= count) {
573                 CERROR("LOV target index %d >= number of LOV OBDs %d.\n",
574                        index, count);
575                 RETURN(-EINVAL);
576         }
577
578         tgt = lov->tgts + index;
579
580         if (obd_uuid_empty(&tgt->uuid)) {
581                 CERROR("LOV target at index %d is not setup.\n", index);
582                 RETURN(-EINVAL);
583         }
584
585         if (!obd_uuid_equals(uuidp, &tgt->uuid)) {
586                 CERROR("LOV target UUID %s at index %d doesn't match %s.\n",
587                        tgt->uuid.uuid, index, uuidp->uuid);
588                 RETURN(-EINVAL);
589         }
590
591         if (tgt->ltd_exp) {
592                 struct obd_device *osc_obd;
593
594                 osc_obd = class_exp2obd(tgt->ltd_exp);
595                 if (osc_obd) {
596                         osc_obd->obd_no_recov = 1;
597                         rc = obd_llog_finish(osc_obd, &osc_obd->obd_llogs, 1);
598                         if (rc)
599                                 CERROR("osc_llog_finish error: %d\n", rc);
600                 }
601                 lov_disconnect_obd(obd, tgt, 0);
602         }
603
604         /* XXX - right now there is a dependency on ld_tgt_count being the
605          * maximum tgt index for computing the mds_max_easize. So we can't
606          * shrink it. */
607
608         /* lt_gen = 0 will mean it will not match the gen of any valid loi */
609         memset(tgt, 0, sizeof(*tgt));
610
611         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
612                tgt->uuid.uuid, index, tgt->ltd_gen, tgt->ltd_exp, tgt->active);
613
614         RETURN(rc);
615 }
616
617 static int lov_process_config(struct obd_device *obd, obd_count len, void *buf)
618 {
619         struct lustre_cfg *lcfg = buf;
620         struct obd_uuid obd_uuid;
621         int cmd;
622         int index;
623         int gen;
624         int rc = 0;
625         ENTRY;
626
627         switch(cmd = lcfg->lcfg_command) {
628         case LCFG_LOV_ADD_OBD:
629         case LCFG_LOV_DEL_OBD: {
630                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
631                         GOTO(out, rc = -EINVAL);
632
633                 obd_str2uuid(&obd_uuid, lustre_cfg_string(lcfg, 1));
634
635                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
636                         GOTO(out, rc = -EINVAL);
637                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
638                         GOTO(out, rc = -EINVAL);
639                 if (cmd == LCFG_LOV_ADD_OBD)
640                         rc = lov_add_obd(obd, &obd_uuid, index, gen);
641                 else
642                         rc = lov_del_obd(obd, &obd_uuid, index, gen);
643                 GOTO(out, rc);
644         }
645         default: {
646                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
647                 GOTO(out, rc = -EINVAL);
648
649         }
650         }
651 out:
652         RETURN(rc);
653 }
654
655 #ifndef log2
656 #define log2(n) ffz(~(n))
657 #endif
658
659 static int lov_clear_orphans(struct obd_export *export,
660                              struct obdo *src_oa,
661                              struct lov_stripe_md **ea,
662                              struct obd_trans_info *oti)
663 {
664         struct lov_obd *lov;
665         struct obdo *tmp_oa;
666         struct obd_uuid *ost_uuid = NULL;
667         int rc = 0, i;
668         ENTRY;
669
670         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
671                 src_oa->o_flags == OBD_FL_DELORPHAN);
672
673         lov = &export->exp_obd->u.lov;
674
675         tmp_oa = obdo_alloc();
676         if (tmp_oa == NULL)
677                 RETURN(-ENOMEM);
678
679         if (src_oa->o_valid & OBD_MD_FLINLINE) {
680                 ost_uuid = (struct obd_uuid *)src_oa->o_inline;
681                 CDEBUG(D_HA, "clearing orphans only for %s\n",
682                        ost_uuid->uuid);
683         }
684
685         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
686                 int err;
687                 struct lov_stripe_md obj_md;
688                 struct lov_stripe_md *obj_mdp = &obj_md;
689
690                 /*
691                  * if called for a specific target, we don't care if it is not
692                  * active.
693                  */
694                 if (lov->tgts[i].active == 0 && ost_uuid == NULL) {
695                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
696                         continue;
697                 }
698
699                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &lov->tgts[i].uuid))
700                         continue;
701
702                 /* 
703                  * setting up objid OSS objects should be destroyed starting
704                  * from it.
705                  */
706                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
707                 tmp_oa->o_valid |= OBD_MD_FLID;
708                 tmp_oa->o_id = oti->oti_objid[i];
709
710                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
711                 err = obd_create(lov->tgts[i].ltd_exp, tmp_oa, NULL, 0,
712                                  &obj_mdp, oti);
713                 if (err) {
714                         /*
715                          * this export will be disabled until it is recovered,
716                          * and then orphan recovery will be completed.
717                          */
718                         CERROR("error in orphan recovery on OST idx %d/%d: "
719                                "rc = %d\n", i, lov->desc.ld_tgt_count, err);
720                 }
721
722                 if (ost_uuid)
723                         break;
724         }
725         obdo_free(tmp_oa);
726         RETURN(rc);
727 }
728
729 /* the LOV expects oa->o_id to be set to the LOV object id */
730 static int
731 lov_create(struct obd_export *exp, struct obdo *src_oa,
732            void *acl, int acl_size, struct lov_stripe_md **ea,
733            struct obd_trans_info *oti)
734 {
735         struct lov_request_set *set = NULL;
736         struct list_head *pos;
737         struct lov_obd *lov;
738         int rc = 0;
739         ENTRY;
740
741         LASSERT(ea != NULL);
742         if (exp == NULL)
743                 RETURN(-EINVAL);
744
745         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
746             src_oa->o_flags == OBD_FL_DELORPHAN) {
747                 rc = lov_clear_orphans(exp, src_oa, ea, oti);
748                 RETURN(rc);
749         }
750
751         lov = &exp->exp_obd->u.lov;
752         if (!lov->desc.ld_active_tgt_count)
753                 RETURN(-EIO);
754
755         LASSERT(oti->oti_flags & OBD_MODE_CROW);
756                 
757         /* main creation loop */
758         rc = lov_prep_create_set(exp, ea, src_oa, oti, &set);
759         if (rc)
760                 RETURN(rc);
761
762         list_for_each (pos, &set->set_list) {
763                 struct lov_request *req = 
764                         list_entry(pos, struct lov_request, rq_link);
765
766                 obd_id *objids = oti->oti_objid;
767
768                 if (oti->oti_obj_alloc) {
769                         __u64 next_id;
770                                 
771                         /* 
772                          * allocating new objid. Here it is delegated to caller,
773                          * that is MDS in CROW case.
774                          */
775                         next_id = oti->oti_obj_alloc(&objids[req->rq_idx]);
776                         req->rq_oa->o_id = next_id;
777                 } else {
778                         /* and here is default "allocator" */
779                         req->rq_oa->o_id = ++objids[req->rq_idx];
780                 }
781                 lov_update_create_set(set, req, rc);
782         }
783         rc = lov_fini_create_set(set, ea);
784         RETURN(rc);
785 }
786
787 #define lsm_bad_magic(LSMP)                                     \
788 ({                                                              \
789         struct lov_stripe_md *_lsm__ = (LSMP);                  \
790         int _ret__ = 0;                                         \
791         if (!_lsm__) {                                          \
792                 CERROR("LOV requires striping ea\n");           \
793                 _ret__ = 1;                                     \
794         } else if (_lsm__->lsm_magic != LOV_MAGIC) {            \
795                 CERROR("LOV striping magic bad %#x != %#x\n",   \
796                        _lsm__->lsm_magic, LOV_MAGIC);           \
797                 _ret__ = 1;                                     \
798         }                                                       \
799         _ret__;                                                 \
800 })
801
802 static int lov_destroy(struct obd_export *exp, struct obdo *oa,
803                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
804 {
805         struct lov_request_set *set;
806         struct lov_request *req;
807         struct list_head *pos;
808         struct lov_obd *lov;
809         int rc = 0;
810         ENTRY;
811
812         if (lsm_bad_magic(lsm))
813                 RETURN(-EINVAL);
814
815         if (!exp || !exp->exp_obd)
816                 RETURN(-ENODEV);
817
818         lov = &exp->exp_obd->u.lov;
819         rc = lov_prep_destroy_set(exp, oa, lsm, oti, &set);
820         if (rc)
821                 RETURN(rc);
822
823         list_for_each (pos, &set->set_list) {
824                 int err;
825                 req = list_entry(pos, struct lov_request, rq_link);
826
827                 /* XXX update the cookie position */
828                 oti->oti_logcookies = set->set_cookies + req->rq_stripe;
829                 rc = obd_destroy(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
830                                  NULL, oti);
831                 err = lov_update_common_set(set, req, rc);
832                 if (rc) {
833                         CERROR("error: destroying objid "LPX64" subobj "
834                                LPX64" on OST idx %d: rc = %d\n", 
835                                set->set_oa->o_id, req->rq_oa->o_id, 
836                                req->rq_idx, rc);
837                         if (!rc)
838                                 rc = err;
839                 }
840         }
841         lov_fini_destroy_set(set);
842         RETURN(rc);
843 }
844
845 static int lov_getattr(struct obd_export *exp, struct obdo *oa,
846                        struct lov_stripe_md *lsm)
847 {
848         struct lov_request_set *set;
849         struct lov_request *req;
850         struct list_head *pos;
851         struct lov_obd *lov;
852         int err = 0, rc = 0;
853         ENTRY;
854
855         if (lsm_bad_magic(lsm))
856                 RETURN(-EINVAL);
857
858         if (!exp || !exp->exp_obd)
859                 RETURN(-ENODEV);
860
861         lov = &exp->exp_obd->u.lov;
862         
863         rc = lov_prep_getattr_set(exp, oa, lsm, &set);
864         if (rc)
865                 RETURN(rc);
866
867         list_for_each (pos, &set->set_list) {
868                 req = list_entry(pos, struct lov_request, rq_link);
869                 
870                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
871                        "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id, 
872                        req->rq_idx);
873
874                 rc = obd_getattr(lov->tgts[req->rq_idx].ltd_exp, 
875                                  req->rq_oa, NULL);
876                 err = lov_update_common_set(set, req, rc);
877                 if (err) {
878                         CERROR("error: getattr objid "LPX64" subobj "
879                                LPX64" on OST idx %d: rc = %d\n",
880                                set->set_oa->o_id, req->rq_oa->o_id, 
881                                req->rq_idx, err);
882                         break;
883                 }
884         }
885         
886         rc = lov_fini_getattr_set(set);
887         if (err)
888                 rc = err;
889         RETURN(rc);
890 }
891
892 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data,
893                                  int rc)
894 {
895         struct lov_request_set *lovset = (struct lov_request_set *)data;
896         ENTRY;
897
898         /* don't do attribute merge if this aysnc op failed */
899         if (rc) {
900                 lovset->set_completes = 0;
901                 lov_fini_getattr_set(lovset);
902         } else {
903                 rc = lov_fini_getattr_set(lovset);
904         }
905         RETURN (rc);
906 }
907
908 static int lov_getattr_async(struct obd_export *exp, struct obdo *oa,
909                               struct lov_stripe_md *lsm,
910                               struct ptlrpc_request_set *rqset)
911 {
912         struct lov_request_set *lovset;
913         struct lov_obd *lov;
914         struct list_head *pos;
915         struct lov_request *req;
916         int rc = 0;
917         ENTRY;
918
919         if (lsm_bad_magic(lsm))
920                 RETURN(-EINVAL);
921
922         if (!exp || !exp->exp_obd)
923                 RETURN(-ENODEV);
924
925         lov = &exp->exp_obd->u.lov;
926
927         rc = lov_prep_getattr_set(exp, oa, lsm, &lovset);
928         if (rc)
929                 RETURN(rc);
930
931         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
932                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
933
934         list_for_each (pos, &lovset->set_list) {
935                 req = list_entry(pos, struct lov_request, rq_link);
936                 
937                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
938                        "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id, 
939                        req->rq_idx);
940                 rc = obd_getattr_async(lov->tgts[req->rq_idx].ltd_exp,
941                                        req->rq_oa, NULL, rqset);
942                 if (rc) {
943                         CERROR("error: getattr objid "LPX64" subobj "
944                                LPX64" on OST idx %d: rc = %d\n",
945                                lovset->set_oa->o_id, req->rq_oa->o_id, 
946                                req->rq_idx, rc);
947                         GOTO(out, rc);
948                 }
949                 lov_update_common_set(lovset, req, rc);
950         }
951         
952         LASSERT(rc == 0);
953         LASSERT (rqset->set_interpret == NULL);
954         rqset->set_interpret = lov_getattr_interpret;
955         rqset->set_arg = (void *)lovset;
956         RETURN(rc);
957 out:
958         LASSERT(rc);
959         lov_fini_getattr_set(lovset);
960         RETURN(rc);
961 }
962
963 static int lov_setattr(struct obd_export *exp, struct obdo *src_oa,
964                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
965 {
966         struct lov_request_set *set;
967         struct lov_obd *lov;
968         struct list_head *pos;
969         struct lov_request *req;
970         int err = 0, rc = 0;
971         ENTRY;
972
973         if (lsm_bad_magic(lsm))
974                 RETURN(-EINVAL);
975
976         if (!exp || !exp->exp_obd)
977                 RETURN(-ENODEV);
978
979         LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE | OBD_MD_FLMODE|
980                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
981                                       OBD_MD_FLCTIME | OBD_MD_FLFLAGS |
982                                       OBD_MD_FLSIZE | OBD_MD_FLGROUP |
983                                       OBD_MD_FLUID | OBD_MD_FLGID)));
984
985         LASSERT(!(src_oa->o_valid & OBD_MD_FLGROUP) || src_oa->o_gr > 0);
986
987         lov = &exp->exp_obd->u.lov;
988         rc = lov_prep_setattr_set(exp, src_oa, lsm, NULL, &set);
989         if (rc)
990                 RETURN(rc);
991
992         list_for_each (pos, &set->set_list) {
993                 req = list_entry(pos, struct lov_request, rq_link);
994                 
995                 rc = obd_setattr(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
996                                  NULL, NULL);
997                 err = lov_update_common_set(set, req, rc);
998                 if (err) {
999                         CERROR("error: setattr objid "LPX64" subobj "
1000                                LPX64" on OST idx %d: rc = %d\n",
1001                                set->set_oa->o_id, req->rq_oa->o_id,
1002                                req->rq_idx, err);
1003                         if (!rc)
1004                                 rc = err;
1005                 }
1006         }
1007         err = lov_fini_setattr_set(set);
1008         if (!rc)
1009                 rc = err;
1010         RETURN(rc);
1011 }
1012
1013 static int lov_revalidate_policy(struct lov_obd *lov, struct lov_stripe_md *lsm)
1014 {
1015         static int next_idx = 0;
1016         struct lov_tgt_desc *tgt;
1017         int i, count;
1018
1019         /* XXX - we should do something clever and take lsm
1020          * into account but just do round robin for now. */
1021
1022         /* last_idx must always be less that count because
1023          * ld_tgt_count currently cannot shrink. */
1024         count = lov->desc.ld_tgt_count;
1025
1026         for (i = next_idx, tgt = lov->tgts + i; i < count; i++, tgt++) {
1027                 if (tgt->active) {
1028                         next_idx = (i + 1) % count;
1029                         RETURN(i);
1030                 }
1031         }
1032
1033         for (i = 0, tgt = lov->tgts; i < next_idx; i++, tgt++) {
1034                 if (tgt->active) {
1035                         next_idx = (i + 1) % count;
1036                         RETURN(i);
1037                 }
1038         }
1039
1040         RETURN(-EIO);
1041 }
1042
1043 static int lov_revalidate_md(struct obd_export *exp, struct obdo *src_oa,
1044                              struct lov_stripe_md *ea,
1045                              struct obd_trans_info *oti)
1046 {
1047         struct obd_export *osc_exp;
1048         struct lov_obd *lov = &exp->exp_obd->u.lov;
1049         struct lov_stripe_md *lsm = ea;
1050         struct lov_stripe_md obj_md;
1051         struct lov_stripe_md *obj_mdp = &obj_md;
1052         struct lov_oinfo *loi;
1053         struct obdo *tmp_oa;
1054         int ost_idx, updates = 0, i;
1055         ENTRY;
1056
1057         tmp_oa = obdo_alloc();
1058         if (tmp_oa == NULL)
1059                 RETURN(-ENOMEM);
1060
1061         loi = lsm->lsm_oinfo;
1062         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1063                 int rc;
1064                 if (!obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid))
1065                         continue;
1066
1067                 ost_idx = lov_revalidate_policy(lov, lsm);
1068                 if (ost_idx < 0) {
1069                         /* FIXME: punt for now. */
1070                         CERROR("lov_revalidate_policy failed; no active "
1071                                "OSCs?\n");
1072                         continue;
1073                 }
1074
1075                 /* create a new object */
1076                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
1077                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
1078                 osc_exp = lov->tgts[ost_idx].ltd_exp;
1079                 rc = obd_create(osc_exp, tmp_oa, NULL, 0, &obj_mdp, oti);
1080                 if (rc) {
1081                         CERROR("error creating new subobj at idx %d; "
1082                                "rc = %d\n", ost_idx, rc);
1083                         continue;
1084                 }
1085                 if (oti->oti_objid)
1086                         oti->oti_objid[ost_idx] = tmp_oa->o_id;
1087                 loi->loi_id = tmp_oa->o_id;
1088                 loi->loi_gr = tmp_oa->o_gr;
1089                 loi->loi_ost_idx = ost_idx;
1090                 loi->loi_ost_gen = lov->tgts[ost_idx].ltd_gen;
1091                 CDEBUG(D_INODE, "replacing objid "LPX64" subobj "LPX64
1092                        " with idx %d gen %d.\n", lsm->lsm_object_id,
1093                        loi->loi_id, ost_idx, loi->loi_ost_gen);
1094                 updates = 1;
1095         }
1096
1097         /* If we got an error revalidating an entry there's no need to
1098          * cleanup up objects we allocated here because the bad entry
1099          * still points to a deleted OST. */
1100
1101         obdo_free(tmp_oa);
1102         RETURN(updates);
1103 }
1104
1105 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1106  * we can send this 'punch' to just the authoritative node and the nodes
1107  * that the punch will affect. */
1108 static int lov_punch(struct obd_export *exp, struct obdo *oa,
1109                      struct lov_stripe_md *lsm,
1110                      obd_off start, obd_off end, struct obd_trans_info *oti)
1111 {
1112         struct lov_request_set *set;
1113         struct lov_obd *lov;
1114         struct list_head *pos;
1115         struct lov_request *req;
1116         int err = 0, rc = 0;
1117         ENTRY;
1118
1119         if (lsm_bad_magic(lsm))
1120                 RETURN(-EINVAL);
1121
1122         if (!exp || !exp->exp_obd)
1123                 RETURN(-ENODEV);
1124
1125         lov = &exp->exp_obd->u.lov;
1126         rc = lov_prep_punch_set(exp, oa, lsm, start, end, oti, &set);
1127         if (rc)
1128                 RETURN(rc);
1129
1130         list_for_each (pos, &set->set_list) {
1131                 req = list_entry(pos, struct lov_request, rq_link);
1132
1133                 rc = obd_punch(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, 
1134                                NULL, req->rq_extent.start, 
1135                                req->rq_extent.end, NULL);
1136                 err = lov_update_punch_set(set, req, rc);
1137                 if (err) {
1138                         CERROR("error: punch objid "LPX64" subobj "LPX64
1139                                " on OST idx %d: rc = %d\n", set->set_oa->o_id,
1140                                req->rq_oa->o_id, req->rq_idx, rc);
1141                         if (!rc)
1142                                 rc = err;
1143                 }
1144         }
1145         err = lov_fini_punch_set(set);
1146         if (!rc)
1147                 rc = err;
1148         RETURN(rc);
1149 }
1150
1151 static int lov_sync(struct obd_export *exp, struct obdo *oa,
1152                     struct lov_stripe_md *lsm, obd_off start, obd_off end)
1153 {
1154         struct lov_request_set *set;
1155         struct lov_obd *lov;
1156         struct list_head *pos;
1157         struct lov_request *req;
1158         int err = 0, rc = 0;
1159         ENTRY;
1160
1161         if (lsm_bad_magic(lsm))
1162                 RETURN(-EINVAL);
1163
1164         if (!exp->exp_obd)
1165                 RETURN(-ENODEV);
1166
1167         lov = &exp->exp_obd->u.lov;
1168         rc = lov_prep_sync_set(exp, oa, lsm, start, end, &set);
1169         if (rc)
1170                 RETURN(rc);
1171
1172         list_for_each (pos, &set->set_list) {
1173                 req = list_entry(pos, struct lov_request, rq_link);
1174
1175                 rc = obd_sync(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, 
1176                               NULL, req->rq_extent.start, req->rq_extent.end);
1177                 err = lov_update_common_set(set, req, rc);
1178                 if (err) {
1179                         CERROR("error: fsync objid "LPX64" subobj "LPX64
1180                                " on OST idx %d: rc = %d\n", set->set_oa->o_id,
1181                                req->rq_oa->o_id, req->rq_idx, rc);
1182                         if (!rc)
1183                                 rc = err;
1184                 }
1185         }
1186         err = lov_fini_sync_set(set);
1187         if (!rc)
1188                 rc = err;
1189         RETURN(rc);
1190 }
1191
1192 static int lov_brw_check(struct lov_obd *lov, struct obdo *oa,
1193                          struct lov_stripe_md *lsm,
1194                          obd_count oa_bufs, struct brw_page *pga)
1195 {
1196         int i, rc = 0;
1197         ENTRY;
1198
1199         /* The caller just wants to know if there's a chance that this
1200          * I/O can succeed */
1201         for (i = 0; i < oa_bufs; i++) {
1202                 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
1203                 int ost = lsm->lsm_oinfo[stripe].loi_ost_idx;
1204                 obd_off start, end;
1205
1206                 if (!lov_stripe_intersects(lsm, i, pga[i].disk_offset,
1207                                            pga[i].disk_offset + pga[i].count,
1208                                            &start, &end))
1209                         continue;
1210
1211                 if (lov->tgts[ost].active == 0) {
1212                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1213                         RETURN(-EIO);
1214                 }
1215                 rc = obd_brw(OBD_BRW_CHECK, lov->tgts[ost].ltd_exp, oa,
1216                              NULL, 1, &pga[i], NULL);
1217                 if (rc)
1218                         break;
1219         }
1220         RETURN(rc);
1221 }
1222
1223 static int lov_brw(int cmd, struct obd_export *exp, struct obdo *src_oa,
1224                    struct lov_stripe_md *lsm, obd_count oa_bufs,
1225                    struct brw_page *pga, struct obd_trans_info *oti)
1226 {
1227         struct lov_request_set *set;
1228         struct lov_request *req;
1229         struct list_head *pos;
1230         struct lov_obd *lov = &exp->exp_obd->u.lov;
1231         int err, rc = 0;
1232         ENTRY;
1233
1234         if (lsm_bad_magic(lsm))
1235                 RETURN(-EINVAL);
1236
1237         if (cmd == OBD_BRW_CHECK) {
1238                 rc = lov_brw_check(lov, src_oa, lsm, oa_bufs, pga);
1239                 RETURN(rc);
1240         }
1241
1242         rc = lov_prep_brw_set(exp, src_oa, lsm, oa_bufs, pga, oti, &set);
1243         if (rc)
1244                 RETURN(rc);
1245
1246         list_for_each (pos, &set->set_list) {
1247                 struct obd_export *sub_exp;
1248                 struct brw_page *sub_pga;
1249                 req = list_entry(pos, struct lov_request, rq_link);
1250                 
1251                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
1252                 sub_pga = set->set_pga + req->rq_pgaidx;
1253                 rc = obd_brw(cmd, sub_exp, req->rq_oa, req->rq_md, 
1254                              req->rq_oabufs, sub_pga, oti);
1255                 if (rc)
1256                         break;
1257                 lov_update_common_set(set, req, rc);
1258         }
1259
1260         err = lov_fini_brw_set(set);
1261         if (!rc)
1262                 rc = err;
1263         RETURN(rc);
1264 }
1265
1266 static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data,
1267                              int rc)
1268 {
1269         struct lov_request_set *lovset = (struct lov_request_set *)data;
1270         ENTRY;
1271         
1272         if (rc) {
1273                 lovset->set_completes = 0;
1274                 lov_fini_brw_set(lovset);
1275         } else {
1276                 rc = lov_fini_brw_set(lovset);
1277         }
1278                 
1279         RETURN(rc);
1280 }
1281
1282 static int lov_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
1283                          struct lov_stripe_md *lsm, obd_count oa_bufs,
1284                          struct brw_page *pga, struct ptlrpc_request_set *set,
1285                          struct obd_trans_info *oti)
1286 {
1287         struct lov_request_set *lovset;
1288         struct lov_request *req;
1289         struct list_head *pos;
1290         struct lov_obd *lov = &exp->exp_obd->u.lov;
1291         int rc = 0;
1292         ENTRY;
1293
1294         if (lsm_bad_magic(lsm))
1295                 RETURN(-EINVAL);
1296
1297         if (cmd == OBD_BRW_CHECK) {
1298                 rc = lov_brw_check(lov, oa, lsm, oa_bufs, pga);
1299                 RETURN(rc);
1300         }
1301
1302         rc = lov_prep_brw_set(exp, oa, lsm, oa_bufs, pga, oti, &lovset);
1303         if (rc)
1304                 RETURN(rc);
1305
1306         list_for_each (pos, &lovset->set_list) {
1307                 struct obd_export *sub_exp;
1308                 struct brw_page *sub_pga;
1309                 req = list_entry(pos, struct lov_request, rq_link);
1310                 
1311                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
1312                 sub_pga = lovset->set_pga + req->rq_pgaidx;
1313                 rc = obd_brw_async(cmd, sub_exp, req->rq_oa, req->rq_md,
1314                                    req->rq_oabufs, sub_pga, set, oti);
1315                 if (rc)
1316                         GOTO(out, rc);
1317                 lov_update_common_set(lovset, req, rc);
1318         }
1319         LASSERT(rc == 0);
1320         LASSERT(set->set_interpret == NULL);
1321         set->set_interpret = (set_interpreter_func)lov_brw_interpret;
1322         set->set_arg = (void *)lovset;
1323         
1324         RETURN(rc);
1325 out:
1326         lov_fini_brw_set(lovset);
1327         RETURN(rc);
1328 }
1329
1330 static int lov_ap_make_ready(void *data, int cmd)
1331 {
1332         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1333
1334         return lap->lap_caller_ops->ap_make_ready(lap->lap_caller_data, cmd);
1335 }
1336 static int lov_ap_refresh_count(void *data, int cmd)
1337 {
1338         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1339
1340         return lap->lap_caller_ops->ap_refresh_count(lap->lap_caller_data,
1341                                                      cmd);
1342 }
1343 static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
1344 {
1345         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1346
1347         lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa);
1348         /* XXX woah, shouldn't we be altering more here?  size? */
1349         oa->o_id = lap->lap_loi_id;
1350 }
1351
1352 static void lov_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
1353 {
1354         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1355
1356         /* in a raid1 regime this would down a count of many ios
1357          * in flight, onl calling the caller_ops completion when all
1358          * the raid1 ios are complete */
1359         lap->lap_caller_ops->ap_completion(lap->lap_caller_data, cmd, oa, rc);
1360 }
1361
1362 static struct obd_async_page_ops lov_async_page_ops = {
1363         .ap_make_ready =        lov_ap_make_ready,
1364         .ap_refresh_count =     lov_ap_refresh_count,
1365         .ap_fill_obdo =         lov_ap_fill_obdo,
1366         .ap_completion =        lov_ap_completion,
1367 };
1368
1369 static int lov_prep_async_page(struct obd_export *exp,
1370                                struct lov_stripe_md *lsm,
1371                                struct lov_oinfo *loi, struct page *page,
1372                                obd_off offset, struct obd_async_page_ops *ops,
1373                                void *data, void **res)
1374 {
1375         struct lov_obd *lov = &exp->exp_obd->u.lov;
1376         struct lov_async_page *lap;
1377         int rc, stripe;
1378         ENTRY;
1379
1380         if (lsm_bad_magic(lsm))
1381                 RETURN(-EINVAL);
1382         LASSERT(loi == NULL);
1383
1384         stripe = lov_stripe_number(lsm, offset);
1385         loi = &lsm->lsm_oinfo[stripe];
1386
1387         if (obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid))
1388                 RETURN(-EIO);
1389         if (lov->tgts[loi->loi_ost_idx].active == 0)
1390                 RETURN(-EIO);
1391         if (lov->tgts[loi->loi_ost_idx].ltd_exp == NULL) {
1392                 CERROR("ltd_exp == NULL, but OST idx %d doesn't appear to be "
1393                        "deleted or inactive.\n", loi->loi_ost_idx);
1394                 RETURN(-EIO);
1395         }
1396
1397         OBD_ALLOC(lap, sizeof(*lap));
1398         if (lap == NULL)
1399                 RETURN(-ENOMEM);
1400
1401         lap->lap_magic = LAP_MAGIC;
1402         lap->lap_caller_ops = ops;
1403         lap->lap_caller_data = data;
1404
1405         /* FIXME handle multiple oscs after landing b_raid1 */
1406         lap->lap_stripe = stripe;
1407         switch (lsm->lsm_pattern) {
1408                 case LOV_PATTERN_RAID0:
1409                         lov_stripe_offset(lsm, offset, lap->lap_stripe, 
1410                                           &lap->lap_sub_offset);
1411                         break;
1412                 case LOV_PATTERN_CMOBD:
1413                         lap->lap_sub_offset = offset;
1414                         break;
1415                 default:
1416                         LBUG();
1417         }
1418
1419         /* so the callback doesn't need the lsm */
1420         lap->lap_loi_id = loi->loi_id;
1421
1422         rc = obd_prep_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1423                                  lsm, loi, page, lap->lap_sub_offset,
1424                                  &lov_async_page_ops, lap,
1425                                  &lap->lap_sub_cookie);
1426         if (rc) {
1427                 OBD_FREE(lap, sizeof(*lap));
1428                 RETURN(rc);
1429         }
1430         CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
1431                lap->lap_sub_cookie, offset);
1432         *res = lap;
1433         RETURN(0);
1434 }
1435
1436 static int lov_queue_async_io(struct obd_export *exp,
1437                               struct lov_stripe_md *lsm,
1438                               struct lov_oinfo *loi, void *cookie,
1439                               int cmd, obd_off off, int count,
1440                               obd_flags brw_flags, obd_flags async_flags)
1441 {
1442         struct lov_obd *lov = &exp->exp_obd->u.lov;
1443         struct lov_async_page *lap;
1444         int rc;
1445
1446         LASSERT(loi == NULL);
1447
1448         if (lsm_bad_magic(lsm))
1449                 RETURN(-EINVAL);
1450
1451         lap = LAP_FROM_COOKIE(cookie);
1452
1453         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1454
1455         rc = obd_queue_async_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm,
1456                                 loi, lap->lap_sub_cookie, cmd, off, count,
1457                                 brw_flags, async_flags);
1458         RETURN(rc);
1459 }
1460
1461 static int lov_set_async_flags(struct obd_export *exp,
1462                                struct lov_stripe_md *lsm,
1463                                struct lov_oinfo *loi, void *cookie,
1464                                obd_flags async_flags)
1465 {
1466         struct lov_obd *lov = &exp->exp_obd->u.lov;
1467         struct lov_async_page *lap;
1468         int rc;
1469
1470         LASSERT(loi == NULL);
1471
1472         if (lsm_bad_magic(lsm))
1473                 RETURN(-EINVAL);
1474
1475         lap = LAP_FROM_COOKIE(cookie);
1476
1477         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1478
1479         rc = obd_set_async_flags(lov->tgts[loi->loi_ost_idx].ltd_exp,
1480                                  lsm, loi, lap->lap_sub_cookie, async_flags);
1481         RETURN(rc);
1482 }
1483
1484 static int lov_queue_group_io(struct obd_export *exp,
1485                               struct lov_stripe_md *lsm,
1486                               struct lov_oinfo *loi,
1487                               struct obd_io_group *oig, void *cookie,
1488                               int cmd, obd_off off, int count,
1489                               obd_flags brw_flags, obd_flags async_flags)
1490 {
1491         struct lov_obd *lov = &exp->exp_obd->u.lov;
1492         struct lov_async_page *lap;
1493         int rc;
1494
1495         LASSERT(loi == NULL);
1496
1497         if (lsm_bad_magic(lsm))
1498                 RETURN(-EINVAL);
1499
1500         lap = LAP_FROM_COOKIE(cookie);
1501
1502         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1503
1504         rc = obd_queue_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi,
1505                                 oig, lap->lap_sub_cookie, cmd, off, count,
1506                                 brw_flags, async_flags);
1507         RETURN(rc);
1508 }
1509
1510 /* this isn't exactly optimal.  we may have queued sync io in oscs on
1511  * all stripes, but we don't record that fact at queue time.  so we
1512  * trigger sync io on all stripes. */
1513 static int lov_trigger_group_io(struct obd_export *exp,
1514                                 struct lov_stripe_md *lsm,
1515                                 struct lov_oinfo *loi,
1516                                 struct obd_io_group *oig)
1517 {
1518         struct lov_obd *lov = &exp->exp_obd->u.lov;
1519         int rc = 0, i, err;
1520
1521         LASSERT(loi == NULL);
1522
1523         if (lsm_bad_magic(lsm))
1524                 RETURN(-EINVAL);
1525
1526         loi = lsm->lsm_oinfo;
1527         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1528                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1529                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1530                         continue;
1531                 }
1532
1533                 err = obd_trigger_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp,
1534                                            lsm, loi, oig);
1535                 if (rc == 0 && err != 0)
1536                         rc = err;
1537         };
1538         RETURN(rc);
1539 }
1540
1541 static int lov_teardown_async_page(struct obd_export *exp,
1542                                    struct lov_stripe_md *lsm,
1543                                    struct lov_oinfo *loi, void *cookie)
1544 {
1545         struct lov_obd *lov = &exp->exp_obd->u.lov;
1546         struct lov_async_page *lap;
1547         int rc;
1548
1549         LASSERT(loi == NULL);
1550
1551         if (lsm_bad_magic(lsm))
1552                 RETURN(-EINVAL);
1553
1554         lap = LAP_FROM_COOKIE(cookie);
1555
1556         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1557
1558         rc = obd_teardown_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1559                                      lsm, loi, lap->lap_sub_cookie);
1560         if (rc) {
1561                 CERROR("unable to teardown sub cookie %p: %d\n",
1562                        lap->lap_sub_cookie, rc);
1563                 RETURN(rc);
1564         }
1565         OBD_FREE(lap, sizeof(*lap));
1566         RETURN(rc);
1567 }
1568
1569 static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
1570                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
1571                        int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
1572                        void *data,__u32 lvb_len, void *lvb_swabber,
1573                        struct lustre_handle *lockh)
1574 {
1575         struct lov_request_set *set;
1576         struct lov_request *req;
1577         struct list_head *pos;
1578         struct lustre_handle *lov_lockhp;
1579         struct lov_obd *lov;
1580         ldlm_error_t rc;
1581         int save_flags = *flags;
1582         ENTRY;
1583
1584         if (lsm_bad_magic(lsm))
1585                 RETURN(-EINVAL);
1586
1587         /* we should never be asked to replay a lock this way. */
1588         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1589
1590         if (!exp || !exp->exp_obd)
1591                 RETURN(-ENODEV);
1592
1593         lov = &exp->exp_obd->u.lov;
1594         rc = lov_prep_enqueue_set(exp, lsm, policy, mode, lockh, &set);
1595         if (rc)
1596                 RETURN(rc);
1597
1598         list_for_each (pos, &set->set_list) {
1599                 ldlm_policy_data_t sub_policy;
1600                 req = list_entry(pos, struct lov_request, rq_link);
1601                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1602                 LASSERT(lov_lockhp);
1603
1604                 *flags = save_flags;
1605                 sub_policy.l_extent.start = req->rq_extent.start;
1606                 sub_policy.l_extent.end = req->rq_extent.end;
1607
1608                 rc = obd_enqueue(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1609                                  type, &sub_policy, mode, flags, bl_cb,
1610                                  cp_cb, gl_cb, data, lvb_len, lvb_swabber,
1611                                  lov_lockhp);
1612                 rc = lov_update_enqueue_set(set, req, rc, save_flags);
1613                 if (rc != ELDLM_OK)
1614                         break;
1615         }
1616
1617         lov_fini_enqueue_set(set, mode);
1618         RETURN(rc);
1619 }
1620
1621 static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
1622                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
1623                      int *flags, void *data, struct lustre_handle *lockh)
1624 {
1625         struct lov_request_set *set;
1626         struct lov_request *req;
1627         struct list_head *pos;
1628         struct lov_obd *lov = &exp->exp_obd->u.lov;
1629         struct lustre_handle *lov_lockhp;
1630         int lov_flags, rc = 0;
1631         ENTRY;
1632
1633         if (lsm_bad_magic(lsm))
1634                 RETURN(-EINVAL);
1635
1636         if (!exp || !exp->exp_obd)
1637                 RETURN(-ENODEV);
1638
1639         lov = &exp->exp_obd->u.lov;
1640         rc = lov_prep_match_set(exp, lsm, policy, mode, lockh, &set);
1641         if (rc)
1642                 RETURN(rc);
1643
1644         list_for_each (pos, &set->set_list) {
1645                 ldlm_policy_data_t sub_policy;
1646                 req = list_entry(pos, struct lov_request, rq_link);
1647                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1648                 LASSERT(lov_lockhp);
1649
1650                 sub_policy.l_extent.start = req->rq_extent.start;
1651                 sub_policy.l_extent.end = req->rq_extent.end;
1652                 lov_flags = *flags;
1653
1654                 rc = obd_match(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1655                                type, &sub_policy, mode, &lov_flags, data,
1656                                lov_lockhp);
1657                 rc = lov_update_match_set(set, req, rc);
1658                 if (rc != 1)
1659                         break;
1660         }
1661         lov_fini_match_set(set, mode, *flags);
1662         RETURN(rc);
1663 }
1664
1665 static int lov_change_cbdata(struct obd_export *exp,
1666                              struct lov_stripe_md *lsm, ldlm_iterator_t it,
1667                              void *data)
1668 {
1669         struct lov_obd *lov;
1670         struct lov_oinfo *loi;
1671         int rc = 0, i;
1672         ENTRY;
1673
1674         if (lsm_bad_magic(lsm))
1675                 RETURN(-EINVAL);
1676
1677         if (!exp || !exp->exp_obd)
1678                 RETURN(-ENODEV);
1679
1680         LASSERT(lsm->lsm_object_gr > 0);
1681
1682         lov = &exp->exp_obd->u.lov;
1683         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1684                 struct lov_stripe_md submd;
1685                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1686                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1687                         continue;
1688                 }
1689
1690                 submd.lsm_object_id = loi->loi_id;
1691                 submd.lsm_object_gr = lsm->lsm_object_gr;
1692                 submd.lsm_stripe_count = 0;
1693                 rc = obd_change_cbdata(lov->tgts[loi->loi_ost_idx].ltd_exp,
1694                                        &submd, it, data);
1695         }
1696         RETURN(rc);
1697 }
1698
1699 static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
1700                       __u32 mode, struct lustre_handle *lockh)
1701 {
1702         struct lov_request_set *set;
1703         struct lov_request *req;
1704         struct list_head *pos;
1705         struct lov_obd *lov = &exp->exp_obd->u.lov;
1706         struct lustre_handle *lov_lockhp;
1707         int err = 0, rc = 0;
1708         ENTRY;
1709
1710         if (lsm_bad_magic(lsm))
1711                 RETURN(-EINVAL);
1712
1713         if (!exp || !exp->exp_obd)
1714                 RETURN(-ENODEV);
1715
1716         LASSERT(lsm->lsm_object_gr > 0);
1717
1718         LASSERT(lockh);
1719         lov = &exp->exp_obd->u.lov;
1720         rc = lov_prep_cancel_set(exp, lsm, mode, lockh, &set);
1721         if (rc)
1722                 RETURN(rc);
1723
1724         list_for_each (pos, &set->set_list) {
1725                 req = list_entry(pos, struct lov_request, rq_link);
1726                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1727
1728                 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1729                                 mode, lov_lockhp);
1730                 rc = lov_update_common_set(set, req, rc);
1731                 if (rc) {
1732                         CERROR("error: cancel objid "LPX64" subobj "
1733                                LPX64" on OST idx %d: rc = %d\n",
1734                                lsm->lsm_object_id,
1735                                req->rq_md->lsm_object_id, req->rq_idx, rc);
1736                         err = rc;
1737                 }
1738  
1739         }
1740         lov_fini_cancel_set(set);
1741         RETURN(err);
1742 }
1743
1744 static int lov_cancel_unused(struct obd_export *exp,
1745                              struct lov_stripe_md *lsm, 
1746                              int flags, void *opaque)
1747 {
1748         struct lov_obd *lov;
1749         struct lov_oinfo *loi;
1750         int rc = 0, i;
1751         ENTRY;
1752
1753         lov = &exp->exp_obd->u.lov;
1754         if (lsm == NULL) {
1755                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1756                         int err = obd_cancel_unused(lov->tgts[i].ltd_exp,
1757                                                     NULL, flags, opaque);
1758                         if (!rc)
1759                                 rc = err;
1760                 }
1761                 RETURN(rc);
1762         }
1763
1764         if (lsm_bad_magic(lsm))
1765                 RETURN(-EINVAL);
1766
1767         if (!exp || !exp->exp_obd)
1768                 RETURN(-ENODEV);
1769
1770         LASSERT(lsm->lsm_object_gr > 0);
1771
1772         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1773                 struct lov_stripe_md submd;
1774                 int err;
1775
1776                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1777                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1778
1779                 submd.lsm_object_id = loi->loi_id;
1780                 submd.lsm_object_gr = lsm->lsm_object_gr;
1781                 submd.lsm_stripe_count = 0;
1782                 err = obd_cancel_unused(lov->tgts[loi->loi_ost_idx].ltd_exp,
1783                                         &submd, flags, opaque);
1784                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1785                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
1786                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1787                                loi->loi_id, loi->loi_ost_idx, err);
1788                         if (!rc)
1789                                 rc = err;
1790                 }
1791         }
1792         RETURN(rc);
1793 }
1794
1795 #define LOV_U64_MAX ((__u64)~0ULL)
1796 #define LOV_SUM_MAX(tot, add)                                           \
1797         do {                                                            \
1798                 if ((tot) + (add) < (tot))                              \
1799                         (tot) = LOV_U64_MAX;                            \
1800                 else                                                    \
1801                         (tot) += (add);                                 \
1802         } while(0)
1803
1804 static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1805                       unsigned long max_age)
1806 {
1807         struct lov_obd *lov = &obd->u.lov;
1808         struct obd_statfs lov_sfs;
1809         int set = 0;
1810         int rc = 0;
1811         int i;
1812         ENTRY;
1813
1814
1815         /* We only get block data from the OBD */
1816         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1817                 int err;
1818                 if (!lov->tgts[i].active) {
1819                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1820                         continue;
1821                 }
1822
1823                 err = obd_statfs(class_exp2obd(lov->tgts[i].ltd_exp), &lov_sfs,
1824                                  max_age);
1825                 if (err) {
1826                         if (lov->tgts[i].active && !rc)
1827                                 rc = err;
1828                         continue;
1829                 }
1830
1831                 if (!set) {
1832                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
1833                         set = 1;
1834                 } else {
1835                         osfs->os_bfree += lov_sfs.os_bfree;
1836                         osfs->os_bavail += lov_sfs.os_bavail;
1837                         osfs->os_blocks += lov_sfs.os_blocks;
1838                         /* XXX not sure about this one - depends on policy.
1839                          *   - could be minimum if we always stripe on all OBDs
1840                          *     (but that would be wrong for any other policy,
1841                          *     if one of the OBDs has no more objects left)
1842                          *   - could be sum if we stripe whole objects
1843                          *   - could be average, just to give a nice number
1844                          *
1845                          * To give a "reasonable" (if not wholly accurate)
1846                          * number, we divide the total number of free objects
1847                          * by expected stripe count (watch out for overflow).
1848                          */
1849                         LOV_SUM_MAX(osfs->os_files, lov_sfs.os_files);
1850                         LOV_SUM_MAX(osfs->os_ffree, lov_sfs.os_ffree);
1851                 }
1852         }
1853
1854         if (set) {
1855                 __u32 expected_stripes = lov->desc.ld_default_stripe_count ?
1856                                          lov->desc.ld_default_stripe_count :
1857                                          lov->desc.ld_active_tgt_count;
1858
1859                 if (osfs->os_files != LOV_U64_MAX)
1860                         do_div(osfs->os_files, expected_stripes);
1861                 if (osfs->os_ffree != LOV_U64_MAX)
1862                         do_div(osfs->os_ffree, expected_stripes);
1863         } else if (!rc)
1864                 rc = -EIO;
1865
1866         RETURN(rc);
1867 }
1868
1869 static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1870                          void *karg, void *uarg)
1871 {
1872         struct obd_device *obddev = class_exp2obd(exp);
1873         struct lov_obd *lov = &obddev->u.lov;
1874         int i, rc, count = lov->desc.ld_tgt_count;
1875         struct obd_uuid *uuidp;
1876         ENTRY;
1877
1878         switch (cmd) {
1879         case OBD_IOC_LOV_GET_CONFIG: {
1880                 struct obd_ioctl_data *data = karg;
1881                 struct lov_tgt_desc *tgtdesc;
1882                 struct lov_desc *desc;
1883                 char *buf = NULL;
1884                 __u32 *genp;
1885
1886                 buf = NULL;
1887                 len = 0;
1888                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1889                         RETURN(-EINVAL);
1890
1891                 data = (struct obd_ioctl_data *)buf;
1892
1893                 if (sizeof(*desc) > data->ioc_inllen1) {
1894                         obd_ioctl_freedata(buf, len);
1895                         RETURN(-EINVAL);
1896                 }
1897
1898                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
1899                         obd_ioctl_freedata(buf, len);
1900                         RETURN(-EINVAL);
1901                 }
1902
1903                 if (sizeof(__u32) * count > data->ioc_inllen3) {
1904                         obd_ioctl_freedata(buf, len);
1905                         RETURN(-EINVAL);
1906                 }
1907
1908                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1909                 memcpy(desc, &(lov->desc), sizeof(*desc));
1910
1911                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
1912                 genp = (__u32 *)data->ioc_inlbuf3;
1913                 tgtdesc = lov->tgts;
1914                 /* the uuid will be empty for deleted OSTs */
1915                 for (i = 0; i < count; i++, uuidp++, genp++, tgtdesc++) {
1916                         obd_str2uuid(uuidp, (char *)tgtdesc->uuid.uuid);
1917                         *genp = tgtdesc->ltd_gen;
1918                 }
1919
1920                 rc = copy_to_user((void *)uarg, buf, len);
1921                 if (rc)
1922                         rc = -EFAULT;
1923                 obd_ioctl_freedata(buf, len);
1924                 break;
1925         }
1926         case LL_IOC_LOV_SETSTRIPE:
1927                 rc = lov_setstripe(exp, karg, uarg);
1928                 break;
1929         case LL_IOC_LOV_GETSTRIPE:
1930                 rc = lov_getstripe(exp, karg, uarg);
1931                 break;
1932         case LL_IOC_LOV_SETEA:
1933                 rc = lov_setea(exp, karg, uarg);
1934                 break;
1935         default: {
1936                 int set = 0;
1937                 if (count == 0)
1938                         RETURN(-ENOTTY);
1939                 rc = 0;
1940                 for (i = 0; i < count; i++) {
1941                         int err;
1942
1943                         /* OST was deleted */
1944                         if (obd_uuid_empty(&lov->tgts[i].uuid))
1945                                 continue;
1946
1947                         err = obd_iocontrol(cmd, lov->tgts[i].ltd_exp,
1948                                             len, karg, uarg);
1949                         if (err) {
1950                                 if (lov->tgts[i].active) {
1951                                         CERROR("error: iocontrol OSC %s on OST "
1952                                                "idx %d cmd %x: err = %d\n",
1953                                                lov->tgts[i].uuid.uuid, i,
1954                                                cmd, err);
1955                                         if (!rc)
1956                                                 rc = err;
1957                                 }
1958                         } else
1959                                 set = 1;
1960                 }
1961                 if (!set && !rc)
1962                         rc = -EIO;
1963         }
1964         }
1965
1966         RETURN(rc);
1967 }
1968
1969 static int lov_get_info(struct obd_export *exp, __u32 keylen,
1970                         void *key, __u32 *vallen, void *val)
1971 {
1972         struct obd_device *obddev = class_exp2obd(exp);
1973         struct lov_obd *lov = &obddev->u.lov;
1974         int i;
1975         ENTRY;
1976
1977         if (!vallen || !val)
1978                 RETURN(-EFAULT);
1979
1980         if (keylen > strlen("lock_to_stripe") &&
1981             strcmp(key, "lock_to_stripe") == 0) {
1982                 struct {
1983                         char name[16];
1984                         struct ldlm_lock *lock;
1985                         struct lov_stripe_md *lsm;
1986                 } *data = key;
1987                 struct lov_oinfo *loi;
1988                 struct ldlm_res_id *res_id = &data->lock->l_resource->lr_name;
1989                 __u32 *stripe = val;
1990
1991                 if (*vallen < sizeof(*stripe))
1992                         RETURN(-EFAULT);
1993                 *vallen = sizeof(*stripe);
1994
1995                 /* XXX This is another one of those bits that will need to
1996                  * change if we ever actually support nested LOVs.  It uses
1997                  * the lock's export to find out which stripe it is. */
1998                 /* XXX - it's assumed all the locks for deleted OSTs have
1999                  * been cancelled. Also, the export for deleted OSTs will
2000                  * be NULL and won't match the lock's export. */
2001                 for (i = 0, loi = data->lsm->lsm_oinfo;
2002                      i < data->lsm->lsm_stripe_count;
2003                      i++, loi++) {
2004                         if (lov->tgts[loi->loi_ost_idx].ltd_exp ==
2005                                         data->lock->l_conn_export &&
2006                             loi->loi_id == res_id->name[0] &&
2007                             loi->loi_gr == res_id->name[2]) {
2008                                 *stripe = i;
2009                                 RETURN(0);
2010                         }
2011                 }
2012                 LDLM_ERROR(data->lock, "lock on inode without such object\n");
2013                 dump_lsm(D_ERROR, data->lsm);
2014                 RETURN(-ENXIO);
2015         } else if (keylen >= strlen("size_to_stripe") &&
2016                    strcmp(key, "size_to_stripe") == 0) {
2017                 struct {
2018                         int stripe_number;
2019                         __u64 size;
2020                         struct lov_stripe_md *lsm;
2021                 } *data = val;
2022
2023                 if (*vallen < sizeof(*data))
2024                         RETURN(-EFAULT);
2025
2026                 data->size = lov_size_to_stripe(data->lsm, data->size,
2027                                                 data->stripe_number);
2028                 RETURN(0);
2029         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
2030                 __u32 size = sizeof(obd_id);
2031                 obd_id *ids = val;
2032                 int rc = 0;
2033
2034                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2035                         if (!lov->tgts[i].active)
2036                                 continue;
2037                         rc = obd_get_info(lov->tgts[i].ltd_exp,
2038                                           keylen, key, &size, &(ids[i]));
2039                         if (rc != 0)
2040                                 RETURN(rc);
2041                 }
2042                 RETURN(0);
2043         } else if (keylen >= strlen("lovdesc") && strcmp(key, "lovdesc") == 0) {
2044                 struct lov_desc *desc_ret = val;
2045                 *desc_ret = lov->desc;
2046
2047                 RETURN(0);
2048         }
2049
2050         RETURN(-EINVAL);
2051 }
2052
2053 static int lov_set_info(struct obd_export *exp, obd_count keylen,
2054                         void *key, obd_count vallen, void *val)
2055 {
2056         struct obd_device *obddev = class_exp2obd(exp);
2057         struct lov_obd *lov = &obddev->u.lov;
2058         int i, rc = 0, err;
2059         ENTRY;
2060
2061 #define KEY_IS(str) \
2062         (keylen == strlen(str) && memcmp(key, str, keylen) == 0)
2063
2064         if (KEY_IS("async")) {
2065                 struct lov_desc *desc = &lov->desc;
2066                 struct lov_tgt_desc *tgts = lov->tgts;
2067
2068                 if (vallen != sizeof(int))
2069                         RETURN(-EINVAL);
2070                 lov->async = *((int*) val);
2071
2072                 for (i = 0; i < desc->ld_tgt_count; i++, tgts++) {
2073                         struct obd_uuid *tgt_uuid = &tgts->uuid;
2074                         struct obd_device *tgt_obd;
2075
2076                         tgt_obd = class_find_client_obd(tgt_uuid,
2077                                                         LUSTRE_OSC_NAME,
2078                                                         &obddev->obd_uuid);
2079                         if (!tgt_obd) {
2080                                 CERROR("Target %s not attached\n",
2081                                         tgt_uuid->uuid);
2082                                 if (!rc)
2083                                         rc = -EINVAL;
2084                                 continue;
2085                         }
2086
2087                         err = obd_set_info(tgt_obd->obd_self_export,
2088                                            keylen, key, vallen, val);
2089                         if (err) {
2090                                 CERROR("Failed to set async on target %s\n",
2091                                         tgt_obd->obd_name);
2092                                 if (!rc)
2093                                         rc = err;
2094                         }
2095                 }
2096                 RETURN(rc);
2097         }
2098
2099         if (KEY_IS("mds_conn")) {
2100                 if (vallen != sizeof(__u32))
2101                         RETURN(-EINVAL);
2102         } else if (KEY_IS("unlinked") || KEY_IS("unrecovery")) {
2103                 if (vallen != 0)
2104                         RETURN(-EINVAL);
2105         } else if (KEY_IS("sec")) {
2106                 struct lov_tgt_desc *tgt;
2107                 struct obd_export *exp;
2108                 int rc = 0, err, i;
2109
2110                 spin_lock(&lov->lov_lock);
2111                 for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count;
2112                      i++, tgt++) {
2113                         exp = tgt->ltd_exp;
2114                         /* during setup time the connections to osc might
2115                          * haven't been established.
2116                          */
2117                         if (exp == NULL) {
2118                                 struct obd_device *tgt_obd;
2119
2120                                 tgt_obd = class_find_client_obd(&tgt->uuid,
2121                                                                 LUSTRE_OSC_NAME,
2122                                                                 &obddev->obd_uuid);
2123                                 if (!tgt_obd) {
2124                                         CERROR("can't set security flavor, "
2125                                                "device %s not attached?\n",
2126                                                 tgt->uuid.uuid);
2127                                         rc = -EINVAL;
2128                                         continue;
2129                                 }
2130                                 exp = tgt_obd->obd_self_export;
2131                         }
2132
2133                         err = obd_set_info(exp, keylen, key, vallen, val);
2134                         if (!rc)
2135                                 rc = err;
2136                 }
2137                 spin_unlock(&lov->lov_lock);
2138
2139                 RETURN(rc);
2140         } else if (KEY_IS("flush_cred")) {
2141                 struct lov_tgt_desc *tgt;
2142                 int rc = 0, i;
2143
2144                 for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count;
2145                      i++, tgt++) {
2146                         if (!tgt->ltd_exp)
2147                                 continue;
2148                         rc = obd_set_info(tgt->ltd_exp,
2149                                           keylen, key, vallen, val);
2150                         if (rc)
2151                                 RETURN(rc);
2152                 }
2153
2154                 RETURN(0);
2155         } else {
2156                 RETURN(-EINVAL);
2157         }
2158
2159         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2160                 if (val && !obd_uuid_equals(val, &lov->tgts[i].uuid))
2161                         continue;
2162
2163                 if (!val && !lov->tgts[i].active)
2164                         continue;
2165
2166                 err = obd_set_info(lov->tgts[i].ltd_exp,
2167                                   keylen, key, vallen, val);
2168                 if (!rc)
2169                         rc = err;
2170         }
2171         RETURN(rc);
2172 #undef KEY_IS
2173 }
2174
2175 #if 0
2176 struct lov_multi_wait {
2177         struct ldlm_lock *lock;
2178         wait_queue_t      wait;
2179         int               completed;
2180         int               generation;
2181 };
2182
2183 int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm,
2184                       struct lustre_handle *lockh)
2185 {
2186         struct lov_lock_handles *lov_lockh = NULL;
2187         struct lustre_handle *lov_lockhp;
2188         struct lov_obd *lov;
2189         struct lov_oinfo *loi;
2190         struct lov_multi_wait *queues;
2191         int rc = 0, i;
2192         ENTRY;
2193
2194         if (lsm_bad_magic(lsm))
2195                 RETURN(-EINVAL);
2196
2197         if (!exp || !exp->exp_obd)
2198                 RETURN(-ENODEV);
2199
2200         LASSERT(lockh != NULL);
2201         if (lsm->lsm_stripe_count > 1) {
2202                 lov_lockh = lov_handle2llh(lockh);
2203                 if (lov_lockh == NULL) {
2204                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
2205                         RETURN(-EINVAL);
2206                 }
2207
2208                 lov_lockhp = lov_lockh->llh_handles;
2209         } else {
2210                 lov_lockhp = lockh;
2211         }
2212
2213         OBD_ALLOC(queues, lsm->lsm_stripe_count * sizeof(*queues));
2214         if (queues == NULL)
2215                 GOTO(out, rc = -ENOMEM);
2216
2217         lov = &exp->exp_obd->u.lov;
2218         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
2219              i++, loi++, lov_lockhp++) {
2220                 struct ldlm_lock *lock;
2221                 struct obd_device *obd;
2222                 unsigned long irqflags;
2223
2224                 lock = ldlm_handle2lock(lov_lockhp);
2225                 if (lock == NULL) {
2226                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
2227                                loi->loi_ost_idx, loi->loi_id);
2228                         queues[i].completed = 1;
2229                         continue;
2230                 }
2231
2232                 queues[i].lock = lock;
2233                 init_waitqueue_entry(&(queues[i].wait), current);
2234                 add_wait_queue(lock->l_waitq, &(queues[i].wait));
2235
2236                 obd = class_exp2obd(lock->l_conn_export);
2237                 if (obd != NULL)
2238                         imp = obd->u.cli.cl_import;
2239                 if (imp != NULL) {
2240                         spin_lock_irqsave(&imp->imp_lock, irqflags);
2241                         queues[i].generation = imp->imp_generation;
2242                         spin_unlock_irqrestore(&imp->imp_lock, irqflags);
2243                 }
2244         }
2245
2246         lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
2247                                interrupted_completion_wait, &lwd);
2248         rc = l_wait_event_added(check_multi_complete(queues, lsm), &lwi);
2249
2250         for (i = 0; i < lsm->lsm_stripe_count; i++)
2251                 remove_wait_queue(lock->l_waitq, &(queues[i].wait));
2252
2253         if (rc == -EINTR || rc == -ETIMEDOUT) {
2254
2255
2256         }
2257
2258  out:
2259         if (lov_lockh != NULL)
2260                 lov_llh_put(lov_lockh);
2261         RETURN(rc);
2262 }
2263 #endif
2264
2265 struct obd_ops lov_obd_ops = {
2266         .o_owner               = THIS_MODULE,
2267         .o_attach              = lov_attach,
2268         .o_detach              = lov_detach,
2269         .o_setup               = lov_setup,
2270         .o_cleanup             = lov_cleanup,
2271         .o_process_config      = lov_process_config,
2272         .o_connect             = lov_connect,
2273         .o_disconnect          = lov_disconnect,
2274         .o_statfs              = lov_statfs,
2275         .o_packmd              = lov_packmd,
2276         .o_unpackmd            = lov_unpackmd,
2277         .o_revalidate_md       = lov_revalidate_md,
2278         .o_create              = lov_create,
2279         .o_destroy             = lov_destroy,
2280         .o_getattr             = lov_getattr,
2281         .o_getattr_async       = lov_getattr_async,
2282         .o_setattr             = lov_setattr,
2283         .o_brw                 = lov_brw,
2284         .o_brw_async           = lov_brw_async,
2285         .o_prep_async_page     = lov_prep_async_page,
2286         .o_queue_async_io      = lov_queue_async_io,
2287         .o_set_async_flags     = lov_set_async_flags,
2288         .o_queue_group_io      = lov_queue_group_io,
2289         .o_trigger_group_io    = lov_trigger_group_io,
2290         .o_teardown_async_page = lov_teardown_async_page,
2291         .o_adjust_kms          = lov_adjust_kms,
2292         .o_punch               = lov_punch,
2293         .o_sync                = lov_sync,
2294         .o_enqueue             = lov_enqueue,
2295         .o_match               = lov_match,
2296         .o_change_cbdata       = lov_change_cbdata,
2297         .o_cancel              = lov_cancel,
2298         .o_cancel_unused       = lov_cancel_unused,
2299         .o_iocontrol           = lov_iocontrol,
2300         .o_get_info            = lov_get_info,
2301         .o_set_info            = lov_set_info,
2302         .o_llog_init           = lov_llog_init,
2303         .o_llog_finish         = lov_llog_finish,
2304         .o_notify              = lov_notify,
2305 };
2306
2307 int __init lov_init(void)
2308 {
2309         struct lprocfs_static_vars lvars;
2310         int rc;
2311         ENTRY;
2312
2313         lprocfs_init_vars(lov, &lvars);
2314         rc = class_register_type(&lov_obd_ops, NULL, lvars.module_vars,
2315                                  OBD_LOV_DEVICENAME);
2316         RETURN(rc);
2317 }
2318
2319 #ifdef __KERNEL__
2320 static void /*__exit*/ lov_exit(void)
2321 {
2322         class_unregister_type(OBD_LOV_DEVICENAME);
2323 }
2324
2325 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2326 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
2327 MODULE_LICENSE("GPL");
2328
2329 module_init(lov_init);
2330 module_exit(lov_exit);
2331 #endif