Whamcloud - gitweb
land b_hd_sec onto HEAD:
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  * Author: Phil Schwan <phil@clusterfs.com>
6  *         Peter Braam <braam@clusterfs.com>
7  *         Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29 #ifdef __KERNEL__
30 #include <linux/slab.h>
31 #include <linux/module.h>
32 #include <linux/init.h>
33 #include <linux/slab.h>
34 #include <linux/pagemap.h>
35 #include <linux/seq_file.h>
36 #include <asm/div64.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/obd_support.h>
42 #include <linux/lustre_lib.h>
43 #include <linux/lustre_net.h>
44 #include <linux/lustre_idl.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/lustre_mds.h>
47 #include <linux/obd_class.h>
48 #include <linux/obd_lov.h>
49 #include <linux/obd_ost.h>
50 #include <linux/lprocfs_status.h>
51
52 #include "lov_internal.h"
53
54 /* obd methods */
55 #define MAX_STRING_SIZE 128
56 static int lov_connect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt,
57                            int activate, struct obd_connect_data *conn_data,
58                            unsigned long connect_flags)
59 {
60         struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
61         struct obd_uuid *tgt_uuid = &tgt->uuid;
62
63 #ifdef __KERNEL__
64         struct proc_dir_entry *lov_proc_dir;
65 #endif
66         struct lov_obd *lov = &obd->u.lov;
67         struct lustre_handle conn = {0, };
68         struct obd_device *tgt_obd;
69         int rc;
70         ENTRY;
71
72         tgt_obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
73                                         &obd->obd_uuid);
74
75         if (!tgt_obd) {
76                 CERROR("Target %s not attached\n", tgt_uuid->uuid);
77                 RETURN(-EINVAL);
78         }
79
80         if (!tgt_obd->obd_set_up) {
81                 CERROR("Target %s not set up\n", tgt_uuid->uuid);
82                 RETURN(-EINVAL);
83         }
84
85         if (activate) {
86                 tgt_obd->obd_no_recov = 0;
87                 ptlrpc_activate_import(tgt_obd->u.cli.cl_import);
88         }
89
90         if (tgt_obd->u.cli.cl_import->imp_invalid) {
91                 CERROR("not connecting OSC %s; administratively "
92                        "disabled\n", tgt_uuid->uuid);
93                 rc = obd_register_observer(tgt_obd, obd);
94                 if (rc) {
95                         CERROR("Target %s register_observer error %d; "
96                                "will not be able to reactivate\n",
97                                tgt_uuid->uuid, rc);
98                 }
99                 RETURN(0);
100         }
101
102         rc = obd_connect(&conn, tgt_obd, &lov_osc_uuid, conn_data,
103                          connect_flags);
104         if (rc) {
105                 CERROR("Target %s connect error %d\n", tgt_uuid->uuid, rc);
106                 RETURN(rc);
107         }
108         tgt->ltd_exp = class_conn2export(&conn);
109
110         rc = obd_register_observer(tgt_obd, obd);
111         if (rc) {
112                 CERROR("Target %s register_observer error %d\n",
113                        tgt_uuid->uuid, rc);
114                 obd_disconnect(tgt->ltd_exp, 0);
115                 tgt->ltd_exp = NULL;
116                 RETURN(rc);
117         }
118
119         tgt->active = 1;
120         lov->desc.ld_active_tgt_count++;
121
122 #ifdef __KERNEL__
123         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
124         if (lov_proc_dir) {
125                 struct obd_device *osc_obd = class_conn2obd(&conn);
126                 struct proc_dir_entry *osc_symlink;
127                 char name[MAX_STRING_SIZE + 1];
128
129                 LASSERT(osc_obd != NULL);
130                 LASSERT(osc_obd->obd_type != NULL);
131                 LASSERT(osc_obd->obd_type->typ_name != NULL);
132                 name[MAX_STRING_SIZE] = '\0';
133                 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
134                          osc_obd->obd_type->typ_name,
135                          osc_obd->obd_name);
136                 osc_symlink = proc_symlink(osc_obd->obd_name, lov_proc_dir,
137                                            name);
138                 if (osc_symlink == NULL) {
139                         CERROR("could not register LOV target "
140                                "/proc/fs/lustre/%s/%s/target_obds/%s\n",
141                                obd->obd_type->typ_name, obd->obd_name,
142                                osc_obd->obd_name);
143                         lprocfs_remove(lov_proc_dir);
144                         lov_proc_dir = NULL;
145                 }
146         }
147 #endif
148
149         RETURN(0);
150 }
151
152 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
153                        struct obd_uuid *cluuid, struct obd_connect_data *data,
154                        unsigned long flags)
155 {
156 #ifdef __KERNEL__
157         struct proc_dir_entry *lov_proc_dir;
158 #endif
159         struct lov_obd *lov = &obd->u.lov;
160         struct lov_tgt_desc *tgt;
161         struct obd_export *exp;
162         int rc, rc2, i;
163         ENTRY;
164
165         rc = class_connect(conn, obd, cluuid);
166         if (rc)
167                 RETURN(rc);
168
169         exp = class_conn2export(conn);
170
171         /* We don't want to actually do the underlying connections more than
172          * once, so keep track. */
173         lov->refcount++;
174         if (lov->refcount > 1) {
175                 class_export_put(exp);
176                 RETURN(0);
177         }
178
179 #ifdef __KERNEL__
180         lov_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
181                                         NULL, NULL);
182         if (IS_ERR(lov_proc_dir)) {
183                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
184                        obd->obd_type->typ_name, obd->obd_name);
185                 lov_proc_dir = NULL;
186         }
187 #endif
188
189         /* connect_flags is the MDS number, save for use in lov_add_obd */
190         lov->lov_connect_flags = flags;
191         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
192                 if (obd_uuid_empty(&tgt->uuid))
193                         continue;
194                 rc = lov_connect_obd(obd, tgt, 0, data, flags);
195                 if (rc)
196                         GOTO(out_disc, rc);
197         }
198
199         class_export_put(exp);
200         RETURN (0);
201
202  out_disc:
203 #ifdef __KERNEL__
204         if (lov_proc_dir)
205                 lprocfs_remove(lov_proc_dir);
206 #endif
207
208         while (i-- > 0) {
209                 struct obd_uuid uuid;
210                 --tgt;
211                 --lov->desc.ld_active_tgt_count;
212                 tgt->active = 0;
213                 /* save for CERROR below; (we know it's terminated) */
214                 uuid = tgt->uuid;
215                 rc2 = obd_disconnect(tgt->ltd_exp, 0);
216                 if (rc2)
217                         CERROR("error: LOV target %s disconnect on OST idx %d: "
218                                "rc = %d\n", uuid.uuid, i, rc2);
219         }
220         class_disconnect(exp, 0);
221         RETURN (rc);
222 }
223
224 static int lov_disconnect_obd(struct obd_device *obd, 
225                               struct lov_tgt_desc *tgt,
226                               unsigned long flags)
227 {
228 #ifdef __KERNEL__
229         struct proc_dir_entry *lov_proc_dir;
230 #endif
231         struct obd_device *osc_obd = class_exp2obd(tgt->ltd_exp);
232         struct lov_obd *lov = &obd->u.lov;
233         int rc;
234         ENTRY;
235
236 #ifdef __KERNEL__
237         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
238         if (lov_proc_dir) {
239                 struct proc_dir_entry *osc_symlink;
240
241                 osc_symlink = lprocfs_srch(lov_proc_dir, osc_obd->obd_name);
242                 if (osc_symlink) {
243                         lprocfs_remove(osc_symlink);
244                 } else {
245                         CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
246                                obd->obd_type->typ_name, obd->obd_name,
247                                osc_obd->obd_name);
248                 }
249         }
250 #endif
251         if (obd->obd_no_recov) {
252                 /* Pass it on to our clients.
253                  * XXX This should be an argument to disconnect,
254                  * XXX not a back-door flag on the OBD.  Ah well.
255                  */
256                 if (osc_obd)
257                         osc_obd->obd_no_recov = 1;
258         }
259
260         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
261         rc = obd_disconnect(tgt->ltd_exp, flags);
262         if (rc) {
263                 if (tgt->active) {
264                         CERROR("Target %s disconnect error %d\n",
265                                tgt->uuid.uuid, rc);
266                 }
267                 rc = 0;
268         }
269
270         if (tgt->active) {
271                 tgt->active = 0;
272                 lov->desc.ld_active_tgt_count--;
273         }
274         tgt->ltd_exp = NULL;
275         RETURN(0);
276 }
277
278 static int lov_disconnect(struct obd_export *exp, unsigned long flags)
279 {
280         struct obd_device *obd = class_exp2obd(exp);
281 #ifdef __KERNEL__
282         struct proc_dir_entry *lov_proc_dir;
283 #endif
284         struct lov_obd *lov = &obd->u.lov;
285         struct lov_tgt_desc *tgt;
286         int rc, i;
287         ENTRY;
288
289         if (!lov->tgts)
290                 goto out_local;
291
292         /* Only disconnect the underlying layers on the final disconnect. */
293         lov->refcount--;
294         if (lov->refcount != 0)
295                 goto out_local;
296
297         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
298                 if (tgt->ltd_exp)
299                         lov_disconnect_obd(obd, tgt, flags);
300         }
301
302 #ifdef __KERNEL__
303         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
304         if (lov_proc_dir) {
305                 lprocfs_remove(lov_proc_dir);
306         } else {
307                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing.",
308                        obd->obd_type->typ_name, obd->obd_name);
309         }
310 #endif
311         
312  out_local:
313         rc = class_disconnect(exp, 0);
314         RETURN(rc);
315 }
316
317 /* Error codes:
318  *
319  *  -EINVAL  : UUID can't be found in the LOV's target list
320  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
321  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
322  */
323 static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
324                               int activate)
325 {
326         struct lov_tgt_desc *tgt;
327         int i, rc = 0;
328         ENTRY;
329
330         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
331                lov, uuid->uuid, activate);
332
333         spin_lock(&lov->lov_lock);
334         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
335                 if (tgt->ltd_exp == NULL)
336                         continue;
337
338                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
339                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
340                 if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof uuid->uuid) == 0)
341                         break;
342         }
343
344         if (i == lov->desc.ld_tgt_count)
345                 GOTO(out, rc = -EINVAL);
346
347
348         if (tgt->active == activate) {
349                 CDEBUG(D_INFO, "OSC %s already %sactive!\n", uuid->uuid,                       
350                         activate ? "" : "in");
351                 GOTO(out, rc);
352         }
353
354         CDEBUG(D_INFO, "Marking OSC %s %sactive\n", uuid->uuid,
355                activate ? "" : "in");
356
357         tgt->active = activate;
358         if (activate)
359                 lov->desc.ld_active_tgt_count++;
360         else
361                 lov->desc.ld_active_tgt_count--;
362
363         EXIT;
364  out:
365         spin_unlock(&lov->lov_lock);
366         return rc;
367 }
368
369 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
370                       int active, void *data)
371 {
372         struct obd_uuid *uuid;
373         int rc;
374         ENTRY;
375
376         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME)) {
377                 CERROR("unexpected notification of %s %s!\n",
378                        watched->obd_type->typ_name,
379                        watched->obd_name);
380                 return -EINVAL;
381         }
382         uuid = &watched->u.cli.cl_import->imp_target_uuid;
383
384         /* Set OSC as active before notifying the observer, so the
385          * observer can use the OSC normally.  
386          */
387         rc = lov_set_osc_active(&obd->u.lov, uuid, active);
388         if (rc) {
389                 CERROR("%sactivation of %s failed: %d\n",
390                        active ? "" : "de", uuid->uuid, rc);
391                 RETURN(rc);
392         }
393
394         if (obd->obd_observer)
395                 /* Pass the notification up the chain. */
396                 rc = obd_notify(obd->obd_observer, watched, active, data);
397
398         RETURN(rc);
399 }
400
401 int lov_attach(struct obd_device *dev, obd_count len, void *data)
402 {
403         struct lprocfs_static_vars lvars;
404         int rc;
405
406         lprocfs_init_vars(lov, &lvars);
407         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
408         if (rc == 0) {
409 #ifdef __KERNEL__
410                 struct proc_dir_entry *entry;
411
412                 entry = create_proc_entry("target_obd_status", 0444, 
413                                           dev->obd_proc_entry);
414                 if (entry == NULL) {
415                         rc = -ENOMEM;
416                 } else {
417                         entry->proc_fops = &lov_proc_target_fops;
418                         entry->data = dev;
419                 }
420 #endif
421         }
422         return rc;
423 }
424
425 int lov_detach(struct obd_device *dev)
426 {
427         return lprocfs_obd_detach(dev);
428 }
429
430 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
431 {
432         struct lov_obd *lov = &obd->u.lov;
433         struct lustre_cfg *lcfg = buf;
434         struct lov_desc *desc;
435         int count;
436         ENTRY;
437
438         if (lcfg->lcfg_inllen1 < 1) {
439                 CERROR("LOV setup requires a descriptor\n");
440                 RETURN(-EINVAL);
441         }
442
443         desc = (struct lov_desc *)lcfg->lcfg_inlbuf1;
444         if (sizeof(*desc) > lcfg->lcfg_inllen1) {
445                 CERROR("descriptor size wrong: %d > %d\n",
446                        (int)sizeof(*desc), lcfg->lcfg_inllen1);
447                 RETURN(-EINVAL);
448         }
449  
450         /* Because of 64-bit divide/mod operations only work with a 32-bit
451          * divisor in a 32-bit kernel, we cannot support a stripe width
452          * of 4GB or larger on 32-bit CPUs.
453          */
454        
455         count = desc->ld_default_stripe_count;
456         if (count && (count * desc->ld_default_stripe_size) > ~0UL) {
457                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
458                        desc->ld_default_stripe_size, count, ~0UL);
459                 RETURN(-EINVAL);
460         }
461         if (desc->ld_tgt_count > 0) {
462                 lov->bufsize= sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
463         } else {
464                 lov->bufsize = sizeof(struct lov_tgt_desc) * LOV_MAX_TGT_COUNT;  
465         }
466         OBD_ALLOC(lov->tgts, lov->bufsize);
467         if (lov->tgts == NULL) {
468                 lov->bufsize = 0;
469                 CERROR("couldn't allocate %d bytes for target table.\n",
470                        lov->bufsize);
471                 RETURN(-EINVAL);
472         }
473
474         desc->ld_tgt_count = 0;
475         desc->ld_active_tgt_count = 0;
476         lov->desc = *desc;
477         spin_lock_init(&lov->lov_lock);
478         sema_init(&lov->lov_llog_sem, 1);
479
480         RETURN(0);
481 }
482
483 static int lov_cleanup(struct obd_device *obd, int flags)
484 {
485         struct lov_obd *lov = &obd->u.lov;
486
487         OBD_FREE(lov->tgts, lov->bufsize);
488         RETURN(0);
489 }
490
491 static int
492 lov_add_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
493 {
494         struct lov_obd *lov = &obd->u.lov;
495         struct lov_tgt_desc *tgt;
496         int rc;
497         ENTRY;
498
499         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
500                uuidp->uuid, index, gen);
501
502         if ((index < 0) || (index >= LOV_MAX_TGT_COUNT)) {
503                 CERROR("request to add OBD %s at invalid index: %d\n",
504                        uuidp->uuid, index);
505                 RETURN(-EINVAL);
506         }
507
508         if (gen <= 0) {
509                 CERROR("request to add OBD %s with invalid generation: %d\n",
510                        uuidp->uuid, gen);
511                 RETURN(-EINVAL);
512         }
513
514         tgt = lov->tgts + index;
515         if (!obd_uuid_empty(&tgt->uuid)) {
516                 CERROR("OBD already assigned at LOV target index %d\n",
517                        index);
518                 RETURN(-EEXIST);
519         }
520
521         tgt->uuid = *uuidp;
522         /* XXX - add a sanity check on the generation number. */
523         tgt->ltd_gen = gen;
524
525         if (index >= lov->desc.ld_tgt_count)
526                 lov->desc.ld_tgt_count = index + 1;
527
528         CDEBUG(D_CONFIG, "idx: %d ltd_gen: %d ld_tgt_count: %d\n",
529                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
530
531         if (lov->refcount == 0)
532                 RETURN(0);
533
534         if (tgt->ltd_exp) {
535                 struct obd_device *osc_obd;
536
537                 osc_obd = class_exp2obd(tgt->ltd_exp);
538                 if (osc_obd)
539                         osc_obd->obd_no_recov = 0;
540         }
541
542         rc = lov_connect_obd(obd, tgt, 1, NULL, lov->lov_connect_flags);
543         if (rc)
544                 GOTO(out, rc);
545
546         if (obd->obd_observer) {
547                 /* tell the mds_lov about the new target */
548                 rc = obd_notify(obd->obd_observer, tgt->ltd_exp->exp_obd, 1,
549                                 (void *)index);
550         }
551
552         GOTO(out, rc);
553  out:
554         if (rc && tgt->ltd_exp != NULL)
555                 lov_disconnect_obd(obd, tgt, 0);
556         return rc;
557 }
558
559 static int
560 lov_del_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
561 {
562         struct lov_obd *lov = &obd->u.lov;
563         struct lov_tgt_desc *tgt;
564         int count = lov->desc.ld_tgt_count;
565         int rc = 0;
566         ENTRY;
567
568         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
569                uuidp->uuid, index, gen);
570
571         if (index >= count) {
572                 CERROR("LOV target index %d >= number of LOV OBDs %d.\n",
573                        index, count);
574                 RETURN(-EINVAL);
575         }
576
577         tgt = lov->tgts + index;
578
579         if (obd_uuid_empty(&tgt->uuid)) {
580                 CERROR("LOV target at index %d is not setup.\n", index);
581                 RETURN(-EINVAL);
582         }
583
584         if (strncmp(uuidp->uuid, tgt->uuid.uuid, sizeof uuidp->uuid) != 0) {
585                 CERROR("LOV target UUID %s at index %d doesn't match %s.\n",
586                        tgt->uuid.uuid, index, uuidp->uuid);
587                 RETURN(-EINVAL);
588         }
589
590         if (tgt->ltd_exp) {
591                 struct obd_device *osc_obd;
592
593                 osc_obd = class_exp2obd(tgt->ltd_exp);
594                 if (osc_obd) {
595                         osc_obd->obd_no_recov = 1;
596                         rc = obd_llog_finish(osc_obd, &osc_obd->obd_llogs, 1);
597                         if (rc)
598                                 CERROR("osc_llog_finish error: %d\n", rc);
599                 }
600                 lov_disconnect_obd(obd, tgt, 0);
601         }
602
603         /* XXX - right now there is a dependency on ld_tgt_count being the
604          * maximum tgt index for computing the mds_max_easize. So we can't
605          * shrink it. */
606
607         /* lt_gen = 0 will mean it will not match the gen of any valid loi */
608         memset(tgt, 0, sizeof(*tgt));
609
610         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
611                tgt->uuid.uuid, index, tgt->ltd_gen, tgt->ltd_exp, tgt->active);
612
613         RETURN(rc);
614 }
615
616 static int lov_process_config(struct obd_device *obd, obd_count len, void *buf)
617 {
618         struct lustre_cfg *lcfg = buf;
619         struct obd_uuid obd_uuid;
620         int cmd;
621         int index;
622         int gen;
623         int rc = 0;
624         ENTRY;
625
626         switch(cmd = lcfg->lcfg_command) {
627         case LCFG_LOV_ADD_OBD:
628         case LCFG_LOV_DEL_OBD: {
629                 if (lcfg->lcfg_inllen1 > sizeof(obd_uuid.uuid))
630                         GOTO(out, rc = -EINVAL);
631
632                 obd_str2uuid(&obd_uuid, lcfg->lcfg_inlbuf1);
633
634                 if (sscanf(lcfg->lcfg_inlbuf2, "%d", &index) != 1)
635                         GOTO(out, rc = -EINVAL);
636                 if (sscanf(lcfg->lcfg_inlbuf3, "%d", &gen) != 1)
637                         GOTO(out, rc = -EINVAL);
638                 if (cmd == LCFG_LOV_ADD_OBD)
639                         rc = lov_add_obd(obd, &obd_uuid, index, gen);
640                 else
641                         rc = lov_del_obd(obd, &obd_uuid, index, gen);
642                 GOTO(out, rc);
643         }
644         default: {
645                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
646                 GOTO(out, rc = -EINVAL);
647
648         }
649         }
650 out:
651         RETURN(rc);
652 }
653
654 #ifndef log2
655 #define log2(n) ffz(~(n))
656 #endif
657
658 static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
659                              struct lov_stripe_md **ea,
660                              struct obd_trans_info *oti)
661 {
662         struct lov_obd *lov;
663         struct obdo *tmp_oa;
664         struct obd_uuid *ost_uuid = NULL;
665         int rc = 0, i;
666         ENTRY;
667
668         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
669                 src_oa->o_flags == OBD_FL_DELORPHAN);
670
671         lov = &export->exp_obd->u.lov;
672
673         tmp_oa = obdo_alloc();
674         if (tmp_oa == NULL)
675                 RETURN(-ENOMEM);
676
677         if (src_oa->o_valid & OBD_MD_FLINLINE) {
678                 ost_uuid = (struct obd_uuid *)src_oa->o_inline;
679                 CDEBUG(D_HA, "clearing orphans only for %s\n",
680                        ost_uuid->uuid);
681         }
682
683         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
684                 struct lov_stripe_md obj_md;
685                 struct lov_stripe_md *obj_mdp = &obj_md;
686                 int err;
687
688                 /* if called for a specific target, we don't
689                    care if it is not active. */
690                 if (lov->tgts[i].active == 0 && ost_uuid == NULL) {
691                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
692                         continue;
693                 }
694
695                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &lov->tgts[i].uuid))
696                         continue;
697
698                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
699
700                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
701                 err = obd_create(lov->tgts[i].ltd_exp, tmp_oa, &obj_mdp, oti);
702                 if (err)
703                         /* This export will be disabled until it is recovered,
704                            and then orphan recovery will be completed. */
705                         CERROR("error in orphan recovery on OST idx %d/%d: "
706                                "rc = %d\n", i, lov->desc.ld_tgt_count, err);
707
708                 if (ost_uuid)
709                         break;
710         }
711         obdo_free(tmp_oa);
712         RETURN(rc);
713 }
714
715 static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
716                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
717 {
718         struct lov_stripe_md *obj_mdp, *lsm;
719         struct lov_obd *lov = &exp->exp_obd->u.lov;
720         unsigned ost_idx;
721         int rc, i;
722         ENTRY;
723
724         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
725                 src_oa->o_flags & OBD_FL_RECREATE_OBJS);
726
727         OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
728         if (obj_mdp == NULL)
729                 RETURN(-ENOMEM);
730
731         ost_idx = src_oa->o_nlink;
732         lsm = *ea;
733         if (lsm == NULL)
734                 GOTO(out, rc = -EINVAL);
735         if (ost_idx >= lov->desc.ld_tgt_count)
736                 GOTO(out, rc = -EINVAL);
737
738         for (i = 0; i < lsm->lsm_stripe_count; i++) {
739                 if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) {
740                         if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id)
741                                 GOTO(out, rc = -EINVAL);
742                         break;
743                 }
744         }
745         if (i == lsm->lsm_stripe_count)
746                 GOTO(out, rc = -EINVAL);
747
748         rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, &obj_mdp, oti);
749 out:
750         OBD_FREE(obj_mdp, sizeof(*obj_mdp));
751         RETURN(rc);
752 }
753
754 /* the LOV expects oa->o_id to be set to the LOV object id */
755 static int lov_create(struct obd_export *exp, struct obdo *src_oa,
756                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
757 {
758         struct lov_request_set *set = NULL;
759         struct list_head *pos;
760         struct lov_obd *lov;
761         int rc = 0;
762         ENTRY;
763
764         LASSERT(ea != NULL);
765         if (exp == NULL)
766                 RETURN(-EINVAL);
767
768         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
769             src_oa->o_flags == OBD_FL_DELORPHAN) {
770                 rc = lov_clear_orphans(exp, src_oa, ea, oti);
771                 RETURN(rc);
772         }
773
774         lov = &exp->exp_obd->u.lov;
775         if (!lov->desc.ld_active_tgt_count)
776                 RETURN(-EIO);
777
778         /* Recreate a specific object id at the given OST index */
779         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
780             (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
781                  rc = lov_recreate(exp, src_oa, ea, oti);
782                  RETURN(rc);
783         }
784
785         rc = lov_prep_create_set(exp, ea, src_oa, oti, &set);
786         if (rc)
787                 RETURN(rc);
788
789         list_for_each (pos, &set->set_list) {
790                 struct lov_request *req = 
791                         list_entry(pos, struct lov_request, rq_link);
792
793                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
794                 rc = obd_create(lov->tgts[req->rq_idx].ltd_exp, 
795                                 req->rq_oa, &req->rq_md, oti);
796                 lov_update_create_set(set, req, rc);
797         }
798         rc = lov_fini_create_set(set, ea);
799         RETURN(rc);
800 }
801
802 #define lsm_bad_magic(LSMP)                                     \
803 ({                                                              \
804         struct lov_stripe_md *_lsm__ = (LSMP);                  \
805         int _ret__ = 0;                                         \
806         if (!_lsm__) {                                          \
807                 CERROR("LOV requires striping ea\n");           \
808                 _ret__ = 1;                                     \
809         } else if (_lsm__->lsm_magic != LOV_MAGIC) {            \
810                 CERROR("LOV striping magic bad %#x != %#x\n",   \
811                        _lsm__->lsm_magic, LOV_MAGIC);           \
812                 _ret__ = 1;                                     \
813         }                                                       \
814         _ret__;                                                 \
815 })
816
817 static int lov_destroy(struct obd_export *exp, struct obdo *oa,
818                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
819 {
820         struct lov_request_set *set;
821         struct lov_request *req;
822         struct list_head *pos;
823         struct lov_obd *lov;
824         int rc = 0;
825         ENTRY;
826
827         if (lsm_bad_magic(lsm))
828                 RETURN(-EINVAL);
829
830         if (!exp || !exp->exp_obd)
831                 RETURN(-ENODEV);
832
833         lov = &exp->exp_obd->u.lov;
834         rc = lov_prep_destroy_set(exp, oa, lsm, oti, &set);
835         if (rc)
836                 RETURN(rc);
837
838         list_for_each (pos, &set->set_list) {
839                 int err;
840                 req = list_entry(pos, struct lov_request, rq_link);
841
842                 /* XXX update the cookie position */
843                 oti->oti_logcookies = set->set_cookies + req->rq_stripe;
844                 rc = obd_destroy(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
845                                  NULL, oti);
846                 err = lov_update_common_set(set, req, rc);
847                 if (rc) {
848                         CERROR("error: destroying objid "LPX64" subobj "
849                                LPX64" on OST idx %d: rc = %d\n", 
850                                set->set_oa->o_id, req->rq_oa->o_id, 
851                                req->rq_idx, rc);
852                         if (!rc)
853                                 rc = err;
854                 }
855         }
856         lov_fini_destroy_set(set);
857         RETURN(rc);
858 }
859
860 static int lov_getattr(struct obd_export *exp, struct obdo *oa,
861                        struct lov_stripe_md *lsm)
862 {
863         struct lov_request_set *set;
864         struct lov_request *req;
865         struct list_head *pos;
866         struct lov_obd *lov;
867         int err = 0, rc = 0;
868         ENTRY;
869
870         if (lsm_bad_magic(lsm))
871                 RETURN(-EINVAL);
872
873         if (!exp || !exp->exp_obd)
874                 RETURN(-ENODEV);
875
876         lov = &exp->exp_obd->u.lov;
877         
878         rc = lov_prep_getattr_set(exp, oa, lsm, &set);
879         if (rc)
880                 RETURN(rc);
881
882         list_for_each (pos, &set->set_list) {
883                 req = list_entry(pos, struct lov_request, rq_link);
884                 
885                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
886                        "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id, 
887                        req->rq_idx);
888
889                 rc = obd_getattr(lov->tgts[req->rq_idx].ltd_exp, 
890                                  req->rq_oa, NULL);
891                 err = lov_update_common_set(set, req, rc);
892                 if (err) {
893                         CERROR("error: getattr objid "LPX64" subobj "
894                                LPX64" on OST idx %d: rc = %d\n",
895                                set->set_oa->o_id, req->rq_oa->o_id, 
896                                req->rq_idx, err);
897                         break;
898                 }
899         }
900         
901         rc = lov_fini_getattr_set(set);
902         if (err)
903                 rc = err;
904         RETURN(rc);
905 }
906
907 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data,
908                                  int rc)
909 {
910         struct lov_request_set *lovset = (struct lov_request_set *)data;
911         ENTRY;
912
913         /* don't do attribute merge if this aysnc op failed */
914         if (rc) {
915                 lovset->set_completes = 0;
916                 lov_fini_getattr_set(lovset);
917         } else {
918                 rc = lov_fini_getattr_set(lovset);
919         }
920         RETURN (rc);
921 }
922
923 static int lov_getattr_async(struct obd_export *exp, struct obdo *oa,
924                               struct lov_stripe_md *lsm,
925                               struct ptlrpc_request_set *rqset)
926 {
927         struct lov_request_set *lovset;
928         struct lov_obd *lov;
929         struct list_head *pos;
930         struct lov_request *req;
931         int rc = 0;
932         ENTRY;
933
934         if (lsm_bad_magic(lsm))
935                 RETURN(-EINVAL);
936
937         if (!exp || !exp->exp_obd)
938                 RETURN(-ENODEV);
939
940         lov = &exp->exp_obd->u.lov;
941
942         rc = lov_prep_getattr_set(exp, oa, lsm, &lovset);
943         if (rc)
944                 RETURN(rc);
945
946         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
947                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
948
949         list_for_each (pos, &lovset->set_list) {
950                 req = list_entry(pos, struct lov_request, rq_link);
951                 
952                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
953                        "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id, 
954                        req->rq_idx);
955                 rc = obd_getattr_async(lov->tgts[req->rq_idx].ltd_exp,
956                                        req->rq_oa, NULL, rqset);
957                 if (rc) {
958                         CERROR("error: getattr objid "LPX64" subobj "
959                                LPX64" on OST idx %d: rc = %d\n",
960                                lovset->set_oa->o_id, req->rq_oa->o_id, 
961                                req->rq_idx, rc);
962                         GOTO(out, rc);
963                 }
964                 lov_update_common_set(lovset, req, rc);
965         }
966         
967         LASSERT(rc == 0);
968         LASSERT (rqset->set_interpret == NULL);
969         rqset->set_interpret = lov_getattr_interpret;
970         rqset->set_arg = (void *)lovset;
971         RETURN(rc);
972 out:
973         LASSERT(rc);
974         lov_fini_getattr_set(lovset);
975         RETURN(rc);
976 }
977
978 static int lov_setattr(struct obd_export *exp, struct obdo *src_oa,
979                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
980 {
981         struct lov_request_set *set;
982         struct lov_obd *lov;
983         struct list_head *pos;
984         struct lov_request *req;
985         int err = 0, rc = 0;
986         ENTRY;
987
988         if (lsm_bad_magic(lsm))
989                 RETURN(-EINVAL);
990
991         if (!exp || !exp->exp_obd)
992                 RETURN(-ENODEV);
993
994         /* for now, we only expect time updates here */
995         LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE | OBD_MD_FLMODE|
996                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
997                                       OBD_MD_FLCTIME | OBD_MD_FLFLAGS |
998                                       OBD_MD_FLSIZE | OBD_MD_FLGROUP)));
999
1000         LASSERT(!(src_oa->o_valid & OBD_MD_FLGROUP) || src_oa->o_gr > 0);
1001
1002         lov = &exp->exp_obd->u.lov;
1003         rc = lov_prep_setattr_set(exp, src_oa, lsm, NULL, &set);
1004         if (rc)
1005                 RETURN(rc);
1006
1007         list_for_each (pos, &set->set_list) {
1008                 req = list_entry(pos, struct lov_request, rq_link);
1009                 
1010                 rc = obd_setattr(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
1011                                  NULL, NULL);
1012                 err = lov_update_common_set(set, req, rc);
1013                 if (err) {
1014                         CERROR("error: setattr objid "LPX64" subobj "
1015                                LPX64" on OST idx %d: rc = %d\n",
1016                                set->set_oa->o_id, req->rq_oa->o_id,
1017                                req->rq_idx, err);
1018                         if (!rc)
1019                                 rc = err;
1020                 }
1021         }
1022         err = lov_fini_setattr_set(set);
1023         if (!rc)
1024                 rc = err;
1025         RETURN(rc);
1026 }
1027
1028 static int lov_revalidate_policy(struct lov_obd *lov, struct lov_stripe_md *lsm)
1029 {
1030         static int next_idx = 0;
1031         struct lov_tgt_desc *tgt;
1032         int i, count;
1033
1034         /* XXX - we should do something clever and take lsm
1035          * into account but just do round robin for now. */
1036
1037         /* last_idx must always be less that count because
1038          * ld_tgt_count currently cannot shrink. */
1039         count = lov->desc.ld_tgt_count;
1040
1041         for (i = next_idx, tgt = lov->tgts + i; i < count; i++, tgt++) {
1042                 if (tgt->active) {
1043                         next_idx = (i + 1) % count;
1044                         RETURN(i);
1045                 }
1046         }
1047
1048         for (i = 0, tgt = lov->tgts; i < next_idx; i++, tgt++) {
1049                 if (tgt->active) {
1050                         next_idx = (i + 1) % count;
1051                         RETURN(i);
1052                 }
1053         }
1054
1055         RETURN(-EIO);
1056 }
1057
1058 static int lov_revalidate_md(struct obd_export *exp, struct obdo *src_oa,
1059                              struct lov_stripe_md *ea,
1060                              struct obd_trans_info *oti)
1061 {
1062         struct obd_export *osc_exp;
1063         struct lov_obd *lov = &exp->exp_obd->u.lov;
1064         struct lov_stripe_md *lsm = ea;
1065         struct lov_stripe_md obj_md;
1066         struct lov_stripe_md *obj_mdp = &obj_md;
1067         struct lov_oinfo *loi;
1068         struct obdo *tmp_oa;
1069         int ost_idx, updates = 0, i;
1070         ENTRY;
1071
1072         tmp_oa = obdo_alloc();
1073         if (tmp_oa == NULL)
1074                 RETURN(-ENOMEM);
1075
1076         loi = lsm->lsm_oinfo;
1077         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1078                 int rc;
1079                 if (!obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid))
1080                         continue;
1081
1082                 ost_idx = lov_revalidate_policy(lov, lsm);
1083                 if (ost_idx < 0) {
1084                         /* FIXME: punt for now. */
1085                         CERROR("lov_revalidate_policy failed; no active "
1086                                "OSCs?\n");
1087                         continue;
1088                 }
1089
1090                 /* create a new object */
1091                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
1092                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
1093                 osc_exp = lov->tgts[ost_idx].ltd_exp;
1094                 rc = obd_create(osc_exp, tmp_oa, &obj_mdp, oti);
1095                 if (rc) {
1096                         CERROR("error creating new subobj at idx %d; "
1097                                "rc = %d\n", ost_idx, rc);
1098                         continue;
1099                 }
1100                 if (oti->oti_objid)
1101                         oti->oti_objid[ost_idx] = tmp_oa->o_id;
1102                 loi->loi_id = tmp_oa->o_id;
1103                 loi->loi_gr = tmp_oa->o_gr;
1104                 loi->loi_ost_idx = ost_idx;
1105                 loi->loi_ost_gen = lov->tgts[ost_idx].ltd_gen;
1106                 CDEBUG(D_INODE, "replacing objid "LPX64" subobj "LPX64
1107                        " with idx %d gen %d.\n", lsm->lsm_object_id,
1108                        loi->loi_id, ost_idx, loi->loi_ost_gen);
1109                 updates = 1;
1110         }
1111
1112         /* If we got an error revalidating an entry there's no need to
1113          * cleanup up objects we allocated here because the bad entry
1114          * still points to a deleted OST. */
1115
1116         obdo_free(tmp_oa);
1117         RETURN(updates);
1118 }
1119
1120 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1121  * we can send this 'punch' to just the authoritative node and the nodes
1122  * that the punch will affect. */
1123 static int lov_punch(struct obd_export *exp, struct obdo *oa,
1124                      struct lov_stripe_md *lsm,
1125                      obd_off start, obd_off end, struct obd_trans_info *oti)
1126 {
1127         struct lov_request_set *set;
1128         struct lov_obd *lov;
1129         struct list_head *pos;
1130         struct lov_request *req;
1131         int err = 0, rc = 0;
1132         ENTRY;
1133
1134         if (lsm_bad_magic(lsm))
1135                 RETURN(-EINVAL);
1136
1137         if (!exp || !exp->exp_obd)
1138                 RETURN(-ENODEV);
1139
1140         lov = &exp->exp_obd->u.lov;
1141         rc = lov_prep_punch_set(exp, oa, lsm, start, end, oti, &set);
1142         if (rc)
1143                 RETURN(rc);
1144
1145         list_for_each (pos, &set->set_list) {
1146                 req = list_entry(pos, struct lov_request, rq_link);
1147
1148                 rc = obd_punch(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, 
1149                                NULL, req->rq_extent.start, 
1150                                req->rq_extent.end, NULL);
1151                 err = lov_update_punch_set(set, req, rc);
1152                 if (err) {
1153                         CERROR("error: punch objid "LPX64" subobj "LPX64
1154                                " on OST idx %d: rc = %d\n", set->set_oa->o_id,
1155                                req->rq_oa->o_id, req->rq_idx, rc);
1156                         if (!rc)
1157                                 rc = err;
1158                 }
1159         }
1160         err = lov_fini_punch_set(set);
1161         if (!rc)
1162                 rc = err;
1163         RETURN(rc);
1164 }
1165
1166 static int lov_sync(struct obd_export *exp, struct obdo *oa,
1167                     struct lov_stripe_md *lsm, obd_off start, obd_off end)
1168 {
1169         struct lov_request_set *set;
1170         struct lov_obd *lov;
1171         struct list_head *pos;
1172         struct lov_request *req;
1173         int err = 0, rc = 0;
1174         ENTRY;
1175
1176         if (lsm_bad_magic(lsm))
1177                 RETURN(-EINVAL);
1178
1179         if (!exp->exp_obd)
1180                 RETURN(-ENODEV);
1181
1182         lov = &exp->exp_obd->u.lov;
1183         rc = lov_prep_sync_set(exp, oa, lsm, start, end, &set);
1184         if (rc)
1185                 RETURN(rc);
1186
1187         list_for_each (pos, &set->set_list) {
1188                 req = list_entry(pos, struct lov_request, rq_link);
1189
1190                 rc = obd_sync(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, 
1191                               NULL, req->rq_extent.start, req->rq_extent.end);
1192                 err = lov_update_common_set(set, req, rc);
1193                 if (err) {
1194                         CERROR("error: fsync objid "LPX64" subobj "LPX64
1195                                " on OST idx %d: rc = %d\n", set->set_oa->o_id,
1196                                req->rq_oa->o_id, req->rq_idx, rc);
1197                         if (!rc)
1198                                 rc = err;
1199                 }
1200         }
1201         err = lov_fini_sync_set(set);
1202         if (!rc)
1203                 rc = err;
1204         RETURN(rc);
1205 }
1206
1207 static int lov_brw_check(struct lov_obd *lov, struct obdo *oa,
1208                          struct lov_stripe_md *lsm,
1209                          obd_count oa_bufs, struct brw_page *pga)
1210 {
1211         int i, rc = 0;
1212         ENTRY;
1213
1214         /* The caller just wants to know if there's a chance that this
1215          * I/O can succeed */
1216         for (i = 0; i < oa_bufs; i++) {
1217                 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
1218                 int ost = lsm->lsm_oinfo[stripe].loi_ost_idx;
1219                 obd_off start, end;
1220
1221                 if (!lov_stripe_intersects(lsm, i, pga[i].disk_offset,
1222                                            pga[i].disk_offset + pga[i].count,
1223                                            &start, &end))
1224                         continue;
1225
1226                 if (lov->tgts[ost].active == 0) {
1227                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1228                         RETURN(-EIO);
1229                 }
1230                 rc = obd_brw(OBD_BRW_CHECK, lov->tgts[ost].ltd_exp, oa,
1231                              NULL, 1, &pga[i], NULL);
1232                 if (rc)
1233                         break;
1234         }
1235         RETURN(rc);
1236 }
1237
1238 static int lov_brw(int cmd, struct obd_export *exp, struct obdo *src_oa,
1239                    struct lov_stripe_md *lsm, obd_count oa_bufs,
1240                    struct brw_page *pga, struct obd_trans_info *oti)
1241 {
1242         struct lov_request_set *set;
1243         struct lov_request *req;
1244         struct list_head *pos;
1245         struct lov_obd *lov = &exp->exp_obd->u.lov;
1246         int err, rc = 0;
1247         ENTRY;
1248
1249         if (lsm_bad_magic(lsm))
1250                 RETURN(-EINVAL);
1251
1252         if (cmd == OBD_BRW_CHECK) {
1253                 rc = lov_brw_check(lov, src_oa, lsm, oa_bufs, pga);
1254                 RETURN(rc);
1255         }
1256
1257         rc = lov_prep_brw_set(exp, src_oa, lsm, oa_bufs, pga, oti, &set);
1258         if (rc)
1259                 RETURN(rc);
1260
1261         list_for_each (pos, &set->set_list) {
1262                 struct obd_export *sub_exp;
1263                 struct brw_page *sub_pga;
1264                 req = list_entry(pos, struct lov_request, rq_link);
1265                 
1266                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
1267                 sub_pga = set->set_pga + req->rq_pgaidx;
1268                 rc = obd_brw(cmd, sub_exp, req->rq_oa, req->rq_md, 
1269                              req->rq_oabufs, sub_pga, oti);
1270                 if (rc)
1271                         break;
1272                 lov_update_common_set(set, req, rc);
1273         }
1274
1275         err = lov_fini_brw_set(set);
1276         if (!rc)
1277                 rc = err;
1278         RETURN(rc);
1279 }
1280
1281 static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data,
1282                              int rc)
1283 {
1284         struct lov_request_set *lovset = (struct lov_request_set *)data;
1285         ENTRY;
1286         
1287         if (rc) {
1288                 lovset->set_completes = 0;
1289                 lov_fini_brw_set(lovset);
1290         } else {
1291                 rc = lov_fini_brw_set(lovset);
1292         }
1293                 
1294         RETURN(rc);
1295 }
1296
1297 static int lov_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
1298                          struct lov_stripe_md *lsm, obd_count oa_bufs,
1299                          struct brw_page *pga, struct ptlrpc_request_set *set,
1300                          struct obd_trans_info *oti)
1301 {
1302         struct lov_request_set *lovset;
1303         struct lov_request *req;
1304         struct list_head *pos;
1305         struct lov_obd *lov = &exp->exp_obd->u.lov;
1306         int rc = 0;
1307         ENTRY;
1308
1309         if (lsm_bad_magic(lsm))
1310                 RETURN(-EINVAL);
1311
1312         if (cmd == OBD_BRW_CHECK) {
1313                 rc = lov_brw_check(lov, oa, lsm, oa_bufs, pga);
1314                 RETURN(rc);
1315         }
1316
1317         rc = lov_prep_brw_set(exp, oa, lsm, oa_bufs, pga, oti, &lovset);
1318         if (rc)
1319                 RETURN(rc);
1320
1321         list_for_each (pos, &lovset->set_list) {
1322                 struct obd_export *sub_exp;
1323                 struct brw_page *sub_pga;
1324                 req = list_entry(pos, struct lov_request, rq_link);
1325                 
1326                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
1327                 sub_pga = lovset->set_pga + req->rq_pgaidx;
1328                 rc = obd_brw_async(cmd, sub_exp, req->rq_oa, req->rq_md,
1329                                    req->rq_oabufs, sub_pga, set, oti);
1330                 if (rc)
1331                         GOTO(out, rc);
1332                 lov_update_common_set(lovset, req, rc);
1333         }
1334         LASSERT(rc == 0);
1335         LASSERT(set->set_interpret == NULL);
1336         set->set_interpret = (set_interpreter_func)lov_brw_interpret;
1337         set->set_arg = (void *)lovset;
1338         
1339         RETURN(rc);
1340 out:
1341         lov_fini_brw_set(lovset);
1342         RETURN(rc);
1343 }
1344
1345 static int lov_ap_make_ready(void *data, int cmd)
1346 {
1347         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1348
1349         return lap->lap_caller_ops->ap_make_ready(lap->lap_caller_data, cmd);
1350 }
1351 static int lov_ap_refresh_count(void *data, int cmd)
1352 {
1353         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1354
1355         return lap->lap_caller_ops->ap_refresh_count(lap->lap_caller_data,
1356                                                      cmd);
1357 }
1358 static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
1359 {
1360         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1361
1362         lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa);
1363         /* XXX woah, shouldn't we be altering more here?  size? */
1364         oa->o_id = lap->lap_loi_id;
1365 }
1366
1367 static void lov_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
1368 {
1369         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1370
1371         /* in a raid1 regime this would down a count of many ios
1372          * in flight, onl calling the caller_ops completion when all
1373          * the raid1 ios are complete */
1374         lap->lap_caller_ops->ap_completion(lap->lap_caller_data, cmd, oa, rc);
1375 }
1376
1377 static struct obd_async_page_ops lov_async_page_ops = {
1378         .ap_make_ready =        lov_ap_make_ready,
1379         .ap_refresh_count =     lov_ap_refresh_count,
1380         .ap_fill_obdo =         lov_ap_fill_obdo,
1381         .ap_completion =        lov_ap_completion,
1382 };
1383
1384 static int lov_prep_async_page(struct obd_export *exp,
1385                                struct lov_stripe_md *lsm,
1386                                struct lov_oinfo *loi, struct page *page,
1387                                obd_off offset, struct obd_async_page_ops *ops,
1388                                void *data, void **res)
1389 {
1390         struct lov_obd *lov = &exp->exp_obd->u.lov;
1391         struct lov_async_page *lap;
1392         int rc, stripe;
1393         ENTRY;
1394
1395         if (lsm_bad_magic(lsm))
1396                 RETURN(-EINVAL);
1397         LASSERT(loi == NULL);
1398
1399         stripe = lov_stripe_number(lsm, offset);
1400         loi = &lsm->lsm_oinfo[stripe];
1401
1402         if (obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid))
1403                 RETURN(-EIO);
1404         if (lov->tgts[loi->loi_ost_idx].active == 0)
1405                 RETURN(-EIO);
1406         if (lov->tgts[loi->loi_ost_idx].ltd_exp == NULL) {
1407                 CERROR("ltd_exp == NULL, but OST idx %d doesn't appear to be "
1408                        "deleted or inactive.\n", loi->loi_ost_idx);
1409                 RETURN(-EIO);
1410         }
1411
1412         OBD_ALLOC(lap, sizeof(*lap));
1413         if (lap == NULL)
1414                 RETURN(-ENOMEM);
1415
1416         lap->lap_magic = LAP_MAGIC;
1417         lap->lap_caller_ops = ops;
1418         lap->lap_caller_data = data;
1419
1420         /* FIXME handle multiple oscs after landing b_raid1 */
1421         lap->lap_stripe = stripe;
1422         switch (lsm->lsm_pattern) {
1423                 case LOV_PATTERN_RAID0:
1424                         lov_stripe_offset(lsm, offset, lap->lap_stripe, 
1425                                           &lap->lap_sub_offset);
1426                         break;
1427                 case LOV_PATTERN_CMOBD:
1428                         lap->lap_sub_offset = offset;
1429                         break;
1430                 default:
1431                         LBUG();
1432         }
1433
1434         /* so the callback doesn't need the lsm */
1435         lap->lap_loi_id = loi->loi_id;
1436
1437         rc = obd_prep_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1438                                  lsm, loi, page, lap->lap_sub_offset,
1439                                  &lov_async_page_ops, lap,
1440                                  &lap->lap_sub_cookie);
1441         if (rc) {
1442                 OBD_FREE(lap, sizeof(*lap));
1443                 RETURN(rc);
1444         }
1445         CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
1446                lap->lap_sub_cookie, offset);
1447         *res = lap;
1448         RETURN(0);
1449 }
1450
1451 static int lov_queue_async_io(struct obd_export *exp,
1452                               struct lov_stripe_md *lsm,
1453                               struct lov_oinfo *loi, void *cookie,
1454                               int cmd, obd_off off, int count,
1455                               obd_flags brw_flags, obd_flags async_flags)
1456 {
1457         struct lov_obd *lov = &exp->exp_obd->u.lov;
1458         struct lov_async_page *lap;
1459         int rc;
1460
1461         LASSERT(loi == NULL);
1462
1463         if (lsm_bad_magic(lsm))
1464                 RETURN(-EINVAL);
1465
1466         lap = LAP_FROM_COOKIE(cookie);
1467
1468         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1469
1470         rc = obd_queue_async_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm,
1471                                 loi, lap->lap_sub_cookie, cmd, off, count,
1472                                 brw_flags, async_flags);
1473         RETURN(rc);
1474 }
1475
1476 static int lov_set_async_flags(struct obd_export *exp,
1477                                struct lov_stripe_md *lsm,
1478                                struct lov_oinfo *loi, void *cookie,
1479                                obd_flags async_flags)
1480 {
1481         struct lov_obd *lov = &exp->exp_obd->u.lov;
1482         struct lov_async_page *lap;
1483         int rc;
1484
1485         LASSERT(loi == NULL);
1486
1487         if (lsm_bad_magic(lsm))
1488                 RETURN(-EINVAL);
1489
1490         lap = LAP_FROM_COOKIE(cookie);
1491
1492         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1493
1494         rc = obd_set_async_flags(lov->tgts[loi->loi_ost_idx].ltd_exp,
1495                                  lsm, loi, lap->lap_sub_cookie, async_flags);
1496         RETURN(rc);
1497 }
1498
1499 static int lov_queue_group_io(struct obd_export *exp,
1500                               struct lov_stripe_md *lsm,
1501                               struct lov_oinfo *loi,
1502                               struct obd_io_group *oig, void *cookie,
1503                               int cmd, obd_off off, int count,
1504                               obd_flags brw_flags, obd_flags async_flags)
1505 {
1506         struct lov_obd *lov = &exp->exp_obd->u.lov;
1507         struct lov_async_page *lap;
1508         int rc;
1509
1510         LASSERT(loi == NULL);
1511
1512         if (lsm_bad_magic(lsm))
1513                 RETURN(-EINVAL);
1514
1515         lap = LAP_FROM_COOKIE(cookie);
1516
1517         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1518
1519         rc = obd_queue_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi,
1520                                 oig, lap->lap_sub_cookie, cmd, off, count,
1521                                 brw_flags, async_flags);
1522         RETURN(rc);
1523 }
1524
1525 /* this isn't exactly optimal.  we may have queued sync io in oscs on
1526  * all stripes, but we don't record that fact at queue time.  so we
1527  * trigger sync io on all stripes. */
1528 static int lov_trigger_group_io(struct obd_export *exp,
1529                                 struct lov_stripe_md *lsm,
1530                                 struct lov_oinfo *loi,
1531                                 struct obd_io_group *oig)
1532 {
1533         struct lov_obd *lov = &exp->exp_obd->u.lov;
1534         int rc = 0, i, err;
1535
1536         LASSERT(loi == NULL);
1537
1538         if (lsm_bad_magic(lsm))
1539                 RETURN(-EINVAL);
1540
1541         loi = lsm->lsm_oinfo;
1542         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1543                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1544                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1545                         continue;
1546                 }
1547
1548                 err = obd_trigger_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp,
1549                                            lsm, loi, oig);
1550                 if (rc == 0 && err != 0)
1551                         rc = err;
1552         };
1553         RETURN(rc);
1554 }
1555
1556 static int lov_teardown_async_page(struct obd_export *exp,
1557                                    struct lov_stripe_md *lsm,
1558                                    struct lov_oinfo *loi, void *cookie)
1559 {
1560         struct lov_obd *lov = &exp->exp_obd->u.lov;
1561         struct lov_async_page *lap;
1562         int rc;
1563
1564         LASSERT(loi == NULL);
1565
1566         if (lsm_bad_magic(lsm))
1567                 RETURN(-EINVAL);
1568
1569         lap = LAP_FROM_COOKIE(cookie);
1570
1571         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1572
1573         rc = obd_teardown_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1574                                      lsm, loi, lap->lap_sub_cookie);
1575         if (rc) {
1576                 CERROR("unable to teardown sub cookie %p: %d\n",
1577                        lap->lap_sub_cookie, rc);
1578                 RETURN(rc);
1579         }
1580         OBD_FREE(lap, sizeof(*lap));
1581         RETURN(rc);
1582 }
1583
1584 static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
1585                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
1586                        int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
1587                        void *data,__u32 lvb_len, void *lvb_swabber,
1588                        struct lustre_handle *lockh)
1589 {
1590         struct lov_request_set *set;
1591         struct lov_request *req;
1592         struct list_head *pos;
1593         struct lustre_handle *lov_lockhp;
1594         struct lov_obd *lov;
1595         ldlm_error_t rc;
1596         int save_flags = *flags;
1597         ENTRY;
1598
1599         if (lsm_bad_magic(lsm))
1600                 RETURN(-EINVAL);
1601
1602         /* we should never be asked to replay a lock this way. */
1603         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1604
1605         if (!exp || !exp->exp_obd)
1606                 RETURN(-ENODEV);
1607
1608         lov = &exp->exp_obd->u.lov;
1609         rc = lov_prep_enqueue_set(exp, lsm, policy, mode, lockh, &set);
1610         if (rc)
1611                 RETURN(rc);
1612
1613         list_for_each (pos, &set->set_list) {
1614                 ldlm_policy_data_t sub_policy;
1615                 req = list_entry(pos, struct lov_request, rq_link);
1616                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1617                 LASSERT(lov_lockhp);
1618
1619                 *flags = save_flags;
1620                 sub_policy.l_extent.start = req->rq_extent.start;
1621                 sub_policy.l_extent.end = req->rq_extent.end;
1622
1623                 rc = obd_enqueue(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1624                                  type, &sub_policy, mode, flags, bl_cb,
1625                                  cp_cb, gl_cb, data, lvb_len, lvb_swabber,
1626                                  lov_lockhp);
1627                 rc = lov_update_enqueue_set(set, req, rc, save_flags);
1628                 if (rc != ELDLM_OK)
1629                         break;
1630         }
1631
1632         lov_fini_enqueue_set(set, mode);
1633         RETURN(rc);
1634 }
1635
1636 static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
1637                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
1638                      int *flags, void *data, struct lustre_handle *lockh)
1639 {
1640         struct lov_request_set *set;
1641         struct lov_request *req;
1642         struct list_head *pos;
1643         struct lov_obd *lov = &exp->exp_obd->u.lov;
1644         struct lustre_handle *lov_lockhp;
1645         int lov_flags, rc = 0;
1646         ENTRY;
1647
1648         if (lsm_bad_magic(lsm))
1649                 RETURN(-EINVAL);
1650
1651         if (!exp || !exp->exp_obd)
1652                 RETURN(-ENODEV);
1653
1654         lov = &exp->exp_obd->u.lov;
1655         rc = lov_prep_match_set(exp, lsm, policy, mode, lockh, &set);
1656         if (rc)
1657                 RETURN(rc);
1658
1659         list_for_each (pos, &set->set_list) {
1660                 ldlm_policy_data_t sub_policy;
1661                 req = list_entry(pos, struct lov_request, rq_link);
1662                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1663                 LASSERT(lov_lockhp);
1664
1665                 sub_policy.l_extent.start = req->rq_extent.start;
1666                 sub_policy.l_extent.end = req->rq_extent.end;
1667                 lov_flags = *flags;
1668
1669                 rc = obd_match(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1670                                type, &sub_policy, mode, &lov_flags, data,
1671                                lov_lockhp);
1672                 rc = lov_update_match_set(set, req, rc);
1673                 if (rc != 1)
1674                         break;
1675         }
1676         lov_fini_match_set(set, mode, *flags);
1677         RETURN(rc);
1678 }
1679
1680 static int lov_change_cbdata(struct obd_export *exp,
1681                              struct lov_stripe_md *lsm, ldlm_iterator_t it,
1682                              void *data)
1683 {
1684         struct lov_obd *lov;
1685         struct lov_oinfo *loi;
1686         int rc = 0, i;
1687         ENTRY;
1688
1689         if (lsm_bad_magic(lsm))
1690                 RETURN(-EINVAL);
1691
1692         if (!exp || !exp->exp_obd)
1693                 RETURN(-ENODEV);
1694
1695         LASSERT(lsm->lsm_object_gr > 0);
1696
1697         lov = &exp->exp_obd->u.lov;
1698         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1699                 struct lov_stripe_md submd;
1700                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1701                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1702                         continue;
1703                 }
1704
1705                 submd.lsm_object_id = loi->loi_id;
1706                 submd.lsm_object_gr = lsm->lsm_object_gr;
1707                 submd.lsm_stripe_count = 0;
1708                 rc = obd_change_cbdata(lov->tgts[loi->loi_ost_idx].ltd_exp,
1709                                        &submd, it, data);
1710         }
1711         RETURN(rc);
1712 }
1713
1714 static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
1715                       __u32 mode, struct lustre_handle *lockh)
1716 {
1717         struct lov_request_set *set;
1718         struct lov_request *req;
1719         struct list_head *pos;
1720         struct lov_obd *lov = &exp->exp_obd->u.lov;
1721         struct lustre_handle *lov_lockhp;
1722         int err = 0, rc = 0;
1723         ENTRY;
1724
1725         if (lsm_bad_magic(lsm))
1726                 RETURN(-EINVAL);
1727
1728         if (!exp || !exp->exp_obd)
1729                 RETURN(-ENODEV);
1730
1731         LASSERT(lsm->lsm_object_gr > 0);
1732
1733         LASSERT(lockh);
1734         lov = &exp->exp_obd->u.lov;
1735         rc = lov_prep_cancel_set(exp, lsm, mode, lockh, &set);
1736         if (rc)
1737                 RETURN(rc);
1738
1739         list_for_each (pos, &set->set_list) {
1740                 req = list_entry(pos, struct lov_request, rq_link);
1741                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1742
1743                 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1744                                 mode, lov_lockhp);
1745                 rc = lov_update_common_set(set, req, rc);
1746                 if (rc) {
1747                         CERROR("error: cancel objid "LPX64" subobj "
1748                                LPX64" on OST idx %d: rc = %d\n",
1749                                lsm->lsm_object_id,
1750                                req->rq_md->lsm_object_id, req->rq_idx, rc);
1751                         err = rc;
1752                 }
1753  
1754         }
1755         lov_fini_cancel_set(set);
1756         RETURN(err);
1757 }
1758
1759 static int lov_cancel_unused(struct obd_export *exp,
1760                              struct lov_stripe_md *lsm, 
1761                              int flags, void *opaque)
1762 {
1763         struct lov_obd *lov;
1764         struct lov_oinfo *loi;
1765         int rc = 0, i;
1766         ENTRY;
1767
1768         lov = &exp->exp_obd->u.lov;
1769         if (lsm == NULL) {
1770                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1771                         int err = obd_cancel_unused(lov->tgts[i].ltd_exp, NULL,
1772                                                     flags, opaque);
1773                         if (!rc)
1774                                 rc = err;
1775                 }
1776                 RETURN(rc);
1777         }
1778
1779         if (lsm_bad_magic(lsm))
1780                 RETURN(-EINVAL);
1781
1782         if (!exp || !exp->exp_obd)
1783                 RETURN(-ENODEV);
1784
1785         LASSERT(lsm->lsm_object_gr > 0);
1786
1787         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1788                 struct lov_stripe_md submd;
1789                 int err;
1790
1791                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1792                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1793
1794                 submd.lsm_object_id = loi->loi_id;
1795                 submd.lsm_object_gr = lsm->lsm_object_gr;
1796                 submd.lsm_stripe_count = 0;
1797                 err = obd_cancel_unused(lov->tgts[loi->loi_ost_idx].ltd_exp,
1798                                         &submd, flags, opaque);
1799                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1800                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
1801                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1802                                loi->loi_id, loi->loi_ost_idx, err);
1803                         if (!rc)
1804                                 rc = err;
1805                 }
1806         }
1807         RETURN(rc);
1808 }
1809
1810 #define LOV_U64_MAX ((__u64)~0ULL)
1811 #define LOV_SUM_MAX(tot, add)                                           \
1812         do {                                                            \
1813                 if ((tot) + (add) < (tot))                              \
1814                         (tot) = LOV_U64_MAX;                            \
1815                 else                                                    \
1816                         (tot) += (add);                                 \
1817         } while(0)
1818
1819 static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1820                       unsigned long max_age)
1821 {
1822         struct lov_obd *lov = &obd->u.lov;
1823         struct obd_statfs lov_sfs;
1824         int set = 0;
1825         int rc = 0;
1826         int i;
1827         ENTRY;
1828
1829
1830         /* We only get block data from the OBD */
1831         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1832                 int err;
1833                 if (!lov->tgts[i].active) {
1834                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1835                         continue;
1836                 }
1837
1838                 err = obd_statfs(class_exp2obd(lov->tgts[i].ltd_exp), &lov_sfs,
1839                                  max_age);
1840                 if (err) {
1841                         if (lov->tgts[i].active && !rc)
1842                                 rc = err;
1843                         continue;
1844                 }
1845
1846                 if (!set) {
1847                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
1848                         set = 1;
1849                 } else {
1850                         osfs->os_bfree += lov_sfs.os_bfree;
1851                         osfs->os_bavail += lov_sfs.os_bavail;
1852                         osfs->os_blocks += lov_sfs.os_blocks;
1853                         /* XXX not sure about this one - depends on policy.
1854                          *   - could be minimum if we always stripe on all OBDs
1855                          *     (but that would be wrong for any other policy,
1856                          *     if one of the OBDs has no more objects left)
1857                          *   - could be sum if we stripe whole objects
1858                          *   - could be average, just to give a nice number
1859                          *
1860                          * To give a "reasonable" (if not wholly accurate)
1861                          * number, we divide the total number of free objects
1862                          * by expected stripe count (watch out for overflow).
1863                          */
1864                         LOV_SUM_MAX(osfs->os_files, lov_sfs.os_files);
1865                         LOV_SUM_MAX(osfs->os_ffree, lov_sfs.os_ffree);
1866                 }
1867         }
1868
1869         if (set) {
1870                 __u32 expected_stripes = lov->desc.ld_default_stripe_count ?
1871                                          lov->desc.ld_default_stripe_count :
1872                                          lov->desc.ld_active_tgt_count;
1873
1874                 if (osfs->os_files != LOV_U64_MAX)
1875                         do_div(osfs->os_files, expected_stripes);
1876                 if (osfs->os_ffree != LOV_U64_MAX)
1877                         do_div(osfs->os_ffree, expected_stripes);
1878         } else if (!rc)
1879                 rc = -EIO;
1880
1881         RETURN(rc);
1882 }
1883
1884 static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1885                          void *karg, void *uarg)
1886 {
1887         struct obd_device *obddev = class_exp2obd(exp);
1888         struct lov_obd *lov = &obddev->u.lov;
1889         int i, rc, count = lov->desc.ld_tgt_count;
1890         struct obd_uuid *uuidp;
1891         ENTRY;
1892
1893         switch (cmd) {
1894         case OBD_IOC_LOV_GET_CONFIG: {
1895                 struct obd_ioctl_data *data = karg;
1896                 struct lov_tgt_desc *tgtdesc;
1897                 struct lov_desc *desc;
1898                 char *buf = NULL;
1899                 __u32 *genp;
1900
1901                 buf = NULL;
1902                 len = 0;
1903                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1904                         RETURN(-EINVAL);
1905
1906                 data = (struct obd_ioctl_data *)buf;
1907
1908                 if (sizeof(*desc) > data->ioc_inllen1) {
1909                         obd_ioctl_freedata(buf, len);
1910                         RETURN(-EINVAL);
1911                 }
1912
1913                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
1914                         obd_ioctl_freedata(buf, len);
1915                         RETURN(-EINVAL);
1916                 }
1917
1918                 if (sizeof(__u32) * count > data->ioc_inllen3) {
1919                         obd_ioctl_freedata(buf, len);
1920                         RETURN(-EINVAL);
1921                 }
1922
1923                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1924                 memcpy(desc, &(lov->desc), sizeof(*desc));
1925
1926                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
1927                 genp = (__u32 *)data->ioc_inlbuf3;
1928                 tgtdesc = lov->tgts;
1929                 /* the uuid will be empty for deleted OSTs */
1930                 for (i = 0; i < count; i++, uuidp++, genp++, tgtdesc++) {
1931                         obd_str2uuid(uuidp, tgtdesc->uuid.uuid);
1932                         *genp = tgtdesc->ltd_gen;
1933                 }
1934
1935                 rc = copy_to_user((void *)uarg, buf, len);
1936                 if (rc)
1937                         rc = -EFAULT;
1938                 obd_ioctl_freedata(buf, len);
1939                 break;
1940         }
1941         case LL_IOC_LOV_SETSTRIPE:
1942                 rc = lov_setstripe(exp, karg, uarg);
1943                 break;
1944         case LL_IOC_LOV_GETSTRIPE:
1945                 rc = lov_getstripe(exp, karg, uarg);
1946                 break;
1947         case LL_IOC_LOV_SETEA:
1948                 rc = lov_setea(exp, karg, uarg);
1949                 break;
1950         default: {
1951                 int set = 0;
1952                 if (count == 0)
1953                         RETURN(-ENOTTY);
1954                 rc = 0;
1955                 for (i = 0; i < count; i++) {
1956                         int err;
1957
1958                         /* OST was deleted */
1959                         if (obd_uuid_empty(&lov->tgts[i].uuid))
1960                                 continue;
1961
1962                         err = obd_iocontrol(cmd, lov->tgts[i].ltd_exp,
1963                                             len, karg, uarg);
1964                         if (err) {
1965                                 if (lov->tgts[i].active) {
1966                                         CERROR("error: iocontrol OSC %s on OST "
1967                                                "idx %d cmd %x: err = %d\n",
1968                                                lov->tgts[i].uuid.uuid, i,
1969                                                cmd, err);
1970                                         if (!rc)
1971                                                 rc = err;
1972                                 }
1973                         } else
1974                                 set = 1;
1975                 }
1976                 if (!set && !rc)
1977                         rc = -EIO;
1978         }
1979         }
1980
1981         RETURN(rc);
1982 }
1983
1984 static int lov_get_info(struct obd_export *exp, __u32 keylen,
1985                         void *key, __u32 *vallen, void *val)
1986 {
1987         struct obd_device *obddev = class_exp2obd(exp);
1988         struct lov_obd *lov = &obddev->u.lov;
1989         int i;
1990         ENTRY;
1991
1992         if (!vallen || !val)
1993                 RETURN(-EFAULT);
1994
1995         if (keylen > strlen("lock_to_stripe") &&
1996             strcmp(key, "lock_to_stripe") == 0) {
1997                 struct {
1998                         char name[16];
1999                         struct ldlm_lock *lock;
2000                         struct lov_stripe_md *lsm;
2001                 } *data = key;
2002                 struct lov_oinfo *loi;
2003                 struct ldlm_res_id *res_id = &data->lock->l_resource->lr_name;
2004                 __u32 *stripe = val;
2005
2006                 if (*vallen < sizeof(*stripe))
2007                         RETURN(-EFAULT);
2008                 *vallen = sizeof(*stripe);
2009
2010                 /* XXX This is another one of those bits that will need to
2011                  * change if we ever actually support nested LOVs.  It uses
2012                  * the lock's export to find out which stripe it is. */
2013                 /* XXX - it's assumed all the locks for deleted OSTs have
2014                  * been cancelled. Also, the export for deleted OSTs will
2015                  * be NULL and won't match the lock's export. */
2016                 for (i = 0, loi = data->lsm->lsm_oinfo;
2017                      i < data->lsm->lsm_stripe_count;
2018                      i++, loi++) {
2019                         if (lov->tgts[loi->loi_ost_idx].ltd_exp ==
2020                                         data->lock->l_conn_export &&
2021                             loi->loi_id == res_id->name[0] &&
2022                             loi->loi_gr == res_id->name[2]) {
2023                                 *stripe = i;
2024                                 RETURN(0);
2025                         }
2026                 }
2027                 LDLM_ERROR(data->lock, "lock on inode without such object\n");
2028                 dump_lsm(D_ERROR, data->lsm);
2029                 RETURN(-ENXIO);
2030         } else if (keylen >= strlen("size_to_stripe") &&
2031                    strcmp(key, "size_to_stripe") == 0) {
2032                 struct {
2033                         int stripe_number;
2034                         __u64 size;
2035                         struct lov_stripe_md *lsm;
2036                 } *data = val;
2037
2038                 if (*vallen < sizeof(*data))
2039                         RETURN(-EFAULT);
2040
2041                 data->size = lov_size_to_stripe(data->lsm, data->size,
2042                                                 data->stripe_number);
2043                 RETURN(0);
2044         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
2045                 obd_id *ids = val;
2046                 int rc, size = sizeof(obd_id);
2047                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2048                         if (!lov->tgts[i].active)
2049                                 continue;
2050                         rc = obd_get_info(lov->tgts[i].ltd_exp,
2051                                           keylen, key, &size, &(ids[i]));
2052                         if (rc != 0)
2053                                 RETURN(rc);
2054                 }
2055                 RETURN(0);
2056         } else if (keylen >= strlen("lovdesc") && strcmp(key, "lovdesc") == 0) {
2057                 struct lov_desc *desc_ret = val;
2058                 *desc_ret = lov->desc;
2059
2060                 RETURN(0);
2061         }
2062
2063         RETURN(-EINVAL);
2064 }
2065
2066 static int lov_set_info(struct obd_export *exp, obd_count keylen,
2067                         void *key, obd_count vallen, void *val)
2068 {
2069         struct obd_device *obddev = class_exp2obd(exp);
2070         struct lov_obd *lov = &obddev->u.lov;
2071         int i, rc = 0, err;
2072         ENTRY;
2073
2074 #define KEY_IS(str) \
2075         (keylen == strlen(str) && memcmp(key, str, keylen) == 0)
2076
2077         if (KEY_IS("next_id")) {
2078                 if (vallen != lov->desc.ld_tgt_count)
2079                         RETURN(-EINVAL);
2080                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2081                         /* initialize all OSCs, even inactive ones */
2082
2083                         err = obd_set_info(lov->tgts[i].ltd_exp,
2084                                           keylen, key, sizeof(obd_id),
2085                                           ((obd_id*)val) + i);
2086                         if (!rc)
2087                                 rc = err;
2088                 }
2089                 RETURN(rc);
2090         }
2091         if (KEY_IS("async")) {
2092                 struct lov_desc *desc = &lov->desc;
2093                 struct lov_tgt_desc *tgts = lov->tgts;
2094
2095                 if (vallen != sizeof(int))
2096                         RETURN(-EINVAL);
2097                 lov->async = *((int*) val);
2098
2099                 for (i = 0; i < desc->ld_tgt_count; i++, tgts++) {
2100                         struct obd_uuid *tgt_uuid = &tgts->uuid;
2101                         struct obd_device *tgt_obd;
2102
2103                         tgt_obd = class_find_client_obd(tgt_uuid,
2104                                                         LUSTRE_OSC_NAME,
2105                                                         &obddev->obd_uuid);
2106                         if (!tgt_obd) {
2107                                 CERROR("Target %s not attached\n",
2108                                         tgt_uuid->uuid);
2109                                 if (!rc)
2110                                         rc = -EINVAL;
2111                                 continue;
2112                         }
2113
2114                         err = obd_set_info(tgt_obd->obd_self_export,
2115                                            keylen, key, vallen, val);
2116                         if (err) {
2117                                 CERROR("Failed to set async on target %s\n",
2118                                         tgt_obd->obd_name);
2119                                 if (!rc)
2120                                         rc = err;
2121                         }
2122                 }
2123                 RETURN(rc);
2124         }
2125
2126         if (KEY_IS("growth_count")) {
2127                 if (vallen != sizeof(int))
2128                         RETURN(-EINVAL);
2129         } else if (KEY_IS("mds_conn")) {
2130                 if (vallen != sizeof(__u32))
2131                         RETURN(-EINVAL);
2132         } else if (KEY_IS("unlinked") || KEY_IS("unrecovery")) {
2133                 if (vallen != 0)
2134                         RETURN(-EINVAL);
2135         } else if (KEY_IS("sec")) {
2136                 struct lov_tgt_desc *tgt;
2137                 struct obd_export *exp;
2138                 int rc = 0, err, i;
2139
2140                 spin_lock(&lov->lov_lock);
2141                 for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count;
2142                      i++, tgt++) {
2143                         exp = tgt->ltd_exp;
2144                         /* during setup time the connections to osc might
2145                          * haven't been established.
2146                          */
2147                         if (exp == NULL) {
2148                                 struct obd_device *tgt_obd;
2149
2150                                 tgt_obd = class_find_client_obd(&tgt->uuid,
2151                                                                 LUSTRE_OSC_NAME,
2152                                                                 &obddev->obd_uuid);
2153                                 if (!tgt_obd) {
2154                                         CERROR("can't set security flavor, "
2155                                                "device %s not attached?\n",
2156                                                 tgt->uuid.uuid);
2157                                         rc = -EINVAL;
2158                                         continue;
2159                                 }
2160                                 exp = tgt_obd->obd_self_export;
2161                         }
2162
2163                         err = obd_set_info(exp, keylen, key, vallen, val);
2164                         if (!rc)
2165                                 rc = err;
2166                 }
2167                 spin_unlock(&lov->lov_lock);
2168
2169                 RETURN(rc);
2170         } else {
2171                 RETURN(-EINVAL);
2172         }
2173
2174         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2175                 if (val && !obd_uuid_equals(val, &lov->tgts[i].uuid))
2176                         continue;
2177
2178                 if (!val && !lov->tgts[i].active)
2179                         continue;
2180
2181                 err = obd_set_info(lov->tgts[i].ltd_exp,
2182                                   keylen, key, vallen, val);
2183                 if (!rc)
2184                         rc = err;
2185         }
2186         RETURN(rc);
2187 #undef KEY_IS
2188 }
2189
2190 #if 0
2191 struct lov_multi_wait {
2192         struct ldlm_lock *lock;
2193         wait_queue_t      wait;
2194         int               completed;
2195         int               generation;
2196 };
2197
2198 int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm,
2199                       struct lustre_handle *lockh)
2200 {
2201         struct lov_lock_handles *lov_lockh = NULL;
2202         struct lustre_handle *lov_lockhp;
2203         struct lov_obd *lov;
2204         struct lov_oinfo *loi;
2205         struct lov_multi_wait *queues;
2206         int rc = 0, i;
2207         ENTRY;
2208
2209         if (lsm_bad_magic(lsm))
2210                 RETURN(-EINVAL);
2211
2212         if (!exp || !exp->exp_obd)
2213                 RETURN(-ENODEV);
2214
2215         LASSERT(lockh != NULL);
2216         if (lsm->lsm_stripe_count > 1) {
2217                 lov_lockh = lov_handle2llh(lockh);
2218                 if (lov_lockh == NULL) {
2219                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
2220                         RETURN(-EINVAL);
2221                 }
2222
2223                 lov_lockhp = lov_lockh->llh_handles;
2224         } else {
2225                 lov_lockhp = lockh;
2226         }
2227
2228         OBD_ALLOC(queues, lsm->lsm_stripe_count * sizeof(*queues));
2229         if (queues == NULL)
2230                 GOTO(out, rc = -ENOMEM);
2231
2232         lov = &exp->exp_obd->u.lov;
2233         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
2234              i++, loi++, lov_lockhp++) {
2235                 struct ldlm_lock *lock;
2236                 struct obd_device *obd;
2237                 unsigned long irqflags;
2238
2239                 lock = ldlm_handle2lock(lov_lockhp);
2240                 if (lock == NULL) {
2241                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
2242                                loi->loi_ost_idx, loi->loi_id);
2243                         queues[i].completed = 1;
2244                         continue;
2245                 }
2246
2247                 queues[i].lock = lock;
2248                 init_waitqueue_entry(&(queues[i].wait), current);
2249                 add_wait_queue(lock->l_waitq, &(queues[i].wait));
2250
2251                 obd = class_exp2obd(lock->l_conn_export);
2252                 if (obd != NULL)
2253                         imp = obd->u.cli.cl_import;
2254                 if (imp != NULL) {
2255                         spin_lock_irqsave(&imp->imp_lock, irqflags);
2256                         queues[i].generation = imp->imp_generation;
2257                         spin_unlock_irqrestore(&imp->imp_lock, irqflags);
2258                 }
2259         }
2260
2261         lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
2262                                interrupted_completion_wait, &lwd);
2263         rc = l_wait_event_added(check_multi_complete(queues, lsm), &lwi);
2264
2265         for (i = 0; i < lsm->lsm_stripe_count; i++)
2266                 remove_wait_queue(lock->l_waitq, &(queues[i].wait));
2267
2268         if (rc == -EINTR || rc == -ETIMEDOUT) {
2269
2270
2271         }
2272
2273  out:
2274         if (lov_lockh != NULL)
2275                 lov_llh_put(lov_lockh);
2276         RETURN(rc);
2277 }
2278 #endif
2279
2280 struct obd_ops lov_obd_ops = {
2281         .o_owner               = THIS_MODULE,
2282         .o_attach              = lov_attach,
2283         .o_detach              = lov_detach,
2284         .o_setup               = lov_setup,
2285         .o_cleanup             = lov_cleanup,
2286         .o_process_config      = lov_process_config,
2287         .o_connect             = lov_connect,
2288         .o_disconnect          = lov_disconnect,
2289         .o_statfs              = lov_statfs,
2290         .o_packmd              = lov_packmd,
2291         .o_unpackmd            = lov_unpackmd,
2292         .o_revalidate_md       = lov_revalidate_md,
2293         .o_create              = lov_create,
2294         .o_destroy             = lov_destroy,
2295         .o_getattr             = lov_getattr,
2296         .o_getattr_async       = lov_getattr_async,
2297         .o_setattr             = lov_setattr,
2298         .o_brw                 = lov_brw,
2299         .o_brw_async           = lov_brw_async,
2300         .o_prep_async_page     = lov_prep_async_page,
2301         .o_queue_async_io      = lov_queue_async_io,
2302         .o_set_async_flags     = lov_set_async_flags,
2303         .o_queue_group_io      = lov_queue_group_io,
2304         .o_trigger_group_io    = lov_trigger_group_io,
2305         .o_teardown_async_page = lov_teardown_async_page,
2306         .o_adjust_kms          = lov_adjust_kms,
2307         .o_punch               = lov_punch,
2308         .o_sync                = lov_sync,
2309         .o_enqueue             = lov_enqueue,
2310         .o_match               = lov_match,
2311         .o_change_cbdata       = lov_change_cbdata,
2312         .o_cancel              = lov_cancel,
2313         .o_cancel_unused       = lov_cancel_unused,
2314         .o_iocontrol           = lov_iocontrol,
2315         .o_get_info            = lov_get_info,
2316         .o_set_info            = lov_set_info,
2317         .o_llog_init           = lov_llog_init,
2318         .o_llog_finish         = lov_llog_finish,
2319         .o_notify              = lov_notify,
2320 };
2321
2322 int __init lov_init(void)
2323 {
2324         struct lprocfs_static_vars lvars;
2325         int rc;
2326         ENTRY;
2327
2328         lprocfs_init_vars(lov, &lvars);
2329         rc = class_register_type(&lov_obd_ops, NULL, lvars.module_vars,
2330                                  OBD_LOV_DEVICENAME);
2331         RETURN(rc);
2332 }
2333
2334 #ifdef __KERNEL__
2335 static void /*__exit*/ lov_exit(void)
2336 {
2337         class_unregister_type(OBD_LOV_DEVICENAME);
2338 }
2339
2340 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2341 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
2342 MODULE_LICENSE("GPL");
2343
2344 module_init(lov_init);
2345 module_exit(lov_exit);
2346 #endif