Whamcloud - gitweb
land b_hd_sec on HEAD. various security fixes.
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  * Author: Phil Schwan <phil@clusterfs.com>
6  *         Peter Braam <braam@clusterfs.com>
7  *         Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29 #ifdef __KERNEL__
30 #include <linux/slab.h>
31 #include <linux/module.h>
32 #include <linux/init.h>
33 #include <linux/slab.h>
34 #include <linux/pagemap.h>
35 #include <linux/seq_file.h>
36 #include <asm/div64.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/obd_support.h>
42 #include <linux/lustre_lib.h>
43 #include <linux/lustre_net.h>
44 #include <linux/lustre_idl.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/lustre_mds.h>
47 #include <linux/obd_class.h>
48 #include <linux/obd_lov.h>
49 #include <linux/obd_ost.h>
50 #include <linux/lprocfs_status.h>
51
52 #include "lov_internal.h"
53
54 /* obd methods */
55 #define MAX_STRING_SIZE 128
56 static int lov_connect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt,
57                            int activate, struct obd_connect_data *conn_data,
58                            unsigned long connect_flags)
59 {
60         struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
61         struct obd_uuid *tgt_uuid = &tgt->uuid;
62
63 #ifdef __KERNEL__
64         struct proc_dir_entry *lov_proc_dir;
65 #endif
66         struct lov_obd *lov = &obd->u.lov;
67         struct lustre_handle conn = {0, };
68         struct obd_device *tgt_obd;
69         int rc;
70         ENTRY;
71
72         tgt_obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
73                                         &obd->obd_uuid);
74
75         if (!tgt_obd) {
76                 CERROR("Target %s not attached\n", tgt_uuid->uuid);
77                 RETURN(-EINVAL);
78         }
79
80         if (!tgt_obd->obd_set_up) {
81                 CERROR("Target %s not set up\n", tgt_uuid->uuid);
82                 RETURN(-EINVAL);
83         }
84
85         if (activate) {
86                 tgt_obd->obd_no_recov = 0;
87                 ptlrpc_activate_import(tgt_obd->u.cli.cl_import);
88         }
89
90         if (tgt_obd->u.cli.cl_import->imp_invalid) {
91                 CERROR("not connecting OSC %s; administratively "
92                        "disabled\n", tgt_uuid->uuid);
93                 rc = obd_register_observer(tgt_obd, obd);
94                 if (rc) {
95                         CERROR("Target %s register_observer error %d; "
96                                "will not be able to reactivate\n",
97                                tgt_uuid->uuid, rc);
98                 }
99                 RETURN(0);
100         }
101
102         rc = obd_connect(&conn, tgt_obd, &lov_osc_uuid, conn_data,
103                          connect_flags);
104         if (rc) {
105                 CERROR("Target %s connect error %d\n", tgt_uuid->uuid, rc);
106                 RETURN(rc);
107         }
108         tgt->ltd_exp = class_conn2export(&conn);
109
110         rc = obd_register_observer(tgt_obd, obd);
111         if (rc) {
112                 CERROR("Target %s register_observer error %d\n",
113                        tgt_uuid->uuid, rc);
114                 obd_disconnect(tgt->ltd_exp, 0);
115                 tgt->ltd_exp = NULL;
116                 RETURN(rc);
117         }
118
119         tgt->active = 1;
120         lov->desc.ld_active_tgt_count++;
121
122 #ifdef __KERNEL__
123         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
124         if (lov_proc_dir) {
125                 struct obd_device *osc_obd = class_conn2obd(&conn);
126                 struct proc_dir_entry *osc_symlink;
127                 char name[MAX_STRING_SIZE + 1];
128
129                 LASSERT(osc_obd != NULL);
130                 LASSERT(osc_obd->obd_type != NULL);
131                 LASSERT(osc_obd->obd_type->typ_name != NULL);
132                 name[MAX_STRING_SIZE] = '\0';
133                 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
134                          osc_obd->obd_type->typ_name,
135                          osc_obd->obd_name);
136                 osc_symlink = proc_symlink(osc_obd->obd_name, lov_proc_dir,
137                                            name);
138                 if (osc_symlink == NULL) {
139                         CERROR("could not register LOV target "
140                                "/proc/fs/lustre/%s/%s/target_obds/%s\n",
141                                obd->obd_type->typ_name, obd->obd_name,
142                                osc_obd->obd_name);
143                         lprocfs_remove(lov_proc_dir);
144                         lov_proc_dir = NULL;
145                 }
146         }
147 #endif
148
149         RETURN(0);
150 }
151
152 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
153                        struct obd_uuid *cluuid, struct obd_connect_data *data,
154                        unsigned long flags)
155 {
156 #ifdef __KERNEL__
157         struct proc_dir_entry *lov_proc_dir;
158 #endif
159         struct lov_obd *lov = &obd->u.lov;
160         struct lov_tgt_desc *tgt;
161         struct obd_export *exp;
162         int rc, rc2, i;
163         ENTRY;
164
165         rc = class_connect(conn, obd, cluuid);
166         if (rc)
167                 RETURN(rc);
168
169         exp = class_conn2export(conn);
170
171         /* We don't want to actually do the underlying connections more than
172          * once, so keep track. */
173         lov->refcount++;
174         if (lov->refcount > 1) {
175                 class_export_put(exp);
176                 RETURN(0);
177         }
178
179 #ifdef __KERNEL__
180         lov_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
181                                         NULL, NULL);
182         if (IS_ERR(lov_proc_dir)) {
183                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
184                        obd->obd_type->typ_name, obd->obd_name);
185                 lov_proc_dir = NULL;
186         }
187 #endif
188
189         /* connect_flags is the MDS number, save for use in lov_add_obd */
190         lov->lov_connect_flags = flags;
191         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
192                 if (obd_uuid_empty(&tgt->uuid))
193                         continue;
194                 rc = lov_connect_obd(obd, tgt, 0, data, flags);
195                 if (rc)
196                         GOTO(out_disc, rc);
197         }
198
199         class_export_put(exp);
200         RETURN (0);
201
202  out_disc:
203 #ifdef __KERNEL__
204         if (lov_proc_dir)
205                 lprocfs_remove(lov_proc_dir);
206 #endif
207
208         while (i-- > 0) {
209                 struct obd_uuid uuid;
210                 --tgt;
211                 --lov->desc.ld_active_tgt_count;
212                 tgt->active = 0;
213                 /* save for CERROR below; (we know it's terminated) */
214                 uuid = tgt->uuid;
215                 rc2 = obd_disconnect(tgt->ltd_exp, 0);
216                 if (rc2)
217                         CERROR("error: LOV target %s disconnect on OST idx %d: "
218                                "rc = %d\n", uuid.uuid, i, rc2);
219         }
220         class_disconnect(exp, 0);
221         RETURN (rc);
222 }
223
224 static int lov_disconnect_obd(struct obd_device *obd, 
225                               struct lov_tgt_desc *tgt,
226                               unsigned long flags)
227 {
228 #ifdef __KERNEL__
229         struct proc_dir_entry *lov_proc_dir;
230 #endif
231         struct obd_device *osc_obd = class_exp2obd(tgt->ltd_exp);
232         struct lov_obd *lov = &obd->u.lov;
233         int rc;
234         ENTRY;
235
236 #ifdef __KERNEL__
237         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
238         if (lov_proc_dir) {
239                 struct proc_dir_entry *osc_symlink;
240
241                 osc_symlink = lprocfs_srch(lov_proc_dir, osc_obd->obd_name);
242                 if (osc_symlink) {
243                         lprocfs_remove(osc_symlink);
244                 } else {
245                         CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
246                                obd->obd_type->typ_name, obd->obd_name,
247                                osc_obd->obd_name);
248                 }
249         }
250 #endif
251         if (obd->obd_no_recov) {
252                 /* Pass it on to our clients.
253                  * XXX This should be an argument to disconnect,
254                  * XXX not a back-door flag on the OBD.  Ah well.
255                  */
256                 if (osc_obd)
257                         osc_obd->obd_no_recov = 1;
258         }
259
260         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
261         rc = obd_disconnect(tgt->ltd_exp, flags);
262         if (rc) {
263                 if (tgt->active) {
264                         CERROR("Target %s disconnect error %d\n",
265                                tgt->uuid.uuid, rc);
266                 }
267                 rc = 0;
268         }
269
270         if (tgt->active) {
271                 tgt->active = 0;
272                 lov->desc.ld_active_tgt_count--;
273         }
274         tgt->ltd_exp = NULL;
275         RETURN(0);
276 }
277
278 static int lov_disconnect(struct obd_export *exp, unsigned long flags)
279 {
280         struct obd_device *obd = class_exp2obd(exp);
281 #ifdef __KERNEL__
282         struct proc_dir_entry *lov_proc_dir;
283 #endif
284         struct lov_obd *lov = &obd->u.lov;
285         struct lov_tgt_desc *tgt;
286         int rc, i;
287         ENTRY;
288
289         if (!lov->tgts)
290                 goto out_local;
291
292         /* Only disconnect the underlying layers on the final disconnect. */
293         lov->refcount--;
294         if (lov->refcount != 0)
295                 goto out_local;
296
297         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
298                 if (tgt->ltd_exp)
299                         lov_disconnect_obd(obd, tgt, flags);
300         }
301
302 #ifdef __KERNEL__
303         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
304         if (lov_proc_dir) {
305                 lprocfs_remove(lov_proc_dir);
306         } else {
307                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing.",
308                        obd->obd_type->typ_name, obd->obd_name);
309         }
310 #endif
311         
312  out_local:
313         rc = class_disconnect(exp, 0);
314         RETURN(rc);
315 }
316
317 /* Error codes:
318  *
319  *  -EINVAL  : UUID can't be found in the LOV's target list
320  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
321  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
322  */
323 static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
324                               int activate)
325 {
326         struct lov_tgt_desc *tgt;
327         int i, rc = 0;
328         ENTRY;
329
330         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
331                lov, uuid->uuid, activate);
332
333         spin_lock(&lov->lov_lock);
334         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
335                 if (tgt->ltd_exp == NULL)
336                         continue;
337
338                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
339                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
340                 
341                 if (obd_uuid_equals(uuid, &tgt->uuid))
342                         break;
343         }
344
345         if (i == lov->desc.ld_tgt_count)
346                 GOTO(out, rc = -EINVAL);
347
348
349         if (tgt->active == activate) {
350                 CDEBUG(D_INFO, "OSC %s already %sactive!\n", uuid->uuid,                       
351                         activate ? "" : "in");
352                 GOTO(out, rc);
353         }
354
355         CDEBUG(D_INFO, "Marking OSC %s %sactive\n", uuid->uuid,
356                activate ? "" : "in");
357
358         tgt->active = activate;
359         if (activate)
360                 lov->desc.ld_active_tgt_count++;
361         else
362                 lov->desc.ld_active_tgt_count--;
363
364         EXIT;
365  out:
366         spin_unlock(&lov->lov_lock);
367         return rc;
368 }
369
370 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
371                       int active, void *data)
372 {
373         struct obd_uuid *uuid;
374         int rc;
375         ENTRY;
376
377         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME)) {
378                 CERROR("unexpected notification of %s %s!\n",
379                        watched->obd_type->typ_name,
380                        watched->obd_name);
381                 return -EINVAL;
382         }
383         uuid = &watched->u.cli.cl_import->imp_target_uuid;
384
385         /* Set OSC as active before notifying the observer, so the
386          * observer can use the OSC normally.  
387          */
388         rc = lov_set_osc_active(&obd->u.lov, uuid, active);
389         if (rc) {
390                 CERROR("%sactivation of %s failed: %d\n",
391                        active ? "" : "de", uuid->uuid, rc);
392                 RETURN(rc);
393         }
394
395         if (obd->obd_observer)
396                 /* Pass the notification up the chain. */
397                 rc = obd_notify(obd->obd_observer, watched, active, data);
398
399         RETURN(rc);
400 }
401
402 int lov_attach(struct obd_device *dev, obd_count len, void *data)
403 {
404         struct lprocfs_static_vars lvars;
405         int rc;
406
407         lprocfs_init_vars(lov, &lvars);
408         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
409         if (rc == 0) {
410 #ifdef __KERNEL__
411                 struct proc_dir_entry *entry;
412
413                 entry = create_proc_entry("target_obd_status", 0444, 
414                                           dev->obd_proc_entry);
415                 if (entry == NULL) {
416                         rc = -ENOMEM;
417                 } else {
418                         entry->proc_fops = &lov_proc_target_fops;
419                         entry->data = dev;
420                 }
421 #endif
422         }
423         return rc;
424 }
425
426 int lov_detach(struct obd_device *dev)
427 {
428         return lprocfs_obd_detach(dev);
429 }
430
431 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
432 {
433         struct lov_obd *lov = &obd->u.lov;
434         struct lustre_cfg *lcfg = buf;
435         struct lov_desc *desc;
436         int count;
437         ENTRY;
438
439         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
440                 CERROR("LOV setup requires a descriptor\n");
441                 RETURN(-EINVAL);
442         }
443
444         desc = (struct lov_desc *)lustre_cfg_string(lcfg, 1);
445         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
446                 CERROR("descriptor size wrong: %d > %d\n",
447                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
448                 RETURN(-EINVAL);
449         }
450  
451         /* Because of 64-bit divide/mod operations only work with a 32-bit
452          * divisor in a 32-bit kernel, we cannot support a stripe width
453          * of 4GB or larger on 32-bit CPUs.
454          */
455        
456         count = desc->ld_default_stripe_count;
457         if (count && (count * desc->ld_default_stripe_size) > ~0UL) {
458                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
459                        desc->ld_default_stripe_size, count, ~0UL);
460                 RETURN(-EINVAL);
461         }
462         if (desc->ld_tgt_count > 0) {
463                 lov->bufsize= sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
464         } else {
465                 lov->bufsize = sizeof(struct lov_tgt_desc) * LOV_MAX_TGT_COUNT;  
466         }
467         OBD_ALLOC(lov->tgts, lov->bufsize);
468         if (lov->tgts == NULL) {
469                 lov->bufsize = 0;
470                 CERROR("couldn't allocate %d bytes for target table.\n",
471                        lov->bufsize);
472                 RETURN(-EINVAL);
473         }
474
475         desc->ld_tgt_count = 0;
476         desc->ld_active_tgt_count = 0;
477         lov->desc = *desc;
478         spin_lock_init(&lov->lov_lock);
479         sema_init(&lov->lov_llog_sem, 1);
480
481         RETURN(0);
482 }
483
484 static int lov_cleanup(struct obd_device *obd, int flags)
485 {
486         struct lov_obd *lov = &obd->u.lov;
487
488         OBD_FREE(lov->tgts, lov->bufsize);
489         RETURN(0);
490 }
491
492 static int
493 lov_add_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
494 {
495         struct lov_obd *lov = &obd->u.lov;
496         struct lov_tgt_desc *tgt;
497         int rc;
498         ENTRY;
499
500         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
501                uuidp->uuid, index, gen);
502
503         if ((index < 0) || (index >= LOV_MAX_TGT_COUNT)) {
504                 CERROR("request to add OBD %s at invalid index: %d\n",
505                        uuidp->uuid, index);
506                 RETURN(-EINVAL);
507         }
508
509         if (gen <= 0) {
510                 CERROR("request to add OBD %s with invalid generation: %d\n",
511                        uuidp->uuid, gen);
512                 RETURN(-EINVAL);
513         }
514
515         tgt = lov->tgts + index;
516         if (!obd_uuid_empty(&tgt->uuid)) {
517                 CERROR("OBD already assigned at LOV target index %d\n",
518                        index);
519                 RETURN(-EEXIST);
520         }
521
522         tgt->uuid = *uuidp;
523         /* XXX - add a sanity check on the generation number. */
524         tgt->ltd_gen = gen;
525
526         if (index >= lov->desc.ld_tgt_count)
527                 lov->desc.ld_tgt_count = index + 1;
528
529         CDEBUG(D_CONFIG, "idx: %d ltd_gen: %d ld_tgt_count: %d\n",
530                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
531
532         if (lov->refcount == 0)
533                 RETURN(0);
534
535         if (tgt->ltd_exp) {
536                 struct obd_device *osc_obd;
537
538                 osc_obd = class_exp2obd(tgt->ltd_exp);
539                 if (osc_obd)
540                         osc_obd->obd_no_recov = 0;
541         }
542
543         rc = lov_connect_obd(obd, tgt, 1, NULL, lov->lov_connect_flags);
544         if (rc)
545                 GOTO(out, rc);
546
547         if (obd->obd_observer) {
548                 /* tell the mds_lov about the new target */
549                 rc = obd_notify(obd->obd_observer, tgt->ltd_exp->exp_obd, 1,
550                                 (void *)index);
551         }
552
553         GOTO(out, rc);
554  out:
555         if (rc && tgt->ltd_exp != NULL)
556                 lov_disconnect_obd(obd, tgt, 0);
557         return rc;
558 }
559
560 static int
561 lov_del_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
562 {
563         struct lov_obd *lov = &obd->u.lov;
564         struct lov_tgt_desc *tgt;
565         int count = lov->desc.ld_tgt_count;
566         int rc = 0;
567         ENTRY;
568
569         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
570                uuidp->uuid, index, gen);
571
572         if (index >= count) {
573                 CERROR("LOV target index %d >= number of LOV OBDs %d.\n",
574                        index, count);
575                 RETURN(-EINVAL);
576         }
577
578         tgt = lov->tgts + index;
579
580         if (obd_uuid_empty(&tgt->uuid)) {
581                 CERROR("LOV target at index %d is not setup.\n", index);
582                 RETURN(-EINVAL);
583         }
584
585         if (!obd_uuid_equals(uuidp, &tgt->uuid)) {
586                 CERROR("LOV target UUID %s at index %d doesn't match %s.\n",
587                        tgt->uuid.uuid, index, uuidp->uuid);
588                 RETURN(-EINVAL);
589         }
590
591         if (tgt->ltd_exp) {
592                 struct obd_device *osc_obd;
593
594                 osc_obd = class_exp2obd(tgt->ltd_exp);
595                 if (osc_obd) {
596                         osc_obd->obd_no_recov = 1;
597                         rc = obd_llog_finish(osc_obd, &osc_obd->obd_llogs, 1);
598                         if (rc)
599                                 CERROR("osc_llog_finish error: %d\n", rc);
600                 }
601                 lov_disconnect_obd(obd, tgt, 0);
602         }
603
604         /* XXX - right now there is a dependency on ld_tgt_count being the
605          * maximum tgt index for computing the mds_max_easize. So we can't
606          * shrink it. */
607
608         /* lt_gen = 0 will mean it will not match the gen of any valid loi */
609         memset(tgt, 0, sizeof(*tgt));
610
611         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
612                tgt->uuid.uuid, index, tgt->ltd_gen, tgt->ltd_exp, tgt->active);
613
614         RETURN(rc);
615 }
616
617 static int lov_process_config(struct obd_device *obd, obd_count len, void *buf)
618 {
619         struct lustre_cfg *lcfg = buf;
620         struct obd_uuid obd_uuid;
621         int cmd;
622         int index;
623         int gen;
624         int rc = 0;
625         ENTRY;
626
627         switch(cmd = lcfg->lcfg_command) {
628         case LCFG_LOV_ADD_OBD:
629         case LCFG_LOV_DEL_OBD: {
630                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
631                         GOTO(out, rc = -EINVAL);
632
633                 obd_str2uuid(&obd_uuid, lustre_cfg_string(lcfg, 1));
634
635                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
636                         GOTO(out, rc = -EINVAL);
637                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
638                         GOTO(out, rc = -EINVAL);
639                 if (cmd == LCFG_LOV_ADD_OBD)
640                         rc = lov_add_obd(obd, &obd_uuid, index, gen);
641                 else
642                         rc = lov_del_obd(obd, &obd_uuid, index, gen);
643                 GOTO(out, rc);
644         }
645         default: {
646                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
647                 GOTO(out, rc = -EINVAL);
648
649         }
650         }
651 out:
652         RETURN(rc);
653 }
654
655 #ifndef log2
656 #define log2(n) ffz(~(n))
657 #endif
658
659 static int lov_clear_orphans(struct obd_export *export,
660                              struct obdo *src_oa,
661                              struct lov_stripe_md **ea,
662                              struct obd_trans_info *oti)
663 {
664         struct lov_obd *lov;
665         struct obdo *tmp_oa;
666         struct obd_uuid *ost_uuid = NULL;
667         int rc = 0, i;
668         ENTRY;
669
670         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
671                 src_oa->o_flags == OBD_FL_DELORPHAN);
672
673         lov = &export->exp_obd->u.lov;
674
675         tmp_oa = obdo_alloc();
676         if (tmp_oa == NULL)
677                 RETURN(-ENOMEM);
678
679         if (src_oa->o_valid & OBD_MD_FLINLINE) {
680                 ost_uuid = (struct obd_uuid *)src_oa->o_inline;
681                 CDEBUG(D_HA, "clearing orphans only for %s\n",
682                        ost_uuid->uuid);
683         }
684
685         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
686                 int err;
687                 struct lov_stripe_md obj_md;
688                 struct lov_stripe_md *obj_mdp = &obj_md;
689
690                 /*
691                  * if called for a specific target, we don't care if it is not
692                  * active.
693                  */
694                 if (lov->tgts[i].active == 0 && ost_uuid == NULL) {
695                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
696                         continue;
697                 }
698
699                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &lov->tgts[i].uuid))
700                         continue;
701
702                 /* 
703                  * setting up objid OSS objects should be destroyed starting
704                  * from it.
705                  */
706                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
707                 tmp_oa->o_valid |= OBD_MD_FLID;
708                 tmp_oa->o_id = oti->oti_objid[i];
709
710                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
711                 err = obd_create(lov->tgts[i].ltd_exp, tmp_oa, NULL, 0,
712                                  &obj_mdp, oti);
713                 if (err) {
714                         /*
715                          * this export will be disabled until it is recovered,
716                          * and then orphan recovery will be completed.
717                          */
718                         CERROR("error in orphan recovery on OST idx %d/%d: "
719                                "rc = %d\n", i, lov->desc.ld_tgt_count, err);
720                 }
721
722                 if (ost_uuid)
723                         break;
724         }
725         obdo_free(tmp_oa);
726         RETURN(rc);
727 }
728
729 /* the LOV expects oa->o_id to be set to the LOV object id */
730 static int
731 lov_create(struct obd_export *exp, struct obdo *src_oa,
732            void *acl, int acl_size, struct lov_stripe_md **ea,
733            struct obd_trans_info *oti)
734 {
735         struct lov_request_set *set = NULL;
736         struct list_head *pos;
737         struct lov_obd *lov;
738         int rc = 0;
739         ENTRY;
740
741         LASSERT(ea != NULL);
742         if (exp == NULL)
743                 RETURN(-EINVAL);
744
745         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
746             src_oa->o_flags == OBD_FL_DELORPHAN) {
747                 rc = lov_clear_orphans(exp, src_oa, ea, oti);
748                 RETURN(rc);
749         }
750
751         lov = &exp->exp_obd->u.lov;
752         if (!lov->desc.ld_active_tgt_count)
753                 RETURN(-EIO);
754
755         LASSERT(oti->oti_flags & OBD_MODE_CROW);
756                 
757         /* main creation loop */
758         rc = lov_prep_create_set(exp, ea, src_oa, oti, &set);
759         if (rc)
760                 RETURN(rc);
761
762         list_for_each (pos, &set->set_list) {
763                 struct lov_request *req = 
764                         list_entry(pos, struct lov_request, rq_link);
765
766                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
767                 rc = obd_create(lov->tgts[req->rq_idx].ltd_exp,
768                                 req->rq_oa, NULL, 0, &req->rq_md, oti);
769                 lov_update_create_set(set, req, rc);
770         }
771         rc = lov_fini_create_set(set, ea);
772         RETURN(rc);
773 }
774
775 #define lsm_bad_magic(LSMP)                                     \
776 ({                                                              \
777         struct lov_stripe_md *_lsm__ = (LSMP);                  \
778         int _ret__ = 0;                                         \
779         if (!_lsm__) {                                          \
780                 CERROR("LOV requires striping ea\n");           \
781                 _ret__ = 1;                                     \
782         } else if (_lsm__->lsm_magic != LOV_MAGIC) {            \
783                 CERROR("LOV striping magic bad %#x != %#x\n",   \
784                        _lsm__->lsm_magic, LOV_MAGIC);           \
785                 _ret__ = 1;                                     \
786         }                                                       \
787         _ret__;                                                 \
788 })
789
790 static int lov_destroy(struct obd_export *exp, struct obdo *oa,
791                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
792 {
793         struct lov_request_set *set;
794         struct lov_request *req;
795         struct list_head *pos;
796         struct lov_obd *lov;
797         int rc = 0;
798         ENTRY;
799
800         if (lsm_bad_magic(lsm))
801                 RETURN(-EINVAL);
802
803         if (!exp || !exp->exp_obd)
804                 RETURN(-ENODEV);
805
806         lov = &exp->exp_obd->u.lov;
807         rc = lov_prep_destroy_set(exp, oa, lsm, oti, &set);
808         if (rc)
809                 RETURN(rc);
810
811         list_for_each (pos, &set->set_list) {
812                 int err;
813                 req = list_entry(pos, struct lov_request, rq_link);
814
815                 /* XXX update the cookie position */
816                 oti->oti_logcookies = set->set_cookies + req->rq_stripe;
817                 rc = obd_destroy(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
818                                  NULL, oti);
819                 err = lov_update_common_set(set, req, rc);
820                 if (rc) {
821                         CERROR("error: destroying objid "LPX64" subobj "
822                                LPX64" on OST idx %d: rc = %d\n", 
823                                set->set_oa->o_id, req->rq_oa->o_id, 
824                                req->rq_idx, rc);
825                         if (!rc)
826                                 rc = err;
827                 }
828         }
829         lov_fini_destroy_set(set);
830         RETURN(rc);
831 }
832
833 static int lov_getattr(struct obd_export *exp, struct obdo *oa,
834                        struct lov_stripe_md *lsm)
835 {
836         struct lov_request_set *set;
837         struct lov_request *req;
838         struct list_head *pos;
839         struct lov_obd *lov;
840         int err = 0, rc = 0;
841         ENTRY;
842
843         if (lsm_bad_magic(lsm))
844                 RETURN(-EINVAL);
845
846         if (!exp || !exp->exp_obd)
847                 RETURN(-ENODEV);
848
849         lov = &exp->exp_obd->u.lov;
850         
851         rc = lov_prep_getattr_set(exp, oa, lsm, &set);
852         if (rc)
853                 RETURN(rc);
854
855         list_for_each (pos, &set->set_list) {
856                 req = list_entry(pos, struct lov_request, rq_link);
857                 
858                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
859                        "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id, 
860                        req->rq_idx);
861
862                 rc = obd_getattr(lov->tgts[req->rq_idx].ltd_exp, 
863                                  req->rq_oa, NULL);
864                 err = lov_update_common_set(set, req, rc);
865                 if (err) {
866                         CERROR("error: getattr objid "LPX64" subobj "
867                                LPX64" on OST idx %d: rc = %d\n",
868                                set->set_oa->o_id, req->rq_oa->o_id, 
869                                req->rq_idx, err);
870                         break;
871                 }
872         }
873         
874         rc = lov_fini_getattr_set(set);
875         if (err)
876                 rc = err;
877         RETURN(rc);
878 }
879
880 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data,
881                                  int rc)
882 {
883         struct lov_request_set *lovset = (struct lov_request_set *)data;
884         ENTRY;
885
886         /* don't do attribute merge if this aysnc op failed */
887         if (rc) {
888                 lovset->set_completes = 0;
889                 lov_fini_getattr_set(lovset);
890         } else {
891                 rc = lov_fini_getattr_set(lovset);
892         }
893         RETURN (rc);
894 }
895
896 static int lov_getattr_async(struct obd_export *exp, struct obdo *oa,
897                               struct lov_stripe_md *lsm,
898                               struct ptlrpc_request_set *rqset)
899 {
900         struct lov_request_set *lovset;
901         struct lov_obd *lov;
902         struct list_head *pos;
903         struct lov_request *req;
904         int rc = 0;
905         ENTRY;
906
907         if (lsm_bad_magic(lsm))
908                 RETURN(-EINVAL);
909
910         if (!exp || !exp->exp_obd)
911                 RETURN(-ENODEV);
912
913         lov = &exp->exp_obd->u.lov;
914
915         rc = lov_prep_getattr_set(exp, oa, lsm, &lovset);
916         if (rc)
917                 RETURN(rc);
918
919         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
920                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
921
922         list_for_each (pos, &lovset->set_list) {
923                 req = list_entry(pos, struct lov_request, rq_link);
924                 
925                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
926                        "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id, 
927                        req->rq_idx);
928                 rc = obd_getattr_async(lov->tgts[req->rq_idx].ltd_exp,
929                                        req->rq_oa, NULL, rqset);
930                 if (rc) {
931                         CERROR("error: getattr objid "LPX64" subobj "
932                                LPX64" on OST idx %d: rc = %d\n",
933                                lovset->set_oa->o_id, req->rq_oa->o_id, 
934                                req->rq_idx, rc);
935                         GOTO(out, rc);
936                 }
937                 lov_update_common_set(lovset, req, rc);
938         }
939         
940         LASSERT(rc == 0);
941         LASSERT (rqset->set_interpret == NULL);
942         rqset->set_interpret = lov_getattr_interpret;
943         rqset->set_arg = (void *)lovset;
944         RETURN(rc);
945 out:
946         LASSERT(rc);
947         lov_fini_getattr_set(lovset);
948         RETURN(rc);
949 }
950
951 static int lov_setattr(struct obd_export *exp, struct obdo *src_oa,
952                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
953 {
954         struct lov_request_set *set;
955         struct lov_obd *lov;
956         struct list_head *pos;
957         struct lov_request *req;
958         int err = 0, rc = 0;
959         ENTRY;
960
961         if (lsm_bad_magic(lsm))
962                 RETURN(-EINVAL);
963
964         if (!exp || !exp->exp_obd)
965                 RETURN(-ENODEV);
966
967         LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE | OBD_MD_FLMODE|
968                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
969                                       OBD_MD_FLCTIME | OBD_MD_FLFLAGS |
970                                       OBD_MD_FLSIZE | OBD_MD_FLGROUP |
971                                       OBD_MD_FLUID | OBD_MD_FLGID)));
972
973         LASSERT(!(src_oa->o_valid & OBD_MD_FLGROUP) || src_oa->o_gr > 0);
974
975         lov = &exp->exp_obd->u.lov;
976         rc = lov_prep_setattr_set(exp, src_oa, lsm, NULL, &set);
977         if (rc)
978                 RETURN(rc);
979
980         list_for_each (pos, &set->set_list) {
981                 req = list_entry(pos, struct lov_request, rq_link);
982                 
983                 rc = obd_setattr(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
984                                  NULL, NULL);
985                 err = lov_update_common_set(set, req, rc);
986                 if (err) {
987                         CERROR("error: setattr objid "LPX64" subobj "
988                                LPX64" on OST idx %d: rc = %d\n",
989                                set->set_oa->o_id, req->rq_oa->o_id,
990                                req->rq_idx, err);
991                         if (!rc)
992                                 rc = err;
993                 }
994         }
995         err = lov_fini_setattr_set(set);
996         if (!rc)
997                 rc = err;
998         RETURN(rc);
999 }
1000
1001 static int lov_revalidate_policy(struct lov_obd *lov, struct lov_stripe_md *lsm)
1002 {
1003         static int next_idx = 0;
1004         struct lov_tgt_desc *tgt;
1005         int i, count;
1006
1007         /* XXX - we should do something clever and take lsm
1008          * into account but just do round robin for now. */
1009
1010         /* last_idx must always be less that count because
1011          * ld_tgt_count currently cannot shrink. */
1012         count = lov->desc.ld_tgt_count;
1013
1014         for (i = next_idx, tgt = lov->tgts + i; i < count; i++, tgt++) {
1015                 if (tgt->active) {
1016                         next_idx = (i + 1) % count;
1017                         RETURN(i);
1018                 }
1019         }
1020
1021         for (i = 0, tgt = lov->tgts; i < next_idx; i++, tgt++) {
1022                 if (tgt->active) {
1023                         next_idx = (i + 1) % count;
1024                         RETURN(i);
1025                 }
1026         }
1027
1028         RETURN(-EIO);
1029 }
1030
1031 static int lov_revalidate_md(struct obd_export *exp, struct obdo *src_oa,
1032                              struct lov_stripe_md *ea,
1033                              struct obd_trans_info *oti)
1034 {
1035         struct obd_export *osc_exp;
1036         struct lov_obd *lov = &exp->exp_obd->u.lov;
1037         struct lov_stripe_md *lsm = ea;
1038         struct lov_stripe_md obj_md;
1039         struct lov_stripe_md *obj_mdp = &obj_md;
1040         struct lov_oinfo *loi;
1041         struct obdo *tmp_oa;
1042         int ost_idx, updates = 0, i;
1043         ENTRY;
1044
1045         tmp_oa = obdo_alloc();
1046         if (tmp_oa == NULL)
1047                 RETURN(-ENOMEM);
1048
1049         loi = lsm->lsm_oinfo;
1050         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1051                 int rc;
1052                 if (!obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid))
1053                         continue;
1054
1055                 ost_idx = lov_revalidate_policy(lov, lsm);
1056                 if (ost_idx < 0) {
1057                         /* FIXME: punt for now. */
1058                         CERROR("lov_revalidate_policy failed; no active "
1059                                "OSCs?\n");
1060                         continue;
1061                 }
1062
1063                 /* create a new object */
1064                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
1065                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
1066                 osc_exp = lov->tgts[ost_idx].ltd_exp;
1067                 rc = obd_create(osc_exp, tmp_oa, NULL, 0, &obj_mdp, oti);
1068                 if (rc) {
1069                         CERROR("error creating new subobj at idx %d; "
1070                                "rc = %d\n", ost_idx, rc);
1071                         continue;
1072                 }
1073                 if (oti->oti_objid)
1074                         oti->oti_objid[ost_idx] = tmp_oa->o_id;
1075                 loi->loi_id = tmp_oa->o_id;
1076                 loi->loi_gr = tmp_oa->o_gr;
1077                 loi->loi_ost_idx = ost_idx;
1078                 loi->loi_ost_gen = lov->tgts[ost_idx].ltd_gen;
1079                 CDEBUG(D_INODE, "replacing objid "LPX64" subobj "LPX64
1080                        " with idx %d gen %d.\n", lsm->lsm_object_id,
1081                        loi->loi_id, ost_idx, loi->loi_ost_gen);
1082                 updates = 1;
1083         }
1084
1085         /* If we got an error revalidating an entry there's no need to
1086          * cleanup up objects we allocated here because the bad entry
1087          * still points to a deleted OST. */
1088
1089         obdo_free(tmp_oa);
1090         RETURN(updates);
1091 }
1092
1093 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1094  * we can send this 'punch' to just the authoritative node and the nodes
1095  * that the punch will affect. */
1096 static int lov_punch(struct obd_export *exp, struct obdo *oa,
1097                      struct lov_stripe_md *lsm,
1098                      obd_off start, obd_off end, struct obd_trans_info *oti)
1099 {
1100         struct lov_request_set *set;
1101         struct lov_obd *lov;
1102         struct list_head *pos;
1103         struct lov_request *req;
1104         int err = 0, rc = 0;
1105         ENTRY;
1106
1107         if (lsm_bad_magic(lsm))
1108                 RETURN(-EINVAL);
1109
1110         if (!exp || !exp->exp_obd)
1111                 RETURN(-ENODEV);
1112
1113         lov = &exp->exp_obd->u.lov;
1114         rc = lov_prep_punch_set(exp, oa, lsm, start, end, oti, &set);
1115         if (rc)
1116                 RETURN(rc);
1117
1118         list_for_each (pos, &set->set_list) {
1119                 req = list_entry(pos, struct lov_request, rq_link);
1120
1121                 rc = obd_punch(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, 
1122                                NULL, req->rq_extent.start, 
1123                                req->rq_extent.end, NULL);
1124                 err = lov_update_punch_set(set, req, rc);
1125                 if (err) {
1126                         CERROR("error: punch objid "LPX64" subobj "LPX64
1127                                " on OST idx %d: rc = %d\n", set->set_oa->o_id,
1128                                req->rq_oa->o_id, req->rq_idx, rc);
1129                         if (!rc)
1130                                 rc = err;
1131                 }
1132         }
1133         err = lov_fini_punch_set(set);
1134         if (!rc)
1135                 rc = err;
1136         RETURN(rc);
1137 }
1138
1139 static int lov_sync(struct obd_export *exp, struct obdo *oa,
1140                     struct lov_stripe_md *lsm, obd_off start, obd_off end)
1141 {
1142         struct lov_request_set *set;
1143         struct lov_obd *lov;
1144         struct list_head *pos;
1145         struct lov_request *req;
1146         int err = 0, rc = 0;
1147         ENTRY;
1148
1149         if (lsm_bad_magic(lsm))
1150                 RETURN(-EINVAL);
1151
1152         if (!exp->exp_obd)
1153                 RETURN(-ENODEV);
1154
1155         lov = &exp->exp_obd->u.lov;
1156         rc = lov_prep_sync_set(exp, oa, lsm, start, end, &set);
1157         if (rc)
1158                 RETURN(rc);
1159
1160         list_for_each (pos, &set->set_list) {
1161                 req = list_entry(pos, struct lov_request, rq_link);
1162
1163                 rc = obd_sync(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, 
1164                               NULL, req->rq_extent.start, req->rq_extent.end);
1165                 err = lov_update_common_set(set, req, rc);
1166                 if (err) {
1167                         CERROR("error: fsync objid "LPX64" subobj "LPX64
1168                                " on OST idx %d: rc = %d\n", set->set_oa->o_id,
1169                                req->rq_oa->o_id, req->rq_idx, rc);
1170                         if (!rc)
1171                                 rc = err;
1172                 }
1173         }
1174         err = lov_fini_sync_set(set);
1175         if (!rc)
1176                 rc = err;
1177         RETURN(rc);
1178 }
1179
1180 static int lov_brw_check(struct lov_obd *lov, struct obdo *oa,
1181                          struct lov_stripe_md *lsm,
1182                          obd_count oa_bufs, struct brw_page *pga)
1183 {
1184         int i, rc = 0;
1185         ENTRY;
1186
1187         /* The caller just wants to know if there's a chance that this
1188          * I/O can succeed */
1189         for (i = 0; i < oa_bufs; i++) {
1190                 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
1191                 int ost = lsm->lsm_oinfo[stripe].loi_ost_idx;
1192                 obd_off start, end;
1193
1194                 if (!lov_stripe_intersects(lsm, i, pga[i].disk_offset,
1195                                            pga[i].disk_offset + pga[i].count,
1196                                            &start, &end))
1197                         continue;
1198
1199                 if (lov->tgts[ost].active == 0) {
1200                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1201                         RETURN(-EIO);
1202                 }
1203                 rc = obd_brw(OBD_BRW_CHECK, lov->tgts[ost].ltd_exp, oa,
1204                              NULL, 1, &pga[i], NULL);
1205                 if (rc)
1206                         break;
1207         }
1208         RETURN(rc);
1209 }
1210
1211 static int lov_brw(int cmd, struct obd_export *exp, struct obdo *src_oa,
1212                    struct lov_stripe_md *lsm, obd_count oa_bufs,
1213                    struct brw_page *pga, struct obd_trans_info *oti)
1214 {
1215         struct lov_request_set *set;
1216         struct lov_request *req;
1217         struct list_head *pos;
1218         struct lov_obd *lov = &exp->exp_obd->u.lov;
1219         int err, rc = 0;
1220         ENTRY;
1221
1222         if (lsm_bad_magic(lsm))
1223                 RETURN(-EINVAL);
1224
1225         if (cmd == OBD_BRW_CHECK) {
1226                 rc = lov_brw_check(lov, src_oa, lsm, oa_bufs, pga);
1227                 RETURN(rc);
1228         }
1229
1230         rc = lov_prep_brw_set(exp, src_oa, lsm, oa_bufs, pga, oti, &set);
1231         if (rc)
1232                 RETURN(rc);
1233
1234         list_for_each (pos, &set->set_list) {
1235                 struct obd_export *sub_exp;
1236                 struct brw_page *sub_pga;
1237                 req = list_entry(pos, struct lov_request, rq_link);
1238                 
1239                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
1240                 sub_pga = set->set_pga + req->rq_pgaidx;
1241                 rc = obd_brw(cmd, sub_exp, req->rq_oa, req->rq_md, 
1242                              req->rq_oabufs, sub_pga, oti);
1243                 if (rc)
1244                         break;
1245                 lov_update_common_set(set, req, rc);
1246         }
1247
1248         err = lov_fini_brw_set(set);
1249         if (!rc)
1250                 rc = err;
1251         RETURN(rc);
1252 }
1253
1254 static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data,
1255                              int rc)
1256 {
1257         struct lov_request_set *lovset = (struct lov_request_set *)data;
1258         ENTRY;
1259         
1260         if (rc) {
1261                 lovset->set_completes = 0;
1262                 lov_fini_brw_set(lovset);
1263         } else {
1264                 rc = lov_fini_brw_set(lovset);
1265         }
1266                 
1267         RETURN(rc);
1268 }
1269
1270 static int lov_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
1271                          struct lov_stripe_md *lsm, obd_count oa_bufs,
1272                          struct brw_page *pga, struct ptlrpc_request_set *set,
1273                          struct obd_trans_info *oti)
1274 {
1275         struct lov_request_set *lovset;
1276         struct lov_request *req;
1277         struct list_head *pos;
1278         struct lov_obd *lov = &exp->exp_obd->u.lov;
1279         int rc = 0;
1280         ENTRY;
1281
1282         if (lsm_bad_magic(lsm))
1283                 RETURN(-EINVAL);
1284
1285         if (cmd == OBD_BRW_CHECK) {
1286                 rc = lov_brw_check(lov, oa, lsm, oa_bufs, pga);
1287                 RETURN(rc);
1288         }
1289
1290         rc = lov_prep_brw_set(exp, oa, lsm, oa_bufs, pga, oti, &lovset);
1291         if (rc)
1292                 RETURN(rc);
1293
1294         list_for_each (pos, &lovset->set_list) {
1295                 struct obd_export *sub_exp;
1296                 struct brw_page *sub_pga;
1297                 req = list_entry(pos, struct lov_request, rq_link);
1298                 
1299                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
1300                 sub_pga = lovset->set_pga + req->rq_pgaidx;
1301                 rc = obd_brw_async(cmd, sub_exp, req->rq_oa, req->rq_md,
1302                                    req->rq_oabufs, sub_pga, set, oti);
1303                 if (rc)
1304                         GOTO(out, rc);
1305                 lov_update_common_set(lovset, req, rc);
1306         }
1307         LASSERT(rc == 0);
1308         LASSERT(set->set_interpret == NULL);
1309         set->set_interpret = (set_interpreter_func)lov_brw_interpret;
1310         set->set_arg = (void *)lovset;
1311         
1312         RETURN(rc);
1313 out:
1314         lov_fini_brw_set(lovset);
1315         RETURN(rc);
1316 }
1317
1318 static int lov_ap_make_ready(void *data, int cmd)
1319 {
1320         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1321
1322         return lap->lap_caller_ops->ap_make_ready(lap->lap_caller_data, cmd);
1323 }
1324 static int lov_ap_refresh_count(void *data, int cmd)
1325 {
1326         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1327
1328         return lap->lap_caller_ops->ap_refresh_count(lap->lap_caller_data,
1329                                                      cmd);
1330 }
1331 static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
1332 {
1333         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1334
1335         lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa);
1336         /* XXX woah, shouldn't we be altering more here?  size? */
1337         oa->o_id = lap->lap_loi_id;
1338 }
1339
1340 static void lov_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
1341 {
1342         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1343
1344         /* in a raid1 regime this would down a count of many ios
1345          * in flight, onl calling the caller_ops completion when all
1346          * the raid1 ios are complete */
1347         lap->lap_caller_ops->ap_completion(lap->lap_caller_data, cmd, oa, rc);
1348 }
1349
1350 static struct obd_async_page_ops lov_async_page_ops = {
1351         .ap_make_ready =        lov_ap_make_ready,
1352         .ap_refresh_count =     lov_ap_refresh_count,
1353         .ap_fill_obdo =         lov_ap_fill_obdo,
1354         .ap_completion =        lov_ap_completion,
1355 };
1356
1357 static int lov_prep_async_page(struct obd_export *exp,
1358                                struct lov_stripe_md *lsm,
1359                                struct lov_oinfo *loi, struct page *page,
1360                                obd_off offset, struct obd_async_page_ops *ops,
1361                                void *data, void **res)
1362 {
1363         struct lov_obd *lov = &exp->exp_obd->u.lov;
1364         struct lov_async_page *lap;
1365         int rc, stripe;
1366         ENTRY;
1367
1368         if (lsm_bad_magic(lsm))
1369                 RETURN(-EINVAL);
1370         LASSERT(loi == NULL);
1371
1372         stripe = lov_stripe_number(lsm, offset);
1373         loi = &lsm->lsm_oinfo[stripe];
1374
1375         if (obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid))
1376                 RETURN(-EIO);
1377         if (lov->tgts[loi->loi_ost_idx].active == 0)
1378                 RETURN(-EIO);
1379         if (lov->tgts[loi->loi_ost_idx].ltd_exp == NULL) {
1380                 CERROR("ltd_exp == NULL, but OST idx %d doesn't appear to be "
1381                        "deleted or inactive.\n", loi->loi_ost_idx);
1382                 RETURN(-EIO);
1383         }
1384
1385         OBD_ALLOC(lap, sizeof(*lap));
1386         if (lap == NULL)
1387                 RETURN(-ENOMEM);
1388
1389         lap->lap_magic = LAP_MAGIC;
1390         lap->lap_caller_ops = ops;
1391         lap->lap_caller_data = data;
1392
1393         /* FIXME handle multiple oscs after landing b_raid1 */
1394         lap->lap_stripe = stripe;
1395         switch (lsm->lsm_pattern) {
1396                 case LOV_PATTERN_RAID0:
1397                         lov_stripe_offset(lsm, offset, lap->lap_stripe, 
1398                                           &lap->lap_sub_offset);
1399                         break;
1400                 case LOV_PATTERN_CMOBD:
1401                         lap->lap_sub_offset = offset;
1402                         break;
1403                 default:
1404                         LBUG();
1405         }
1406
1407         /* so the callback doesn't need the lsm */
1408         lap->lap_loi_id = loi->loi_id;
1409
1410         rc = obd_prep_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1411                                  lsm, loi, page, lap->lap_sub_offset,
1412                                  &lov_async_page_ops, lap,
1413                                  &lap->lap_sub_cookie);
1414         if (rc) {
1415                 OBD_FREE(lap, sizeof(*lap));
1416                 RETURN(rc);
1417         }
1418         CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
1419                lap->lap_sub_cookie, offset);
1420         *res = lap;
1421         RETURN(0);
1422 }
1423
1424 static int lov_queue_async_io(struct obd_export *exp,
1425                               struct lov_stripe_md *lsm,
1426                               struct lov_oinfo *loi, void *cookie,
1427                               int cmd, obd_off off, int count,
1428                               obd_flags brw_flags, obd_flags async_flags)
1429 {
1430         struct lov_obd *lov = &exp->exp_obd->u.lov;
1431         struct lov_async_page *lap;
1432         int rc;
1433
1434         LASSERT(loi == NULL);
1435
1436         if (lsm_bad_magic(lsm))
1437                 RETURN(-EINVAL);
1438
1439         lap = LAP_FROM_COOKIE(cookie);
1440
1441         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1442
1443         rc = obd_queue_async_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm,
1444                                 loi, lap->lap_sub_cookie, cmd, off, count,
1445                                 brw_flags, async_flags);
1446         RETURN(rc);
1447 }
1448
1449 static int lov_set_async_flags(struct obd_export *exp,
1450                                struct lov_stripe_md *lsm,
1451                                struct lov_oinfo *loi, void *cookie,
1452                                obd_flags async_flags)
1453 {
1454         struct lov_obd *lov = &exp->exp_obd->u.lov;
1455         struct lov_async_page *lap;
1456         int rc;
1457
1458         LASSERT(loi == NULL);
1459
1460         if (lsm_bad_magic(lsm))
1461                 RETURN(-EINVAL);
1462
1463         lap = LAP_FROM_COOKIE(cookie);
1464
1465         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1466
1467         rc = obd_set_async_flags(lov->tgts[loi->loi_ost_idx].ltd_exp,
1468                                  lsm, loi, lap->lap_sub_cookie, async_flags);
1469         RETURN(rc);
1470 }
1471
1472 static int lov_queue_group_io(struct obd_export *exp,
1473                               struct lov_stripe_md *lsm,
1474                               struct lov_oinfo *loi,
1475                               struct obd_io_group *oig, void *cookie,
1476                               int cmd, obd_off off, int count,
1477                               obd_flags brw_flags, obd_flags async_flags)
1478 {
1479         struct lov_obd *lov = &exp->exp_obd->u.lov;
1480         struct lov_async_page *lap;
1481         int rc;
1482
1483         LASSERT(loi == NULL);
1484
1485         if (lsm_bad_magic(lsm))
1486                 RETURN(-EINVAL);
1487
1488         lap = LAP_FROM_COOKIE(cookie);
1489
1490         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1491
1492         rc = obd_queue_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi,
1493                                 oig, lap->lap_sub_cookie, cmd, off, count,
1494                                 brw_flags, async_flags);
1495         RETURN(rc);
1496 }
1497
1498 /* this isn't exactly optimal.  we may have queued sync io in oscs on
1499  * all stripes, but we don't record that fact at queue time.  so we
1500  * trigger sync io on all stripes. */
1501 static int lov_trigger_group_io(struct obd_export *exp,
1502                                 struct lov_stripe_md *lsm,
1503                                 struct lov_oinfo *loi,
1504                                 struct obd_io_group *oig)
1505 {
1506         struct lov_obd *lov = &exp->exp_obd->u.lov;
1507         int rc = 0, i, err;
1508
1509         LASSERT(loi == NULL);
1510
1511         if (lsm_bad_magic(lsm))
1512                 RETURN(-EINVAL);
1513
1514         loi = lsm->lsm_oinfo;
1515         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1516                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1517                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1518                         continue;
1519                 }
1520
1521                 err = obd_trigger_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp,
1522                                            lsm, loi, oig);
1523                 if (rc == 0 && err != 0)
1524                         rc = err;
1525         };
1526         RETURN(rc);
1527 }
1528
1529 static int lov_teardown_async_page(struct obd_export *exp,
1530                                    struct lov_stripe_md *lsm,
1531                                    struct lov_oinfo *loi, void *cookie)
1532 {
1533         struct lov_obd *lov = &exp->exp_obd->u.lov;
1534         struct lov_async_page *lap;
1535         int rc;
1536
1537         LASSERT(loi == NULL);
1538
1539         if (lsm_bad_magic(lsm))
1540                 RETURN(-EINVAL);
1541
1542         lap = LAP_FROM_COOKIE(cookie);
1543
1544         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1545
1546         rc = obd_teardown_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1547                                      lsm, loi, lap->lap_sub_cookie);
1548         if (rc) {
1549                 CERROR("unable to teardown sub cookie %p: %d\n",
1550                        lap->lap_sub_cookie, rc);
1551                 RETURN(rc);
1552         }
1553         OBD_FREE(lap, sizeof(*lap));
1554         RETURN(rc);
1555 }
1556
1557 static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
1558                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
1559                        int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
1560                        void *data,__u32 lvb_len, void *lvb_swabber,
1561                        struct lustre_handle *lockh)
1562 {
1563         struct lov_request_set *set;
1564         struct lov_request *req;
1565         struct list_head *pos;
1566         struct lustre_handle *lov_lockhp;
1567         struct lov_obd *lov;
1568         ldlm_error_t rc;
1569         int save_flags = *flags;
1570         ENTRY;
1571
1572         if (lsm_bad_magic(lsm))
1573                 RETURN(-EINVAL);
1574
1575         /* we should never be asked to replay a lock this way. */
1576         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1577
1578         if (!exp || !exp->exp_obd)
1579                 RETURN(-ENODEV);
1580
1581         lov = &exp->exp_obd->u.lov;
1582         rc = lov_prep_enqueue_set(exp, lsm, policy, mode, lockh, &set);
1583         if (rc)
1584                 RETURN(rc);
1585
1586         list_for_each (pos, &set->set_list) {
1587                 ldlm_policy_data_t sub_policy;
1588                 req = list_entry(pos, struct lov_request, rq_link);
1589                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1590                 LASSERT(lov_lockhp);
1591
1592                 *flags = save_flags;
1593                 sub_policy.l_extent.start = req->rq_extent.start;
1594                 sub_policy.l_extent.end = req->rq_extent.end;
1595
1596                 rc = obd_enqueue(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1597                                  type, &sub_policy, mode, flags, bl_cb,
1598                                  cp_cb, gl_cb, data, lvb_len, lvb_swabber,
1599                                  lov_lockhp);
1600                 rc = lov_update_enqueue_set(set, req, rc, save_flags);
1601                 if (rc != ELDLM_OK)
1602                         break;
1603         }
1604
1605         lov_fini_enqueue_set(set, mode);
1606         RETURN(rc);
1607 }
1608
1609 static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
1610                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
1611                      int *flags, void *data, struct lustre_handle *lockh)
1612 {
1613         struct lov_request_set *set;
1614         struct lov_request *req;
1615         struct list_head *pos;
1616         struct lov_obd *lov = &exp->exp_obd->u.lov;
1617         struct lustre_handle *lov_lockhp;
1618         int lov_flags, rc = 0;
1619         ENTRY;
1620
1621         if (lsm_bad_magic(lsm))
1622                 RETURN(-EINVAL);
1623
1624         if (!exp || !exp->exp_obd)
1625                 RETURN(-ENODEV);
1626
1627         lov = &exp->exp_obd->u.lov;
1628         rc = lov_prep_match_set(exp, lsm, policy, mode, lockh, &set);
1629         if (rc)
1630                 RETURN(rc);
1631
1632         list_for_each (pos, &set->set_list) {
1633                 ldlm_policy_data_t sub_policy;
1634                 req = list_entry(pos, struct lov_request, rq_link);
1635                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1636                 LASSERT(lov_lockhp);
1637
1638                 sub_policy.l_extent.start = req->rq_extent.start;
1639                 sub_policy.l_extent.end = req->rq_extent.end;
1640                 lov_flags = *flags;
1641
1642                 rc = obd_match(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1643                                type, &sub_policy, mode, &lov_flags, data,
1644                                lov_lockhp);
1645                 rc = lov_update_match_set(set, req, rc);
1646                 if (rc != 1)
1647                         break;
1648         }
1649         lov_fini_match_set(set, mode, *flags);
1650         RETURN(rc);
1651 }
1652
1653 static int lov_change_cbdata(struct obd_export *exp,
1654                              struct lov_stripe_md *lsm, ldlm_iterator_t it,
1655                              void *data)
1656 {
1657         struct lov_obd *lov;
1658         struct lov_oinfo *loi;
1659         int rc = 0, i;
1660         ENTRY;
1661
1662         if (lsm_bad_magic(lsm))
1663                 RETURN(-EINVAL);
1664
1665         if (!exp || !exp->exp_obd)
1666                 RETURN(-ENODEV);
1667
1668         LASSERT(lsm->lsm_object_gr > 0);
1669
1670         lov = &exp->exp_obd->u.lov;
1671         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1672                 struct lov_stripe_md submd;
1673                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1674                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1675                         continue;
1676                 }
1677
1678                 submd.lsm_object_id = loi->loi_id;
1679                 submd.lsm_object_gr = lsm->lsm_object_gr;
1680                 submd.lsm_stripe_count = 0;
1681                 rc = obd_change_cbdata(lov->tgts[loi->loi_ost_idx].ltd_exp,
1682                                        &submd, it, data);
1683         }
1684         RETURN(rc);
1685 }
1686
1687 static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
1688                       __u32 mode, struct lustre_handle *lockh)
1689 {
1690         struct lov_request_set *set;
1691         struct lov_request *req;
1692         struct list_head *pos;
1693         struct lov_obd *lov = &exp->exp_obd->u.lov;
1694         struct lustre_handle *lov_lockhp;
1695         int err = 0, rc = 0;
1696         ENTRY;
1697
1698         if (lsm_bad_magic(lsm))
1699                 RETURN(-EINVAL);
1700
1701         if (!exp || !exp->exp_obd)
1702                 RETURN(-ENODEV);
1703
1704         LASSERT(lsm->lsm_object_gr > 0);
1705
1706         LASSERT(lockh);
1707         lov = &exp->exp_obd->u.lov;
1708         rc = lov_prep_cancel_set(exp, lsm, mode, lockh, &set);
1709         if (rc)
1710                 RETURN(rc);
1711
1712         list_for_each (pos, &set->set_list) {
1713                 req = list_entry(pos, struct lov_request, rq_link);
1714                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1715
1716                 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1717                                 mode, lov_lockhp);
1718                 rc = lov_update_common_set(set, req, rc);
1719                 if (rc) {
1720                         CERROR("error: cancel objid "LPX64" subobj "
1721                                LPX64" on OST idx %d: rc = %d\n",
1722                                lsm->lsm_object_id,
1723                                req->rq_md->lsm_object_id, req->rq_idx, rc);
1724                         err = rc;
1725                 }
1726  
1727         }
1728         lov_fini_cancel_set(set);
1729         RETURN(err);
1730 }
1731
1732 static int lov_cancel_unused(struct obd_export *exp,
1733                              struct lov_stripe_md *lsm, 
1734                              int flags, void *opaque)
1735 {
1736         struct lov_obd *lov;
1737         struct lov_oinfo *loi;
1738         int rc = 0, i;
1739         ENTRY;
1740
1741         lov = &exp->exp_obd->u.lov;
1742         if (lsm == NULL) {
1743                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1744                         int err = obd_cancel_unused(lov->tgts[i].ltd_exp,
1745                                                     NULL, flags, opaque);
1746                         if (!rc)
1747                                 rc = err;
1748                 }
1749                 RETURN(rc);
1750         }
1751
1752         if (lsm_bad_magic(lsm))
1753                 RETURN(-EINVAL);
1754
1755         if (!exp || !exp->exp_obd)
1756                 RETURN(-ENODEV);
1757
1758         LASSERT(lsm->lsm_object_gr > 0);
1759
1760         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1761                 struct lov_stripe_md submd;
1762                 int err;
1763
1764                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1765                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1766
1767                 submd.lsm_object_id = loi->loi_id;
1768                 submd.lsm_object_gr = lsm->lsm_object_gr;
1769                 submd.lsm_stripe_count = 0;
1770                 err = obd_cancel_unused(lov->tgts[loi->loi_ost_idx].ltd_exp,
1771                                         &submd, flags, opaque);
1772                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1773                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
1774                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1775                                loi->loi_id, loi->loi_ost_idx, err);
1776                         if (!rc)
1777                                 rc = err;
1778                 }
1779         }
1780         RETURN(rc);
1781 }
1782
1783 #define LOV_U64_MAX ((__u64)~0ULL)
1784 #define LOV_SUM_MAX(tot, add)                                           \
1785         do {                                                            \
1786                 if ((tot) + (add) < (tot))                              \
1787                         (tot) = LOV_U64_MAX;                            \
1788                 else                                                    \
1789                         (tot) += (add);                                 \
1790         } while(0)
1791
1792 static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1793                       unsigned long max_age)
1794 {
1795         struct lov_obd *lov = &obd->u.lov;
1796         struct obd_statfs lov_sfs;
1797         int set = 0;
1798         int rc = 0;
1799         int i;
1800         ENTRY;
1801
1802
1803         /* We only get block data from the OBD */
1804         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1805                 int err;
1806                 if (!lov->tgts[i].active) {
1807                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1808                         continue;
1809                 }
1810
1811                 err = obd_statfs(class_exp2obd(lov->tgts[i].ltd_exp), &lov_sfs,
1812                                  max_age);
1813                 if (err) {
1814                         if (lov->tgts[i].active && !rc)
1815                                 rc = err;
1816                         continue;
1817                 }
1818
1819                 if (!set) {
1820                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
1821                         set = 1;
1822                 } else {
1823                         osfs->os_bfree += lov_sfs.os_bfree;
1824                         osfs->os_bavail += lov_sfs.os_bavail;
1825                         osfs->os_blocks += lov_sfs.os_blocks;
1826                         /* XXX not sure about this one - depends on policy.
1827                          *   - could be minimum if we always stripe on all OBDs
1828                          *     (but that would be wrong for any other policy,
1829                          *     if one of the OBDs has no more objects left)
1830                          *   - could be sum if we stripe whole objects
1831                          *   - could be average, just to give a nice number
1832                          *
1833                          * To give a "reasonable" (if not wholly accurate)
1834                          * number, we divide the total number of free objects
1835                          * by expected stripe count (watch out for overflow).
1836                          */
1837                         LOV_SUM_MAX(osfs->os_files, lov_sfs.os_files);
1838                         LOV_SUM_MAX(osfs->os_ffree, lov_sfs.os_ffree);
1839                 }
1840         }
1841
1842         if (set) {
1843                 __u32 expected_stripes = lov->desc.ld_default_stripe_count ?
1844                                          lov->desc.ld_default_stripe_count :
1845                                          lov->desc.ld_active_tgt_count;
1846
1847                 if (osfs->os_files != LOV_U64_MAX)
1848                         do_div(osfs->os_files, expected_stripes);
1849                 if (osfs->os_ffree != LOV_U64_MAX)
1850                         do_div(osfs->os_ffree, expected_stripes);
1851         } else if (!rc)
1852                 rc = -EIO;
1853
1854         RETURN(rc);
1855 }
1856
1857 static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1858                          void *karg, void *uarg)
1859 {
1860         struct obd_device *obddev = class_exp2obd(exp);
1861         struct lov_obd *lov = &obddev->u.lov;
1862         int i, rc, count = lov->desc.ld_tgt_count;
1863         struct obd_uuid *uuidp;
1864         ENTRY;
1865
1866         switch (cmd) {
1867         case OBD_IOC_LOV_GET_CONFIG: {
1868                 struct obd_ioctl_data *data = karg;
1869                 struct lov_tgt_desc *tgtdesc;
1870                 struct lov_desc *desc;
1871                 char *buf = NULL;
1872                 __u32 *genp;
1873
1874                 buf = NULL;
1875                 len = 0;
1876                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1877                         RETURN(-EINVAL);
1878
1879                 data = (struct obd_ioctl_data *)buf;
1880
1881                 if (sizeof(*desc) > data->ioc_inllen1) {
1882                         obd_ioctl_freedata(buf, len);
1883                         RETURN(-EINVAL);
1884                 }
1885
1886                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
1887                         obd_ioctl_freedata(buf, len);
1888                         RETURN(-EINVAL);
1889                 }
1890
1891                 if (sizeof(__u32) * count > data->ioc_inllen3) {
1892                         obd_ioctl_freedata(buf, len);
1893                         RETURN(-EINVAL);
1894                 }
1895
1896                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1897                 memcpy(desc, &(lov->desc), sizeof(*desc));
1898
1899                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
1900                 genp = (__u32 *)data->ioc_inlbuf3;
1901                 tgtdesc = lov->tgts;
1902                 /* the uuid will be empty for deleted OSTs */
1903                 for (i = 0; i < count; i++, uuidp++, genp++, tgtdesc++) {
1904                         obd_str2uuid(uuidp, (char *)tgtdesc->uuid.uuid);
1905                         *genp = tgtdesc->ltd_gen;
1906                 }
1907
1908                 rc = copy_to_user((void *)uarg, buf, len);
1909                 if (rc)
1910                         rc = -EFAULT;
1911                 obd_ioctl_freedata(buf, len);
1912                 break;
1913         }
1914         case LL_IOC_LOV_SETSTRIPE:
1915                 rc = lov_setstripe(exp, karg, uarg);
1916                 break;
1917         case LL_IOC_LOV_GETSTRIPE:
1918                 rc = lov_getstripe(exp, karg, uarg);
1919                 break;
1920         case LL_IOC_LOV_SETEA:
1921                 rc = lov_setea(exp, karg, uarg);
1922                 break;
1923         default: {
1924                 int set = 0;
1925                 if (count == 0)
1926                         RETURN(-ENOTTY);
1927                 rc = 0;
1928                 for (i = 0; i < count; i++) {
1929                         int err;
1930
1931                         /* OST was deleted */
1932                         if (obd_uuid_empty(&lov->tgts[i].uuid))
1933                                 continue;
1934
1935                         err = obd_iocontrol(cmd, lov->tgts[i].ltd_exp,
1936                                             len, karg, uarg);
1937                         if (err) {
1938                                 if (lov->tgts[i].active) {
1939                                         CERROR("error: iocontrol OSC %s on OST "
1940                                                "idx %d cmd %x: err = %d\n",
1941                                                lov->tgts[i].uuid.uuid, i,
1942                                                cmd, err);
1943                                         if (!rc)
1944                                                 rc = err;
1945                                 }
1946                         } else
1947                                 set = 1;
1948                 }
1949                 if (!set && !rc)
1950                         rc = -EIO;
1951         }
1952         }
1953
1954         RETURN(rc);
1955 }
1956
1957 static int lov_get_info(struct obd_export *exp, __u32 keylen,
1958                         void *key, __u32 *vallen, void *val)
1959 {
1960         struct obd_device *obddev = class_exp2obd(exp);
1961         struct lov_obd *lov = &obddev->u.lov;
1962         int i;
1963         ENTRY;
1964
1965         if (!vallen || !val)
1966                 RETURN(-EFAULT);
1967
1968         if (keylen > strlen("lock_to_stripe") &&
1969             strcmp(key, "lock_to_stripe") == 0) {
1970                 struct {
1971                         char name[16];
1972                         struct ldlm_lock *lock;
1973                         struct lov_stripe_md *lsm;
1974                 } *data = key;
1975                 struct lov_oinfo *loi;
1976                 struct ldlm_res_id *res_id = &data->lock->l_resource->lr_name;
1977                 __u32 *stripe = val;
1978
1979                 if (*vallen < sizeof(*stripe))
1980                         RETURN(-EFAULT);
1981                 *vallen = sizeof(*stripe);
1982
1983                 /* XXX This is another one of those bits that will need to
1984                  * change if we ever actually support nested LOVs.  It uses
1985                  * the lock's export to find out which stripe it is. */
1986                 /* XXX - it's assumed all the locks for deleted OSTs have
1987                  * been cancelled. Also, the export for deleted OSTs will
1988                  * be NULL and won't match the lock's export. */
1989                 for (i = 0, loi = data->lsm->lsm_oinfo;
1990                      i < data->lsm->lsm_stripe_count;
1991                      i++, loi++) {
1992                         if (lov->tgts[loi->loi_ost_idx].ltd_exp ==
1993                                         data->lock->l_conn_export &&
1994                             loi->loi_id == res_id->name[0] &&
1995                             loi->loi_gr == res_id->name[2]) {
1996                                 *stripe = i;
1997                                 RETURN(0);
1998                         }
1999                 }
2000                 LDLM_ERROR(data->lock, "lock on inode without such object\n");
2001                 dump_lsm(D_ERROR, data->lsm);
2002                 portals_debug_dumpstack(NULL);
2003                 RETURN(-ENXIO);
2004         } else if (keylen >= strlen("size_to_stripe") &&
2005                    strcmp(key, "size_to_stripe") == 0) {
2006                 struct {
2007                         int stripe_number;
2008                         __u64 size;
2009                         struct lov_stripe_md *lsm;
2010                 } *data = val;
2011
2012                 if (*vallen < sizeof(*data))
2013                         RETURN(-EFAULT);
2014
2015                 data->size = lov_size_to_stripe(data->lsm, data->size,
2016                                                 data->stripe_number);
2017                 RETURN(0);
2018         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
2019                 __u32 size = sizeof(obd_id);
2020                 obd_id *ids = val;
2021                 int rc = 0;
2022
2023                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2024                         if (!lov->tgts[i].active)
2025                                 continue;
2026                         rc = obd_get_info(lov->tgts[i].ltd_exp,
2027                                           keylen, key, &size, &(ids[i]));
2028                         if (rc != 0)
2029                                 RETURN(rc);
2030                 }
2031                 RETURN(0);
2032         } else if (keylen >= strlen("lovdesc") && strcmp(key, "lovdesc") == 0) {
2033                 struct lov_desc *desc_ret = val;
2034                 *desc_ret = lov->desc;
2035
2036                 RETURN(0);
2037         }
2038
2039         RETURN(-EINVAL);
2040 }
2041
2042 static int lov_set_info(struct obd_export *exp, obd_count keylen,
2043                         void *key, obd_count vallen, void *val)
2044 {
2045         struct obd_device *obddev = class_exp2obd(exp);
2046         struct lov_obd *lov = &obddev->u.lov;
2047         int i, rc = 0, err;
2048         ENTRY;
2049
2050 #define KEY_IS(str) \
2051         (keylen == strlen(str) && memcmp(key, str, keylen) == 0)
2052
2053         if (KEY_IS("async")) {
2054                 struct lov_desc *desc = &lov->desc;
2055                 struct lov_tgt_desc *tgts = lov->tgts;
2056
2057                 if (vallen != sizeof(int))
2058                         RETURN(-EINVAL);
2059                 lov->async = *((int*) val);
2060
2061                 for (i = 0; i < desc->ld_tgt_count; i++, tgts++) {
2062                         struct obd_uuid *tgt_uuid = &tgts->uuid;
2063                         struct obd_device *tgt_obd;
2064
2065                         tgt_obd = class_find_client_obd(tgt_uuid,
2066                                                         LUSTRE_OSC_NAME,
2067                                                         &obddev->obd_uuid);
2068                         if (!tgt_obd) {
2069                                 CERROR("Target %s not attached\n",
2070                                         tgt_uuid->uuid);
2071                                 if (!rc)
2072                                         rc = -EINVAL;
2073                                 continue;
2074                         }
2075
2076                         err = obd_set_info(tgt_obd->obd_self_export,
2077                                            keylen, key, vallen, val);
2078                         if (err) {
2079                                 CERROR("Failed to set async on target %s\n",
2080                                         tgt_obd->obd_name);
2081                                 if (!rc)
2082                                         rc = err;
2083                         }
2084                 }
2085                 RETURN(rc);
2086         }
2087
2088         if (KEY_IS("mds_conn")) {
2089                 if (vallen != sizeof(__u32))
2090                         RETURN(-EINVAL);
2091         } else if (KEY_IS("unlinked") || KEY_IS("unrecovery")) {
2092                 if (vallen != 0)
2093                         RETURN(-EINVAL);
2094         } else if (KEY_IS("sec") || KEY_IS("sec_flags")) {
2095                 struct lov_tgt_desc *tgt;
2096                 struct obd_export *exp;
2097                 int rc = 0, err, i;
2098
2099                 spin_lock(&lov->lov_lock);
2100                 for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count;
2101                      i++, tgt++) {
2102                         exp = tgt->ltd_exp;
2103                         /* during setup time the connections to osc might
2104                          * haven't been established.
2105                          */
2106                         if (exp == NULL) {
2107                                 struct obd_device *tgt_obd;
2108
2109                                 tgt_obd = class_find_client_obd(&tgt->uuid,
2110                                                                 LUSTRE_OSC_NAME,
2111                                                                 &obddev->obd_uuid);
2112                                 if (!tgt_obd) {
2113                                         CERROR("can't set security flavor, "
2114                                                "device %s not attached?\n",
2115                                                 tgt->uuid.uuid);
2116                                         rc = -EINVAL;
2117                                         continue;
2118                                 }
2119                                 exp = tgt_obd->obd_self_export;
2120                         }
2121
2122                         err = obd_set_info(exp, keylen, key, vallen, val);
2123                         if (!rc)
2124                                 rc = err;
2125                 }
2126                 spin_unlock(&lov->lov_lock);
2127
2128                 RETURN(rc);
2129         } else if (KEY_IS("flush_cred")) {
2130                 struct lov_tgt_desc *tgt;
2131                 int rc = 0, i;
2132
2133                 for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count;
2134                      i++, tgt++) {
2135                         if (!tgt->ltd_exp)
2136                                 continue;
2137                         rc = obd_set_info(tgt->ltd_exp,
2138                                           keylen, key, vallen, val);
2139                         if (rc)
2140                                 RETURN(rc);
2141                 }
2142
2143                 RETURN(0);
2144         } else {
2145                 RETURN(-EINVAL);
2146         }
2147
2148         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2149                 if (val && !obd_uuid_equals(val, &lov->tgts[i].uuid))
2150                         continue;
2151
2152                 if (!val && !lov->tgts[i].active)
2153                         continue;
2154
2155                 err = obd_set_info(lov->tgts[i].ltd_exp,
2156                                   keylen, key, vallen, val);
2157                 if (!rc)
2158                         rc = err;
2159         }
2160         RETURN(rc);
2161 #undef KEY_IS
2162 }
2163
2164 #if 0
2165 struct lov_multi_wait {
2166         struct ldlm_lock *lock;
2167         wait_queue_t      wait;
2168         int               completed;
2169         int               generation;
2170 };
2171
2172 int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm,
2173                       struct lustre_handle *lockh)
2174 {
2175         struct lov_lock_handles *lov_lockh = NULL;
2176         struct lustre_handle *lov_lockhp;
2177         struct lov_obd *lov;
2178         struct lov_oinfo *loi;
2179         struct lov_multi_wait *queues;
2180         int rc = 0, i;
2181         ENTRY;
2182
2183         if (lsm_bad_magic(lsm))
2184                 RETURN(-EINVAL);
2185
2186         if (!exp || !exp->exp_obd)
2187                 RETURN(-ENODEV);
2188
2189         LASSERT(lockh != NULL);
2190         if (lsm->lsm_stripe_count > 1) {
2191                 lov_lockh = lov_handle2llh(lockh);
2192                 if (lov_lockh == NULL) {
2193                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
2194                         RETURN(-EINVAL);
2195                 }
2196
2197                 lov_lockhp = lov_lockh->llh_handles;
2198         } else {
2199                 lov_lockhp = lockh;
2200         }
2201
2202         OBD_ALLOC(queues, lsm->lsm_stripe_count * sizeof(*queues));
2203         if (queues == NULL)
2204                 GOTO(out, rc = -ENOMEM);
2205
2206         lov = &exp->exp_obd->u.lov;
2207         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
2208              i++, loi++, lov_lockhp++) {
2209                 struct ldlm_lock *lock;
2210                 struct obd_device *obd;
2211                 unsigned long irqflags;
2212
2213                 lock = ldlm_handle2lock(lov_lockhp);
2214                 if (lock == NULL) {
2215                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
2216                                loi->loi_ost_idx, loi->loi_id);
2217                         queues[i].completed = 1;
2218                         continue;
2219                 }
2220
2221                 queues[i].lock = lock;
2222                 init_waitqueue_entry(&(queues[i].wait), current);
2223                 add_wait_queue(lock->l_waitq, &(queues[i].wait));
2224
2225                 obd = class_exp2obd(lock->l_conn_export);
2226                 if (obd != NULL)
2227                         imp = obd->u.cli.cl_import;
2228                 if (imp != NULL) {
2229                         spin_lock_irqsave(&imp->imp_lock, irqflags);
2230                         queues[i].generation = imp->imp_generation;
2231                         spin_unlock_irqrestore(&imp->imp_lock, irqflags);
2232                 }
2233         }
2234
2235         lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
2236                                interrupted_completion_wait, &lwd);
2237         rc = l_wait_event_added(check_multi_complete(queues, lsm), &lwi);
2238
2239         for (i = 0; i < lsm->lsm_stripe_count; i++)
2240                 remove_wait_queue(lock->l_waitq, &(queues[i].wait));
2241
2242         if (rc == -EINTR || rc == -ETIMEDOUT) {
2243
2244
2245         }
2246
2247  out:
2248         if (lov_lockh != NULL)
2249                 lov_llh_put(lov_lockh);
2250         RETURN(rc);
2251 }
2252 #endif
2253
2254 struct obd_ops lov_obd_ops = {
2255         .o_owner               = THIS_MODULE,
2256         .o_attach              = lov_attach,
2257         .o_detach              = lov_detach,
2258         .o_setup               = lov_setup,
2259         .o_cleanup             = lov_cleanup,
2260         .o_process_config      = lov_process_config,
2261         .o_connect             = lov_connect,
2262         .o_disconnect          = lov_disconnect,
2263         .o_statfs              = lov_statfs,
2264         .o_packmd              = lov_packmd,
2265         .o_unpackmd            = lov_unpackmd,
2266         .o_revalidate_md       = lov_revalidate_md,
2267         .o_create              = lov_create,
2268         .o_destroy             = lov_destroy,
2269         .o_getattr             = lov_getattr,
2270         .o_getattr_async       = lov_getattr_async,
2271         .o_setattr             = lov_setattr,
2272         .o_brw                 = lov_brw,
2273         .o_brw_async           = lov_brw_async,
2274         .o_prep_async_page     = lov_prep_async_page,
2275         .o_queue_async_io      = lov_queue_async_io,
2276         .o_set_async_flags     = lov_set_async_flags,
2277         .o_queue_group_io      = lov_queue_group_io,
2278         .o_trigger_group_io    = lov_trigger_group_io,
2279         .o_teardown_async_page = lov_teardown_async_page,
2280         .o_adjust_kms          = lov_adjust_kms,
2281         .o_punch               = lov_punch,
2282         .o_sync                = lov_sync,
2283         .o_enqueue             = lov_enqueue,
2284         .o_match               = lov_match,
2285         .o_change_cbdata       = lov_change_cbdata,
2286         .o_cancel              = lov_cancel,
2287         .o_cancel_unused       = lov_cancel_unused,
2288         .o_iocontrol           = lov_iocontrol,
2289         .o_get_info            = lov_get_info,
2290         .o_set_info            = lov_set_info,
2291         .o_llog_init           = lov_llog_init,
2292         .o_llog_finish         = lov_llog_finish,
2293         .o_notify              = lov_notify,
2294 };
2295
2296 int __init lov_init(void)
2297 {
2298         struct lprocfs_static_vars lvars;
2299         int rc;
2300         ENTRY;
2301
2302         lprocfs_init_vars(lov, &lvars);
2303         rc = class_register_type(&lov_obd_ops, NULL, lvars.module_vars,
2304                                  OBD_LOV_DEVICENAME);
2305         RETURN(rc);
2306 }
2307
2308 #ifdef __KERNEL__
2309 static void /*__exit*/ lov_exit(void)
2310 {
2311         class_unregister_type(OBD_LOV_DEVICENAME);
2312 }
2313
2314 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2315 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
2316 MODULE_LICENSE("GPL");
2317
2318 module_init(lov_init);
2319 module_exit(lov_exit);
2320 #endif