Whamcloud - gitweb
land b_hd_sec: perm/acl authorization for remote users.
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  * Author: Phil Schwan <phil@clusterfs.com>
6  *         Peter Braam <braam@clusterfs.com>
7  *         Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29 #ifdef __KERNEL__
30 #include <linux/slab.h>
31 #include <linux/module.h>
32 #include <linux/init.h>
33 #include <linux/slab.h>
34 #include <linux/pagemap.h>
35 #include <linux/seq_file.h>
36 #include <asm/div64.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/obd_support.h>
42 #include <linux/lustre_lib.h>
43 #include <linux/lustre_net.h>
44 #include <linux/lustre_idl.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/lustre_mds.h>
47 #include <linux/obd_class.h>
48 #include <linux/obd_lov.h>
49 #include <linux/obd_ost.h>
50 #include <linux/lprocfs_status.h>
51
52 #include "lov_internal.h"
53
54 /* obd methods */
55 #define MAX_STRING_SIZE 128
56 static int lov_connect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt,
57                            int activate, struct obd_connect_data *conn_data,
58                            unsigned long connect_flags)
59 {
60         struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
61         struct obd_uuid *tgt_uuid = &tgt->uuid;
62
63 #ifdef __KERNEL__
64         struct proc_dir_entry *lov_proc_dir;
65 #endif
66         struct lov_obd *lov = &obd->u.lov;
67         struct lustre_handle conn = {0, };
68         struct obd_device *tgt_obd;
69         int rc;
70         ENTRY;
71
72         tgt_obd = class_find_client_obd(tgt_uuid, OBD_OSC_DEVICENAME,
73                                         &obd->obd_uuid);
74
75         if (!tgt_obd) {
76                 CERROR("Target %s not attached\n", tgt_uuid->uuid);
77                 RETURN(-EINVAL);
78         }
79
80         if (!tgt_obd->obd_set_up) {
81                 CERROR("Target %s not set up\n", tgt_uuid->uuid);
82                 RETURN(-EINVAL);
83         }
84
85         if (activate) {
86                 tgt_obd->obd_no_recov = 0;
87                 ptlrpc_activate_import(tgt_obd->u.cli.cl_import);
88         }
89
90         if (tgt_obd->u.cli.cl_import->imp_invalid) {
91                 CERROR("not connecting OSC %s; administratively "
92                        "disabled\n", tgt_uuid->uuid);
93                 rc = obd_register_observer(tgt_obd, obd);
94                 if (rc) {
95                         CERROR("Target %s register_observer error %d; "
96                                "will not be able to reactivate\n",
97                                tgt_uuid->uuid, rc);
98                 }
99                 RETURN(0);
100         }
101
102         rc = obd_connect(&conn, tgt_obd, &lov_osc_uuid, conn_data,
103                          connect_flags);
104         if (rc) {
105                 CERROR("Target %s connect error %d\n", tgt_uuid->uuid, rc);
106                 RETURN(rc);
107         }
108         tgt->ltd_exp = class_conn2export(&conn);
109
110         rc = obd_register_observer(tgt_obd, obd);
111         if (rc) {
112                 CERROR("Target %s register_observer error %d\n",
113                        tgt_uuid->uuid, rc);
114                 obd_disconnect(tgt->ltd_exp, 0);
115                 tgt->ltd_exp = NULL;
116                 RETURN(rc);
117         }
118
119         tgt->active = 1;
120         lov->desc.ld_active_tgt_count++;
121
122 #ifdef __KERNEL__
123         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
124         if (lov_proc_dir) {
125                 struct obd_device *osc_obd = class_conn2obd(&conn);
126                 struct proc_dir_entry *osc_symlink;
127                 char name[MAX_STRING_SIZE + 1];
128
129                 LASSERT(osc_obd != NULL);
130                 LASSERT(osc_obd->obd_type != NULL);
131                 LASSERT(osc_obd->obd_type->typ_name != NULL);
132                 name[MAX_STRING_SIZE] = '\0';
133                 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
134                          osc_obd->obd_type->typ_name,
135                          osc_obd->obd_name);
136                 osc_symlink = proc_symlink(osc_obd->obd_name, lov_proc_dir,
137                                            name);
138                 if (osc_symlink == NULL) {
139                         CERROR("could not register LOV target "
140                                "/proc/fs/lustre/%s/%s/target_obds/%s\n",
141                                obd->obd_type->typ_name, obd->obd_name,
142                                osc_obd->obd_name);
143                         lprocfs_remove(lov_proc_dir);
144                         lov_proc_dir = NULL;
145                 }
146         }
147 #endif
148
149         RETURN(0);
150 }
151
152 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
153                        struct obd_uuid *cluuid, struct obd_connect_data *data,
154                        unsigned long flags)
155 {
156 #ifdef __KERNEL__
157         struct proc_dir_entry *lov_proc_dir;
158 #endif
159         struct lov_obd *lov = &obd->u.lov;
160         struct lov_tgt_desc *tgt;
161         struct obd_export *exp;
162         int rc, rc2, i;
163         ENTRY;
164
165         rc = class_connect(conn, obd, cluuid);
166         if (rc)
167                 RETURN(rc);
168
169         exp = class_conn2export(conn);
170
171         /* We don't want to actually do the underlying connections more than
172          * once, so keep track. */
173         lov->refcount++;
174         if (lov->refcount > 1) {
175                 class_export_put(exp);
176                 RETURN(0);
177         }
178
179 #ifdef __KERNEL__
180         lov_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
181                                         NULL, NULL);
182         if (IS_ERR(lov_proc_dir)) {
183                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
184                        obd->obd_type->typ_name, obd->obd_name);
185                 lov_proc_dir = NULL;
186         }
187 #endif
188
189         /* connect_flags is the MDS number, save for use in lov_add_obd */
190         lov->lov_connect_flags = flags;
191         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
192                 if (obd_uuid_empty(&tgt->uuid))
193                         continue;
194                 rc = lov_connect_obd(obd, tgt, 0, data, flags);
195                 if (rc)
196                         GOTO(out_disc, rc);
197         }
198
199         class_export_put(exp);
200         RETURN (0);
201
202  out_disc:
203 #ifdef __KERNEL__
204         if (lov_proc_dir)
205                 lprocfs_remove(lov_proc_dir);
206 #endif
207
208         while (i-- > 0) {
209                 struct obd_uuid uuid;
210                 --tgt;
211                 --lov->desc.ld_active_tgt_count;
212                 tgt->active = 0;
213                 /* save for CERROR below; (we know it's terminated) */
214                 uuid = tgt->uuid;
215                 rc2 = obd_disconnect(tgt->ltd_exp, 0);
216                 if (rc2)
217                         CERROR("error: LOV target %s disconnect on OST idx %d: "
218                                "rc = %d\n", uuid.uuid, i, rc2);
219         }
220         class_disconnect(exp, 0);
221         RETURN (rc);
222 }
223
224 static int lov_disconnect_obd(struct obd_device *obd, 
225                               struct lov_tgt_desc *tgt,
226                               unsigned long flags)
227 {
228 #ifdef __KERNEL__
229         struct proc_dir_entry *lov_proc_dir;
230 #endif
231         struct obd_device *osc_obd = class_exp2obd(tgt->ltd_exp);
232         struct lov_obd *lov = &obd->u.lov;
233         int rc;
234         ENTRY;
235
236 #ifdef __KERNEL__
237         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
238         if (lov_proc_dir) {
239                 struct proc_dir_entry *osc_symlink;
240
241                 osc_symlink = lprocfs_srch(lov_proc_dir, osc_obd->obd_name);
242                 if (osc_symlink) {
243                         lprocfs_remove(osc_symlink);
244                 } else {
245                         CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
246                                obd->obd_type->typ_name, obd->obd_name,
247                                osc_obd->obd_name);
248                 }
249         }
250 #endif
251         if (obd->obd_no_recov) {
252                 /* Pass it on to our clients.
253                  * XXX This should be an argument to disconnect,
254                  * XXX not a back-door flag on the OBD.  Ah well.
255                  */
256                 if (osc_obd)
257                         osc_obd->obd_no_recov = 1;
258         }
259
260         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
261         rc = obd_disconnect(tgt->ltd_exp, flags);
262         if (rc) {
263                 if (tgt->active) {
264                         CERROR("Target %s disconnect error %d\n",
265                                tgt->uuid.uuid, rc);
266                 }
267                 rc = 0;
268         }
269
270         if (tgt->active) {
271                 tgt->active = 0;
272                 lov->desc.ld_active_tgt_count--;
273         }
274         tgt->ltd_exp = NULL;
275         RETURN(0);
276 }
277
278 static int lov_disconnect(struct obd_export *exp, unsigned long flags)
279 {
280         struct obd_device *obd = class_exp2obd(exp);
281 #ifdef __KERNEL__
282         struct proc_dir_entry *lov_proc_dir;
283 #endif
284         struct lov_obd *lov = &obd->u.lov;
285         struct lov_tgt_desc *tgt;
286         int rc, i;
287         ENTRY;
288
289         if (!lov->tgts)
290                 goto out_local;
291
292         /* Only disconnect the underlying layers on the final disconnect. */
293         lov->refcount--;
294         if (lov->refcount != 0)
295                 goto out_local;
296
297         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
298                 if (tgt->ltd_exp)
299                         lov_disconnect_obd(obd, tgt, flags);
300         }
301
302 #ifdef __KERNEL__
303         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
304         if (lov_proc_dir) {
305                 lprocfs_remove(lov_proc_dir);
306         } else {
307                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing.",
308                        obd->obd_type->typ_name, obd->obd_name);
309         }
310 #endif
311         
312  out_local:
313         rc = class_disconnect(exp, 0);
314         RETURN(rc);
315 }
316
317 /* Error codes:
318  *
319  *  -EINVAL  : UUID can't be found in the LOV's target list
320  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
321  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
322  */
323 static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
324                               int activate)
325 {
326         struct lov_tgt_desc *tgt;
327         int i, rc = 0;
328         ENTRY;
329
330         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
331                lov, uuid->uuid, activate);
332
333         spin_lock(&lov->lov_lock);
334         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
335                 if (tgt->ltd_exp == NULL)
336                         continue;
337
338                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
339                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
340                 
341                 if (obd_uuid_equals(uuid, &tgt->uuid))
342                         break;
343         }
344
345         if (i == lov->desc.ld_tgt_count)
346                 GOTO(out, rc = -EINVAL);
347
348
349         if (tgt->active == activate) {
350                 CDEBUG(D_INFO, "OSC %s already %sactive!\n", uuid->uuid,                       
351                         activate ? "" : "in");
352                 GOTO(out, rc);
353         }
354
355         CDEBUG(D_INFO, "Marking OSC %s %sactive\n", uuid->uuid,
356                activate ? "" : "in");
357
358         tgt->active = activate;
359         if (activate)
360                 lov->desc.ld_active_tgt_count++;
361         else
362                 lov->desc.ld_active_tgt_count--;
363
364         EXIT;
365  out:
366         spin_unlock(&lov->lov_lock);
367         return rc;
368 }
369
370 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
371                       int active, void *data)
372 {
373         struct obd_uuid *uuid;
374         int rc;
375         ENTRY;
376
377         if (strcmp(watched->obd_type->typ_name, OBD_OSC_DEVICENAME)) {
378                 CERROR("unexpected notification of %s %s!\n",
379                        watched->obd_type->typ_name,
380                        watched->obd_name);
381                 return -EINVAL;
382         }
383         uuid = &watched->u.cli.cl_import->imp_target_uuid;
384
385         /* Set OSC as active before notifying the observer, so the
386          * observer can use the OSC normally.  
387          */
388         rc = lov_set_osc_active(&obd->u.lov, uuid, active);
389         if (rc) {
390                 CERROR("%sactivation of %s failed: %d\n",
391                        active ? "" : "de", uuid->uuid, rc);
392                 RETURN(rc);
393         }
394
395         if (obd->obd_observer)
396                 /* Pass the notification up the chain. */
397                 rc = obd_notify(obd->obd_observer, watched, active, data);
398
399         RETURN(rc);
400 }
401
402 int lov_attach(struct obd_device *dev, obd_count len, void *data)
403 {
404         struct lprocfs_static_vars lvars;
405         int rc;
406
407         lprocfs_init_vars(lov, &lvars);
408         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
409         if (rc == 0) {
410 #ifdef __KERNEL__
411                 struct proc_dir_entry *entry;
412
413                 entry = create_proc_entry("target_obd_status", 0444, 
414                                           dev->obd_proc_entry);
415                 if (entry == NULL) {
416                         rc = -ENOMEM;
417                 } else {
418                         entry->proc_fops = &lov_proc_target_fops;
419                         entry->data = dev;
420                 }
421 #endif
422         }
423         return rc;
424 }
425
426 int lov_detach(struct obd_device *dev)
427 {
428         return lprocfs_obd_detach(dev);
429 }
430
431 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
432 {
433         struct lov_obd *lov = &obd->u.lov;
434         struct lustre_cfg *lcfg = buf;
435         struct lov_desc *desc;
436         int count;
437         ENTRY;
438
439         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
440                 CERROR("LOV setup requires a descriptor\n");
441                 RETURN(-EINVAL);
442         }
443
444         desc = (struct lov_desc *)lustre_cfg_string(lcfg, 1);
445         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
446                 CERROR("descriptor size wrong: %d > %d\n",
447                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
448                 RETURN(-EINVAL);
449         }
450  
451         /* Because of 64-bit divide/mod operations only work with a 32-bit
452          * divisor in a 32-bit kernel, we cannot support a stripe width
453          * of 4GB or larger on 32-bit CPUs.
454          */
455        
456         count = desc->ld_default_stripe_count;
457         if (count && (count * desc->ld_default_stripe_size) > ~0UL) {
458                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
459                        desc->ld_default_stripe_size, count, ~0UL);
460                 RETURN(-EINVAL);
461         }
462         if (desc->ld_tgt_count > 0) {
463                 lov->bufsize= sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
464         } else {
465                 lov->bufsize = sizeof(struct lov_tgt_desc) * LOV_MAX_TGT_COUNT;  
466         }
467         OBD_ALLOC(lov->tgts, lov->bufsize);
468         if (lov->tgts == NULL) {
469                 lov->bufsize = 0;
470                 CERROR("couldn't allocate %d bytes for target table.\n",
471                        lov->bufsize);
472                 RETURN(-EINVAL);
473         }
474
475         desc->ld_tgt_count = 0;
476         desc->ld_active_tgt_count = 0;
477         lov->desc = *desc;
478         spin_lock_init(&lov->lov_lock);
479         sema_init(&lov->lov_llog_sem, 1);
480
481         RETURN(0);
482 }
483
484 static int lov_cleanup(struct obd_device *obd, int flags)
485 {
486         struct lov_obd *lov = &obd->u.lov;
487
488         OBD_FREE(lov->tgts, lov->bufsize);
489         RETURN(0);
490 }
491
492 static int
493 lov_add_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
494 {
495         struct lov_obd *lov = &obd->u.lov;
496         struct lov_tgt_desc *tgt;
497         int rc;
498         ENTRY;
499
500         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
501                uuidp->uuid, index, gen);
502
503         if ((index < 0) || (index >= LOV_MAX_TGT_COUNT)) {
504                 CERROR("request to add OBD %s at invalid index: %d\n",
505                        uuidp->uuid, index);
506                 RETURN(-EINVAL);
507         }
508
509         if (gen <= 0) {
510                 CERROR("request to add OBD %s with invalid generation: %d\n",
511                        uuidp->uuid, gen);
512                 RETURN(-EINVAL);
513         }
514
515         tgt = lov->tgts + index;
516         if (!obd_uuid_empty(&tgt->uuid)) {
517                 CERROR("OBD already assigned at LOV target index %d\n",
518                        index);
519                 RETURN(-EEXIST);
520         }
521
522         tgt->uuid = *uuidp;
523         /* XXX - add a sanity check on the generation number. */
524         tgt->ltd_gen = gen;
525
526         if (index >= lov->desc.ld_tgt_count)
527                 lov->desc.ld_tgt_count = index + 1;
528
529         CDEBUG(D_CONFIG, "idx: %d ltd_gen: %d ld_tgt_count: %d\n",
530                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
531
532         if (lov->refcount == 0)
533                 RETURN(0);
534
535         if (tgt->ltd_exp) {
536                 struct obd_device *osc_obd;
537
538                 osc_obd = class_exp2obd(tgt->ltd_exp);
539                 if (osc_obd)
540                         osc_obd->obd_no_recov = 0;
541         }
542
543         rc = lov_connect_obd(obd, tgt, 1, NULL, lov->lov_connect_flags);
544         if (rc)
545                 GOTO(out, rc);
546
547         if (obd->obd_observer) {
548                 /* tell the mds_lov about the new target */
549                 rc = obd_notify(obd->obd_observer, tgt->ltd_exp->exp_obd, 1,
550                                 (void *)index);
551         }
552
553         GOTO(out, rc);
554  out:
555         if (rc && tgt->ltd_exp != NULL)
556                 lov_disconnect_obd(obd, tgt, 0);
557         return rc;
558 }
559
560 static int
561 lov_del_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
562 {
563         struct lov_obd *lov = &obd->u.lov;
564         struct lov_tgt_desc *tgt;
565         int count = lov->desc.ld_tgt_count;
566         int rc = 0;
567         ENTRY;
568
569         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
570                uuidp->uuid, index, gen);
571
572         if (index >= count) {
573                 CERROR("LOV target index %d >= number of LOV OBDs %d.\n",
574                        index, count);
575                 RETURN(-EINVAL);
576         }
577
578         tgt = lov->tgts + index;
579
580         if (obd_uuid_empty(&tgt->uuid)) {
581                 CERROR("LOV target at index %d is not setup.\n", index);
582                 RETURN(-EINVAL);
583         }
584
585         if (!obd_uuid_equals(uuidp, &tgt->uuid)) {
586                 CERROR("LOV target UUID %s at index %d doesn't match %s.\n",
587                        tgt->uuid.uuid, index, uuidp->uuid);
588                 RETURN(-EINVAL);
589         }
590
591         if (tgt->ltd_exp) {
592                 struct obd_device *osc_obd;
593
594                 osc_obd = class_exp2obd(tgt->ltd_exp);
595                 if (osc_obd) {
596                         osc_obd->obd_no_recov = 1;
597                         rc = obd_llog_finish(osc_obd, &osc_obd->obd_llogs, 1);
598                         if (rc)
599                                 CERROR("osc_llog_finish error: %d\n", rc);
600                 }
601                 lov_disconnect_obd(obd, tgt, 0);
602         }
603
604         /* XXX - right now there is a dependency on ld_tgt_count being the
605          * maximum tgt index for computing the mds_max_easize. So we can't
606          * shrink it. */
607
608         /* lt_gen = 0 will mean it will not match the gen of any valid loi */
609         memset(tgt, 0, sizeof(*tgt));
610
611         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
612                tgt->uuid.uuid, index, tgt->ltd_gen, tgt->ltd_exp, tgt->active);
613
614         RETURN(rc);
615 }
616
617 static int lov_process_config(struct obd_device *obd, obd_count len, void *buf)
618 {
619         struct lustre_cfg *lcfg = buf;
620         struct obd_uuid obd_uuid;
621         int cmd;
622         int index;
623         int gen;
624         int rc = 0;
625         ENTRY;
626
627         switch(cmd = lcfg->lcfg_command) {
628         case LCFG_LOV_ADD_OBD:
629         case LCFG_LOV_DEL_OBD: {
630                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
631                         GOTO(out, rc = -EINVAL);
632
633                 obd_str2uuid(&obd_uuid, lustre_cfg_string(lcfg, 1));
634
635                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
636                         GOTO(out, rc = -EINVAL);
637                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
638                         GOTO(out, rc = -EINVAL);
639                 if (cmd == LCFG_LOV_ADD_OBD)
640                         rc = lov_add_obd(obd, &obd_uuid, index, gen);
641                 else
642                         rc = lov_del_obd(obd, &obd_uuid, index, gen);
643                 GOTO(out, rc);
644         }
645         default: {
646                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
647                 GOTO(out, rc = -EINVAL);
648
649         }
650         }
651 out:
652         RETURN(rc);
653 }
654
655 #ifndef log2
656 #define log2(n) ffz(~(n))
657 #endif
658
659 static int lov_clear_orphans(struct obd_export *export,
660                              struct obdo *src_oa,
661                              struct lov_stripe_md **ea,
662                              struct obd_trans_info *oti)
663 {
664         struct lov_obd *lov;
665         struct obdo *tmp_oa;
666         struct obd_uuid *ost_uuid = NULL;
667         int rc = 0, i;
668         ENTRY;
669
670         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
671                 src_oa->o_flags == OBD_FL_DELORPHAN);
672
673         lov = &export->exp_obd->u.lov;
674
675         tmp_oa = obdo_alloc();
676         if (tmp_oa == NULL)
677                 RETURN(-ENOMEM);
678
679         if (src_oa->o_valid & OBD_MD_FLINLINE) {
680                 ost_uuid = (struct obd_uuid *)src_oa->o_inline;
681                 CDEBUG(D_HA, "clearing orphans only for %s\n",
682                        ost_uuid->uuid);
683         }
684
685         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
686                 int err;
687                 struct lov_stripe_md obj_md;
688                 struct lov_stripe_md *obj_mdp = &obj_md;
689
690                 /*
691                  * if called for a specific target, we don't care if it is not
692                  * active.
693                  */
694                 if (lov->tgts[i].active == 0 && ost_uuid == NULL) {
695                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
696                         continue;
697                 }
698
699                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &lov->tgts[i].uuid))
700                         continue;
701
702                 /* 
703                  * setting up objid OSS objects should be destroyed starting
704                  * from it.
705                  */
706                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
707                 tmp_oa->o_valid |= OBD_MD_FLID;
708                 tmp_oa->o_id = oti->oti_objid[i];
709
710                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
711                 err = obd_create(lov->tgts[i].ltd_exp, tmp_oa, NULL, 0,
712                                  &obj_mdp, oti);
713                 if (err) {
714                         /*
715                          * this export will be disabled until it is recovered,
716                          * and then orphan recovery will be completed.
717                          */
718                         CERROR("error in orphan recovery on OST idx %d/%d: "
719                                "rc = %d\n", i, lov->desc.ld_tgt_count, err);
720                 }
721
722                 if (ost_uuid)
723                         break;
724         }
725         obdo_free(tmp_oa);
726         RETURN(rc);
727 }
728
729 /* the LOV expects oa->o_id to be set to the LOV object id */
730 static int
731 lov_create(struct obd_export *exp, struct obdo *src_oa,
732            void *acl, int acl_size, struct lov_stripe_md **ea,
733            struct obd_trans_info *oti)
734 {
735         struct lov_request_set *set = NULL;
736         struct list_head *pos;
737         struct lov_obd *lov;
738         int rc = 0;
739         ENTRY;
740
741         LASSERT(ea != NULL);
742         if (exp == NULL)
743                 RETURN(-EINVAL);
744
745         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
746             src_oa->o_flags == OBD_FL_DELORPHAN) {
747                 rc = lov_clear_orphans(exp, src_oa, ea, oti);
748                 RETURN(rc);
749         }
750
751         lov = &exp->exp_obd->u.lov;
752         if (!lov->desc.ld_active_tgt_count)
753                 RETURN(-EIO);
754
755         LASSERT(oti->oti_flags & OBD_MODE_CROW);
756                 
757         /* main creation loop */
758         rc = lov_prep_create_set(exp, ea, src_oa, oti, &set);
759         if (rc)
760                 RETURN(rc);
761
762         list_for_each (pos, &set->set_list) {
763                 struct lov_request *req = 
764                         list_entry(pos, struct lov_request, rq_link);
765
766                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
767                 rc = obd_create(lov->tgts[req->rq_idx].ltd_exp,
768                                 req->rq_oa, NULL, 0, &req->rq_md, oti);
769                 lov_update_create_set(set, req, rc);
770         }
771         rc = lov_fini_create_set(set, ea);
772         RETURN(rc);
773 }
774
775 #define lsm_bad_magic(LSMP)                                     \
776 ({                                                              \
777         struct lov_stripe_md *_lsm__ = (LSMP);                  \
778         int _ret__ = 0;                                         \
779         if (!_lsm__) {                                          \
780                 CERROR("LOV requires striping ea\n");           \
781                 _ret__ = 1;                                     \
782         } else if (_lsm__->lsm_magic != LOV_MAGIC) {            \
783                 CERROR("LOV striping magic bad %#x != %#x\n",   \
784                        _lsm__->lsm_magic, LOV_MAGIC);           \
785                 _ret__ = 1;                                     \
786         }                                                       \
787         _ret__;                                                 \
788 })
789
790 static int lov_destroy(struct obd_export *exp, struct obdo *oa,
791                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
792 {
793         struct lov_request_set *set;
794         struct lov_request *req;
795         struct list_head *pos;
796         struct lov_obd *lov;
797         int rc = 0;
798         ENTRY;
799
800         if (lsm_bad_magic(lsm))
801                 RETURN(-EINVAL);
802
803         if (!exp || !exp->exp_obd)
804                 RETURN(-ENODEV);
805
806         lov = &exp->exp_obd->u.lov;
807         rc = lov_prep_destroy_set(exp, oa, lsm, oti, &set);
808         if (rc)
809                 RETURN(rc);
810
811         list_for_each (pos, &set->set_list) {
812                 int err;
813                 req = list_entry(pos, struct lov_request, rq_link);
814
815                 /* XXX update the cookie position */
816                 oti->oti_logcookies = set->set_cookies + req->rq_stripe;
817                 rc = obd_destroy(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
818                                  NULL, oti);
819                 err = lov_update_common_set(set, req, rc);
820                 if (rc) {
821                         CERROR("error: destroying objid "LPX64" subobj "
822                                LPX64" on OST idx %d: rc = %d\n", 
823                                set->set_oa->o_id, req->rq_oa->o_id, 
824                                req->rq_idx, rc);
825                         if (!rc)
826                                 rc = err;
827                 }
828         }
829         lov_fini_destroy_set(set);
830         RETURN(rc);
831 }
832
833 static int lov_getattr(struct obd_export *exp, struct obdo *oa,
834                        struct lov_stripe_md *lsm)
835 {
836         struct lov_request_set *set;
837         struct lov_request *req;
838         struct list_head *pos;
839         struct lov_obd *lov;
840         int err = 0, rc = 0;
841         ENTRY;
842
843         if (lsm_bad_magic(lsm))
844                 RETURN(-EINVAL);
845
846         if (!exp || !exp->exp_obd)
847                 RETURN(-ENODEV);
848
849         lov = &exp->exp_obd->u.lov;
850         
851         rc = lov_prep_getattr_set(exp, oa, lsm, &set);
852         if (rc)
853                 RETURN(rc);
854
855         list_for_each (pos, &set->set_list) {
856                 req = list_entry(pos, struct lov_request, rq_link);
857                 
858                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
859                        "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id, 
860                        req->rq_idx);
861
862                 rc = obd_getattr(lov->tgts[req->rq_idx].ltd_exp, 
863                                  req->rq_oa, NULL);
864                 err = lov_update_common_set(set, req, rc);
865                 if (err) {
866                         CERROR("error: getattr objid "LPX64" subobj "
867                                LPX64" on OST idx %d: rc = %d\n",
868                                set->set_oa->o_id, req->rq_oa->o_id, 
869                                req->rq_idx, err);
870                         break;
871                 }
872         }
873         
874         rc = lov_fini_getattr_set(set);
875         if (err)
876                 rc = err;
877         RETURN(rc);
878 }
879
880 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data,
881                                  int rc)
882 {
883         struct lov_request_set *lovset = (struct lov_request_set *)data;
884         ENTRY;
885
886         /* don't do attribute merge if this aysnc op failed */
887         if (rc) {
888                 lovset->set_completes = 0;
889                 lov_fini_getattr_set(lovset);
890         } else {
891                 rc = lov_fini_getattr_set(lovset);
892         }
893         RETURN (rc);
894 }
895
896 static int lov_getattr_async(struct obd_export *exp, struct obdo *oa,
897                               struct lov_stripe_md *lsm,
898                               struct ptlrpc_request_set *rqset)
899 {
900         struct lov_request_set *lovset;
901         struct lov_obd *lov;
902         struct list_head *pos;
903         struct lov_request *req;
904         int rc = 0;
905         ENTRY;
906
907         if (lsm_bad_magic(lsm))
908                 RETURN(-EINVAL);
909
910         if (!exp || !exp->exp_obd)
911                 RETURN(-ENODEV);
912
913         lov = &exp->exp_obd->u.lov;
914
915         rc = lov_prep_getattr_set(exp, oa, lsm, &lovset);
916         if (rc)
917                 RETURN(rc);
918
919         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
920                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
921
922         list_for_each (pos, &lovset->set_list) {
923                 req = list_entry(pos, struct lov_request, rq_link);
924                 
925                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
926                        "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id, 
927                        req->rq_idx);
928                 rc = obd_getattr_async(lov->tgts[req->rq_idx].ltd_exp,
929                                        req->rq_oa, NULL, rqset);
930                 if (rc) {
931                         CERROR("error: getattr objid "LPX64" subobj "
932                                LPX64" on OST idx %d: rc = %d\n",
933                                lovset->set_oa->o_id, req->rq_oa->o_id, 
934                                req->rq_idx, rc);
935                         GOTO(out, rc);
936                 }
937                 lov_update_common_set(lovset, req, rc);
938         }
939         
940         LASSERT(rc == 0);
941         LASSERT (rqset->set_interpret == NULL);
942         rqset->set_interpret = lov_getattr_interpret;
943         rqset->set_arg = (void *)lovset;
944         RETURN(rc);
945 out:
946         LASSERT(rc);
947         lov_fini_getattr_set(lovset);
948         RETURN(rc);
949 }
950
951 static int lov_setattr(struct obd_export *exp, struct obdo *src_oa,
952                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
953 {
954         struct lov_request_set *set;
955         struct lov_obd *lov;
956         struct list_head *pos;
957         struct lov_request *req;
958         int err = 0, rc = 0;
959         ENTRY;
960
961         if (lsm_bad_magic(lsm))
962                 RETURN(-EINVAL);
963
964         if (!exp || !exp->exp_obd)
965                 RETURN(-ENODEV);
966
967         LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE | OBD_MD_FLMODE|
968                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
969                                       OBD_MD_FLCTIME | OBD_MD_FLFLAGS |
970                                       OBD_MD_FLSIZE | OBD_MD_FLGROUP |
971                                       OBD_MD_FLUID | OBD_MD_FLGID |
972                                       OBD_MD_FLINLINE)));
973
974         LASSERT(!(src_oa->o_valid & OBD_MD_FLGROUP) || src_oa->o_gr > 0);
975
976         lov = &exp->exp_obd->u.lov;
977         rc = lov_prep_setattr_set(exp, src_oa, lsm, NULL, &set);
978         if (rc)
979                 RETURN(rc);
980
981         list_for_each (pos, &set->set_list) {
982                 req = list_entry(pos, struct lov_request, rq_link);
983                 
984                 rc = obd_setattr(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
985                                  NULL, NULL);
986                 err = lov_update_common_set(set, req, rc);
987                 if (err) {
988                         CERROR("error: setattr objid "LPX64" subobj "
989                                LPX64" on OST idx %d: rc = %d\n",
990                                set->set_oa->o_id, req->rq_oa->o_id,
991                                req->rq_idx, err);
992                         if (!rc)
993                                 rc = err;
994                 }
995         }
996         err = lov_fini_setattr_set(set);
997         if (!rc)
998                 rc = err;
999         RETURN(rc);
1000 }
1001
1002 static int lov_revalidate_policy(struct lov_obd *lov, struct lov_stripe_md *lsm)
1003 {
1004         static int next_idx = 0;
1005         struct lov_tgt_desc *tgt;
1006         int i, count;
1007
1008         /* XXX - we should do something clever and take lsm
1009          * into account but just do round robin for now. */
1010
1011         /* last_idx must always be less that count because
1012          * ld_tgt_count currently cannot shrink. */
1013         count = lov->desc.ld_tgt_count;
1014
1015         for (i = next_idx, tgt = lov->tgts + i; i < count; i++, tgt++) {
1016                 if (tgt->active) {
1017                         next_idx = (i + 1) % count;
1018                         RETURN(i);
1019                 }
1020         }
1021
1022         for (i = 0, tgt = lov->tgts; i < next_idx; i++, tgt++) {
1023                 if (tgt->active) {
1024                         next_idx = (i + 1) % count;
1025                         RETURN(i);
1026                 }
1027         }
1028
1029         RETURN(-EIO);
1030 }
1031
1032 static int lov_revalidate_md(struct obd_export *exp, struct obdo *src_oa,
1033                              struct lov_stripe_md *ea,
1034                              struct obd_trans_info *oti)
1035 {
1036         struct obd_export *osc_exp;
1037         struct lov_obd *lov = &exp->exp_obd->u.lov;
1038         struct lov_stripe_md *lsm = ea;
1039         struct lov_stripe_md obj_md;
1040         struct lov_stripe_md *obj_mdp = &obj_md;
1041         struct lov_oinfo *loi;
1042         struct obdo *tmp_oa;
1043         int ost_idx, updates = 0, i;
1044         ENTRY;
1045
1046         tmp_oa = obdo_alloc();
1047         if (tmp_oa == NULL)
1048                 RETURN(-ENOMEM);
1049
1050         loi = lsm->lsm_oinfo;
1051         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1052                 int rc;
1053                 if (!obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid))
1054                         continue;
1055
1056                 ost_idx = lov_revalidate_policy(lov, lsm);
1057                 if (ost_idx < 0) {
1058                         /* FIXME: punt for now. */
1059                         CERROR("lov_revalidate_policy failed; no active "
1060                                "OSCs?\n");
1061                         continue;
1062                 }
1063
1064                 /* create a new object */
1065                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
1066                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
1067                 osc_exp = lov->tgts[ost_idx].ltd_exp;
1068                 rc = obd_create(osc_exp, tmp_oa, NULL, 0, &obj_mdp, oti);
1069                 if (rc) {
1070                         CERROR("error creating new subobj at idx %d; "
1071                                "rc = %d\n", ost_idx, rc);
1072                         continue;
1073                 }
1074                 if (oti->oti_objid)
1075                         oti->oti_objid[ost_idx] = tmp_oa->o_id;
1076                 loi->loi_id = tmp_oa->o_id;
1077                 loi->loi_gr = tmp_oa->o_gr;
1078                 loi->loi_ost_idx = ost_idx;
1079                 loi->loi_ost_gen = lov->tgts[ost_idx].ltd_gen;
1080                 CDEBUG(D_INODE, "replacing objid "LPX64" subobj "LPX64
1081                        " with idx %d gen %d.\n", lsm->lsm_object_id,
1082                        loi->loi_id, ost_idx, loi->loi_ost_gen);
1083                 updates = 1;
1084         }
1085
1086         /* If we got an error revalidating an entry there's no need to
1087          * cleanup up objects we allocated here because the bad entry
1088          * still points to a deleted OST. */
1089
1090         obdo_free(tmp_oa);
1091         RETURN(updates);
1092 }
1093
1094 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1095  * we can send this 'punch' to just the authoritative node and the nodes
1096  * that the punch will affect. */
1097 static int lov_punch(struct obd_export *exp, struct obdo *oa,
1098                      struct lov_stripe_md *lsm,
1099                      obd_off start, obd_off end, struct obd_trans_info *oti)
1100 {
1101         struct lov_request_set *set;
1102         struct lov_obd *lov;
1103         struct list_head *pos;
1104         struct lov_request *req;
1105         int err = 0, rc = 0;
1106         ENTRY;
1107
1108         if (lsm_bad_magic(lsm))
1109                 RETURN(-EINVAL);
1110
1111         if (!exp || !exp->exp_obd)
1112                 RETURN(-ENODEV);
1113
1114         lov = &exp->exp_obd->u.lov;
1115         rc = lov_prep_punch_set(exp, oa, lsm, start, end, oti, &set);
1116         if (rc)
1117                 RETURN(rc);
1118
1119         list_for_each (pos, &set->set_list) {
1120                 req = list_entry(pos, struct lov_request, rq_link);
1121
1122                 rc = obd_punch(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, 
1123                                NULL, req->rq_extent.start, 
1124                                req->rq_extent.end, NULL);
1125                 err = lov_update_punch_set(set, req, rc);
1126                 if (err) {
1127                         CERROR("error: punch objid "LPX64" subobj "LPX64
1128                                " on OST idx %d: rc = %d\n", set->set_oa->o_id,
1129                                req->rq_oa->o_id, req->rq_idx, rc);
1130                         if (!rc)
1131                                 rc = err;
1132                 }
1133         }
1134         err = lov_fini_punch_set(set);
1135         if (!rc)
1136                 rc = err;
1137         RETURN(rc);
1138 }
1139
1140 static int lov_sync(struct obd_export *exp, struct obdo *oa,
1141                     struct lov_stripe_md *lsm, obd_off start, obd_off end)
1142 {
1143         struct lov_request_set *set;
1144         struct lov_obd *lov;
1145         struct list_head *pos;
1146         struct lov_request *req;
1147         int err = 0, rc = 0;
1148         ENTRY;
1149
1150         if (lsm_bad_magic(lsm))
1151                 RETURN(-EINVAL);
1152
1153         if (!exp->exp_obd)
1154                 RETURN(-ENODEV);
1155
1156         lov = &exp->exp_obd->u.lov;
1157         rc = lov_prep_sync_set(exp, oa, lsm, start, end, &set);
1158         if (rc)
1159                 RETURN(rc);
1160
1161         list_for_each (pos, &set->set_list) {
1162                 req = list_entry(pos, struct lov_request, rq_link);
1163
1164                 rc = obd_sync(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, 
1165                               NULL, req->rq_extent.start, req->rq_extent.end);
1166                 err = lov_update_common_set(set, req, rc);
1167                 if (err) {
1168                         CERROR("error: fsync objid "LPX64" subobj "LPX64
1169                                " on OST idx %d: rc = %d\n", set->set_oa->o_id,
1170                                req->rq_oa->o_id, req->rq_idx, rc);
1171                         if (!rc)
1172                                 rc = err;
1173                 }
1174         }
1175         err = lov_fini_sync_set(set);
1176         if (!rc)
1177                 rc = err;
1178         RETURN(rc);
1179 }
1180
1181 static int lov_brw_check(struct lov_obd *lov, struct obdo *oa,
1182                          struct lov_stripe_md *lsm,
1183                          obd_count oa_bufs, struct brw_page *pga)
1184 {
1185         int i, rc = 0;
1186         ENTRY;
1187
1188         /* The caller just wants to know if there's a chance that this
1189          * I/O can succeed */
1190         for (i = 0; i < oa_bufs; i++) {
1191                 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
1192                 int ost = lsm->lsm_oinfo[stripe].loi_ost_idx;
1193                 obd_off start, end;
1194
1195                 if (!lov_stripe_intersects(lsm, i, pga[i].disk_offset,
1196                                            pga[i].disk_offset + pga[i].count,
1197                                            &start, &end))
1198                         continue;
1199
1200                 if (lov->tgts[ost].active == 0) {
1201                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1202                         RETURN(-EIO);
1203                 }
1204                 rc = obd_brw(OBD_BRW_CHECK, lov->tgts[ost].ltd_exp, oa,
1205                              NULL, 1, &pga[i], NULL);
1206                 if (rc)
1207                         break;
1208         }
1209         RETURN(rc);
1210 }
1211
1212 static int lov_brw(int cmd, struct obd_export *exp, struct obdo *src_oa,
1213                    struct lov_stripe_md *lsm, obd_count oa_bufs,
1214                    struct brw_page *pga, struct obd_trans_info *oti)
1215 {
1216         struct lov_request_set *set;
1217         struct lov_request *req;
1218         struct list_head *pos;
1219         struct lov_obd *lov = &exp->exp_obd->u.lov;
1220         int err, rc = 0;
1221         ENTRY;
1222
1223         if (lsm_bad_magic(lsm))
1224                 RETURN(-EINVAL);
1225
1226         if (cmd == OBD_BRW_CHECK) {
1227                 rc = lov_brw_check(lov, src_oa, lsm, oa_bufs, pga);
1228                 RETURN(rc);
1229         }
1230
1231         rc = lov_prep_brw_set(exp, src_oa, lsm, oa_bufs, pga, oti, &set);
1232         if (rc)
1233                 RETURN(rc);
1234
1235         list_for_each (pos, &set->set_list) {
1236                 struct obd_export *sub_exp;
1237                 struct brw_page *sub_pga;
1238                 req = list_entry(pos, struct lov_request, rq_link);
1239                 
1240                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
1241                 sub_pga = set->set_pga + req->rq_pgaidx;
1242                 rc = obd_brw(cmd, sub_exp, req->rq_oa, req->rq_md, 
1243                              req->rq_oabufs, sub_pga, oti);
1244                 if (rc)
1245                         break;
1246                 lov_update_common_set(set, req, rc);
1247         }
1248
1249         err = lov_fini_brw_set(set);
1250         if (!rc)
1251                 rc = err;
1252         RETURN(rc);
1253 }
1254
1255 static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data,
1256                              int rc)
1257 {
1258         struct lov_request_set *lovset = (struct lov_request_set *)data;
1259         ENTRY;
1260         
1261         if (rc) {
1262                 lovset->set_completes = 0;
1263                 lov_fini_brw_set(lovset);
1264         } else {
1265                 rc = lov_fini_brw_set(lovset);
1266         }
1267                 
1268         RETURN(rc);
1269 }
1270
1271 static int lov_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
1272                          struct lov_stripe_md *lsm, obd_count oa_bufs,
1273                          struct brw_page *pga, struct ptlrpc_request_set *set,
1274                          struct obd_trans_info *oti)
1275 {
1276         struct lov_request_set *lovset;
1277         struct lov_request *req;
1278         struct list_head *pos;
1279         struct lov_obd *lov = &exp->exp_obd->u.lov;
1280         int rc = 0;
1281         ENTRY;
1282
1283         if (lsm_bad_magic(lsm))
1284                 RETURN(-EINVAL);
1285
1286         if (cmd == OBD_BRW_CHECK) {
1287                 rc = lov_brw_check(lov, oa, lsm, oa_bufs, pga);
1288                 RETURN(rc);
1289         }
1290
1291         rc = lov_prep_brw_set(exp, oa, lsm, oa_bufs, pga, oti, &lovset);
1292         if (rc)
1293                 RETURN(rc);
1294
1295         list_for_each (pos, &lovset->set_list) {
1296                 struct obd_export *sub_exp;
1297                 struct brw_page *sub_pga;
1298                 req = list_entry(pos, struct lov_request, rq_link);
1299                 
1300                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
1301                 sub_pga = lovset->set_pga + req->rq_pgaidx;
1302                 rc = obd_brw_async(cmd, sub_exp, req->rq_oa, req->rq_md,
1303                                    req->rq_oabufs, sub_pga, set, oti);
1304                 if (rc)
1305                         GOTO(out, rc);
1306                 lov_update_common_set(lovset, req, rc);
1307         }
1308         LASSERT(rc == 0);
1309         LASSERT(set->set_interpret == NULL);
1310         set->set_interpret = (set_interpreter_func)lov_brw_interpret;
1311         set->set_arg = (void *)lovset;
1312         
1313         RETURN(rc);
1314 out:
1315         lov_fini_brw_set(lovset);
1316         RETURN(rc);
1317 }
1318
1319 static int lov_ap_make_ready(void *data, int cmd)
1320 {
1321         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1322
1323         return lap->lap_caller_ops->ap_make_ready(lap->lap_caller_data, cmd);
1324 }
1325 static int lov_ap_refresh_count(void *data, int cmd)
1326 {
1327         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1328
1329         return lap->lap_caller_ops->ap_refresh_count(lap->lap_caller_data,
1330                                                      cmd);
1331 }
1332 static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
1333 {
1334         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1335
1336         lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa);
1337         /* XXX woah, shouldn't we be altering more here?  size? */
1338         oa->o_id = lap->lap_loi_id;
1339 }
1340
1341 static void lov_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
1342 {
1343         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1344
1345         /* in a raid1 regime this would down a count of many ios
1346          * in flight, onl calling the caller_ops completion when all
1347          * the raid1 ios are complete */
1348         lap->lap_caller_ops->ap_completion(lap->lap_caller_data, cmd, oa, rc);
1349 }
1350
1351 static struct obd_async_page_ops lov_async_page_ops = {
1352         .ap_make_ready =        lov_ap_make_ready,
1353         .ap_refresh_count =     lov_ap_refresh_count,
1354         .ap_fill_obdo =         lov_ap_fill_obdo,
1355         .ap_completion =        lov_ap_completion,
1356 };
1357
1358 static int lov_prep_async_page(struct obd_export *exp,
1359                                struct lov_stripe_md *lsm,
1360                                struct lov_oinfo *loi, struct page *page,
1361                                obd_off offset, struct obd_async_page_ops *ops,
1362                                void *data, void **res)
1363 {
1364         struct lov_obd *lov = &exp->exp_obd->u.lov;
1365         struct lov_async_page *lap;
1366         int rc, stripe;
1367         ENTRY;
1368
1369         if (lsm_bad_magic(lsm))
1370                 RETURN(-EINVAL);
1371         LASSERT(loi == NULL);
1372
1373         stripe = lov_stripe_number(lsm, offset);
1374         loi = &lsm->lsm_oinfo[stripe];
1375
1376         if (obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid))
1377                 RETURN(-EIO);
1378         if (lov->tgts[loi->loi_ost_idx].active == 0)
1379                 RETURN(-EIO);
1380         if (lov->tgts[loi->loi_ost_idx].ltd_exp == NULL) {
1381                 CERROR("ltd_exp == NULL, but OST idx %d doesn't appear to be "
1382                        "deleted or inactive.\n", loi->loi_ost_idx);
1383                 RETURN(-EIO);
1384         }
1385
1386         OBD_ALLOC(lap, sizeof(*lap));
1387         if (lap == NULL)
1388                 RETURN(-ENOMEM);
1389
1390         lap->lap_magic = LAP_MAGIC;
1391         lap->lap_caller_ops = ops;
1392         lap->lap_caller_data = data;
1393
1394         /* FIXME handle multiple oscs after landing b_raid1 */
1395         lap->lap_stripe = stripe;
1396         switch (lsm->lsm_pattern) {
1397                 case LOV_PATTERN_RAID0:
1398                         lov_stripe_offset(lsm, offset, lap->lap_stripe, 
1399                                           &lap->lap_sub_offset);
1400                         break;
1401                 case LOV_PATTERN_CMOBD:
1402                         lap->lap_sub_offset = offset;
1403                         break;
1404                 default:
1405                         LBUG();
1406         }
1407
1408         /* so the callback doesn't need the lsm */
1409         lap->lap_loi_id = loi->loi_id;
1410
1411         rc = obd_prep_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1412                                  lsm, loi, page, lap->lap_sub_offset,
1413                                  &lov_async_page_ops, lap,
1414                                  &lap->lap_sub_cookie);
1415         if (rc) {
1416                 OBD_FREE(lap, sizeof(*lap));
1417                 RETURN(rc);
1418         }
1419         CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
1420                lap->lap_sub_cookie, offset);
1421         *res = lap;
1422         RETURN(0);
1423 }
1424
1425 static int lov_queue_async_io(struct obd_export *exp,
1426                               struct lov_stripe_md *lsm,
1427                               struct lov_oinfo *loi, void *cookie,
1428                               int cmd, obd_off off, int count,
1429                               obd_flags brw_flags, obd_flags async_flags)
1430 {
1431         struct lov_obd *lov = &exp->exp_obd->u.lov;
1432         struct lov_async_page *lap;
1433         int rc;
1434
1435         LASSERT(loi == NULL);
1436
1437         if (lsm_bad_magic(lsm))
1438                 RETURN(-EINVAL);
1439
1440         lap = LAP_FROM_COOKIE(cookie);
1441
1442         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1443
1444         rc = obd_queue_async_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm,
1445                                 loi, lap->lap_sub_cookie, cmd, off, count,
1446                                 brw_flags, async_flags);
1447         RETURN(rc);
1448 }
1449
1450 static int lov_set_async_flags(struct obd_export *exp,
1451                                struct lov_stripe_md *lsm,
1452                                struct lov_oinfo *loi, void *cookie,
1453                                obd_flags async_flags)
1454 {
1455         struct lov_obd *lov = &exp->exp_obd->u.lov;
1456         struct lov_async_page *lap;
1457         int rc;
1458
1459         LASSERT(loi == NULL);
1460
1461         if (lsm_bad_magic(lsm))
1462                 RETURN(-EINVAL);
1463
1464         lap = LAP_FROM_COOKIE(cookie);
1465
1466         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1467
1468         rc = obd_set_async_flags(lov->tgts[loi->loi_ost_idx].ltd_exp,
1469                                  lsm, loi, lap->lap_sub_cookie, async_flags);
1470         RETURN(rc);
1471 }
1472
1473 static int lov_queue_group_io(struct obd_export *exp,
1474                               struct lov_stripe_md *lsm,
1475                               struct lov_oinfo *loi,
1476                               struct obd_io_group *oig, void *cookie,
1477                               int cmd, obd_off off, int count,
1478                               obd_flags brw_flags, obd_flags async_flags)
1479 {
1480         struct lov_obd *lov = &exp->exp_obd->u.lov;
1481         struct lov_async_page *lap;
1482         int rc;
1483
1484         LASSERT(loi == NULL);
1485
1486         if (lsm_bad_magic(lsm))
1487                 RETURN(-EINVAL);
1488
1489         lap = LAP_FROM_COOKIE(cookie);
1490
1491         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1492
1493         rc = obd_queue_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi,
1494                                 oig, lap->lap_sub_cookie, cmd, off, count,
1495                                 brw_flags, async_flags);
1496         RETURN(rc);
1497 }
1498
1499 /* this isn't exactly optimal.  we may have queued sync io in oscs on
1500  * all stripes, but we don't record that fact at queue time.  so we
1501  * trigger sync io on all stripes. */
1502 static int lov_trigger_group_io(struct obd_export *exp,
1503                                 struct lov_stripe_md *lsm,
1504                                 struct lov_oinfo *loi,
1505                                 struct obd_io_group *oig)
1506 {
1507         struct lov_obd *lov = &exp->exp_obd->u.lov;
1508         int rc = 0, i, err;
1509
1510         LASSERT(loi == NULL);
1511
1512         if (lsm_bad_magic(lsm))
1513                 RETURN(-EINVAL);
1514
1515         loi = lsm->lsm_oinfo;
1516         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1517                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1518                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1519                         continue;
1520                 }
1521
1522                 err = obd_trigger_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp,
1523                                            lsm, loi, oig);
1524                 if (rc == 0 && err != 0)
1525                         rc = err;
1526         };
1527         RETURN(rc);
1528 }
1529
1530 static int lov_teardown_async_page(struct obd_export *exp,
1531                                    struct lov_stripe_md *lsm,
1532                                    struct lov_oinfo *loi, void *cookie)
1533 {
1534         struct lov_obd *lov = &exp->exp_obd->u.lov;
1535         struct lov_async_page *lap;
1536         int rc;
1537
1538         LASSERT(loi == NULL);
1539
1540         if (lsm_bad_magic(lsm))
1541                 RETURN(-EINVAL);
1542
1543         lap = LAP_FROM_COOKIE(cookie);
1544
1545         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1546
1547         rc = obd_teardown_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1548                                      lsm, loi, lap->lap_sub_cookie);
1549         if (rc) {
1550                 CERROR("unable to teardown sub cookie %p: %d\n",
1551                        lap->lap_sub_cookie, rc);
1552                 RETURN(rc);
1553         }
1554         OBD_FREE(lap, sizeof(*lap));
1555         RETURN(rc);
1556 }
1557
1558 static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
1559                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
1560                        int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
1561                        void *data,__u32 lvb_len, void *lvb_swabber,
1562                        struct lustre_handle *lockh)
1563 {
1564         struct lov_request_set *set;
1565         struct lov_request *req;
1566         struct list_head *pos;
1567         struct lustre_handle *lov_lockhp;
1568         struct lov_obd *lov;
1569         ldlm_error_t rc;
1570         int save_flags = *flags;
1571         ENTRY;
1572
1573         if (lsm_bad_magic(lsm))
1574                 RETURN(-EINVAL);
1575
1576         /* we should never be asked to replay a lock this way. */
1577         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1578
1579         if (!exp || !exp->exp_obd)
1580                 RETURN(-ENODEV);
1581
1582         lov = &exp->exp_obd->u.lov;
1583         rc = lov_prep_enqueue_set(exp, lsm, policy, mode, lockh, &set);
1584         if (rc)
1585                 RETURN(rc);
1586
1587         list_for_each (pos, &set->set_list) {
1588                 ldlm_policy_data_t sub_policy;
1589                 req = list_entry(pos, struct lov_request, rq_link);
1590                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1591                 LASSERT(lov_lockhp);
1592
1593                 *flags = save_flags;
1594                 sub_policy.l_extent.start = req->rq_extent.start;
1595                 sub_policy.l_extent.end = req->rq_extent.end;
1596
1597                 rc = obd_enqueue(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1598                                  type, &sub_policy, mode, flags, bl_cb,
1599                                  cp_cb, gl_cb, data, lvb_len, lvb_swabber,
1600                                  lov_lockhp);
1601                 rc = lov_update_enqueue_set(set, req, rc, save_flags);
1602                 if (rc != ELDLM_OK)
1603                         break;
1604         }
1605
1606         lov_fini_enqueue_set(set, mode);
1607         RETURN(rc);
1608 }
1609
1610 static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
1611                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
1612                      int *flags, void *data, struct lustre_handle *lockh)
1613 {
1614         struct lov_request_set *set;
1615         struct lov_request *req;
1616         struct list_head *pos;
1617         struct lov_obd *lov = &exp->exp_obd->u.lov;
1618         struct lustre_handle *lov_lockhp;
1619         int lov_flags, rc = 0;
1620         ENTRY;
1621
1622         if (lsm_bad_magic(lsm))
1623                 RETURN(-EINVAL);
1624
1625         if (!exp || !exp->exp_obd)
1626                 RETURN(-ENODEV);
1627
1628         lov = &exp->exp_obd->u.lov;
1629         rc = lov_prep_match_set(exp, lsm, policy, mode, lockh, &set);
1630         if (rc)
1631                 RETURN(rc);
1632
1633         list_for_each (pos, &set->set_list) {
1634                 ldlm_policy_data_t sub_policy;
1635                 req = list_entry(pos, struct lov_request, rq_link);
1636                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1637                 LASSERT(lov_lockhp);
1638
1639                 sub_policy.l_extent.start = req->rq_extent.start;
1640                 sub_policy.l_extent.end = req->rq_extent.end;
1641                 lov_flags = *flags;
1642
1643                 rc = obd_match(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1644                                type, &sub_policy, mode, &lov_flags, data,
1645                                lov_lockhp);
1646                 rc = lov_update_match_set(set, req, rc);
1647                 if (rc != 1)
1648                         break;
1649         }
1650         lov_fini_match_set(set, mode, *flags);
1651         RETURN(rc);
1652 }
1653
1654 static int lov_change_cbdata(struct obd_export *exp,
1655                              struct lov_stripe_md *lsm, ldlm_iterator_t it,
1656                              void *data)
1657 {
1658         struct lov_obd *lov;
1659         struct lov_oinfo *loi;
1660         int rc = 0, i;
1661         ENTRY;
1662
1663         if (lsm_bad_magic(lsm))
1664                 RETURN(-EINVAL);
1665
1666         if (!exp || !exp->exp_obd)
1667                 RETURN(-ENODEV);
1668
1669         LASSERT(lsm->lsm_object_gr > 0);
1670
1671         lov = &exp->exp_obd->u.lov;
1672         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1673                 struct lov_stripe_md submd;
1674                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1675                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1676                         continue;
1677                 }
1678
1679                 submd.lsm_object_id = loi->loi_id;
1680                 submd.lsm_object_gr = lsm->lsm_object_gr;
1681                 submd.lsm_stripe_count = 0;
1682                 rc = obd_change_cbdata(lov->tgts[loi->loi_ost_idx].ltd_exp,
1683                                        &submd, it, data);
1684         }
1685         RETURN(rc);
1686 }
1687
1688 static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
1689                       __u32 mode, struct lustre_handle *lockh)
1690 {
1691         struct lov_request_set *set;
1692         struct lov_request *req;
1693         struct list_head *pos;
1694         struct lov_obd *lov = &exp->exp_obd->u.lov;
1695         struct lustre_handle *lov_lockhp;
1696         int err = 0, rc = 0;
1697         ENTRY;
1698
1699         if (lsm_bad_magic(lsm))
1700                 RETURN(-EINVAL);
1701
1702         if (!exp || !exp->exp_obd)
1703                 RETURN(-ENODEV);
1704
1705         LASSERT(lsm->lsm_object_gr > 0);
1706
1707         LASSERT(lockh);
1708         lov = &exp->exp_obd->u.lov;
1709         rc = lov_prep_cancel_set(exp, lsm, mode, lockh, &set);
1710         if (rc)
1711                 RETURN(rc);
1712
1713         list_for_each (pos, &set->set_list) {
1714                 req = list_entry(pos, struct lov_request, rq_link);
1715                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1716
1717                 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1718                                 mode, lov_lockhp);
1719                 rc = lov_update_common_set(set, req, rc);
1720                 if (rc) {
1721                         CERROR("error: cancel objid "LPX64" subobj "
1722                                LPX64" on OST idx %d: rc = %d\n",
1723                                lsm->lsm_object_id,
1724                                req->rq_md->lsm_object_id, req->rq_idx, rc);
1725                         err = rc;
1726                 }
1727  
1728         }
1729         lov_fini_cancel_set(set);
1730         RETURN(err);
1731 }
1732
1733 static int lov_cancel_unused(struct obd_export *exp,
1734                              struct lov_stripe_md *lsm, 
1735                              int flags, void *opaque)
1736 {
1737         struct lov_obd *lov;
1738         struct lov_oinfo *loi;
1739         int rc = 0, i;
1740         ENTRY;
1741
1742         lov = &exp->exp_obd->u.lov;
1743         if (lsm == NULL) {
1744                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1745                         int err = obd_cancel_unused(lov->tgts[i].ltd_exp,
1746                                                     NULL, flags, opaque);
1747                         if (!rc)
1748                                 rc = err;
1749                 }
1750                 RETURN(rc);
1751         }
1752
1753         if (lsm_bad_magic(lsm))
1754                 RETURN(-EINVAL);
1755
1756         if (!exp || !exp->exp_obd)
1757                 RETURN(-ENODEV);
1758
1759         LASSERT(lsm->lsm_object_gr > 0);
1760
1761         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1762                 struct lov_stripe_md submd;
1763                 int err;
1764
1765                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1766                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1767
1768                 submd.lsm_object_id = loi->loi_id;
1769                 submd.lsm_object_gr = lsm->lsm_object_gr;
1770                 submd.lsm_stripe_count = 0;
1771                 err = obd_cancel_unused(lov->tgts[loi->loi_ost_idx].ltd_exp,
1772                                         &submd, flags, opaque);
1773                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1774                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
1775                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1776                                loi->loi_id, loi->loi_ost_idx, err);
1777                         if (!rc)
1778                                 rc = err;
1779                 }
1780         }
1781         RETURN(rc);
1782 }
1783
1784 #define LOV_U64_MAX ((__u64)~0ULL)
1785 #define LOV_SUM_MAX(tot, add)                                           \
1786         do {                                                            \
1787                 if ((tot) + (add) < (tot))                              \
1788                         (tot) = LOV_U64_MAX;                            \
1789                 else                                                    \
1790                         (tot) += (add);                                 \
1791         } while(0)
1792
1793 static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1794                       unsigned long max_age)
1795 {
1796         struct lov_obd *lov = &obd->u.lov;
1797         struct obd_statfs lov_sfs;
1798         int set = 0;
1799         int rc = 0;
1800         int i;
1801         ENTRY;
1802
1803
1804         /* We only get block data from the OBD */
1805         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1806                 int err;
1807                 if (!lov->tgts[i].active) {
1808                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1809                         continue;
1810                 }
1811
1812                 err = obd_statfs(class_exp2obd(lov->tgts[i].ltd_exp), &lov_sfs,
1813                                  max_age);
1814                 if (err) {
1815                         if (lov->tgts[i].active && !rc)
1816                                 rc = err;
1817                         continue;
1818                 }
1819
1820                 if (!set) {
1821                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
1822                         set = 1;
1823                 } else {
1824                         osfs->os_bfree += lov_sfs.os_bfree;
1825                         osfs->os_bavail += lov_sfs.os_bavail;
1826                         osfs->os_blocks += lov_sfs.os_blocks;
1827                         /* XXX not sure about this one - depends on policy.
1828                          *   - could be minimum if we always stripe on all OBDs
1829                          *     (but that would be wrong for any other policy,
1830                          *     if one of the OBDs has no more objects left)
1831                          *   - could be sum if we stripe whole objects
1832                          *   - could be average, just to give a nice number
1833                          *
1834                          * To give a "reasonable" (if not wholly accurate)
1835                          * number, we divide the total number of free objects
1836                          * by expected stripe count (watch out for overflow).
1837                          */
1838                         LOV_SUM_MAX(osfs->os_files, lov_sfs.os_files);
1839                         LOV_SUM_MAX(osfs->os_ffree, lov_sfs.os_ffree);
1840                 }
1841         }
1842
1843         if (set) {
1844                 __u32 expected_stripes = lov->desc.ld_default_stripe_count ?
1845                                          lov->desc.ld_default_stripe_count :
1846                                          lov->desc.ld_active_tgt_count;
1847
1848                 if (osfs->os_files != LOV_U64_MAX)
1849                         do_div(osfs->os_files, expected_stripes);
1850                 if (osfs->os_ffree != LOV_U64_MAX)
1851                         do_div(osfs->os_ffree, expected_stripes);
1852         } else if (!rc)
1853                 rc = -EIO;
1854
1855         RETURN(rc);
1856 }
1857
1858 static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1859                          void *karg, void *uarg)
1860 {
1861         struct obd_device *obddev = class_exp2obd(exp);
1862         struct lov_obd *lov = &obddev->u.lov;
1863         int i, rc, count = lov->desc.ld_tgt_count;
1864         struct obd_uuid *uuidp;
1865         ENTRY;
1866
1867         switch (cmd) {
1868         case OBD_IOC_LOV_GET_CONFIG: {
1869                 struct obd_ioctl_data *data = karg;
1870                 struct lov_tgt_desc *tgtdesc;
1871                 struct lov_desc *desc;
1872                 char *buf = NULL;
1873                 __u32 *genp;
1874
1875                 buf = NULL;
1876                 len = 0;
1877                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1878                         RETURN(-EINVAL);
1879
1880                 data = (struct obd_ioctl_data *)buf;
1881
1882                 if (sizeof(*desc) > data->ioc_inllen1) {
1883                         obd_ioctl_freedata(buf, len);
1884                         RETURN(-EINVAL);
1885                 }
1886
1887                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
1888                         obd_ioctl_freedata(buf, len);
1889                         RETURN(-EINVAL);
1890                 }
1891
1892                 if (sizeof(__u32) * count > data->ioc_inllen3) {
1893                         obd_ioctl_freedata(buf, len);
1894                         RETURN(-EINVAL);
1895                 }
1896
1897                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1898                 memcpy(desc, &(lov->desc), sizeof(*desc));
1899
1900                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
1901                 genp = (__u32 *)data->ioc_inlbuf3;
1902                 tgtdesc = lov->tgts;
1903                 /* the uuid will be empty for deleted OSTs */
1904                 for (i = 0; i < count; i++, uuidp++, genp++, tgtdesc++) {
1905                         obd_str2uuid(uuidp, (char *)tgtdesc->uuid.uuid);
1906                         *genp = tgtdesc->ltd_gen;
1907                 }
1908
1909                 rc = copy_to_user((void *)uarg, buf, len);
1910                 if (rc)
1911                         rc = -EFAULT;
1912                 obd_ioctl_freedata(buf, len);
1913                 break;
1914         }
1915         case LL_IOC_LOV_SETSTRIPE:
1916                 rc = lov_setstripe(exp, karg, uarg);
1917                 break;
1918         case LL_IOC_LOV_GETSTRIPE:
1919                 rc = lov_getstripe(exp, karg, uarg);
1920                 break;
1921         case LL_IOC_LOV_SETEA:
1922                 rc = lov_setea(exp, karg, uarg);
1923                 break;
1924         default: {
1925                 int set = 0;
1926                 if (count == 0)
1927                         RETURN(-ENOTTY);
1928                 rc = 0;
1929                 for (i = 0; i < count; i++) {
1930                         int err;
1931
1932                         /* OST was deleted */
1933                         if (obd_uuid_empty(&lov->tgts[i].uuid))
1934                                 continue;
1935
1936                         err = obd_iocontrol(cmd, lov->tgts[i].ltd_exp,
1937                                             len, karg, uarg);
1938                         if (err) {
1939                                 if (lov->tgts[i].active) {
1940                                         CERROR("error: iocontrol OSC %s on OST "
1941                                                "idx %d cmd %x: err = %d\n",
1942                                                lov->tgts[i].uuid.uuid, i,
1943                                                cmd, err);
1944                                         if (!rc)
1945                                                 rc = err;
1946                                 }
1947                         } else
1948                                 set = 1;
1949                 }
1950                 if (!set && !rc)
1951                         rc = -EIO;
1952         }
1953         }
1954
1955         RETURN(rc);
1956 }
1957
1958 static int lov_get_info(struct obd_export *exp, __u32 keylen,
1959                         void *key, __u32 *vallen, void *val)
1960 {
1961         struct obd_device *obddev = class_exp2obd(exp);
1962         struct lov_obd *lov = &obddev->u.lov;
1963         int i;
1964         ENTRY;
1965
1966         if (!vallen || !val)
1967                 RETURN(-EFAULT);
1968
1969         if (keylen > strlen("lock_to_stripe") &&
1970             strcmp(key, "lock_to_stripe") == 0) {
1971                 struct {
1972                         char name[16];
1973                         struct ldlm_lock *lock;
1974                         struct lov_stripe_md *lsm;
1975                 } *data = key;
1976                 struct lov_oinfo *loi;
1977                 struct ldlm_res_id *res_id = &data->lock->l_resource->lr_name;
1978                 __u32 *stripe = val;
1979
1980                 if (*vallen < sizeof(*stripe))
1981                         RETURN(-EFAULT);
1982                 *vallen = sizeof(*stripe);
1983
1984                 /* XXX This is another one of those bits that will need to
1985                  * change if we ever actually support nested LOVs.  It uses
1986                  * the lock's export to find out which stripe it is. */
1987                 /* XXX - it's assumed all the locks for deleted OSTs have
1988                  * been cancelled. Also, the export for deleted OSTs will
1989                  * be NULL and won't match the lock's export. */
1990                 for (i = 0, loi = data->lsm->lsm_oinfo;
1991                      i < data->lsm->lsm_stripe_count;
1992                      i++, loi++) {
1993                         if (lov->tgts[loi->loi_ost_idx].ltd_exp ==
1994                                         data->lock->l_conn_export &&
1995                             loi->loi_id == res_id->name[0] &&
1996                             loi->loi_gr == res_id->name[2]) {
1997                                 *stripe = i;
1998                                 RETURN(0);
1999                         }
2000                 }
2001                 LDLM_ERROR(data->lock, "lock on inode without such object\n");
2002                 dump_lsm(D_ERROR, data->lsm);
2003                 RETURN(-ENXIO);
2004         } else if (keylen >= strlen("size_to_stripe") &&
2005                    strcmp(key, "size_to_stripe") == 0) {
2006                 struct {
2007                         int stripe_number;
2008                         __u64 size;
2009                         struct lov_stripe_md *lsm;
2010                 } *data = val;
2011
2012                 if (*vallen < sizeof(*data))
2013                         RETURN(-EFAULT);
2014
2015                 data->size = lov_size_to_stripe(data->lsm, data->size,
2016                                                 data->stripe_number);
2017                 RETURN(0);
2018         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
2019                 __u32 size = sizeof(obd_id);
2020                 obd_id *ids = val;
2021                 int rc = 0;
2022
2023                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2024                         if (!lov->tgts[i].active)
2025                                 continue;
2026                         rc = obd_get_info(lov->tgts[i].ltd_exp,
2027                                           keylen, key, &size, &(ids[i]));
2028                         if (rc != 0)
2029                                 RETURN(rc);
2030                 }
2031                 RETURN(0);
2032         } else if (keylen >= strlen("lovdesc") && strcmp(key, "lovdesc") == 0) {
2033                 struct lov_desc *desc_ret = val;
2034                 *desc_ret = lov->desc;
2035
2036                 RETURN(0);
2037         }
2038
2039         RETURN(-EINVAL);
2040 }
2041
2042 static int lov_set_info(struct obd_export *exp, obd_count keylen,
2043                         void *key, obd_count vallen, void *val)
2044 {
2045         struct obd_device *obddev = class_exp2obd(exp);
2046         struct lov_obd *lov = &obddev->u.lov;
2047         int i, rc = 0, err;
2048         ENTRY;
2049
2050 #define KEY_IS(str) \
2051         (keylen == strlen(str) && memcmp(key, str, keylen) == 0)
2052
2053         if (KEY_IS("async")) {
2054                 struct lov_desc *desc = &lov->desc;
2055                 struct lov_tgt_desc *tgts = lov->tgts;
2056
2057                 if (vallen != sizeof(int))
2058                         RETURN(-EINVAL);
2059                 lov->async = *((int*) val);
2060
2061                 for (i = 0; i < desc->ld_tgt_count; i++, tgts++) {
2062                         struct obd_uuid *tgt_uuid = &tgts->uuid;
2063                         struct obd_device *tgt_obd;
2064
2065                         tgt_obd = class_find_client_obd(tgt_uuid,
2066                                                         OBD_OSC_DEVICENAME,
2067                                                         &obddev->obd_uuid);
2068                         if (!tgt_obd) {
2069                                 CERROR("Target %s not attached\n",
2070                                         tgt_uuid->uuid);
2071                                 if (!rc)
2072                                         rc = -EINVAL;
2073                                 continue;
2074                         }
2075
2076                         err = obd_set_info(tgt_obd->obd_self_export,
2077                                            keylen, key, vallen, val);
2078                         if (err) {
2079                                 CERROR("Failed to set async on target %s\n",
2080                                         tgt_obd->obd_name);
2081                                 if (!rc)
2082                                         rc = err;
2083                         }
2084                 }
2085                 RETURN(rc);
2086         }
2087
2088         if (KEY_IS("mds_conn")) {
2089                 if (vallen != sizeof(__u32))
2090                         RETURN(-EINVAL);
2091         } else if (KEY_IS("unlinked") || KEY_IS("unrecovery")) {
2092                 if (vallen != 0)
2093                         RETURN(-EINVAL);
2094         } else if (KEY_IS("sec") || KEY_IS("sec_flags")) {
2095                 struct lov_tgt_desc *tgt;
2096                 struct obd_export *exp;
2097                 int rc = 0, err, i;
2098
2099                 spin_lock(&lov->lov_lock);
2100                 for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count;
2101                      i++, tgt++) {
2102                         exp = tgt->ltd_exp;
2103                         /* during setup time the connections to osc might
2104                          * haven't been established.
2105                          */
2106                         if (exp == NULL) {
2107                                 struct obd_device *tgt_obd;
2108
2109                                 tgt_obd = class_find_client_obd(&tgt->uuid,
2110                                                                 OBD_OSC_DEVICENAME,
2111                                                                 &obddev->obd_uuid);
2112                                 if (!tgt_obd) {
2113                                         CERROR("can't set security flavor, "
2114                                                "device %s not attached?\n",
2115                                                 tgt->uuid.uuid);
2116                                         rc = -EINVAL;
2117                                         continue;
2118                                 }
2119                                 exp = tgt_obd->obd_self_export;
2120                         }
2121
2122                         err = obd_set_info(exp, keylen, key, vallen, val);
2123                         if (!rc)
2124                                 rc = err;
2125                 }
2126                 spin_unlock(&lov->lov_lock);
2127
2128                 RETURN(rc);
2129         } else if (KEY_IS("flush_cred")) {
2130                 struct lov_tgt_desc *tgt;
2131                 int rc = 0, i;
2132
2133                 for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count;
2134                      i++, tgt++) {
2135                         if (!tgt->ltd_exp)
2136                                 continue;
2137                         rc = obd_set_info(tgt->ltd_exp,
2138                                           keylen, key, vallen, val);
2139                         if (rc)
2140                                 RETURN(rc);
2141                 }
2142
2143                 RETURN(0);
2144         } else {
2145                 RETURN(-EINVAL);
2146         }
2147
2148         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2149                 if (val && !obd_uuid_equals(val, &lov->tgts[i].uuid))
2150                         continue;
2151
2152                 if (!val && !lov->tgts[i].active)
2153                         continue;
2154
2155                 err = obd_set_info(lov->tgts[i].ltd_exp,
2156                                   keylen, key, vallen, val);
2157                 if (!rc)
2158                         rc = err;
2159         }
2160         RETURN(rc);
2161 #undef KEY_IS
2162 }
2163
2164 #if 0
2165 struct lov_multi_wait {
2166         struct ldlm_lock *lock;
2167         wait_queue_t      wait;
2168         int               completed;
2169         int               generation;
2170 };
2171
2172 int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm,
2173                       struct lustre_handle *lockh)
2174 {
2175         struct lov_lock_handles *lov_lockh = NULL;
2176         struct lustre_handle *lov_lockhp;
2177         struct lov_obd *lov;
2178         struct lov_oinfo *loi;
2179         struct lov_multi_wait *queues;
2180         int rc = 0, i;
2181         ENTRY;
2182
2183         if (lsm_bad_magic(lsm))
2184                 RETURN(-EINVAL);
2185
2186         if (!exp || !exp->exp_obd)
2187                 RETURN(-ENODEV);
2188
2189         LASSERT(lockh != NULL);
2190         if (lsm->lsm_stripe_count > 1) {
2191                 lov_lockh = lov_handle2llh(lockh);
2192                 if (lov_lockh == NULL) {
2193                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
2194                         RETURN(-EINVAL);
2195                 }
2196
2197                 lov_lockhp = lov_lockh->llh_handles;
2198         } else {
2199                 lov_lockhp = lockh;
2200         }
2201
2202         OBD_ALLOC(queues, lsm->lsm_stripe_count * sizeof(*queues));
2203         if (queues == NULL)
2204                 GOTO(out, rc = -ENOMEM);
2205
2206         lov = &exp->exp_obd->u.lov;
2207         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
2208              i++, loi++, lov_lockhp++) {
2209                 struct ldlm_lock *lock;
2210                 struct obd_device *obd;
2211                 unsigned long irqflags;
2212
2213                 lock = ldlm_handle2lock(lov_lockhp);
2214                 if (lock == NULL) {
2215                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
2216                                loi->loi_ost_idx, loi->loi_id);
2217                         queues[i].completed = 1;
2218                         continue;
2219                 }
2220
2221                 queues[i].lock = lock;
2222                 init_waitqueue_entry(&(queues[i].wait), current);
2223                 add_wait_queue(lock->l_waitq, &(queues[i].wait));
2224
2225                 obd = class_exp2obd(lock->l_conn_export);
2226                 if (obd != NULL)
2227                         imp = obd->u.cli.cl_import;
2228                 if (imp != NULL) {
2229                         spin_lock_irqsave(&imp->imp_lock, irqflags);
2230                         queues[i].generation = imp->imp_generation;
2231                         spin_unlock_irqrestore(&imp->imp_lock, irqflags);
2232                 }
2233         }
2234
2235         lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
2236                                interrupted_completion_wait, &lwd);
2237         rc = l_wait_event_added(check_multi_complete(queues, lsm), &lwi);
2238
2239         for (i = 0; i < lsm->lsm_stripe_count; i++)
2240                 remove_wait_queue(lock->l_waitq, &(queues[i].wait));
2241
2242         if (rc == -EINTR || rc == -ETIMEDOUT) {
2243
2244
2245         }
2246
2247  out:
2248         if (lov_lockh != NULL)
2249                 lov_llh_put(lov_lockh);
2250         RETURN(rc);
2251 }
2252 #endif
2253
2254 struct obd_ops lov_obd_ops = {
2255         .o_owner               = THIS_MODULE,
2256         .o_attach              = lov_attach,
2257         .o_detach              = lov_detach,
2258         .o_setup               = lov_setup,
2259         .o_cleanup             = lov_cleanup,
2260         .o_process_config      = lov_process_config,
2261         .o_connect             = lov_connect,
2262         .o_disconnect          = lov_disconnect,
2263         .o_statfs              = lov_statfs,
2264         .o_packmd              = lov_packmd,
2265         .o_unpackmd            = lov_unpackmd,
2266         .o_revalidate_md       = lov_revalidate_md,
2267         .o_create              = lov_create,
2268         .o_destroy             = lov_destroy,
2269         .o_getattr             = lov_getattr,
2270         .o_getattr_async       = lov_getattr_async,
2271         .o_setattr             = lov_setattr,
2272         .o_brw                 = lov_brw,
2273         .o_brw_async           = lov_brw_async,
2274         .o_prep_async_page     = lov_prep_async_page,
2275         .o_queue_async_io      = lov_queue_async_io,
2276         .o_set_async_flags     = lov_set_async_flags,
2277         .o_queue_group_io      = lov_queue_group_io,
2278         .o_trigger_group_io    = lov_trigger_group_io,
2279         .o_teardown_async_page = lov_teardown_async_page,
2280         .o_adjust_kms          = lov_adjust_kms,
2281         .o_punch               = lov_punch,
2282         .o_sync                = lov_sync,
2283         .o_enqueue             = lov_enqueue,
2284         .o_match               = lov_match,
2285         .o_change_cbdata       = lov_change_cbdata,
2286         .o_cancel              = lov_cancel,
2287         .o_cancel_unused       = lov_cancel_unused,
2288         .o_iocontrol           = lov_iocontrol,
2289         .o_get_info            = lov_get_info,
2290         .o_set_info            = lov_set_info,
2291         .o_llog_init           = lov_llog_init,
2292         .o_llog_finish         = lov_llog_finish,
2293         .o_notify              = lov_notify,
2294 };
2295
2296 int __init lov_init(void)
2297 {
2298         struct lprocfs_static_vars lvars;
2299         int rc;
2300         ENTRY;
2301
2302         lprocfs_init_vars(lov, &lvars);
2303         rc = class_register_type(&lov_obd_ops, NULL, lvars.module_vars,
2304                                  OBD_LOV_DEVICENAME);
2305         RETURN(rc);
2306 }
2307
2308 #ifdef __KERNEL__
2309 static void /*__exit*/ lov_exit(void)
2310 {
2311         class_unregister_type(OBD_LOV_DEVICENAME);
2312 }
2313
2314 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2315 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
2316 MODULE_LICENSE("GPL");
2317
2318 module_init(lov_init);
2319 module_exit(lov_exit);
2320 #endif