Whamcloud - gitweb
mgs_llog_read_header
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002-2004 Cluster File Systems, Inc.
5  * Author: Phil Schwan <phil@clusterfs.com>
6  *         Peter Braam <braam@clusterfs.com>
7  *         Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of the Lustre file system, http://www.lustre.org
10  *   Lustre is a trademark of Cluster File Systems, Inc.
11  *
12  *   You may have signed or agreed to another license before downloading
13  *   this software.  If so, you are bound by the terms and conditions
14  *   of that agreement, and the following does not apply to you.  See the
15  *   LICENSE file included with this distribution for more information.
16  *
17  *   If you did not agree to a different license, then this copy of Lustre
18  *   is open source software; you can redistribute it and/or modify it
19  *   under the terms of version 2 of the GNU General Public License as
20  *   published by the Free Software Foundation.
21  *
22  *   In either case, Lustre is distributed in the hope that it will be
23  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
24  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25  *   license text for more details.
26  */
27
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_LOV
32 #ifdef __KERNEL__
33 #include <linux/slab.h>
34 #include <linux/module.h>
35 #include <linux/init.h>
36 #include <linux/slab.h>
37 #include <linux/pagemap.h>
38 #include <linux/seq_file.h>
39 #include <asm/div64.h>
40 #else
41 #include <liblustre.h>
42 #endif
43
44 #include <linux/obd_support.h>
45 #include <linux/lustre_lib.h>
46 #include <linux/lustre_net.h>
47 #include <linux/lustre_idl.h>
48 #include <linux/lustre_dlm.h>
49 #include <linux/lustre_mds.h>
50 #include <linux/lustre_debug.h>
51 #include <linux/obd_class.h>
52 #include <linux/obd_lov.h>
53 #include <linux/obd_ost.h>
54 #include <linux/lprocfs_status.h>
55
56 #include "lov_internal.h"
57
58 /* obd methods */
59 #define MAX_STRING_SIZE 128
60 static int lov_connect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt,
61                            int activate, struct obd_connect_data *data)
62 {
63         struct lov_obd *lov = &obd->u.lov;
64         struct obd_uuid *tgt_uuid = &tgt->uuid;
65         struct obd_device *tgt_obd;
66         struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
67         struct lustre_handle conn = {0, };
68         struct obd_import *imp;
69 #ifdef __KERNEL__
70         struct proc_dir_entry *lov_proc_dir;
71 #endif
72         int rc;
73         ENTRY;
74
75         tgt_obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
76                                         &obd->obd_uuid);
77
78         if (!tgt_obd) {
79                 CERROR("Target %s not attached\n", tgt_uuid->uuid);
80                 RETURN(-EINVAL);
81         }
82
83         if (!tgt_obd->obd_set_up) {
84                 CERROR("Target %s not set up\n", tgt_uuid->uuid);
85                 RETURN(-EINVAL);
86         }
87
88         if (activate) {
89                 tgt_obd->obd_no_recov = 0;
90                 ptlrpc_activate_import(tgt_obd->u.cli.cl_import);
91         }
92
93         /*
94          * Divine LOV knows that OBDs under it are OSCs.
95          */
96         imp = tgt_obd->u.cli.cl_import;
97
98         if (imp->imp_invalid) {
99                 CERROR("not connecting OSC %s; administratively "
100                        "disabled\n", tgt_uuid->uuid);
101                 rc = obd_register_observer(tgt_obd, obd);
102                 if (rc) {
103                         CERROR("Target %s register_observer error %d; "
104                                "will not be able to reactivate\n",
105                                tgt_uuid->uuid, rc);
106                 }
107                 RETURN(0);
108         }
109
110         rc = obd_connect(&conn, tgt_obd, &lov_osc_uuid, data);
111         if (rc) {
112                 CERROR("Target %s connect error %d\n", tgt_uuid->uuid, rc);
113                 RETURN(rc);
114         }
115         tgt->ltd_exp = class_conn2export(&conn);
116
117         rc = obd_register_observer(tgt_obd, obd);
118         if (rc) {
119                 CERROR("Target %s register_observer error %d\n",
120                        tgt_uuid->uuid, rc);
121                 obd_disconnect(tgt->ltd_exp);
122                 tgt->ltd_exp = NULL;
123                 RETURN(rc);
124         }
125
126         tgt->active = 1;
127         lov->desc.ld_active_tgt_count++;
128
129 #ifdef __KERNEL__
130         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
131         if (lov_proc_dir) {
132                 struct obd_device *osc_obd = class_conn2obd(&conn);
133                 struct proc_dir_entry *osc_symlink;
134                 char name[MAX_STRING_SIZE];
135
136                 LASSERT(osc_obd != NULL);
137                 LASSERT(osc_obd->obd_type != NULL);
138                 LASSERT(osc_obd->obd_type->typ_name != NULL);
139                 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
140                          osc_obd->obd_type->typ_name,
141                          osc_obd->obd_name);
142                 osc_symlink = proc_symlink(osc_obd->obd_name, lov_proc_dir,
143                                            name);
144                 if (osc_symlink == NULL) {
145                         CERROR("could not register LOV target "
146                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
147                                obd->obd_type->typ_name, obd->obd_name,
148                                osc_obd->obd_name);
149                         lprocfs_remove(lov_proc_dir);
150                         lov_proc_dir = NULL;
151                 }
152         }
153 #endif
154
155         RETURN(0);
156 }
157
158 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
159                        struct obd_uuid *cluuid, struct obd_connect_data *data)
160 {
161         struct lov_obd *lov = &obd->u.lov;
162         struct lov_tgt_desc *tgt;
163         struct obd_export *exp;
164         __u64 connect_flags = data ? data->ocd_connect_flags : 0;
165         int rc, rc2, i;
166         ENTRY;
167
168         rc = class_connect(conn, obd, cluuid);
169         if (rc)
170                 RETURN(rc);
171
172         exp = class_conn2export(conn);
173
174         /* We don't want to actually do the underlying connections more than
175          * once, so keep track. */
176         lov->refcount++;
177         if (lov->refcount > 1) {
178                 class_export_put(exp);
179                 RETURN(0);
180         }
181
182         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
183                 if (obd_uuid_empty(&tgt->uuid))
184                         continue;
185                 rc = lov_connect_obd(obd, tgt, 0, data);
186                 if (rc)
187                         GOTO(out_disc, rc);
188                 if (data)
189                         connect_flags &= data->ocd_connect_flags;
190         }
191
192         if (data)
193                 data->ocd_connect_flags = connect_flags;
194
195         class_export_put(exp);
196         RETURN (0);
197
198  out_disc:
199         while (i-- > 0) {
200                 struct obd_uuid uuid;
201                 --tgt;
202                 --lov->desc.ld_active_tgt_count;
203                 tgt->active = 0;
204                 /* save for CERROR below; (we know it's terminated) */
205                 uuid = tgt->uuid;
206                 rc2 = obd_disconnect(tgt->ltd_exp);
207                 if (rc2)
208                         CERROR("error: LOV target %s disconnect on OST idx %d: "
209                                "rc = %d\n", uuid.uuid, i, rc2);
210         }
211         class_disconnect(exp);
212         RETURN (rc);
213 }
214
215 static int lov_disconnect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt)
216 {
217         struct proc_dir_entry *lov_proc_dir;
218         struct obd_device *osc_obd = class_exp2obd(tgt->ltd_exp);
219         struct lov_obd *lov = &obd->u.lov;
220         int rc;
221         ENTRY;
222
223         CDEBUG(D_CONFIG, "Disconnecting lov target %s\n", obd->obd_uuid.uuid);
224
225         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
226         if (lov_proc_dir) {
227                 struct proc_dir_entry *osc_symlink;
228
229                 osc_symlink = lprocfs_srch(lov_proc_dir, osc_obd->obd_name);
230                 if (osc_symlink) {
231                         lprocfs_remove(osc_symlink);
232                 } else {
233                         CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing.",
234                                obd->obd_type->typ_name, obd->obd_name,
235                                osc_obd->obd_name);
236                 }
237         }
238
239         if (obd->obd_no_recov) {
240                 /* Pass it on to our clients.
241                  * XXX This should be an argument to disconnect,
242                  * XXX not a back-door flag on the OBD.  Ah well.
243                  */
244                 if (osc_obd)
245                         osc_obd->obd_no_recov = 1;
246         }
247
248         obd_register_observer(osc_obd, NULL);
249
250         rc = obd_disconnect(tgt->ltd_exp);
251         if (rc) {
252                 if (tgt->active) {
253                         CERROR("Target %s disconnect error %d\n",
254                                tgt->uuid.uuid, rc);
255                 }
256                 rc = 0;
257         }
258
259         if (tgt->active) {
260                 tgt->active = 0;
261                 lov->desc.ld_active_tgt_count--;
262         }
263
264         tgt->ltd_exp = NULL;
265         RETURN(0);
266 }
267
268 static int
269 lov_del_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen);
270
271 static int lov_disconnect(struct obd_export *exp)
272 {
273         struct obd_device *obd = class_exp2obd(exp);
274         struct obd_device *osc_obd;
275         struct lov_obd *lov = &obd->u.lov;
276         struct lov_tgt_desc *tgt;
277         int rc, i;
278         ENTRY;
279
280         rc = class_disconnect(exp);
281
282         if (!lov->tgts)
283                 RETURN(rc);
284
285         /* Only disconnect the underlying layers on the final disconnect. */
286         lov->refcount--;
287         if (lov->refcount != 0)
288                 RETURN(rc);
289
290         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
291                 if (tgt->ltd_exp) {
292                         osc_obd = class_exp2obd(tgt->ltd_exp);
293                         /* Disconnect and delete from list */
294                         lov_del_obd(obd, &tgt->uuid, i, tgt->ltd_gen);
295                         /* Cleanup the osc now - can't do it from 
296                            lov_cleanup because we just lost our only reference
297                            to it. */ 
298                         /* Use lov's force/fail flags. */
299                         osc_obd->obd_force = obd->obd_force;
300                         osc_obd->obd_fail = obd->obd_fail;
301                         class_manual_cleanup(osc_obd);
302                 }
303         }
304
305         RETURN(rc);
306 }
307
308 /* Error codes:
309  *
310  *  -EINVAL  : UUID can't be found in the LOV's target list
311  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
312  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
313  */
314 static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
315                               int activate)
316 {
317         struct lov_tgt_desc *tgt;
318         int i, rc = 0;
319         ENTRY;
320
321         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
322                lov, uuid->uuid, activate);
323
324         spin_lock(&lov->lov_lock);
325         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
326                 if (tgt->ltd_exp == NULL)
327                         continue;
328
329                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
330                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
331                 if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof uuid->uuid) == 0)
332                         break;
333         }
334
335         if (i == lov->desc.ld_tgt_count)
336                 GOTO(out, rc = -EINVAL);
337
338         if (tgt->active == activate) {
339                 CDEBUG(D_INFO, "OSC %s already %sactive!\n", uuid->uuid,
340                        activate ? "" : "in");
341                 GOTO(out, rc);
342         }
343
344         CDEBUG(D_INFO, "Marking OSC %s %sactive\n", uuid->uuid,
345                activate ? "" : "in");
346
347         tgt->active = activate;
348         if (activate)
349                 lov->desc.ld_active_tgt_count++;
350         else
351                 lov->desc.ld_active_tgt_count--;
352
353         EXIT;
354  out:
355         spin_unlock(&lov->lov_lock);
356         return rc;
357 }
358
359 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
360                        int active)
361 {
362         int rc;
363         struct obd_uuid *uuid;
364
365         if (strcmp(watched->obd_type->typ_name, "osc")) {
366                 CERROR("unexpected notification of %s %s!\n",
367                        watched->obd_type->typ_name,
368                        watched->obd_name);
369                 return -EINVAL;
370         }
371         uuid = &watched->u.cli.cl_import->imp_target_uuid;
372
373         /* Set OSC as active before notifying the observer, so the
374          * observer can use the OSC normally.
375          */
376         rc = lov_set_osc_active(&obd->u.lov, uuid, active);
377         if (rc) {
378                 CERROR("%sactivation of %s failed: %d\n",
379                        active ? "" : "de", uuid->uuid, rc);
380                 RETURN(rc);
381         }
382
383         if (obd->obd_observer)
384                 /* Pass the notification up the chain. */
385                 rc = obd_notify(obd->obd_observer, watched, active);
386
387         RETURN(rc);
388 }
389
390 static int
391 lov_add_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
392 {
393         struct lov_obd *lov = &obd->u.lov;
394         struct lov_tgt_desc *tgt;
395         struct obd_export *exp_observer;
396         __u32 bufsize;
397         __u32 size = 2;
398         obd_id params[2];
399         int rc, old_count;
400         ENTRY;
401
402         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
403                uuidp->uuid, index, gen);
404
405         if (index < 0) {
406                 CERROR("request to add OBD %s at invalid index: %d\n",
407                        uuidp->uuid, index);
408                 RETURN(-EINVAL);
409         }
410
411         if (gen <= 0) {
412                 CERROR("request to add OBD %s with invalid generation: %d\n",
413                        uuidp->uuid, gen);
414                 RETURN(-EINVAL);
415         }
416
417         bufsize = sizeof(struct lov_tgt_desc) * (index + 1);
418         if (bufsize > lov->bufsize) {
419                 OBD_ALLOC(tgt, bufsize);
420                 if (tgt == NULL) {
421                         CERROR("couldn't allocate %d bytes for new table.\n",
422                                bufsize);
423                         RETURN(-ENOMEM);
424                 }
425
426                 memset(tgt, 0, bufsize);
427                 if (lov->tgts) {
428                         memcpy(tgt, lov->tgts, lov->bufsize);
429                         OBD_FREE(lov->tgts, lov->bufsize);
430                 }
431
432                 lov->tgts = tgt;
433                 lov->bufsize = bufsize;
434                 CDEBUG(D_CONFIG, "tgts: %p bufsize: %d\n",
435                        lov->tgts, lov->bufsize);
436         }
437
438         tgt = &lov->tgts[index];
439         if (!obd_uuid_empty(&tgt->uuid)) {
440                 CERROR("OBD already assigned at LOV target index %d\n",
441                        index);
442                 RETURN(-EEXIST);
443         }
444
445         tgt->uuid = *uuidp;
446         /* XXX - add a sanity check on the generation number. */
447         tgt->ltd_gen = gen;
448
449         old_count = lov->desc.ld_tgt_count;
450         if (index >= lov->desc.ld_tgt_count)
451                 lov->desc.ld_tgt_count = index + 1;
452
453         CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n",
454                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
455
456         if (lov->refcount == 0)
457                 /* lov_connect hasn't been called yet. So we'll do the
458                    lov_connect_obd on this obd when that fn first runs. */
459                 RETURN(0);
460
461         if (tgt->ltd_exp) {
462                 struct obd_device *osc_obd;
463
464                 osc_obd = class_exp2obd(tgt->ltd_exp);
465                 if (osc_obd)
466                         osc_obd->obd_no_recov = 0;
467         }
468
469         /* NULL may need to change when we use flags for osc's */
470         rc = lov_connect_obd(obd, tgt, 1, NULL);
471         if (rc || !obd->obd_observer)
472                 RETURN(rc);
473
474         /* tell the mds_lov about the new target */
475         obd_llog_finish(obd->obd_observer, old_count);
476         llog_cat_initialize(obd->obd_observer, lov->desc.ld_tgt_count);
477
478         params[0] = index;
479         rc = obd_get_info(tgt->ltd_exp, strlen("last_id"), "last_id", &size,
480                           &params[1]);
481         if (rc)
482                 GOTO(out, rc);
483
484         exp_observer = obd->obd_observer->obd_self_export;
485         rc = obd_set_info(exp_observer, strlen("next_id"),"next_id", 2, params);
486         if (rc)
487                 GOTO(out, rc);
488
489         rc = lov_notify(obd, tgt->ltd_exp->exp_obd, 1);
490         GOTO(out, rc);
491  out:
492         if (rc && tgt->ltd_exp != NULL)
493                 lov_disconnect_obd(obd, tgt);
494         return rc;
495 }
496
497 static int
498 lov_del_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
499 {
500         struct lov_obd *lov = &obd->u.lov;
501         struct lov_tgt_desc *tgt;
502         int count = lov->desc.ld_tgt_count;
503         int rc = 0;
504         ENTRY;
505
506         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
507                uuidp->uuid, index, gen);
508
509         if (index >= count) {
510                 CERROR("LOV target index %d >= number of LOV OBDs %d.\n",
511                        index, count);
512                 RETURN(-EINVAL);
513         }
514
515         tgt = &lov->tgts[index];
516
517         if (obd_uuid_empty(&tgt->uuid)) {
518                 CERROR("LOV target at index %d is not setup.\n", index);
519                 RETURN(-EINVAL);
520         }
521
522         if (strncmp(uuidp->uuid, tgt->uuid.uuid, sizeof uuidp->uuid) != 0) {
523                 CERROR("LOV target UUID %s at index %d doesn't match %s.\n",
524                        tgt->uuid.uuid, index, uuidp->uuid);
525                 RETURN(-EINVAL);
526         }
527
528         if (tgt->ltd_exp)
529                 lov_disconnect_obd(obd, tgt);
530
531         /* XXX - right now there is a dependency on ld_tgt_count being the
532          * maximum tgt index for computing the mds_max_easize. So we can't
533          * shrink it. */
534
535         /* lt_gen = 0 will mean it will not match the gen of any valid loi */
536         memset(tgt, 0, sizeof(*tgt));
537
538         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
539                tgt->uuid.uuid, index, tgt->ltd_gen, tgt->ltd_exp, tgt->active);
540
541         RETURN(rc);
542 }
543
544 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
545 {
546         struct lprocfs_static_vars lvars;
547         struct lustre_cfg *lcfg = buf;
548         struct lov_desc *desc;
549         struct lov_obd *lov = &obd->u.lov;
550         int count;
551         ENTRY;
552
553         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
554                 CERROR("LOV setup requires a descriptor\n");
555                 RETURN(-EINVAL);
556         }
557
558         desc = (struct lov_desc *)lustre_cfg_buf(lcfg, 1);
559
560         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
561                 CERROR("descriptor size wrong: %d > %d\n",
562                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
563                 RETURN(-EINVAL);
564         }
565
566         if (desc->ld_magic != LOV_DESC_MAGIC) {
567                 if (desc->ld_magic == __swab32(LOV_DESC_MAGIC)) {
568                             CDEBUG(D_OTHER, "%s: Swabbing lov desc %p\n",
569                                    obd->obd_name, desc);
570                             lustre_swab_lov_desc(desc);
571                 } else {
572                         CERROR("%s: Bad lov desc magic: %#x\n",
573                                obd->obd_name, desc->ld_magic);
574                         RETURN(-EINVAL);
575                 }
576         }
577
578         if (desc->ld_default_stripe_size < PTLRPC_MAX_BRW_SIZE) {
579                 CWARN("Increasing default_stripe_size "LPU64" to %u\n",
580                       desc->ld_default_stripe_size, PTLRPC_MAX_BRW_SIZE);
581                 CWARN("Please update config and run --write-conf on MDS\n");
582
583                 desc->ld_default_stripe_size = PTLRPC_MAX_BRW_SIZE;
584         } else if (desc->ld_default_stripe_size & (LOV_MIN_STRIPE_SIZE - 1)) {
585                 CWARN("default_stripe_size "LPU64" isn't a multiple of %u\n",
586                       desc->ld_default_stripe_size, LOV_MIN_STRIPE_SIZE);
587                 CWARN("Please update config and run --write-conf on MDS\n");
588
589                 desc->ld_default_stripe_size &= ~(LOV_MIN_STRIPE_SIZE - 1);
590        }
591
592         if (desc->ld_default_stripe_count == 0)
593                 desc->ld_default_stripe_count = 1;
594
595         /* Because of 64-bit divide/mod operations only work with a 32-bit
596          * divisor in a 32-bit kernel, we cannot support a stripe width
597          * of 4GB or larger on 32-bit CPUs. */
598         count = desc->ld_default_stripe_count;
599         if ((count > 0 ? count : desc->ld_tgt_count) *
600             desc->ld_default_stripe_size > ~0UL) {
601                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
602                        desc->ld_default_stripe_size, count, ~0UL);
603                 RETURN(-EINVAL);
604         }
605
606         /* Allocate space for target list */
607         if (desc->ld_tgt_count)
608                 count = desc->ld_tgt_count;
609         lov->bufsize = sizeof(struct lov_tgt_desc) * max(count, 1);
610         OBD_ALLOC(lov->tgts, lov->bufsize);
611         if (lov->tgts == NULL) {
612                 CERROR("Out of memory\n");
613                 RETURN(-EINVAL);
614         }
615         memset(lov->tgts, 0, lov->bufsize);
616
617         desc->ld_active_tgt_count = 0;
618         lov->desc = *desc;
619         spin_lock_init(&lov->lov_lock);
620
621         lprocfs_init_vars(lov, &lvars);
622         lprocfs_obd_setup(obd, lvars.obd_vars);
623 #ifdef LPROCFS
624         {
625                 struct proc_dir_entry *entry;
626
627                 entry = create_proc_entry("target_obd", 0444,
628                                           obd->obd_proc_entry);
629                 if (entry != NULL) {
630                         entry->proc_fops = &lov_proc_target_fops;
631                         entry->data = obd;
632                 }
633         }
634 #endif
635
636         RETURN(0);
637 }
638
639 static int lov_precleanup(struct obd_device *obd, int stage)
640 {
641         int rc = 0;
642         ENTRY;
643
644         if (stage < 2)
645                 RETURN(0);
646
647         rc = obd_llog_finish(obd, 0);
648         if (rc != 0)
649                 CERROR("failed to cleanup llogging subsystems\n");
650
651         RETURN(rc);
652 }
653
654 static int lov_cleanup(struct obd_device *obd)
655 {
656         struct lov_obd *lov = &obd->u.lov;
657
658         lprocfs_obd_cleanup(obd);
659         if (lov->tgts) {
660                 int i;
661                 struct lov_tgt_desc *tgt;
662                 for (i = 0, tgt = lov->tgts;
663                       i < lov->desc.ld_tgt_count; i++, tgt++) {
664                         if (!obd_uuid_empty(&tgt->uuid))
665                                 lov_del_obd(obd, &tgt->uuid, i, 0);
666                 }
667                 OBD_FREE(lov->tgts, lov->bufsize);
668         }
669         RETURN(0);
670 }
671
672 static int lov_process_config(struct obd_device *obd, obd_count len, void *buf)
673 {
674         struct lustre_cfg *lcfg = buf;
675         struct obd_uuid obd_uuid;
676         int cmd;
677         int index;
678         int gen;
679         int rc = 0;
680         ENTRY;
681
682         switch(cmd = lcfg->lcfg_command) {
683         case LCFG_LOV_ADD_OBD:
684         case LCFG_LOV_DEL_OBD: {
685                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
686                         GOTO(out, rc = -EINVAL);
687
688                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
689
690                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
691                         GOTO(out, rc = -EINVAL);
692                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
693                         GOTO(out, rc = -EINVAL);
694                 if (cmd == LCFG_LOV_ADD_OBD)
695                         rc = lov_add_obd(obd, &obd_uuid, index, gen);
696                 else
697                         rc = lov_del_obd(obd, &obd_uuid, index, gen);
698                 GOTO(out, rc);
699         }
700         default: {
701                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
702                 GOTO(out, rc = -EINVAL);
703
704         }
705         }
706 out:
707         RETURN(rc);
708 }
709
710 #ifndef log2
711 #define log2(n) ffz(~(n))
712 #endif
713
714 static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
715                              struct lov_stripe_md **ea,
716                              struct obd_trans_info *oti)
717 {
718         struct lov_obd *lov;
719         struct obdo *tmp_oa;
720         struct obd_uuid *ost_uuid = NULL;
721         int rc = 0, i;
722         ENTRY;
723
724         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
725                 src_oa->o_flags == OBD_FL_DELORPHAN);
726
727         lov = &export->exp_obd->u.lov;
728
729         tmp_oa = obdo_alloc();
730         if (tmp_oa == NULL)
731                 RETURN(-ENOMEM);
732
733         if (src_oa->o_valid & OBD_MD_FLINLINE) {
734                 ost_uuid = (struct obd_uuid *)src_oa->o_inline;
735                 CDEBUG(D_HA, "clearing orphans only for %s\n",
736                        ost_uuid->uuid);
737         }
738
739         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
740                 struct lov_stripe_md obj_md;
741                 struct lov_stripe_md *obj_mdp = &obj_md;
742                 int err;
743
744                 /* if called for a specific target, we don't
745                    care if it is not active. */
746                 if (lov->tgts[i].active == 0 && ost_uuid == NULL) {
747                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
748                         continue;
749                 }
750
751                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &lov->tgts[i].uuid))
752                         continue;
753
754                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
755
756                 LASSERT(lov->tgts[i].ltd_exp);
757                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
758                 err = obd_create(lov->tgts[i].ltd_exp, tmp_oa, &obj_mdp, oti);
759                 if (err)
760                         /* This export will be disabled until it is recovered,
761                            and then orphan recovery will be completed. */
762                         CERROR("error in orphan recovery on OST idx %d/%d: "
763                                "rc = %d\n", i, lov->desc.ld_tgt_count, err);
764
765                 if (ost_uuid)
766                         break;
767         }
768         obdo_free(tmp_oa);
769         RETURN(rc);
770 }
771
772 static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
773                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
774 {
775         struct lov_stripe_md *obj_mdp, *lsm;
776         struct lov_obd *lov = &exp->exp_obd->u.lov;
777         unsigned ost_idx;
778         int rc, i;
779         ENTRY;
780
781         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
782                 src_oa->o_flags & OBD_FL_RECREATE_OBJS);
783
784         OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
785         if (obj_mdp == NULL)
786                 RETURN(-ENOMEM);
787
788         ost_idx = src_oa->o_nlink;
789         lsm = *ea;
790         if (lsm == NULL)
791                 GOTO(out, rc = -EINVAL);
792         if (ost_idx >= lov->desc.ld_tgt_count)
793                 GOTO(out, rc = -EINVAL);
794
795         for (i = 0; i < lsm->lsm_stripe_count; i++) {
796                 if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) {
797                         if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id)
798                                 GOTO(out, rc = -EINVAL);
799                         break;
800                 }
801         }
802         if (i == lsm->lsm_stripe_count)
803                 GOTO(out, rc = -EINVAL);
804
805         rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, &obj_mdp, oti);
806 out:
807         OBD_FREE(obj_mdp, sizeof(*obj_mdp));
808         RETURN(rc);
809 }
810
811 /* the LOV expects oa->o_id to be set to the LOV object id */
812 static int lov_create(struct obd_export *exp, struct obdo *src_oa,
813                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
814 {
815         struct lov_obd *lov;
816         struct lov_request_set *set = NULL;
817         struct list_head *pos;
818         int rc = 0;
819         ENTRY;
820
821         LASSERT(ea != NULL);
822         if (exp == NULL)
823                 RETURN(-EINVAL);
824
825         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
826             src_oa->o_flags == OBD_FL_DELORPHAN) {
827                 rc = lov_clear_orphans(exp, src_oa, ea, oti);
828                 RETURN(rc);
829         }
830
831         lov = &exp->exp_obd->u.lov;
832         if (!lov->desc.ld_active_tgt_count)
833                 RETURN(-EIO);
834
835         /* Recreate a specific object id at the given OST index */
836         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
837             (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
838                  rc = lov_recreate(exp, src_oa, ea, oti);
839                  RETURN(rc);
840         }
841
842         rc = lov_prep_create_set(exp, ea, src_oa, oti, &set);
843         if (rc)
844                 RETURN(rc);
845
846         list_for_each (pos, &set->set_list) {
847                 struct lov_request *req =
848                         list_entry(pos, struct lov_request, rq_link);
849
850                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
851                 rc = obd_create(lov->tgts[req->rq_idx].ltd_exp,
852                                 req->rq_oa, &req->rq_md, oti);
853                 lov_update_create_set(set, req, rc);
854         }
855         rc = lov_fini_create_set(set, ea);
856         RETURN(rc);
857 }
858
859 #define ASSERT_LSM_MAGIC(lsmp)                                          \
860 do {                                                                    \
861         LASSERT((lsmp) != NULL);                                        \
862         LASSERTF((lsmp)->lsm_magic == LOV_MAGIC, "%p->lsm_magic=%x\n",  \
863                  (lsmp), (lsmp)->lsm_magic);                            \
864 } while (0)
865
866 static int lov_destroy(struct obd_export *exp, struct obdo *oa,
867                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
868 {
869         struct lov_request_set *set;
870         struct lov_request *req;
871         struct list_head *pos;
872         struct lov_obd *lov;
873         int rc = 0;
874         ENTRY;
875
876         ASSERT_LSM_MAGIC(lsm);
877
878         if (!exp || !exp->exp_obd)
879                 RETURN(-ENODEV);
880
881         lov = &exp->exp_obd->u.lov;
882         rc = lov_prep_destroy_set(exp, oa, lsm, oti, &set);
883         if (rc)
884                 RETURN(rc);
885
886         list_for_each (pos, &set->set_list) {
887                 int err;
888                 req = list_entry(pos, struct lov_request, rq_link);
889
890                 /* XXX update the cookie position */
891                 oti->oti_logcookies = set->set_cookies + req->rq_stripe;
892                 rc = obd_destroy(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
893                                  NULL, oti);
894                 err = lov_update_common_set(set, req, rc);
895                 if (rc) {
896                         CERROR("error: destroying objid "LPX64" subobj "
897                                LPX64" on OST idx %d: rc = %d\n",
898                                set->set_oa->o_id, req->rq_oa->o_id,
899                                req->rq_idx, rc);
900                         if (!rc)
901                                 rc = err;
902                 }
903         }
904         lov_fini_destroy_set(set);
905         RETURN(rc);
906 }
907
908 static int lov_getattr(struct obd_export *exp, struct obdo *oa,
909                        struct lov_stripe_md *lsm)
910 {
911         struct lov_request_set *set;
912         struct lov_request *req;
913         struct list_head *pos;
914         struct lov_obd *lov;
915         int err = 0, rc = 0;
916         ENTRY;
917
918         ASSERT_LSM_MAGIC(lsm);
919
920         if (!exp || !exp->exp_obd)
921                 RETURN(-ENODEV);
922
923         lov = &exp->exp_obd->u.lov;
924
925         rc = lov_prep_getattr_set(exp, oa, lsm, &set);
926         if (rc)
927                 RETURN(rc);
928
929         list_for_each (pos, &set->set_list) {
930                 req = list_entry(pos, struct lov_request, rq_link);
931
932                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
933                        "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id,
934                        req->rq_idx);
935
936                 rc = obd_getattr(lov->tgts[req->rq_idx].ltd_exp,
937                                  req->rq_oa, NULL);
938                 err = lov_update_common_set(set, req, rc);
939                 if (err) {
940                         CERROR("error: getattr objid "LPX64" subobj "
941                                LPX64" on OST idx %d: rc = %d\n",
942                                set->set_oa->o_id, req->rq_oa->o_id,
943                                req->rq_idx, err);
944                         break;
945                 }
946         }
947
948         rc = lov_fini_getattr_set(set);
949         if (err)
950                 rc = err;
951         RETURN(rc);
952 }
953
954 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data,
955                                  int rc)
956 {
957         struct lov_request_set *lovset = (struct lov_request_set *)data;
958         ENTRY;
959
960         /* don't do attribute merge if this aysnc op failed */
961         if (rc) {
962                 lovset->set_completes = 0;
963                 lov_fini_getattr_set(lovset);
964         } else {
965                 rc = lov_fini_getattr_set(lovset);
966         }
967         RETURN (rc);
968 }
969
970 static int lov_getattr_async(struct obd_export *exp, struct obdo *oa,
971                               struct lov_stripe_md *lsm,
972                               struct ptlrpc_request_set *rqset)
973 {
974         struct lov_request_set *lovset;
975         struct lov_obd *lov;
976         struct list_head *pos;
977         struct lov_request *req;
978         int rc = 0;
979         ENTRY;
980
981         ASSERT_LSM_MAGIC(lsm);
982
983         if (!exp || !exp->exp_obd)
984                 RETURN(-ENODEV);
985
986         lov = &exp->exp_obd->u.lov;
987
988         rc = lov_prep_getattr_set(exp, oa, lsm, &lovset);
989         if (rc)
990                 RETURN(rc);
991
992         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
993                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
994
995         list_for_each (pos, &lovset->set_list) {
996                 req = list_entry(pos, struct lov_request, rq_link);
997
998                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
999                        "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id,
1000                        req->rq_idx);
1001                 rc = obd_getattr_async(lov->tgts[req->rq_idx].ltd_exp,
1002                                        req->rq_oa, NULL, rqset);
1003                 if (rc) {
1004                         CERROR("error: getattr objid "LPX64" subobj "
1005                                LPX64" on OST idx %d: rc = %d\n",
1006                                lovset->set_oa->o_id, req->rq_oa->o_id,
1007                                req->rq_idx, rc);
1008                         GOTO(out, rc);
1009                 }
1010                 lov_update_common_set(lovset, req, rc);
1011         }
1012
1013         LASSERT(rc == 0);
1014         LASSERT (rqset->set_interpret == NULL);
1015         rqset->set_interpret = lov_getattr_interpret;
1016         rqset->set_arg = (void *)lovset;
1017         RETURN(rc);
1018 out:
1019         LASSERT(rc);
1020         lov_fini_getattr_set(lovset);
1021         RETURN(rc);
1022 }
1023
1024 static int lov_setattr(struct obd_export *exp, struct obdo *src_oa,
1025                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
1026 {
1027         struct lov_request_set *set;
1028         struct lov_obd *lov;
1029         struct list_head *pos;
1030         struct lov_request *req;
1031         int err = 0, rc = 0;
1032         ENTRY;
1033
1034         ASSERT_LSM_MAGIC(lsm);
1035
1036         if (!exp || !exp->exp_obd)
1037                 RETURN(-ENODEV);
1038
1039         /* for now, we only expect time updates here */
1040         LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE | OBD_MD_FLMODE|
1041                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
1042                                       OBD_MD_FLCTIME | OBD_MD_FLFLAGS |
1043                                       OBD_MD_FLSIZE)));
1044         lov = &exp->exp_obd->u.lov;
1045         rc = lov_prep_setattr_set(exp, src_oa, lsm, NULL, &set);
1046         if (rc)
1047                 RETURN(rc);
1048
1049         list_for_each (pos, &set->set_list) {
1050                 req = list_entry(pos, struct lov_request, rq_link);
1051
1052                 rc = obd_setattr(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
1053                                  NULL, NULL);
1054                 err = lov_update_setattr_set(set, req, rc);
1055                 if (err) {
1056                         CERROR("error: setattr objid "LPX64" subobj "
1057                                LPX64" on OST idx %d: rc = %d\n",
1058                                set->set_oa->o_id, req->rq_oa->o_id,
1059                                req->rq_idx, err);
1060                         if (!rc)
1061                                 rc = err;
1062                 }
1063         }
1064         err = lov_fini_setattr_set(set);
1065         if (!rc)
1066                 rc = err;
1067         RETURN(rc);
1068 }
1069
1070 static int lov_setattr_async(struct obd_export *exp, struct obdo *src_oa,
1071                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
1072 {
1073         struct lov_obd *lov;
1074         struct lov_oinfo *loi = NULL;
1075         int rc = 0, err;
1076         obd_id objid = src_oa->o_id;
1077         int i;
1078         ENTRY;
1079
1080         ASSERT_LSM_MAGIC(lsm);
1081         LASSERT(oti);
1082         if (src_oa->o_valid & OBD_MD_FLCOOKIE)
1083                 LASSERT(oti->oti_logcookies);
1084
1085         if (!exp || !exp->exp_obd)
1086                 RETURN(-ENODEV);
1087
1088         /* support OBD_MD_FLUID, OBD_MD_FLGID and OBD_MD_FLCOOKIE now */
1089         LASSERT(!(src_oa->o_valid &  ~(OBD_MD_FLID | OBD_MD_FLUID |
1090                                        OBD_MD_FLGID| OBD_MD_FLCOOKIE)));
1091         lov = &exp->exp_obd->u.lov;
1092
1093         loi = lsm->lsm_oinfo;
1094         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1095                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1096                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1097                         goto next;
1098                 }
1099
1100                 src_oa->o_id = loi->loi_id;
1101                 /* do chown/chgrp on OST asynchronously */
1102                 err = obd_setattr_async(lov->tgts[loi->loi_ost_idx].ltd_exp,
1103                                         src_oa, NULL, oti);
1104                 if (err) {
1105                         CERROR("error: setattr objid "LPX64" subobj "
1106                                LPX64" on OST idx %d: rc = %d\n",
1107                                objid, src_oa->o_id, i, err);
1108                         if (!rc)
1109                                 rc = err;
1110                 }
1111         next:
1112                 if (src_oa->o_valid & OBD_MD_FLCOOKIE)
1113                         oti->oti_logcookies++;
1114         }
1115
1116         RETURN(rc);
1117 }
1118
1119 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1120  * we can send this 'punch' to just the authoritative node and the nodes
1121  * that the punch will affect. */
1122 static int lov_punch(struct obd_export *exp, struct obdo *oa,
1123                      struct lov_stripe_md *lsm,
1124                      obd_off start, obd_off end, struct obd_trans_info *oti)
1125 {
1126         struct lov_request_set *set;
1127         struct lov_obd *lov;
1128         struct list_head *pos;
1129         struct lov_request *req;
1130         int err = 0, rc = 0;
1131         ENTRY;
1132
1133         ASSERT_LSM_MAGIC(lsm);
1134
1135         if (!exp || !exp->exp_obd)
1136                 RETURN(-ENODEV);
1137
1138         lov = &exp->exp_obd->u.lov;
1139         rc = lov_prep_punch_set(exp, oa, lsm, start, end, oti, &set);
1140         if (rc)
1141                 RETURN(rc);
1142
1143         list_for_each (pos, &set->set_list) {
1144                 req = list_entry(pos, struct lov_request, rq_link);
1145
1146                 rc = obd_punch(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
1147                                NULL, req->rq_extent.start,
1148                                req->rq_extent.end, NULL);
1149                 err = lov_update_punch_set(set, req, rc);
1150                 if (err) {
1151                         CERROR("error: punch objid "LPX64" subobj "LPX64
1152                                " on OST idx %d: rc = %d\n", set->set_oa->o_id,
1153                                req->rq_oa->o_id, req->rq_idx, rc);
1154                         if (!rc)
1155                                 rc = err;
1156                 }
1157         }
1158         err = lov_fini_punch_set(set);
1159         if (!rc)
1160                 rc = err;
1161         RETURN(rc);
1162 }
1163
1164 static int lov_sync(struct obd_export *exp, struct obdo *oa,
1165                     struct lov_stripe_md *lsm, obd_off start, obd_off end)
1166 {
1167         struct lov_request_set *set;
1168         struct lov_obd *lov;
1169         struct list_head *pos;
1170         struct lov_request *req;
1171         int err = 0, rc = 0;
1172         ENTRY;
1173
1174         ASSERT_LSM_MAGIC(lsm);
1175
1176         if (!exp->exp_obd)
1177                 RETURN(-ENODEV);
1178
1179         lov = &exp->exp_obd->u.lov;
1180         rc = lov_prep_sync_set(exp, oa, lsm, start, end, &set);
1181         if (rc)
1182                 RETURN(rc);
1183
1184         list_for_each (pos, &set->set_list) {
1185                 req = list_entry(pos, struct lov_request, rq_link);
1186
1187                 rc = obd_sync(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
1188                               NULL, req->rq_extent.start, req->rq_extent.end);
1189                 err = lov_update_common_set(set, req, rc);
1190                 if (err) {
1191                         CERROR("error: fsync objid "LPX64" subobj "LPX64
1192                                " on OST idx %d: rc = %d\n", set->set_oa->o_id,
1193                                req->rq_oa->o_id, req->rq_idx, rc);
1194                         if (!rc)
1195                                 rc = err;
1196                 }
1197         }
1198         err = lov_fini_sync_set(set);
1199         if (!rc)
1200                 rc = err;
1201         RETURN(rc);
1202 }
1203
1204 static int lov_brw_check(struct lov_obd *lov, struct obdo *oa,
1205                          struct lov_stripe_md *lsm,
1206                          obd_count oa_bufs, struct brw_page *pga)
1207 {
1208         int i, rc = 0;
1209
1210         /* The caller just wants to know if there's a chance that this
1211          * I/O can succeed */
1212         for (i = 0; i < oa_bufs; i++) {
1213                 int stripe = lov_stripe_number(lsm, pga[i].off);
1214                 int ost = lsm->lsm_oinfo[stripe].loi_ost_idx;
1215                 obd_off start, end;
1216
1217                 if (!lov_stripe_intersects(lsm, i, pga[i].off,
1218                                            pga[i].off + pga[i].count,
1219                                            &start, &end))
1220                         continue;
1221
1222                 if (lov->tgts[ost].active == 0) {
1223                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1224                         return -EIO;
1225                 }
1226                 rc = obd_brw(OBD_BRW_CHECK, lov->tgts[ost].ltd_exp, oa,
1227                              NULL, 1, &pga[i], NULL);
1228                 if (rc)
1229                         break;
1230         }
1231         return rc;
1232 }
1233
1234 static int lov_brw(int cmd, struct obd_export *exp, struct obdo *src_oa,
1235                    struct lov_stripe_md *lsm, obd_count oa_bufs,
1236                    struct brw_page *pga, struct obd_trans_info *oti)
1237 {
1238         struct lov_request_set *set;
1239         struct lov_request *req;
1240         struct list_head *pos;
1241         struct lov_obd *lov = &exp->exp_obd->u.lov;
1242         int err, rc = 0;
1243         ENTRY;
1244
1245         ASSERT_LSM_MAGIC(lsm);
1246
1247         if (cmd == OBD_BRW_CHECK) {
1248                 rc = lov_brw_check(lov, src_oa, lsm, oa_bufs, pga);
1249                 RETURN(rc);
1250         }
1251
1252         rc = lov_prep_brw_set(exp, src_oa, lsm, oa_bufs, pga, oti, &set);
1253         if (rc)
1254                 RETURN(rc);
1255
1256         list_for_each (pos, &set->set_list) {
1257                 struct obd_export *sub_exp;
1258                 struct brw_page *sub_pga;
1259                 req = list_entry(pos, struct lov_request, rq_link);
1260
1261                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
1262                 sub_pga = set->set_pga + req->rq_pgaidx;
1263                 rc = obd_brw(cmd, sub_exp, req->rq_oa, req->rq_md,
1264                              req->rq_oabufs, sub_pga, oti);
1265                 if (rc)
1266                         break;
1267                 lov_update_common_set(set, req, rc);
1268         }
1269
1270         err = lov_fini_brw_set(set);
1271         if (!rc)
1272                 rc = err;
1273         RETURN(rc);
1274 }
1275
1276 static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data,
1277                              int rc)
1278 {
1279         struct lov_request_set *lovset = (struct lov_request_set *)data;
1280         ENTRY;
1281
1282         if (rc) {
1283                 lovset->set_completes = 0;
1284                 lov_fini_brw_set(lovset);
1285         } else {
1286                 rc = lov_fini_brw_set(lovset);
1287         }
1288
1289         RETURN(rc);
1290 }
1291
1292 static int lov_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
1293                          struct lov_stripe_md *lsm, obd_count oa_bufs,
1294                          struct brw_page *pga, struct ptlrpc_request_set *set,
1295                          struct obd_trans_info *oti)
1296 {
1297         struct lov_request_set *lovset;
1298         struct lov_request *req;
1299         struct list_head *pos;
1300         struct lov_obd *lov = &exp->exp_obd->u.lov;
1301         int rc = 0;
1302         ENTRY;
1303
1304         ASSERT_LSM_MAGIC(lsm);
1305
1306         if (cmd == OBD_BRW_CHECK) {
1307                 rc = lov_brw_check(lov, oa, lsm, oa_bufs, pga);
1308                 RETURN(rc);
1309         }
1310
1311         rc = lov_prep_brw_set(exp, oa, lsm, oa_bufs, pga, oti, &lovset);
1312         if (rc)
1313                 RETURN(rc);
1314
1315         list_for_each (pos, &lovset->set_list) {
1316                 struct obd_export *sub_exp;
1317                 struct brw_page *sub_pga;
1318                 req = list_entry(pos, struct lov_request, rq_link);
1319
1320                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
1321                 sub_pga = lovset->set_pga + req->rq_pgaidx;
1322                 rc = obd_brw_async(cmd, sub_exp, req->rq_oa, req->rq_md,
1323                                    req->rq_oabufs, sub_pga, set, oti);
1324                 if (rc)
1325                         GOTO(out, rc);
1326                 lov_update_common_set(lovset, req, rc);
1327         }
1328         LASSERT(rc == 0);
1329         LASSERT(set->set_interpret == NULL);
1330         set->set_interpret = (set_interpreter_func)lov_brw_interpret;
1331         set->set_arg = (void *)lovset;
1332
1333         RETURN(rc);
1334 out:
1335         lov_fini_brw_set(lovset);
1336         RETURN(rc);
1337 }
1338
1339 static int lov_ap_make_ready(void *data, int cmd)
1340 {
1341         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1342
1343         return lap->lap_caller_ops->ap_make_ready(lap->lap_caller_data, cmd);
1344 }
1345 static int lov_ap_refresh_count(void *data, int cmd)
1346 {
1347         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1348
1349         return lap->lap_caller_ops->ap_refresh_count(lap->lap_caller_data,
1350                                                      cmd);
1351 }
1352 static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
1353 {
1354         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1355
1356         lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa);
1357         /* XXX woah, shouldn't we be altering more here?  size? */
1358         oa->o_id = lap->lap_loi_id;
1359 }
1360
1361 static void lov_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
1362 {
1363         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1364
1365         /* in a raid1 regime this would down a count of many ios
1366          * in flight, onl calling the caller_ops completion when all
1367          * the raid1 ios are complete */
1368         lap->lap_caller_ops->ap_completion(lap->lap_caller_data, cmd, oa, rc);
1369 }
1370
1371 static struct obd_async_page_ops lov_async_page_ops = {
1372         .ap_make_ready =        lov_ap_make_ready,
1373         .ap_refresh_count =     lov_ap_refresh_count,
1374         .ap_fill_obdo =         lov_ap_fill_obdo,
1375         .ap_completion =        lov_ap_completion,
1376 };
1377
1378 int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
1379                            struct lov_oinfo *loi, struct page *page,
1380                            obd_off offset, struct obd_async_page_ops *ops,
1381                            void *data, void **res)
1382 {
1383         struct lov_obd *lov = &exp->exp_obd->u.lov;
1384         struct lov_async_page *lap;
1385         int rc;
1386         ENTRY;
1387
1388         if (!page)
1389                 return size_round(sizeof(*lap)) +
1390                        obd_prep_async_page(lov->tgts[0].ltd_exp, NULL, NULL,
1391                                            NULL, 0, NULL, NULL, NULL);
1392
1393         ASSERT_LSM_MAGIC(lsm);
1394         LASSERT(loi == NULL);
1395
1396         lap = *res;
1397         lap->lap_magic = LAP_MAGIC;
1398         lap->lap_caller_ops = ops;
1399         lap->lap_caller_data = data;
1400
1401         /* for now only raid 0 which passes through */
1402         lap->lap_stripe = lov_stripe_number(lsm, offset);
1403         lov_stripe_offset(lsm, offset, lap->lap_stripe, &lap->lap_sub_offset);
1404         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1405
1406         /* so the callback doesn't need the lsm */
1407         lap->lap_loi_id = loi->loi_id;
1408
1409         lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap));
1410
1411         rc = obd_prep_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1412                                  lsm, loi, page, lap->lap_sub_offset,
1413                                  &lov_async_page_ops, lap,
1414                                  &lap->lap_sub_cookie);
1415         if (rc)
1416                 RETURN(rc);
1417         CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
1418                lap->lap_sub_cookie, offset);
1419         RETURN(0);
1420 }
1421
1422 static int lov_queue_async_io(struct obd_export *exp,
1423                               struct lov_stripe_md *lsm,
1424                               struct lov_oinfo *loi, void *cookie,
1425                               int cmd, obd_off off, int count,
1426                               obd_flag brw_flags, obd_flag async_flags)
1427 {
1428         struct lov_obd *lov = &exp->exp_obd->u.lov;
1429         struct lov_async_page *lap;
1430         int rc;
1431
1432         LASSERT(loi == NULL);
1433
1434         ASSERT_LSM_MAGIC(lsm);
1435
1436         lap = LAP_FROM_COOKIE(cookie);
1437
1438         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1439
1440         rc = obd_queue_async_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm,
1441                                 loi, lap->lap_sub_cookie, cmd, off, count,
1442                                 brw_flags, async_flags);
1443         RETURN(rc);
1444 }
1445
1446 static int lov_set_async_flags(struct obd_export *exp,
1447                                struct lov_stripe_md *lsm,
1448                                struct lov_oinfo *loi, void *cookie,
1449                                obd_flag async_flags)
1450 {
1451         struct lov_obd *lov = &exp->exp_obd->u.lov;
1452         struct lov_async_page *lap;
1453         int rc;
1454
1455         LASSERT(loi == NULL);
1456
1457         ASSERT_LSM_MAGIC(lsm);
1458
1459         lap = LAP_FROM_COOKIE(cookie);
1460
1461         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1462
1463         rc = obd_set_async_flags(lov->tgts[loi->loi_ost_idx].ltd_exp,
1464                                  lsm, loi, lap->lap_sub_cookie, async_flags);
1465         RETURN(rc);
1466 }
1467
1468 static int lov_queue_group_io(struct obd_export *exp,
1469                               struct lov_stripe_md *lsm,
1470                               struct lov_oinfo *loi,
1471                               struct obd_io_group *oig, void *cookie,
1472                               int cmd, obd_off off, int count,
1473                               obd_flag brw_flags, obd_flag async_flags)
1474 {
1475         struct lov_obd *lov = &exp->exp_obd->u.lov;
1476         struct lov_async_page *lap;
1477         int rc;
1478
1479         LASSERT(loi == NULL);
1480
1481         ASSERT_LSM_MAGIC(lsm);
1482
1483         lap = LAP_FROM_COOKIE(cookie);
1484
1485         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1486
1487         rc = obd_queue_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi,
1488                                 oig, lap->lap_sub_cookie, cmd, off, count,
1489                                 brw_flags, async_flags);
1490         RETURN(rc);
1491 }
1492
1493 /* this isn't exactly optimal.  we may have queued sync io in oscs on
1494  * all stripes, but we don't record that fact at queue time.  so we
1495  * trigger sync io on all stripes. */
1496 static int lov_trigger_group_io(struct obd_export *exp,
1497                                 struct lov_stripe_md *lsm,
1498                                 struct lov_oinfo *loi,
1499                                 struct obd_io_group *oig)
1500 {
1501         struct lov_obd *lov = &exp->exp_obd->u.lov;
1502         int rc = 0, i, err;
1503
1504         LASSERT(loi == NULL);
1505
1506         ASSERT_LSM_MAGIC(lsm);
1507
1508         loi = lsm->lsm_oinfo;
1509         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1510                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1511                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1512                         continue;
1513                 }
1514
1515                 err = obd_trigger_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp,
1516                                            lsm, loi, oig);
1517                 if (rc == 0 && err != 0)
1518                         rc = err;
1519         };
1520         RETURN(rc);
1521 }
1522
1523 static int lov_teardown_async_page(struct obd_export *exp,
1524                                    struct lov_stripe_md *lsm,
1525                                    struct lov_oinfo *loi, void *cookie)
1526 {
1527         struct lov_obd *lov = &exp->exp_obd->u.lov;
1528         struct lov_async_page *lap;
1529         int rc;
1530
1531         LASSERT(loi == NULL);
1532
1533         ASSERT_LSM_MAGIC(lsm);
1534
1535         lap = LAP_FROM_COOKIE(cookie);
1536
1537         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1538
1539         rc = obd_teardown_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1540                                      lsm, loi, lap->lap_sub_cookie);
1541         if (rc) {
1542                 CERROR("unable to teardown sub cookie %p: %d\n",
1543                        lap->lap_sub_cookie, rc);
1544                 RETURN(rc);
1545         }
1546         RETURN(rc);
1547 }
1548
1549 static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
1550                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
1551                        int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
1552                        void *data,__u32 lvb_len, void *lvb_swabber,
1553                        struct lustre_handle *lockh)
1554 {
1555         struct lov_request_set *set;
1556         struct lov_request *req;
1557         struct list_head *pos;
1558         struct lustre_handle *lov_lockhp;
1559         struct lov_obd *lov;
1560         ldlm_error_t rc;
1561         int save_flags = *flags;
1562         ENTRY;
1563
1564         ASSERT_LSM_MAGIC(lsm);
1565
1566         /* we should never be asked to replay a lock this way. */
1567         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1568
1569         if (!exp || !exp->exp_obd)
1570                 RETURN(-ENODEV);
1571
1572         lov = &exp->exp_obd->u.lov;
1573         rc = lov_prep_enqueue_set(exp, lsm, policy, mode, lockh, &set);
1574         if (rc)
1575                 RETURN(rc);
1576
1577         list_for_each (pos, &set->set_list) {
1578                 ldlm_policy_data_t sub_policy;
1579                 req = list_entry(pos, struct lov_request, rq_link);
1580                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1581                 LASSERT(lov_lockhp);
1582
1583                 *flags = save_flags;
1584                 sub_policy.l_extent = req->rq_extent;
1585
1586                 rc = obd_enqueue(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1587                                  type, &sub_policy, mode, flags, bl_cb,
1588                                  cp_cb, gl_cb, data, lvb_len, lvb_swabber,
1589                                  lov_lockhp);
1590                 rc = lov_update_enqueue_set(set, req, rc, save_flags);
1591                 if (rc != ELDLM_OK)
1592                         break;
1593         }
1594
1595         lov_fini_enqueue_set(set, mode);
1596         RETURN(rc);
1597 }
1598
1599 static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
1600                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
1601                      int *flags, void *data, struct lustre_handle *lockh)
1602 {
1603         struct lov_request_set *set;
1604         struct lov_request *req;
1605         struct list_head *pos;
1606         struct lov_obd *lov = &exp->exp_obd->u.lov;
1607         struct lustre_handle *lov_lockhp;
1608         int lov_flags, rc = 0;
1609         ENTRY;
1610
1611         ASSERT_LSM_MAGIC(lsm);
1612
1613         if (!exp || !exp->exp_obd)
1614                 RETURN(-ENODEV);
1615
1616         lov = &exp->exp_obd->u.lov;
1617         rc = lov_prep_match_set(exp, lsm, policy, mode, lockh, &set);
1618         if (rc)
1619                 RETURN(rc);
1620
1621         list_for_each (pos, &set->set_list) {
1622                 ldlm_policy_data_t sub_policy;
1623                 req = list_entry(pos, struct lov_request, rq_link);
1624                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1625                 LASSERT(lov_lockhp);
1626
1627                 lov_flags = *flags;
1628                 sub_policy.l_extent = req->rq_extent;
1629
1630                 rc = obd_match(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1631                                type, &sub_policy, mode, &lov_flags, data,
1632                                lov_lockhp);
1633                 rc = lov_update_match_set(set, req, rc);
1634                 if (rc != 1)
1635                         break;
1636         }
1637         lov_fini_match_set(set, mode, *flags);
1638         RETURN(rc);
1639 }
1640
1641 static int lov_change_cbdata(struct obd_export *exp,
1642                              struct lov_stripe_md *lsm, ldlm_iterator_t it,
1643                              void *data)
1644 {
1645         struct lov_obd *lov;
1646         struct lov_oinfo *loi;
1647         int rc = 0, i;
1648         ENTRY;
1649
1650         ASSERT_LSM_MAGIC(lsm);
1651
1652         if (!exp || !exp->exp_obd)
1653                 RETURN(-ENODEV);
1654
1655         lov = &exp->exp_obd->u.lov;
1656         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1657                 struct lov_stripe_md submd;
1658
1659                 submd.lsm_object_id = loi->loi_id;
1660                 submd.lsm_stripe_count = 0;
1661                 rc = obd_change_cbdata(lov->tgts[loi->loi_ost_idx].ltd_exp,
1662                                        &submd, it, data);
1663         }
1664         RETURN(rc);
1665 }
1666
1667 static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
1668                       __u32 mode, struct lustre_handle *lockh)
1669 {
1670         struct lov_request_set *set;
1671         struct lov_request *req;
1672         struct list_head *pos;
1673         struct lov_obd *lov = &exp->exp_obd->u.lov;
1674         struct lustre_handle *lov_lockhp;
1675         int err = 0, rc = 0;
1676         ENTRY;
1677
1678         ASSERT_LSM_MAGIC(lsm);
1679
1680         if (!exp || !exp->exp_obd)
1681                 RETURN(-ENODEV);
1682
1683         LASSERT(lockh);
1684         lov = &exp->exp_obd->u.lov;
1685         rc = lov_prep_cancel_set(exp, lsm, mode, lockh, &set);
1686         if (rc)
1687                 RETURN(rc);
1688
1689         list_for_each (pos, &set->set_list) {
1690                 req = list_entry(pos, struct lov_request, rq_link);
1691                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1692
1693                 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1694                                 mode, lov_lockhp);
1695                 rc = lov_update_common_set(set, req, rc);
1696                 if (rc) {
1697                         CERROR("error: cancel objid "LPX64" subobj "
1698                                LPX64" on OST idx %d: rc = %d\n",
1699                                lsm->lsm_object_id,
1700                                req->rq_md->lsm_object_id, req->rq_idx, rc);
1701                         err = rc;
1702                 }
1703
1704         }
1705         lov_fini_cancel_set(set);
1706         RETURN(err);
1707 }
1708
1709 static int lov_cancel_unused(struct obd_export *exp,
1710                              struct lov_stripe_md *lsm, int flags, void *opaque)
1711 {
1712         struct lov_obd *lov;
1713         struct lov_oinfo *loi;
1714         int rc = 0, i;
1715         ENTRY;
1716
1717         lov = &exp->exp_obd->u.lov;
1718         if (lsm == NULL) {
1719                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1720                         int err;
1721                         if (!lov->tgts[i].ltd_exp)
1722                                 continue;
1723
1724                         err = obd_cancel_unused(lov->tgts[i].ltd_exp, NULL,
1725                                                 flags, opaque);
1726                         if (!rc)
1727                                 rc = err;
1728                 }
1729                 RETURN(rc);
1730         }
1731
1732         ASSERT_LSM_MAGIC(lsm);
1733
1734         if (!exp || !exp->exp_obd)
1735                 RETURN(-ENODEV);
1736
1737         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1738                 struct lov_stripe_md submd;
1739                 int err;
1740
1741                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1742                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1743
1744                 submd.lsm_object_id = loi->loi_id;
1745                 submd.lsm_stripe_count = 0;
1746                 err = obd_cancel_unused(lov->tgts[loi->loi_ost_idx].ltd_exp,
1747                                         &submd, flags, opaque);
1748                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1749                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
1750                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1751                                loi->loi_id, loi->loi_ost_idx, err);
1752                         if (!rc)
1753                                 rc = err;
1754                 }
1755         }
1756         RETURN(rc);
1757 }
1758
1759 static int lov_join_lru(struct obd_export *exp,
1760                         struct lov_stripe_md *lsm, int join)
1761 {
1762         struct lov_obd *lov;
1763         struct lov_oinfo *loi;
1764         int i, count = 0;
1765         ENTRY;
1766
1767         ASSERT_LSM_MAGIC(lsm);
1768         if (!exp || !exp->exp_obd)
1769                 RETURN(-ENODEV);
1770
1771         lov = &exp->exp_obd->u.lov;
1772         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1773                 struct lov_stripe_md submd;
1774                 int rc = 0;
1775
1776                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1777                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1778
1779                 submd.lsm_object_id = loi->loi_id;
1780                 submd.lsm_stripe_count = 0;
1781                 rc = obd_join_lru(lov->tgts[loi->loi_ost_idx].ltd_exp,
1782                                   &submd, join);
1783                 if (rc < 0) {
1784                         CERROR("join lru failed. objid: "LPX64" subobj: "LPX64
1785                                " ostidx: %d rc: %d\n", lsm->lsm_object_id,
1786                                loi->loi_id, loi->loi_ost_idx, rc);
1787                         return rc;
1788                 } else {
1789                         count += rc;
1790                 }
1791         }
1792         RETURN(count);
1793 }
1794
1795 #define LOV_U64_MAX ((__u64)~0ULL)
1796 #define LOV_SUM_MAX(tot, add)                                           \
1797         do {                                                            \
1798                 if ((tot) + (add) < (tot))                              \
1799                         (tot) = LOV_U64_MAX;                            \
1800                 else                                                    \
1801                         (tot) += (add);                                 \
1802         } while(0)
1803
1804 static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1805                       unsigned long max_age)
1806 {
1807         struct lov_obd *lov = &obd->u.lov;
1808         struct obd_statfs lov_sfs;
1809         int set = 0;
1810         int rc = 0;
1811         int i;
1812         ENTRY;
1813
1814
1815         /* We only get block data from the OBD */
1816         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1817                 int err;
1818                 if (!lov->tgts[i].active) {
1819                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1820                         continue;
1821                 }
1822
1823                 err = obd_statfs(class_exp2obd(lov->tgts[i].ltd_exp), &lov_sfs,
1824                                  max_age);
1825                 if (err) {
1826                         if (lov->tgts[i].active && !rc)
1827                                 rc = err;
1828                         continue;
1829                 }
1830
1831                 if (!set) {
1832                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
1833                         set = 1;
1834                 } else {
1835 #ifdef MIN_DF
1836                         /* Sandia requested that df (and so, statfs) only
1837                            returned minimal available space on 
1838                            a single OST, so people would be able to
1839                            write this much data guaranteed. */
1840                         if (osfs->os_bavail > lov_sfs.os_bavail) {
1841                                 /* Presumably if new bavail is smaller,
1842                                    new bfree is bigger as well */
1843                                 osfs->os_bfree = lov_sfs.os_bfree;
1844                                 osfs->os_bavail = lov_sfs.os_bavail;
1845                         }
1846 #else
1847                         osfs->os_bfree += lov_sfs.os_bfree;
1848                         osfs->os_bavail += lov_sfs.os_bavail;
1849 #endif
1850                         osfs->os_blocks += lov_sfs.os_blocks;
1851                         /* XXX not sure about this one - depends on policy.
1852                          *   - could be minimum if we always stripe on all OBDs
1853                          *     (but that would be wrong for any other policy,
1854                          *     if one of the OBDs has no more objects left)
1855                          *   - could be sum if we stripe whole objects
1856                          *   - could be average, just to give a nice number
1857                          *
1858                          * To give a "reasonable" (if not wholly accurate)
1859                          * number, we divide the total number of free objects
1860                          * by expected stripe count (watch out for overflow).
1861                          */
1862                         LOV_SUM_MAX(osfs->os_files, lov_sfs.os_files);
1863                         LOV_SUM_MAX(osfs->os_ffree, lov_sfs.os_ffree);
1864                 }
1865         }
1866
1867         if (set) {
1868                 __u32 expected_stripes = lov_get_stripecnt(lov, 0);
1869
1870                 if (osfs->os_files != LOV_U64_MAX)
1871                         do_div(osfs->os_files, expected_stripes);
1872                 if (osfs->os_ffree != LOV_U64_MAX)
1873                         do_div(osfs->os_ffree, expected_stripes);
1874         } else if (!rc)
1875                 rc = -EIO;
1876
1877         RETURN(rc);
1878 }
1879
1880 static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1881                          void *karg, void *uarg)
1882 {
1883         struct obd_device *obddev = class_exp2obd(exp);
1884         struct lov_obd *lov = &obddev->u.lov;
1885         int i, rc, count = lov->desc.ld_tgt_count;
1886         struct obd_uuid *uuidp;
1887         ENTRY;
1888
1889         switch (cmd) {
1890         case OBD_IOC_LOV_GET_CONFIG: {
1891                 struct obd_ioctl_data *data = karg;
1892                 struct lov_tgt_desc *tgtdesc;
1893                 struct lov_desc *desc;
1894                 char *buf = NULL;
1895                 __u32 *genp;
1896
1897                 buf = NULL;
1898                 len = 0;
1899                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1900                         RETURN(-EINVAL);
1901
1902                 data = (struct obd_ioctl_data *)buf;
1903
1904                 if (sizeof(*desc) > data->ioc_inllen1) {
1905                         obd_ioctl_freedata(buf, len);
1906                         RETURN(-EINVAL);
1907                 }
1908
1909                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
1910                         obd_ioctl_freedata(buf, len);
1911                         RETURN(-EINVAL);
1912                 }
1913
1914                 if (sizeof(__u32) * count > data->ioc_inllen3) {
1915                         obd_ioctl_freedata(buf, len);
1916                         RETURN(-EINVAL);
1917                 }
1918
1919                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1920                 memcpy(desc, &(lov->desc), sizeof(*desc));
1921
1922                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
1923                 genp = (__u32 *)data->ioc_inlbuf3;
1924                 tgtdesc = lov->tgts;
1925                 /* the uuid will be empty for deleted OSTs */
1926                 for (i = 0; i < count; i++, uuidp++, genp++, tgtdesc++) {
1927                         obd_str2uuid(uuidp, tgtdesc->uuid.uuid);
1928                         *genp = tgtdesc->ltd_gen;
1929                 }
1930
1931                 rc = copy_to_user((void *)uarg, buf, len);
1932                 if (rc)
1933                         rc = -EFAULT;
1934                 obd_ioctl_freedata(buf, len);
1935                 break;
1936         }
1937         case LL_IOC_LOV_SETSTRIPE:
1938                 rc = lov_setstripe(exp, karg, uarg);
1939                 break;
1940         case LL_IOC_LOV_GETSTRIPE:
1941                 rc = lov_getstripe(exp, karg, uarg);
1942                 break;
1943         case LL_IOC_LOV_SETEA:
1944                 rc = lov_setea(exp, karg, uarg);
1945                 break;
1946         default: {
1947                 int set = 0;
1948                 if (count == 0)
1949                         RETURN(-ENOTTY);
1950                 rc = 0;
1951                 for (i = 0; i < count; i++) {
1952                         int err;
1953
1954                         /* OST was disconnected */
1955                         if (!lov->tgts[i].ltd_exp)
1956                                 continue;
1957
1958                         err = obd_iocontrol(cmd, lov->tgts[i].ltd_exp,
1959                                             len, karg, uarg);
1960                         if (err) {
1961                                 if (lov->tgts[i].active) {
1962                                         CERROR("error: iocontrol OSC %s on OST "
1963                                                "idx %d cmd %x: err = %d\n",
1964                                                lov->tgts[i].uuid.uuid, i,
1965                                                cmd, err);
1966                                         if (!rc)
1967                                                 rc = err;
1968                                 }
1969                         } else
1970                                 set = 1;
1971                 }
1972                 if (!set && !rc)
1973                         rc = -EIO;
1974         }
1975         }
1976
1977         RETURN(rc);
1978 }
1979
1980 static int lov_get_info(struct obd_export *exp, __u32 keylen,
1981                         void *key, __u32 *vallen, void *val)
1982 {
1983         struct obd_device *obddev = class_exp2obd(exp);
1984         struct lov_obd *lov = &obddev->u.lov;
1985         int i;
1986         ENTRY;
1987
1988         if (!vallen || !val)
1989                 RETURN(-EFAULT);
1990
1991         if (keylen > strlen("lock_to_stripe") &&
1992             strcmp(key, "lock_to_stripe") == 0) {
1993                 struct {
1994                         char name[16];
1995                         struct ldlm_lock *lock;
1996                         struct lov_stripe_md *lsm;
1997                 } *data = key;
1998                 struct lov_oinfo *loi;
1999                 __u32 *stripe = val;
2000
2001                 if (*vallen < sizeof(*stripe))
2002                         RETURN(-EFAULT);
2003                 *vallen = sizeof(*stripe);
2004
2005                 /* XXX This is another one of those bits that will need to
2006                  * change if we ever actually support nested LOVs.  It uses
2007                  * the lock's export to find out which stripe it is. */
2008                 /* XXX - it's assumed all the locks for deleted OSTs have
2009                  * been cancelled. Also, the export for deleted OSTs will
2010                  * be NULL and won't match the lock's export. */
2011                 for (i = 0, loi = data->lsm->lsm_oinfo;
2012                      i < data->lsm->lsm_stripe_count;
2013                      i++, loi++) {
2014                         if (lov->tgts[loi->loi_ost_idx].ltd_exp ==
2015                             data->lock->l_conn_export) {
2016                                 *stripe = i;
2017                                 RETURN(0);
2018                         }
2019                 }
2020                 LDLM_ERROR(data->lock, "lock on inode without such object\n");
2021                 dump_lsm(D_ERROR, data->lsm);
2022                 RETURN(-ENXIO);
2023         } else if (keylen >= strlen("size_to_stripe") &&
2024                    strcmp(key, "size_to_stripe") == 0) {
2025                 struct {
2026                         int stripe_number;
2027                         __u64 size;
2028                         struct lov_stripe_md *lsm;
2029                 } *data = val;
2030
2031                 if (*vallen < sizeof(*data))
2032                         RETURN(-EFAULT);
2033
2034                 data->size = lov_size_to_stripe(data->lsm, data->size,
2035                                                 data->stripe_number);
2036                 RETURN(0);
2037         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
2038                 obd_id *ids = val;
2039                 int rc, size = sizeof(obd_id);
2040                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2041                         if (!lov->tgts[i].active)
2042                                 continue;
2043                         rc = obd_get_info(lov->tgts[i].ltd_exp,
2044                                           keylen, key, &size, &(ids[i]));
2045                         if (rc != 0)
2046                                 RETURN(rc);
2047                 }
2048                 RETURN(0);
2049         } else if (keylen >= strlen("lovdesc") && strcmp(key, "lovdesc") == 0) {
2050                 struct lov_desc *desc_ret = val;
2051                 *desc_ret = lov->desc;
2052
2053                 RETURN(0);
2054         }
2055
2056         RETURN(-EINVAL);
2057 }
2058
2059 static int lov_set_info(struct obd_export *exp, obd_count keylen,
2060                         void *key, obd_count vallen, void *val)
2061 {
2062         struct obd_device *obddev = class_exp2obd(exp);
2063         struct lov_obd *lov = &obddev->u.lov;
2064         int i, rc = 0, err;
2065         ENTRY;
2066
2067         if (KEY_IS("next_id")) {
2068                 if (vallen != lov->desc.ld_tgt_count)
2069                         RETURN(-EINVAL);
2070                 vallen = sizeof(obd_id);
2071         }
2072
2073         if (KEY_IS("next_id") || KEY_IS("checksum")) {
2074                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2075                         /* OST was disconnected */
2076                         if (!lov->tgts[i].ltd_exp)
2077                                 continue;
2078
2079                         /* hit all OSCs, even inactive ones */
2080                         err = obd_set_info(lov->tgts[i].ltd_exp, keylen, key,
2081                                            vallen, ((obd_id*)val) + i);
2082                         if (!rc)
2083                                 rc = err;
2084                 }
2085                 RETURN(rc);
2086         }
2087
2088         if (KEY_IS("evict_by_nid")) {
2089                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2090                         /* OST was disconnected or is inactive */
2091                         if (!lov->tgts[i].ltd_exp || !lov->tgts[i].active)
2092                                 continue;
2093
2094                         err = obd_set_info(lov->tgts[i].ltd_exp, keylen, key,
2095                                            vallen, val);
2096                         if (!rc)
2097                                 rc = err;
2098                 }
2099                 RETURN(rc);
2100         }
2101
2102         if (KEY_IS("mds_conn") || KEY_IS("unlinked")) {
2103                 if (vallen != 0)
2104                         RETURN(-EINVAL);
2105         } else {
2106                 RETURN(-EINVAL);
2107         }
2108
2109         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2110                 if (val && !obd_uuid_equals(val, &lov->tgts[i].uuid))
2111                         continue;
2112
2113                 /* OST was disconnected */
2114                 if (!lov->tgts[i].ltd_exp)
2115                         continue;
2116
2117                 if (!val && !lov->tgts[i].active)
2118                         continue;
2119
2120                 err = obd_set_info(lov->tgts[i].ltd_exp,
2121                                   keylen, key, vallen, val);
2122                 if (!rc)
2123                         rc = err;
2124         }
2125         RETURN(rc);
2126 }
2127
2128 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm)
2129 {
2130         struct lov_oinfo *loi;
2131         int i, rc = 0;
2132         ENTRY;
2133
2134         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
2135              i++, loi++) {
2136                 if (loi->loi_ar.ar_rc && !rc)
2137                         rc = loi->loi_ar.ar_rc;
2138                 loi->loi_ar.ar_rc = 0;
2139         }
2140         RETURN(rc);
2141 }
2142 EXPORT_SYMBOL(lov_test_and_clear_async_rc);
2143
2144 #if 0
2145 struct lov_multi_wait {
2146         struct ldlm_lock *lock;
2147         wait_queue_t      wait;
2148         int               completed;
2149         int               generation;
2150 };
2151
2152 int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm,
2153                       struct lustre_handle *lockh)
2154 {
2155         struct lov_lock_handles *lov_lockh = NULL;
2156         struct lustre_handle *lov_lockhp;
2157         struct lov_obd *lov;
2158         struct lov_oinfo *loi;
2159         struct lov_multi_wait *queues;
2160         int rc = 0, i;
2161         ENTRY;
2162
2163         ASSERT_LSM_MAGIC(lsm);
2164
2165         if (!exp || !exp->exp_obd)
2166                 RETURN(-ENODEV);
2167
2168         LASSERT(lockh != NULL);
2169         if (lsm->lsm_stripe_count > 1) {
2170                 lov_lockh = lov_handle2llh(lockh);
2171                 if (lov_lockh == NULL) {
2172                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
2173                         RETURN(-EINVAL);
2174                 }
2175
2176                 lov_lockhp = lov_lockh->llh_handles;
2177         } else {
2178                 lov_lockhp = lockh;
2179         }
2180
2181         OBD_ALLOC(queues, lsm->lsm_stripe_count * sizeof(*queues));
2182         if (queues == NULL)
2183                 GOTO(out, rc = -ENOMEM);
2184
2185         lov = &exp->exp_obd->u.lov;
2186         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
2187              i++, loi++, lov_lockhp++) {
2188                 struct ldlm_lock *lock;
2189                 struct obd_device *obd;
2190                 unsigned long irqflags;
2191
2192                 lock = ldlm_handle2lock(lov_lockhp);
2193                 if (lock == NULL) {
2194                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
2195                                loi->loi_ost_idx, loi->loi_id);
2196                         queues[i].completed = 1;
2197                         continue;
2198                 }
2199
2200                 queues[i].lock = lock;
2201                 init_waitqueue_entry(&(queues[i].wait), current);
2202                 add_wait_queue(lock->l_waitq, &(queues[i].wait));
2203
2204                 obd = class_exp2obd(lock->l_conn_export);
2205                 if (obd != NULL)
2206                         imp = obd->u.cli.cl_import;
2207                 if (imp != NULL) {
2208                         spin_lock_irqsave(&imp->imp_lock, irqflags);
2209                         queues[i].generation = imp->imp_generation;
2210                         spin_unlock_irqrestore(&imp->imp_lock, irqflags);
2211                 }
2212         }
2213
2214         lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
2215                                interrupted_completion_wait, &lwd);
2216         rc = l_wait_event_added(check_multi_complete(queues, lsm), &lwi);
2217
2218         for (i = 0; i < lsm->lsm_stripe_count; i++)
2219                 remove_wait_queue(lock->l_waitq, &(queues[i].wait));
2220
2221         if (rc == -EINTR || rc == -ETIMEDOUT) {
2222
2223
2224         }
2225
2226  out:
2227         if (lov_lockh != NULL)
2228                 lov_llh_put(lov_lockh);
2229         RETURN(rc);
2230 }
2231 #endif
2232
2233
2234 void lov_stripe_lock(struct lov_stripe_md *md)
2235 {
2236         LASSERT(md->lsm_lock_owner != current);
2237         spin_lock(&md->lsm_lock);
2238         LASSERT(md->lsm_lock_owner == NULL);
2239         md->lsm_lock_owner = current;
2240 }
2241 EXPORT_SYMBOL(lov_stripe_lock);
2242
2243 void lov_stripe_unlock(struct lov_stripe_md *md)
2244 {
2245         LASSERT(md->lsm_lock_owner == current);
2246         md->lsm_lock_owner = NULL;
2247         spin_unlock(&md->lsm_lock);
2248 }
2249 EXPORT_SYMBOL(lov_stripe_unlock);
2250
2251 struct obd_ops lov_obd_ops = {
2252         .o_owner               = THIS_MODULE,
2253         .o_setup               = lov_setup,
2254         .o_precleanup          = lov_precleanup,
2255         .o_cleanup             = lov_cleanup,
2256         .o_process_config      = lov_process_config,
2257         .o_connect             = lov_connect,
2258         .o_disconnect          = lov_disconnect,
2259         .o_statfs              = lov_statfs,
2260         .o_packmd              = lov_packmd,
2261         .o_unpackmd            = lov_unpackmd,
2262         .o_create              = lov_create,
2263         .o_destroy             = lov_destroy,
2264         .o_getattr             = lov_getattr,
2265         .o_getattr_async       = lov_getattr_async,
2266         .o_setattr             = lov_setattr,
2267         .o_setattr_async       = lov_setattr_async,
2268         .o_brw                 = lov_brw,
2269         .o_brw_async           = lov_brw_async,
2270         .o_prep_async_page     = lov_prep_async_page,
2271         .o_queue_async_io      = lov_queue_async_io,
2272         .o_set_async_flags     = lov_set_async_flags,
2273         .o_queue_group_io      = lov_queue_group_io,
2274         .o_trigger_group_io    = lov_trigger_group_io,
2275         .o_teardown_async_page = lov_teardown_async_page,
2276         .o_adjust_kms          = lov_adjust_kms,
2277         .o_punch               = lov_punch,
2278         .o_sync                = lov_sync,
2279         .o_enqueue             = lov_enqueue,
2280         .o_match               = lov_match,
2281         .o_change_cbdata       = lov_change_cbdata,
2282         .o_cancel              = lov_cancel,
2283         .o_cancel_unused       = lov_cancel_unused,
2284         .o_join_lru            = lov_join_lru,
2285         .o_iocontrol           = lov_iocontrol,
2286         .o_get_info            = lov_get_info,
2287         .o_set_info            = lov_set_info,
2288         .o_llog_init           = lov_llog_init,
2289         .o_llog_finish         = lov_llog_finish,
2290         .o_notify              = lov_notify,
2291 #ifdef HAVE_QUOTA_SUPPORT
2292         .o_quotacheck          = lov_quotacheck,
2293         .o_quotactl            = lov_quotactl,
2294 #endif
2295 };
2296
2297 int __init lov_init(void)
2298 {
2299         struct lprocfs_static_vars lvars;
2300         int rc;
2301         ENTRY;
2302
2303         lprocfs_init_vars(lov, &lvars);
2304         rc = class_register_type(&lov_obd_ops, lvars.module_vars,
2305                                  OBD_LOV_DEVICENAME);
2306         RETURN(rc);
2307 }
2308
2309 #ifdef __KERNEL__
2310 static void /*__exit*/ lov_exit(void)
2311 {
2312         class_unregister_type(OBD_LOV_DEVICENAME);
2313 }
2314
2315 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2316 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
2317 MODULE_LICENSE("GPL");
2318
2319 module_init(lov_init);
2320 module_exit(lov_exit);
2321 #endif