Whamcloud - gitweb
Branch: HEAD
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  * Author: Phil Schwan <phil@clusterfs.com>
6  *         Peter Braam <braam@clusterfs.com>
7  *         Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29 #ifdef __KERNEL__
30 #include <linux/slab.h>
31 #include <linux/module.h>
32 #include <linux/init.h>
33 #include <linux/slab.h>
34 #include <linux/pagemap.h>
35 #include <linux/seq_file.h>
36 #include <asm/div64.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/obd_support.h>
42 #include <linux/lustre_lib.h>
43 #include <linux/lustre_net.h>
44 #include <linux/lustre_idl.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/lustre_mds.h>
47 #include <linux/obd_class.h>
48 #include <linux/obd_lov.h>
49 #include <linux/obd_ost.h>
50 #include <linux/lprocfs_status.h>
51
52 #include "lov_internal.h"
53
54 /* obd methods */
55 #define MAX_STRING_SIZE 128
56 static int lov_connect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt,
57                            int activate, struct obd_connect_data *conn_data,
58                            unsigned long connect_flags)
59 {
60         struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
61         struct obd_uuid *tgt_uuid = &tgt->uuid;
62
63 #ifdef __KERNEL__
64         struct proc_dir_entry *lov_proc_dir;
65 #endif
66         struct lov_obd *lov = &obd->u.lov;
67         struct lustre_handle conn = {0, };
68         struct obd_device *tgt_obd;
69         int rc;
70         ENTRY;
71
72         tgt_obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
73                                         &obd->obd_uuid);
74
75         if (!tgt_obd) {
76                 CERROR("Target %s not attached\n", tgt_uuid->uuid);
77                 RETURN(-EINVAL);
78         }
79
80         if (!tgt_obd->obd_set_up) {
81                 CERROR("Target %s not set up\n", tgt_uuid->uuid);
82                 RETURN(-EINVAL);
83         }
84
85         if (activate) {
86                 tgt_obd->obd_no_recov = 0;
87                 ptlrpc_activate_import(tgt_obd->u.cli.cl_import);
88         }
89
90         if (tgt_obd->u.cli.cl_import->imp_invalid) {
91                 CERROR("not connecting OSC %s; administratively "
92                        "disabled\n", tgt_uuid->uuid);
93                 rc = obd_register_observer(tgt_obd, obd);
94                 if (rc) {
95                         CERROR("Target %s register_observer error %d; "
96                                "will not be able to reactivate\n",
97                                tgt_uuid->uuid, rc);
98                 }
99                 RETURN(0);
100         }
101
102         rc = obd_connect(&conn, tgt_obd, &lov_osc_uuid, conn_data,
103                          connect_flags);
104         if (rc) {
105                 CERROR("Target %s connect error %d\n", tgt_uuid->uuid, rc);
106                 RETURN(rc);
107         }
108         tgt->ltd_exp = class_conn2export(&conn);
109
110         rc = obd_register_observer(tgt_obd, obd);
111         if (rc) {
112                 CERROR("Target %s register_observer error %d\n",
113                        tgt_uuid->uuid, rc);
114                 obd_disconnect(tgt->ltd_exp, 0);
115                 tgt->ltd_exp = NULL;
116                 RETURN(rc);
117         }
118
119         tgt->active = 1;
120         lov->desc.ld_active_tgt_count++;
121
122 #ifdef __KERNEL__
123         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
124         if (lov_proc_dir) {
125                 struct obd_device *osc_obd = class_conn2obd(&conn);
126                 struct proc_dir_entry *osc_symlink;
127                 char name[MAX_STRING_SIZE + 1];
128
129                 LASSERT(osc_obd != NULL);
130                 LASSERT(osc_obd->obd_type != NULL);
131                 LASSERT(osc_obd->obd_type->typ_name != NULL);
132                 name[MAX_STRING_SIZE] = '\0';
133                 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
134                          osc_obd->obd_type->typ_name,
135                          osc_obd->obd_name);
136                 osc_symlink = proc_symlink(osc_obd->obd_name, lov_proc_dir,
137                                            name);
138                 if (osc_symlink == NULL) {
139                         CERROR("could not register LOV target "
140                                "/proc/fs/lustre/%s/%s/target_obds/%s\n",
141                                obd->obd_type->typ_name, obd->obd_name,
142                                osc_obd->obd_name);
143                         lprocfs_remove(lov_proc_dir);
144                         lov_proc_dir = NULL;
145                 }
146         }
147 #endif
148
149         RETURN(0);
150 }
151
152 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
153                        struct obd_uuid *cluuid, struct obd_connect_data *data,
154                        unsigned long flags)
155 {
156 #ifdef __KERNEL__
157         struct proc_dir_entry *lov_proc_dir;
158 #endif
159         struct lov_obd *lov = &obd->u.lov;
160         struct lov_tgt_desc *tgt;
161         struct obd_export *exp;
162         int rc, rc2, i;
163         ENTRY;
164
165         rc = class_connect(conn, obd, cluuid);
166         if (rc)
167                 RETURN(rc);
168
169         exp = class_conn2export(conn);
170
171         /* We don't want to actually do the underlying connections more than
172          * once, so keep track. */
173         lov->refcount++;
174         if (lov->refcount > 1) {
175                 class_export_put(exp);
176                 RETURN(0);
177         }
178
179 #ifdef __KERNEL__
180         lov_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
181                                         NULL, NULL);
182         if (IS_ERR(lov_proc_dir)) {
183                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
184                        obd->obd_type->typ_name, obd->obd_name);
185                 lov_proc_dir = NULL;
186         }
187 #endif
188
189         /* connect_flags is the MDS number, save for use in lov_add_obd */
190         lov->lov_connect_flags = flags;
191         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
192                 if (obd_uuid_empty(&tgt->uuid))
193                         continue;
194                 rc = lov_connect_obd(obd, tgt, 0, data, flags);
195                 if (rc)
196                         GOTO(out_disc, rc);
197         }
198
199         class_export_put(exp);
200         RETURN (0);
201
202  out_disc:
203 #ifdef __KERNEL__
204         if (lov_proc_dir)
205                 lprocfs_remove(lov_proc_dir);
206 #endif
207
208         while (i-- > 0) {
209                 struct obd_uuid uuid;
210                 --tgt;
211                 --lov->desc.ld_active_tgt_count;
212                 tgt->active = 0;
213                 /* save for CERROR below; (we know it's terminated) */
214                 uuid = tgt->uuid;
215                 rc2 = obd_disconnect(tgt->ltd_exp, 0);
216                 if (rc2)
217                         CERROR("error: LOV target %s disconnect on OST idx %d: "
218                                "rc = %d\n", uuid.uuid, i, rc2);
219         }
220         class_disconnect(exp, 0);
221         RETURN (rc);
222 }
223
224 static int lov_disconnect_obd(struct obd_device *obd, 
225                               struct lov_tgt_desc *tgt,
226                               unsigned long flags)
227 {
228 #ifdef __KERNEL__
229         struct proc_dir_entry *lov_proc_dir;
230 #endif
231         struct obd_device *osc_obd = class_exp2obd(tgt->ltd_exp);
232         struct lov_obd *lov = &obd->u.lov;
233         int rc;
234         ENTRY;
235
236 #ifdef __KERNEL__
237         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
238         if (lov_proc_dir) {
239                 struct proc_dir_entry *osc_symlink;
240
241                 osc_symlink = lprocfs_srch(lov_proc_dir, osc_obd->obd_name);
242                 if (osc_symlink) {
243                         lprocfs_remove(osc_symlink);
244                 } else {
245                         CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
246                                obd->obd_type->typ_name, obd->obd_name,
247                                osc_obd->obd_name);
248                 }
249         }
250 #endif
251         if (obd->obd_no_recov) {
252                 /* Pass it on to our clients.
253                  * XXX This should be an argument to disconnect,
254                  * XXX not a back-door flag on the OBD.  Ah well.
255                  */
256                 if (osc_obd)
257                         osc_obd->obd_no_recov = 1;
258         }
259
260         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
261         rc = obd_disconnect(tgt->ltd_exp, flags);
262         if (rc) {
263                 if (tgt->active) {
264                         CERROR("Target %s disconnect error %d\n",
265                                tgt->uuid.uuid, rc);
266                 }
267                 rc = 0;
268         }
269
270         if (tgt->active) {
271                 tgt->active = 0;
272                 lov->desc.ld_active_tgt_count--;
273         }
274         tgt->ltd_exp = NULL;
275         RETURN(0);
276 }
277
278 static int lov_disconnect(struct obd_export *exp, unsigned long flags)
279 {
280         struct obd_device *obd = class_exp2obd(exp);
281 #ifdef __KERNEL__
282         struct proc_dir_entry *lov_proc_dir;
283 #endif
284         struct lov_obd *lov = &obd->u.lov;
285         struct lov_tgt_desc *tgt;
286         int rc, i;
287         ENTRY;
288
289         if (!lov->tgts)
290                 goto out_local;
291
292         /* Only disconnect the underlying layers on the final disconnect. */
293         lov->refcount--;
294         if (lov->refcount != 0)
295                 goto out_local;
296
297         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
298                 if (tgt->ltd_exp)
299                         lov_disconnect_obd(obd, tgt, flags);
300         }
301
302 #ifdef __KERNEL__
303         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
304         if (lov_proc_dir) {
305                 lprocfs_remove(lov_proc_dir);
306         } else {
307                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing.",
308                        obd->obd_type->typ_name, obd->obd_name);
309         }
310 #endif
311         
312  out_local:
313         rc = class_disconnect(exp, 0);
314         RETURN(rc);
315 }
316
317 /* Error codes:
318  *
319  *  -EINVAL  : UUID can't be found in the LOV's target list
320  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
321  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
322  */
323 static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
324                               int activate)
325 {
326         struct lov_tgt_desc *tgt;
327         int i, rc = 0;
328         ENTRY;
329
330         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
331                lov, uuid->uuid, activate);
332
333         spin_lock(&lov->lov_lock);
334         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
335                 if (tgt->ltd_exp == NULL)
336                         continue;
337
338                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
339                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
340                 if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof uuid->uuid) == 0)
341                         break;
342         }
343
344         if (i == lov->desc.ld_tgt_count)
345                 GOTO(out, rc = -EINVAL);
346
347
348         if (tgt->active == activate) {
349                 CDEBUG(D_INFO, "OSC %s already %sactive!\n", uuid->uuid,                       
350                         activate ? "" : "in");
351                 GOTO(out, rc);
352         }
353
354         CDEBUG(D_INFO, "Marking OSC %s %sactive\n", uuid->uuid,
355                activate ? "" : "in");
356
357         tgt->active = activate;
358         if (activate)
359                 lov->desc.ld_active_tgt_count++;
360         else
361                 lov->desc.ld_active_tgt_count--;
362
363         EXIT;
364  out:
365         spin_unlock(&lov->lov_lock);
366         return rc;
367 }
368
369 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
370                       int active, void *data)
371 {
372         struct obd_uuid *uuid;
373         int rc;
374         ENTRY;
375
376         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME)) {
377                 CERROR("unexpected notification of %s %s!\n",
378                        watched->obd_type->typ_name,
379                        watched->obd_name);
380                 return -EINVAL;
381         }
382         uuid = &watched->u.cli.cl_import->imp_target_uuid;
383
384         /* Set OSC as active before notifying the observer, so the
385          * observer can use the OSC normally.  
386          */
387         rc = lov_set_osc_active(&obd->u.lov, uuid, active);
388         if (rc) {
389                 CERROR("%sactivation of %s failed: %d\n",
390                        active ? "" : "de", uuid->uuid, rc);
391                 RETURN(rc);
392         }
393
394         if (obd->obd_observer)
395                 /* Pass the notification up the chain. */
396                 rc = obd_notify(obd->obd_observer, watched, active, data);
397
398         RETURN(rc);
399 }
400
401 int lov_attach(struct obd_device *dev, obd_count len, void *data)
402 {
403         struct lprocfs_static_vars lvars;
404         int rc;
405
406         lprocfs_init_vars(lov, &lvars);
407         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
408         if (rc == 0) {
409 #ifdef __KERNEL__
410                 struct proc_dir_entry *entry;
411
412                 entry = create_proc_entry("target_obd_status", 0444, 
413                                           dev->obd_proc_entry);
414                 if (entry == NULL) {
415                         rc = -ENOMEM;
416                 } else {
417                         entry->proc_fops = &lov_proc_target_fops;
418                         entry->data = dev;
419                 }
420 #endif
421         }
422         return rc;
423 }
424
425 int lov_detach(struct obd_device *dev)
426 {
427         return lprocfs_obd_detach(dev);
428 }
429
430 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
431 {
432         struct lov_obd *lov = &obd->u.lov;
433         struct lustre_cfg *lcfg = buf;
434         struct lov_desc *desc;
435         int count;
436         ENTRY;
437
438         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
439                 CERROR("LOV setup requires a descriptor\n");
440                 RETURN(-EINVAL);
441         }
442
443         desc = (struct lov_desc *)lustre_cfg_string(lcfg, 1);
444         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
445                 CERROR("descriptor size wrong: %d > %d\n",
446                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
447                 RETURN(-EINVAL);
448         }
449  
450         /* Because of 64-bit divide/mod operations only work with a 32-bit
451          * divisor in a 32-bit kernel, we cannot support a stripe width
452          * of 4GB or larger on 32-bit CPUs.
453          */
454        
455         count = desc->ld_default_stripe_count;
456         if (count && (count * desc->ld_default_stripe_size) > ~0UL) {
457                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
458                        desc->ld_default_stripe_size, count, ~0UL);
459                 RETURN(-EINVAL);
460         }
461         if (desc->ld_tgt_count > 0) {
462                 lov->bufsize= sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
463         } else {
464                 lov->bufsize = sizeof(struct lov_tgt_desc) * LOV_MAX_TGT_COUNT;  
465         }
466         OBD_ALLOC(lov->tgts, lov->bufsize);
467         if (lov->tgts == NULL) {
468                 lov->bufsize = 0;
469                 CERROR("couldn't allocate %d bytes for target table.\n",
470                        lov->bufsize);
471                 RETURN(-EINVAL);
472         }
473
474         desc->ld_tgt_count = 0;
475         desc->ld_active_tgt_count = 0;
476         lov->desc = *desc;
477         spin_lock_init(&lov->lov_lock);
478         sema_init(&lov->lov_llog_sem, 1);
479
480         RETURN(0);
481 }
482
483 static int lov_cleanup(struct obd_device *obd, int flags)
484 {
485         struct lov_obd *lov = &obd->u.lov;
486
487         OBD_FREE(lov->tgts, lov->bufsize);
488         RETURN(0);
489 }
490
491 static int
492 lov_add_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
493 {
494         struct lov_obd *lov = &obd->u.lov;
495         struct lov_tgt_desc *tgt;
496         int rc;
497         ENTRY;
498
499         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
500                uuidp->uuid, index, gen);
501
502         if ((index < 0) || (index >= LOV_MAX_TGT_COUNT)) {
503                 CERROR("request to add OBD %s at invalid index: %d\n",
504                        uuidp->uuid, index);
505                 RETURN(-EINVAL);
506         }
507
508         if (gen <= 0) {
509                 CERROR("request to add OBD %s with invalid generation: %d\n",
510                        uuidp->uuid, gen);
511                 RETURN(-EINVAL);
512         }
513
514         tgt = lov->tgts + index;
515         if (!obd_uuid_empty(&tgt->uuid)) {
516                 CERROR("OBD already assigned at LOV target index %d\n",
517                        index);
518                 RETURN(-EEXIST);
519         }
520
521         tgt->uuid = *uuidp;
522         /* XXX - add a sanity check on the generation number. */
523         tgt->ltd_gen = gen;
524
525         if (index >= lov->desc.ld_tgt_count)
526                 lov->desc.ld_tgt_count = index + 1;
527
528         CDEBUG(D_CONFIG, "idx: %d ltd_gen: %d ld_tgt_count: %d\n",
529                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
530
531         if (lov->refcount == 0)
532                 RETURN(0);
533
534         if (tgt->ltd_exp) {
535                 struct obd_device *osc_obd;
536
537                 osc_obd = class_exp2obd(tgt->ltd_exp);
538                 if (osc_obd)
539                         osc_obd->obd_no_recov = 0;
540         }
541
542         rc = lov_connect_obd(obd, tgt, 1, NULL, lov->lov_connect_flags);
543         if (rc)
544                 GOTO(out, rc);
545
546         if (obd->obd_observer) {
547                 /* tell the mds_lov about the new target */
548                 rc = obd_notify(obd->obd_observer, tgt->ltd_exp->exp_obd, 1,
549                                 (void *)index);
550         }
551
552         GOTO(out, rc);
553  out:
554         if (rc && tgt->ltd_exp != NULL)
555                 lov_disconnect_obd(obd, tgt, 0);
556         return rc;
557 }
558
559 static int
560 lov_del_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
561 {
562         struct lov_obd *lov = &obd->u.lov;
563         struct lov_tgt_desc *tgt;
564         int count = lov->desc.ld_tgt_count;
565         int rc = 0;
566         ENTRY;
567
568         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
569                uuidp->uuid, index, gen);
570
571         if (index >= count) {
572                 CERROR("LOV target index %d >= number of LOV OBDs %d.\n",
573                        index, count);
574                 RETURN(-EINVAL);
575         }
576
577         tgt = lov->tgts + index;
578
579         if (obd_uuid_empty(&tgt->uuid)) {
580                 CERROR("LOV target at index %d is not setup.\n", index);
581                 RETURN(-EINVAL);
582         }
583
584         if (strncmp(uuidp->uuid, tgt->uuid.uuid, sizeof uuidp->uuid) != 0) {
585                 CERROR("LOV target UUID %s at index %d doesn't match %s.\n",
586                        tgt->uuid.uuid, index, uuidp->uuid);
587                 RETURN(-EINVAL);
588         }
589
590         if (tgt->ltd_exp) {
591                 struct obd_device *osc_obd;
592
593                 osc_obd = class_exp2obd(tgt->ltd_exp);
594                 if (osc_obd) {
595                         osc_obd->obd_no_recov = 1;
596                         rc = obd_llog_finish(osc_obd, &osc_obd->obd_llogs, 1);
597                         if (rc)
598                                 CERROR("osc_llog_finish error: %d\n", rc);
599                 }
600                 lov_disconnect_obd(obd, tgt, 0);
601         }
602
603         /* XXX - right now there is a dependency on ld_tgt_count being the
604          * maximum tgt index for computing the mds_max_easize. So we can't
605          * shrink it. */
606
607         /* lt_gen = 0 will mean it will not match the gen of any valid loi */
608         memset(tgt, 0, sizeof(*tgt));
609
610         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
611                tgt->uuid.uuid, index, tgt->ltd_gen, tgt->ltd_exp, tgt->active);
612
613         RETURN(rc);
614 }
615
616 static int lov_process_config(struct obd_device *obd, obd_count len, void *buf)
617 {
618         struct lustre_cfg *lcfg = buf;
619         struct obd_uuid obd_uuid;
620         int cmd;
621         int index;
622         int gen;
623         int rc = 0;
624         ENTRY;
625
626         switch(cmd = lcfg->lcfg_command) {
627         case LCFG_LOV_ADD_OBD:
628         case LCFG_LOV_DEL_OBD: {
629                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
630                         GOTO(out, rc = -EINVAL);
631
632                 obd_str2uuid(&obd_uuid, lustre_cfg_string(lcfg, 1));
633
634                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
635                         GOTO(out, rc = -EINVAL);
636                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
637                         GOTO(out, rc = -EINVAL);
638                 if (cmd == LCFG_LOV_ADD_OBD)
639                         rc = lov_add_obd(obd, &obd_uuid, index, gen);
640                 else
641                         rc = lov_del_obd(obd, &obd_uuid, index, gen);
642                 GOTO(out, rc);
643         }
644         default: {
645                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
646                 GOTO(out, rc = -EINVAL);
647
648         }
649         }
650 out:
651         RETURN(rc);
652 }
653
654 #ifndef log2
655 #define log2(n) ffz(~(n))
656 #endif
657
658 static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
659                              struct lov_stripe_md **ea,
660                              struct obd_trans_info *oti)
661 {
662         struct lov_obd *lov;
663         struct obdo *tmp_oa;
664         struct obd_uuid *ost_uuid = NULL;
665         int rc = 0, i;
666         ENTRY;
667
668         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
669                 src_oa->o_flags == OBD_FL_DELORPHAN);
670
671         lov = &export->exp_obd->u.lov;
672
673         tmp_oa = obdo_alloc();
674         if (tmp_oa == NULL)
675                 RETURN(-ENOMEM);
676
677         if (src_oa->o_valid & OBD_MD_FLINLINE) {
678                 ost_uuid = (struct obd_uuid *)src_oa->o_inline;
679                 CDEBUG(D_HA, "clearing orphans only for %s\n",
680                        ost_uuid->uuid);
681         }
682
683         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
684                 struct lov_stripe_md obj_md;
685                 struct lov_stripe_md *obj_mdp = &obj_md;
686                 int err;
687
688                 /* if called for a specific target, we don't
689                    care if it is not active. */
690                 if (lov->tgts[i].active == 0 && ost_uuid == NULL) {
691                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
692                         continue;
693                 }
694
695                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &lov->tgts[i].uuid))
696                         continue;
697
698                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
699
700                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
701                 err = obd_create(lov->tgts[i].ltd_exp, tmp_oa, NULL, 0,
702                                  &obj_mdp, oti);
703                 if (err)
704                         /* This export will be disabled until it is recovered,
705                            and then orphan recovery will be completed. */
706                         CERROR("error in orphan recovery on OST idx %d/%d: "
707                                "rc = %d\n", i, lov->desc.ld_tgt_count, err);
708
709                 if (ost_uuid)
710                         break;
711         }
712         obdo_free(tmp_oa);
713         RETURN(rc);
714 }
715
716 static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
717                         void *acl, int acl_size,
718                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
719 {
720         struct lov_stripe_md *obj_mdp, *lsm;
721         struct lov_obd *lov = &exp->exp_obd->u.lov;
722         unsigned ost_idx;
723         int rc, i;
724         ENTRY;
725
726         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
727                 src_oa->o_flags & OBD_FL_RECREATE_OBJS);
728
729         OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
730         if (obj_mdp == NULL)
731                 RETURN(-ENOMEM);
732
733         ost_idx = src_oa->o_nlink;
734         lsm = *ea;
735         if (lsm == NULL)
736                 GOTO(out, rc = -EINVAL);
737         if (ost_idx >= lov->desc.ld_tgt_count)
738                 GOTO(out, rc = -EINVAL);
739
740         for (i = 0; i < lsm->lsm_stripe_count; i++) {
741                 if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) {
742                         if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id)
743                                 GOTO(out, rc = -EINVAL);
744                         break;
745                 }
746         }
747         if (i == lsm->lsm_stripe_count)
748                 GOTO(out, rc = -EINVAL);
749
750         rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, acl, acl_size,
751                         &obj_mdp, oti);
752 out:
753         OBD_FREE(obj_mdp, sizeof(*obj_mdp));
754         RETURN(rc);
755 }
756
757 /* the LOV expects oa->o_id to be set to the LOV object id */
758 static int lov_create(struct obd_export *exp, struct obdo *src_oa,
759                       void *acl, int acl_size,
760                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
761 {
762         struct lov_request_set *set = NULL;
763         struct list_head *pos;
764         struct lov_obd *lov;
765         int rc = 0;
766         ENTRY;
767
768         LASSERT(ea != NULL);
769         if (exp == NULL)
770                 RETURN(-EINVAL);
771
772         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
773             src_oa->o_flags == OBD_FL_DELORPHAN) {
774                 rc = lov_clear_orphans(exp, src_oa, ea, oti);
775                 RETURN(rc);
776         }
777
778         lov = &exp->exp_obd->u.lov;
779         if (!lov->desc.ld_active_tgt_count)
780                 RETURN(-EIO);
781
782         /* Recreate a specific object id at the given OST index */
783         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
784             (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
785                  rc = lov_recreate(exp, src_oa, acl, acl_size, ea, oti);
786                  RETURN(rc);
787         }
788
789         rc = lov_prep_create_set(exp, ea, src_oa, oti, &set);
790         if (rc)
791                 RETURN(rc);
792
793         list_for_each (pos, &set->set_list) {
794                 struct lov_request *req = 
795                         list_entry(pos, struct lov_request, rq_link);
796
797                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
798                 rc = obd_create(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
799                                 acl, acl_size, &req->rq_md, oti);
800                 lov_update_create_set(set, req, rc);
801         }
802         rc = lov_fini_create_set(set, ea);
803         RETURN(rc);
804 }
805
806 #define lsm_bad_magic(LSMP)                                     \
807 ({                                                              \
808         struct lov_stripe_md *_lsm__ = (LSMP);                  \
809         int _ret__ = 0;                                         \
810         if (!_lsm__) {                                          \
811                 CERROR("LOV requires striping ea\n");           \
812                 _ret__ = 1;                                     \
813         } else if (_lsm__->lsm_magic != LOV_MAGIC) {            \
814                 CERROR("LOV striping magic bad %#x != %#x\n",   \
815                        _lsm__->lsm_magic, LOV_MAGIC);           \
816                 _ret__ = 1;                                     \
817         }                                                       \
818         _ret__;                                                 \
819 })
820
821 static int lov_destroy(struct obd_export *exp, struct obdo *oa,
822                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
823 {
824         struct lov_request_set *set;
825         struct lov_request *req;
826         struct list_head *pos;
827         struct lov_obd *lov;
828         int rc = 0;
829         ENTRY;
830
831         if (lsm_bad_magic(lsm))
832                 RETURN(-EINVAL);
833
834         if (!exp || !exp->exp_obd)
835                 RETURN(-ENODEV);
836
837         lov = &exp->exp_obd->u.lov;
838         rc = lov_prep_destroy_set(exp, oa, lsm, oti, &set);
839         if (rc)
840                 RETURN(rc);
841
842         list_for_each (pos, &set->set_list) {
843                 int err;
844                 req = list_entry(pos, struct lov_request, rq_link);
845
846                 /* XXX update the cookie position */
847                 oti->oti_logcookies = set->set_cookies + req->rq_stripe;
848                 rc = obd_destroy(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
849                                  NULL, oti);
850                 err = lov_update_common_set(set, req, rc);
851                 if (rc) {
852                         CERROR("error: destroying objid "LPX64" subobj "
853                                LPX64" on OST idx %d: rc = %d\n", 
854                                set->set_oa->o_id, req->rq_oa->o_id, 
855                                req->rq_idx, rc);
856                         if (!rc)
857                                 rc = err;
858                 }
859         }
860         lov_fini_destroy_set(set);
861         RETURN(rc);
862 }
863
864 static int lov_getattr(struct obd_export *exp, struct obdo *oa,
865                        struct lov_stripe_md *lsm)
866 {
867         struct lov_request_set *set;
868         struct lov_request *req;
869         struct list_head *pos;
870         struct lov_obd *lov;
871         int err = 0, rc = 0;
872         ENTRY;
873
874         if (lsm_bad_magic(lsm))
875                 RETURN(-EINVAL);
876
877         if (!exp || !exp->exp_obd)
878                 RETURN(-ENODEV);
879
880         lov = &exp->exp_obd->u.lov;
881         
882         rc = lov_prep_getattr_set(exp, oa, lsm, &set);
883         if (rc)
884                 RETURN(rc);
885
886         list_for_each (pos, &set->set_list) {
887                 req = list_entry(pos, struct lov_request, rq_link);
888                 
889                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
890                        "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id, 
891                        req->rq_idx);
892
893                 rc = obd_getattr(lov->tgts[req->rq_idx].ltd_exp, 
894                                  req->rq_oa, NULL);
895                 err = lov_update_common_set(set, req, rc);
896                 if (err) {
897                         CERROR("error: getattr objid "LPX64" subobj "
898                                LPX64" on OST idx %d: rc = %d\n",
899                                set->set_oa->o_id, req->rq_oa->o_id, 
900                                req->rq_idx, err);
901                         break;
902                 }
903         }
904         
905         rc = lov_fini_getattr_set(set);
906         if (err)
907                 rc = err;
908         RETURN(rc);
909 }
910
911 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data,
912                                  int rc)
913 {
914         struct lov_request_set *lovset = (struct lov_request_set *)data;
915         ENTRY;
916
917         /* don't do attribute merge if this aysnc op failed */
918         if (rc) {
919                 lovset->set_completes = 0;
920                 lov_fini_getattr_set(lovset);
921         } else {
922                 rc = lov_fini_getattr_set(lovset);
923         }
924         RETURN (rc);
925 }
926
927 static int lov_getattr_async(struct obd_export *exp, struct obdo *oa,
928                               struct lov_stripe_md *lsm,
929                               struct ptlrpc_request_set *rqset)
930 {
931         struct lov_request_set *lovset;
932         struct lov_obd *lov;
933         struct list_head *pos;
934         struct lov_request *req;
935         int rc = 0;
936         ENTRY;
937
938         if (lsm_bad_magic(lsm))
939                 RETURN(-EINVAL);
940
941         if (!exp || !exp->exp_obd)
942                 RETURN(-ENODEV);
943
944         lov = &exp->exp_obd->u.lov;
945
946         rc = lov_prep_getattr_set(exp, oa, lsm, &lovset);
947         if (rc)
948                 RETURN(rc);
949
950         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
951                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
952
953         list_for_each (pos, &lovset->set_list) {
954                 req = list_entry(pos, struct lov_request, rq_link);
955                 
956                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
957                        "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id, 
958                        req->rq_idx);
959                 rc = obd_getattr_async(lov->tgts[req->rq_idx].ltd_exp,
960                                        req->rq_oa, NULL, rqset);
961                 if (rc) {
962                         CERROR("error: getattr objid "LPX64" subobj "
963                                LPX64" on OST idx %d: rc = %d\n",
964                                lovset->set_oa->o_id, req->rq_oa->o_id, 
965                                req->rq_idx, rc);
966                         GOTO(out, rc);
967                 }
968                 lov_update_common_set(lovset, req, rc);
969         }
970         
971         LASSERT(rc == 0);
972         LASSERT (rqset->set_interpret == NULL);
973         rqset->set_interpret = lov_getattr_interpret;
974         rqset->set_arg = (void *)lovset;
975         RETURN(rc);
976 out:
977         LASSERT(rc);
978         lov_fini_getattr_set(lovset);
979         RETURN(rc);
980 }
981
982 static int lov_setattr(struct obd_export *exp, struct obdo *src_oa,
983                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
984 {
985         struct lov_request_set *set;
986         struct lov_obd *lov;
987         struct list_head *pos;
988         struct lov_request *req;
989         int err = 0, rc = 0;
990         ENTRY;
991
992         if (lsm_bad_magic(lsm))
993                 RETURN(-EINVAL);
994
995         if (!exp || !exp->exp_obd)
996                 RETURN(-ENODEV);
997
998         /* for now, we only expect time updates here */
999         LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE | OBD_MD_FLMODE|
1000                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
1001                                       OBD_MD_FLCTIME | OBD_MD_FLFLAGS |
1002                                       OBD_MD_FLSIZE | OBD_MD_FLGROUP)));
1003
1004         LASSERT(!(src_oa->o_valid & OBD_MD_FLGROUP) || src_oa->o_gr > 0);
1005
1006         lov = &exp->exp_obd->u.lov;
1007         rc = lov_prep_setattr_set(exp, src_oa, lsm, NULL, &set);
1008         if (rc)
1009                 RETURN(rc);
1010
1011         list_for_each (pos, &set->set_list) {
1012                 req = list_entry(pos, struct lov_request, rq_link);
1013                 
1014                 rc = obd_setattr(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
1015                                  NULL, NULL);
1016                 err = lov_update_common_set(set, req, rc);
1017                 if (err) {
1018                         CERROR("error: setattr objid "LPX64" subobj "
1019                                LPX64" on OST idx %d: rc = %d\n",
1020                                set->set_oa->o_id, req->rq_oa->o_id,
1021                                req->rq_idx, err);
1022                         if (!rc)
1023                                 rc = err;
1024                 }
1025         }
1026         err = lov_fini_setattr_set(set);
1027         if (!rc)
1028                 rc = err;
1029         RETURN(rc);
1030 }
1031
1032 static int lov_revalidate_policy(struct lov_obd *lov, struct lov_stripe_md *lsm)
1033 {
1034         static int next_idx = 0;
1035         struct lov_tgt_desc *tgt;
1036         int i, count;
1037
1038         /* XXX - we should do something clever and take lsm
1039          * into account but just do round robin for now. */
1040
1041         /* last_idx must always be less that count because
1042          * ld_tgt_count currently cannot shrink. */
1043         count = lov->desc.ld_tgt_count;
1044
1045         for (i = next_idx, tgt = lov->tgts + i; i < count; i++, tgt++) {
1046                 if (tgt->active) {
1047                         next_idx = (i + 1) % count;
1048                         RETURN(i);
1049                 }
1050         }
1051
1052         for (i = 0, tgt = lov->tgts; i < next_idx; i++, tgt++) {
1053                 if (tgt->active) {
1054                         next_idx = (i + 1) % count;
1055                         RETURN(i);
1056                 }
1057         }
1058
1059         RETURN(-EIO);
1060 }
1061
1062 static int lov_revalidate_md(struct obd_export *exp, struct obdo *src_oa,
1063                              struct lov_stripe_md *ea,
1064                              struct obd_trans_info *oti)
1065 {
1066         struct obd_export *osc_exp;
1067         struct lov_obd *lov = &exp->exp_obd->u.lov;
1068         struct lov_stripe_md *lsm = ea;
1069         struct lov_stripe_md obj_md;
1070         struct lov_stripe_md *obj_mdp = &obj_md;
1071         struct lov_oinfo *loi;
1072         struct obdo *tmp_oa;
1073         int ost_idx, updates = 0, i;
1074         ENTRY;
1075
1076         tmp_oa = obdo_alloc();
1077         if (tmp_oa == NULL)
1078                 RETURN(-ENOMEM);
1079
1080         loi = lsm->lsm_oinfo;
1081         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1082                 int rc;
1083                 if (!obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid))
1084                         continue;
1085
1086                 ost_idx = lov_revalidate_policy(lov, lsm);
1087                 if (ost_idx < 0) {
1088                         /* FIXME: punt for now. */
1089                         CERROR("lov_revalidate_policy failed; no active "
1090                                "OSCs?\n");
1091                         continue;
1092                 }
1093
1094                 /* create a new object */
1095                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
1096                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
1097                 osc_exp = lov->tgts[ost_idx].ltd_exp;
1098                 rc = obd_create(osc_exp, tmp_oa, NULL, 0, &obj_mdp, oti);
1099                 if (rc) {
1100                         CERROR("error creating new subobj at idx %d; "
1101                                "rc = %d\n", ost_idx, rc);
1102                         continue;
1103                 }
1104                 if (oti->oti_objid)
1105                         oti->oti_objid[ost_idx] = tmp_oa->o_id;
1106                 loi->loi_id = tmp_oa->o_id;
1107                 loi->loi_gr = tmp_oa->o_gr;
1108                 loi->loi_ost_idx = ost_idx;
1109                 loi->loi_ost_gen = lov->tgts[ost_idx].ltd_gen;
1110                 CDEBUG(D_INODE, "replacing objid "LPX64" subobj "LPX64
1111                        " with idx %d gen %d.\n", lsm->lsm_object_id,
1112                        loi->loi_id, ost_idx, loi->loi_ost_gen);
1113                 updates = 1;
1114         }
1115
1116         /* If we got an error revalidating an entry there's no need to
1117          * cleanup up objects we allocated here because the bad entry
1118          * still points to a deleted OST. */
1119
1120         obdo_free(tmp_oa);
1121         RETURN(updates);
1122 }
1123
1124 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1125  * we can send this 'punch' to just the authoritative node and the nodes
1126  * that the punch will affect. */
1127 static int lov_punch(struct obd_export *exp, struct obdo *oa,
1128                      struct lov_stripe_md *lsm,
1129                      obd_off start, obd_off end, struct obd_trans_info *oti)
1130 {
1131         struct lov_request_set *set;
1132         struct lov_obd *lov;
1133         struct list_head *pos;
1134         struct lov_request *req;
1135         int err = 0, rc = 0;
1136         ENTRY;
1137
1138         if (lsm_bad_magic(lsm))
1139                 RETURN(-EINVAL);
1140
1141         if (!exp || !exp->exp_obd)
1142                 RETURN(-ENODEV);
1143
1144         lov = &exp->exp_obd->u.lov;
1145         rc = lov_prep_punch_set(exp, oa, lsm, start, end, oti, &set);
1146         if (rc)
1147                 RETURN(rc);
1148
1149         list_for_each (pos, &set->set_list) {
1150                 req = list_entry(pos, struct lov_request, rq_link);
1151
1152                 rc = obd_punch(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, 
1153                                NULL, req->rq_extent.start, 
1154                                req->rq_extent.end, NULL);
1155                 err = lov_update_punch_set(set, req, rc);
1156                 if (err) {
1157                         CERROR("error: punch objid "LPX64" subobj "LPX64
1158                                " on OST idx %d: rc = %d\n", set->set_oa->o_id,
1159                                req->rq_oa->o_id, req->rq_idx, rc);
1160                         if (!rc)
1161                                 rc = err;
1162                 }
1163         }
1164         err = lov_fini_punch_set(set);
1165         if (!rc)
1166                 rc = err;
1167         RETURN(rc);
1168 }
1169
1170 static int lov_sync(struct obd_export *exp, struct obdo *oa,
1171                     struct lov_stripe_md *lsm, obd_off start, obd_off end)
1172 {
1173         struct lov_request_set *set;
1174         struct lov_obd *lov;
1175         struct list_head *pos;
1176         struct lov_request *req;
1177         int err = 0, rc = 0;
1178         ENTRY;
1179
1180         if (lsm_bad_magic(lsm))
1181                 RETURN(-EINVAL);
1182
1183         if (!exp->exp_obd)
1184                 RETURN(-ENODEV);
1185
1186         lov = &exp->exp_obd->u.lov;
1187         rc = lov_prep_sync_set(exp, oa, lsm, start, end, &set);
1188         if (rc)
1189                 RETURN(rc);
1190
1191         list_for_each (pos, &set->set_list) {
1192                 req = list_entry(pos, struct lov_request, rq_link);
1193
1194                 rc = obd_sync(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, 
1195                               NULL, req->rq_extent.start, req->rq_extent.end);
1196                 err = lov_update_common_set(set, req, rc);
1197                 if (err) {
1198                         CERROR("error: fsync objid "LPX64" subobj "LPX64
1199                                " on OST idx %d: rc = %d\n", set->set_oa->o_id,
1200                                req->rq_oa->o_id, req->rq_idx, rc);
1201                         if (!rc)
1202                                 rc = err;
1203                 }
1204         }
1205         err = lov_fini_sync_set(set);
1206         if (!rc)
1207                 rc = err;
1208         RETURN(rc);
1209 }
1210
1211 static int lov_brw_check(struct lov_obd *lov, struct obdo *oa,
1212                          struct lov_stripe_md *lsm,
1213                          obd_count oa_bufs, struct brw_page *pga)
1214 {
1215         int i, rc = 0;
1216         ENTRY;
1217
1218         /* The caller just wants to know if there's a chance that this
1219          * I/O can succeed */
1220         for (i = 0; i < oa_bufs; i++) {
1221                 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
1222                 int ost = lsm->lsm_oinfo[stripe].loi_ost_idx;
1223                 obd_off start, end;
1224
1225                 if (!lov_stripe_intersects(lsm, i, pga[i].disk_offset,
1226                                            pga[i].disk_offset + pga[i].count,
1227                                            &start, &end))
1228                         continue;
1229
1230                 if (lov->tgts[ost].active == 0) {
1231                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1232                         RETURN(-EIO);
1233                 }
1234                 rc = obd_brw(OBD_BRW_CHECK, lov->tgts[ost].ltd_exp, oa,
1235                              NULL, 1, &pga[i], NULL);
1236                 if (rc)
1237                         break;
1238         }
1239         RETURN(rc);
1240 }
1241
1242 static int lov_brw(int cmd, struct obd_export *exp, struct obdo *src_oa,
1243                    struct lov_stripe_md *lsm, obd_count oa_bufs,
1244                    struct brw_page *pga, struct obd_trans_info *oti)
1245 {
1246         struct lov_request_set *set;
1247         struct lov_request *req;
1248         struct list_head *pos;
1249         struct lov_obd *lov = &exp->exp_obd->u.lov;
1250         int err, rc = 0;
1251         ENTRY;
1252
1253         if (lsm_bad_magic(lsm))
1254                 RETURN(-EINVAL);
1255
1256         if (cmd == OBD_BRW_CHECK) {
1257                 rc = lov_brw_check(lov, src_oa, lsm, oa_bufs, pga);
1258                 RETURN(rc);
1259         }
1260
1261         rc = lov_prep_brw_set(exp, src_oa, lsm, oa_bufs, pga, oti, &set);
1262         if (rc)
1263                 RETURN(rc);
1264
1265         list_for_each (pos, &set->set_list) {
1266                 struct obd_export *sub_exp;
1267                 struct brw_page *sub_pga;
1268                 req = list_entry(pos, struct lov_request, rq_link);
1269                 
1270                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
1271                 sub_pga = set->set_pga + req->rq_pgaidx;
1272                 rc = obd_brw(cmd, sub_exp, req->rq_oa, req->rq_md, 
1273                              req->rq_oabufs, sub_pga, oti);
1274                 if (rc)
1275                         break;
1276                 lov_update_common_set(set, req, rc);
1277         }
1278
1279         err = lov_fini_brw_set(set);
1280         if (!rc)
1281                 rc = err;
1282         RETURN(rc);
1283 }
1284
1285 static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data,
1286                              int rc)
1287 {
1288         struct lov_request_set *lovset = (struct lov_request_set *)data;
1289         ENTRY;
1290         
1291         if (rc) {
1292                 lovset->set_completes = 0;
1293                 lov_fini_brw_set(lovset);
1294         } else {
1295                 rc = lov_fini_brw_set(lovset);
1296         }
1297                 
1298         RETURN(rc);
1299 }
1300
1301 static int lov_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
1302                          struct lov_stripe_md *lsm, obd_count oa_bufs,
1303                          struct brw_page *pga, struct ptlrpc_request_set *set,
1304                          struct obd_trans_info *oti)
1305 {
1306         struct lov_request_set *lovset;
1307         struct lov_request *req;
1308         struct list_head *pos;
1309         struct lov_obd *lov = &exp->exp_obd->u.lov;
1310         int rc = 0;
1311         ENTRY;
1312
1313         if (lsm_bad_magic(lsm))
1314                 RETURN(-EINVAL);
1315
1316         if (cmd == OBD_BRW_CHECK) {
1317                 rc = lov_brw_check(lov, oa, lsm, oa_bufs, pga);
1318                 RETURN(rc);
1319         }
1320
1321         rc = lov_prep_brw_set(exp, oa, lsm, oa_bufs, pga, oti, &lovset);
1322         if (rc)
1323                 RETURN(rc);
1324
1325         list_for_each (pos, &lovset->set_list) {
1326                 struct obd_export *sub_exp;
1327                 struct brw_page *sub_pga;
1328                 req = list_entry(pos, struct lov_request, rq_link);
1329                 
1330                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
1331                 sub_pga = lovset->set_pga + req->rq_pgaidx;
1332                 rc = obd_brw_async(cmd, sub_exp, req->rq_oa, req->rq_md,
1333                                    req->rq_oabufs, sub_pga, set, oti);
1334                 if (rc)
1335                         GOTO(out, rc);
1336                 lov_update_common_set(lovset, req, rc);
1337         }
1338         LASSERT(rc == 0);
1339         LASSERT(set->set_interpret == NULL);
1340         set->set_interpret = (set_interpreter_func)lov_brw_interpret;
1341         set->set_arg = (void *)lovset;
1342         
1343         RETURN(rc);
1344 out:
1345         lov_fini_brw_set(lovset);
1346         RETURN(rc);
1347 }
1348
1349 static int lov_ap_make_ready(void *data, int cmd)
1350 {
1351         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1352
1353         return lap->lap_caller_ops->ap_make_ready(lap->lap_caller_data, cmd);
1354 }
1355 static int lov_ap_refresh_count(void *data, int cmd)
1356 {
1357         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1358
1359         return lap->lap_caller_ops->ap_refresh_count(lap->lap_caller_data,
1360                                                      cmd);
1361 }
1362 static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
1363 {
1364         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1365
1366         lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa);
1367         /* XXX woah, shouldn't we be altering more here?  size? */
1368         oa->o_id = lap->lap_loi_id;
1369 }
1370
1371 static void lov_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
1372 {
1373         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1374
1375         /* in a raid1 regime this would down a count of many ios
1376          * in flight, onl calling the caller_ops completion when all
1377          * the raid1 ios are complete */
1378         lap->lap_caller_ops->ap_completion(lap->lap_caller_data, cmd, oa, rc);
1379 }
1380
1381 static struct obd_async_page_ops lov_async_page_ops = {
1382         .ap_make_ready =        lov_ap_make_ready,
1383         .ap_refresh_count =     lov_ap_refresh_count,
1384         .ap_fill_obdo =         lov_ap_fill_obdo,
1385         .ap_completion =        lov_ap_completion,
1386 };
1387
1388 static int lov_prep_async_page(struct obd_export *exp,
1389                                struct lov_stripe_md *lsm,
1390                                struct lov_oinfo *loi, struct page *page,
1391                                obd_off offset, struct obd_async_page_ops *ops,
1392                                void *data, void **res)
1393 {
1394         struct lov_obd *lov = &exp->exp_obd->u.lov;
1395         struct lov_async_page *lap;
1396         int rc, stripe;
1397         ENTRY;
1398
1399         if (lsm_bad_magic(lsm))
1400                 RETURN(-EINVAL);
1401         LASSERT(loi == NULL);
1402
1403         stripe = lov_stripe_number(lsm, offset);
1404         loi = &lsm->lsm_oinfo[stripe];
1405
1406         if (obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid))
1407                 RETURN(-EIO);
1408         if (lov->tgts[loi->loi_ost_idx].active == 0)
1409                 RETURN(-EIO);
1410         if (lov->tgts[loi->loi_ost_idx].ltd_exp == NULL) {
1411                 CERROR("ltd_exp == NULL, but OST idx %d doesn't appear to be "
1412                        "deleted or inactive.\n", loi->loi_ost_idx);
1413                 RETURN(-EIO);
1414         }
1415
1416         OBD_ALLOC(lap, sizeof(*lap));
1417         if (lap == NULL)
1418                 RETURN(-ENOMEM);
1419
1420         lap->lap_magic = LAP_MAGIC;
1421         lap->lap_caller_ops = ops;
1422         lap->lap_caller_data = data;
1423
1424         /* FIXME handle multiple oscs after landing b_raid1 */
1425         lap->lap_stripe = stripe;
1426         switch (lsm->lsm_pattern) {
1427                 case LOV_PATTERN_RAID0:
1428                         lov_stripe_offset(lsm, offset, lap->lap_stripe, 
1429                                           &lap->lap_sub_offset);
1430                         break;
1431                 case LOV_PATTERN_CMOBD:
1432                         lap->lap_sub_offset = offset;
1433                         break;
1434                 default:
1435                         LBUG();
1436         }
1437
1438         /* so the callback doesn't need the lsm */
1439         lap->lap_loi_id = loi->loi_id;
1440
1441         rc = obd_prep_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1442                                  lsm, loi, page, lap->lap_sub_offset,
1443                                  &lov_async_page_ops, lap,
1444                                  &lap->lap_sub_cookie);
1445         if (rc) {
1446                 OBD_FREE(lap, sizeof(*lap));
1447                 RETURN(rc);
1448         }
1449         CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
1450                lap->lap_sub_cookie, offset);
1451         *res = lap;
1452         RETURN(0);
1453 }
1454
1455 static int lov_queue_async_io(struct obd_export *exp,
1456                               struct lov_stripe_md *lsm,
1457                               struct lov_oinfo *loi, void *cookie,
1458                               int cmd, obd_off off, int count,
1459                               obd_flags brw_flags, obd_flags async_flags)
1460 {
1461         struct lov_obd *lov = &exp->exp_obd->u.lov;
1462         struct lov_async_page *lap;
1463         int rc;
1464
1465         LASSERT(loi == NULL);
1466
1467         if (lsm_bad_magic(lsm))
1468                 RETURN(-EINVAL);
1469
1470         lap = LAP_FROM_COOKIE(cookie);
1471
1472         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1473
1474         rc = obd_queue_async_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm,
1475                                 loi, lap->lap_sub_cookie, cmd, off, count,
1476                                 brw_flags, async_flags);
1477         RETURN(rc);
1478 }
1479
1480 static int lov_set_async_flags(struct obd_export *exp,
1481                                struct lov_stripe_md *lsm,
1482                                struct lov_oinfo *loi, void *cookie,
1483                                obd_flags async_flags)
1484 {
1485         struct lov_obd *lov = &exp->exp_obd->u.lov;
1486         struct lov_async_page *lap;
1487         int rc;
1488
1489         LASSERT(loi == NULL);
1490
1491         if (lsm_bad_magic(lsm))
1492                 RETURN(-EINVAL);
1493
1494         lap = LAP_FROM_COOKIE(cookie);
1495
1496         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1497
1498         rc = obd_set_async_flags(lov->tgts[loi->loi_ost_idx].ltd_exp,
1499                                  lsm, loi, lap->lap_sub_cookie, async_flags);
1500         RETURN(rc);
1501 }
1502
1503 static int lov_queue_group_io(struct obd_export *exp,
1504                               struct lov_stripe_md *lsm,
1505                               struct lov_oinfo *loi,
1506                               struct obd_io_group *oig, void *cookie,
1507                               int cmd, obd_off off, int count,
1508                               obd_flags brw_flags, obd_flags async_flags)
1509 {
1510         struct lov_obd *lov = &exp->exp_obd->u.lov;
1511         struct lov_async_page *lap;
1512         int rc;
1513
1514         LASSERT(loi == NULL);
1515
1516         if (lsm_bad_magic(lsm))
1517                 RETURN(-EINVAL);
1518
1519         lap = LAP_FROM_COOKIE(cookie);
1520
1521         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1522
1523         rc = obd_queue_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi,
1524                                 oig, lap->lap_sub_cookie, cmd, off, count,
1525                                 brw_flags, async_flags);
1526         RETURN(rc);
1527 }
1528
1529 /* this isn't exactly optimal.  we may have queued sync io in oscs on
1530  * all stripes, but we don't record that fact at queue time.  so we
1531  * trigger sync io on all stripes. */
1532 static int lov_trigger_group_io(struct obd_export *exp,
1533                                 struct lov_stripe_md *lsm,
1534                                 struct lov_oinfo *loi,
1535                                 struct obd_io_group *oig)
1536 {
1537         struct lov_obd *lov = &exp->exp_obd->u.lov;
1538         int rc = 0, i, err;
1539
1540         LASSERT(loi == NULL);
1541
1542         if (lsm_bad_magic(lsm))
1543                 RETURN(-EINVAL);
1544
1545         loi = lsm->lsm_oinfo;
1546         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1547                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1548                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1549                         continue;
1550                 }
1551
1552                 err = obd_trigger_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp,
1553                                            lsm, loi, oig);
1554                 if (rc == 0 && err != 0)
1555                         rc = err;
1556         };
1557         RETURN(rc);
1558 }
1559
1560 static int lov_teardown_async_page(struct obd_export *exp,
1561                                    struct lov_stripe_md *lsm,
1562                                    struct lov_oinfo *loi, void *cookie)
1563 {
1564         struct lov_obd *lov = &exp->exp_obd->u.lov;
1565         struct lov_async_page *lap;
1566         int rc;
1567
1568         LASSERT(loi == NULL);
1569
1570         if (lsm_bad_magic(lsm))
1571                 RETURN(-EINVAL);
1572
1573         lap = LAP_FROM_COOKIE(cookie);
1574
1575         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1576
1577         rc = obd_teardown_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1578                                      lsm, loi, lap->lap_sub_cookie);
1579         if (rc) {
1580                 CERROR("unable to teardown sub cookie %p: %d\n",
1581                        lap->lap_sub_cookie, rc);
1582                 RETURN(rc);
1583         }
1584         OBD_FREE(lap, sizeof(*lap));
1585         RETURN(rc);
1586 }
1587
1588 static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
1589                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
1590                        int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
1591                        void *data,__u32 lvb_len, void *lvb_swabber,
1592                        struct lustre_handle *lockh)
1593 {
1594         struct lov_request_set *set;
1595         struct lov_request *req;
1596         struct list_head *pos;
1597         struct lustre_handle *lov_lockhp;
1598         struct lov_obd *lov;
1599         ldlm_error_t rc;
1600         int save_flags = *flags;
1601         ENTRY;
1602
1603         if (lsm_bad_magic(lsm))
1604                 RETURN(-EINVAL);
1605
1606         /* we should never be asked to replay a lock this way. */
1607         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1608
1609         if (!exp || !exp->exp_obd)
1610                 RETURN(-ENODEV);
1611
1612         lov = &exp->exp_obd->u.lov;
1613         rc = lov_prep_enqueue_set(exp, lsm, policy, mode, lockh, &set);
1614         if (rc)
1615                 RETURN(rc);
1616
1617         list_for_each (pos, &set->set_list) {
1618                 ldlm_policy_data_t sub_policy;
1619                 req = list_entry(pos, struct lov_request, rq_link);
1620                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1621                 LASSERT(lov_lockhp);
1622
1623                 *flags = save_flags;
1624                 sub_policy.l_extent.start = req->rq_extent.start;
1625                 sub_policy.l_extent.end = req->rq_extent.end;
1626
1627                 rc = obd_enqueue(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1628                                  type, &sub_policy, mode, flags, bl_cb,
1629                                  cp_cb, gl_cb, data, lvb_len, lvb_swabber,
1630                                  lov_lockhp);
1631                 rc = lov_update_enqueue_set(set, req, rc, save_flags);
1632                 if (rc != ELDLM_OK)
1633                         break;
1634         }
1635
1636         lov_fini_enqueue_set(set, mode);
1637         RETURN(rc);
1638 }
1639
1640 static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
1641                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
1642                      int *flags, void *data, struct lustre_handle *lockh)
1643 {
1644         struct lov_request_set *set;
1645         struct lov_request *req;
1646         struct list_head *pos;
1647         struct lov_obd *lov = &exp->exp_obd->u.lov;
1648         struct lustre_handle *lov_lockhp;
1649         int lov_flags, rc = 0;
1650         ENTRY;
1651
1652         if (lsm_bad_magic(lsm))
1653                 RETURN(-EINVAL);
1654
1655         if (!exp || !exp->exp_obd)
1656                 RETURN(-ENODEV);
1657
1658         lov = &exp->exp_obd->u.lov;
1659         rc = lov_prep_match_set(exp, lsm, policy, mode, lockh, &set);
1660         if (rc)
1661                 RETURN(rc);
1662
1663         list_for_each (pos, &set->set_list) {
1664                 ldlm_policy_data_t sub_policy;
1665                 req = list_entry(pos, struct lov_request, rq_link);
1666                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1667                 LASSERT(lov_lockhp);
1668
1669                 sub_policy.l_extent.start = req->rq_extent.start;
1670                 sub_policy.l_extent.end = req->rq_extent.end;
1671                 lov_flags = *flags;
1672
1673                 rc = obd_match(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1674                                type, &sub_policy, mode, &lov_flags, data,
1675                                lov_lockhp);
1676                 rc = lov_update_match_set(set, req, rc);
1677                 if (rc != 1)
1678                         break;
1679         }
1680         lov_fini_match_set(set, mode, *flags);
1681         RETURN(rc);
1682 }
1683
1684 static int lov_change_cbdata(struct obd_export *exp,
1685                              struct lov_stripe_md *lsm, ldlm_iterator_t it,
1686                              void *data)
1687 {
1688         struct lov_obd *lov;
1689         struct lov_oinfo *loi;
1690         int rc = 0, i;
1691         ENTRY;
1692
1693         if (lsm_bad_magic(lsm))
1694                 RETURN(-EINVAL);
1695
1696         if (!exp || !exp->exp_obd)
1697                 RETURN(-ENODEV);
1698
1699         LASSERT(lsm->lsm_object_gr > 0);
1700
1701         lov = &exp->exp_obd->u.lov;
1702         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1703                 struct lov_stripe_md submd;
1704                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1705                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1706                         continue;
1707                 }
1708
1709                 submd.lsm_object_id = loi->loi_id;
1710                 submd.lsm_object_gr = lsm->lsm_object_gr;
1711                 submd.lsm_stripe_count = 0;
1712                 rc = obd_change_cbdata(lov->tgts[loi->loi_ost_idx].ltd_exp,
1713                                        &submd, it, data);
1714         }
1715         RETURN(rc);
1716 }
1717
1718 static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
1719                       __u32 mode, struct lustre_handle *lockh)
1720 {
1721         struct lov_request_set *set;
1722         struct lov_request *req;
1723         struct list_head *pos;
1724         struct lov_obd *lov = &exp->exp_obd->u.lov;
1725         struct lustre_handle *lov_lockhp;
1726         int err = 0, rc = 0;
1727         ENTRY;
1728
1729         if (lsm_bad_magic(lsm))
1730                 RETURN(-EINVAL);
1731
1732         if (!exp || !exp->exp_obd)
1733                 RETURN(-ENODEV);
1734
1735         LASSERT(lsm->lsm_object_gr > 0);
1736
1737         LASSERT(lockh);
1738         lov = &exp->exp_obd->u.lov;
1739         rc = lov_prep_cancel_set(exp, lsm, mode, lockh, &set);
1740         if (rc)
1741                 RETURN(rc);
1742
1743         list_for_each (pos, &set->set_list) {
1744                 req = list_entry(pos, struct lov_request, rq_link);
1745                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1746
1747                 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1748                                 mode, lov_lockhp);
1749                 rc = lov_update_common_set(set, req, rc);
1750                 if (rc) {
1751                         CERROR("error: cancel objid "LPX64" subobj "
1752                                LPX64" on OST idx %d: rc = %d\n",
1753                                lsm->lsm_object_id,
1754                                req->rq_md->lsm_object_id, req->rq_idx, rc);
1755                         err = rc;
1756                 }
1757  
1758         }
1759         lov_fini_cancel_set(set);
1760         RETURN(err);
1761 }
1762
1763 static int lov_cancel_unused(struct obd_export *exp,
1764                              struct lov_stripe_md *lsm, 
1765                              int flags, void *opaque)
1766 {
1767         struct lov_obd *lov;
1768         struct lov_oinfo *loi;
1769         int rc = 0, i;
1770         ENTRY;
1771
1772         lov = &exp->exp_obd->u.lov;
1773         if (lsm == NULL) {
1774                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1775                         int err = obd_cancel_unused(lov->tgts[i].ltd_exp, NULL,
1776                                                     flags, opaque);
1777                         if (!rc)
1778                                 rc = err;
1779                 }
1780                 RETURN(rc);
1781         }
1782
1783         if (lsm_bad_magic(lsm))
1784                 RETURN(-EINVAL);
1785
1786         if (!exp || !exp->exp_obd)
1787                 RETURN(-ENODEV);
1788
1789         LASSERT(lsm->lsm_object_gr > 0);
1790
1791         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1792                 struct lov_stripe_md submd;
1793                 int err;
1794
1795                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1796                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1797
1798                 submd.lsm_object_id = loi->loi_id;
1799                 submd.lsm_object_gr = lsm->lsm_object_gr;
1800                 submd.lsm_stripe_count = 0;
1801                 err = obd_cancel_unused(lov->tgts[loi->loi_ost_idx].ltd_exp,
1802                                         &submd, flags, opaque);
1803                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1804                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
1805                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1806                                loi->loi_id, loi->loi_ost_idx, err);
1807                         if (!rc)
1808                                 rc = err;
1809                 }
1810         }
1811         RETURN(rc);
1812 }
1813
1814 #define LOV_U64_MAX ((__u64)~0ULL)
1815 #define LOV_SUM_MAX(tot, add)                                           \
1816         do {                                                            \
1817                 if ((tot) + (add) < (tot))                              \
1818                         (tot) = LOV_U64_MAX;                            \
1819                 else                                                    \
1820                         (tot) += (add);                                 \
1821         } while(0)
1822
1823 static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1824                       unsigned long max_age)
1825 {
1826         struct lov_obd *lov = &obd->u.lov;
1827         struct obd_statfs lov_sfs;
1828         int set = 0;
1829         int rc = 0;
1830         int i;
1831         ENTRY;
1832
1833
1834         /* We only get block data from the OBD */
1835         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1836                 int err;
1837                 if (!lov->tgts[i].active) {
1838                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1839                         continue;
1840                 }
1841
1842                 err = obd_statfs(class_exp2obd(lov->tgts[i].ltd_exp), &lov_sfs,
1843                                  max_age);
1844                 if (err) {
1845                         if (lov->tgts[i].active && !rc)
1846                                 rc = err;
1847                         continue;
1848                 }
1849
1850                 if (!set) {
1851                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
1852                         set = 1;
1853                 } else {
1854                         osfs->os_bfree += lov_sfs.os_bfree;
1855                         osfs->os_bavail += lov_sfs.os_bavail;
1856                         osfs->os_blocks += lov_sfs.os_blocks;
1857                         /* XXX not sure about this one - depends on policy.
1858                          *   - could be minimum if we always stripe on all OBDs
1859                          *     (but that would be wrong for any other policy,
1860                          *     if one of the OBDs has no more objects left)
1861                          *   - could be sum if we stripe whole objects
1862                          *   - could be average, just to give a nice number
1863                          *
1864                          * To give a "reasonable" (if not wholly accurate)
1865                          * number, we divide the total number of free objects
1866                          * by expected stripe count (watch out for overflow).
1867                          */
1868                         LOV_SUM_MAX(osfs->os_files, lov_sfs.os_files);
1869                         LOV_SUM_MAX(osfs->os_ffree, lov_sfs.os_ffree);
1870                 }
1871         }
1872
1873         if (set) {
1874                 __u32 expected_stripes = lov->desc.ld_default_stripe_count ?
1875                                          lov->desc.ld_default_stripe_count :
1876                                          lov->desc.ld_active_tgt_count;
1877
1878                 if (osfs->os_files != LOV_U64_MAX)
1879                         do_div(osfs->os_files, expected_stripes);
1880                 if (osfs->os_ffree != LOV_U64_MAX)
1881                         do_div(osfs->os_ffree, expected_stripes);
1882         } else if (!rc)
1883                 rc = -EIO;
1884
1885         RETURN(rc);
1886 }
1887
1888 static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1889                          void *karg, void *uarg)
1890 {
1891         struct obd_device *obddev = class_exp2obd(exp);
1892         struct lov_obd *lov = &obddev->u.lov;
1893         int i, rc, count = lov->desc.ld_tgt_count;
1894         struct obd_uuid *uuidp;
1895         ENTRY;
1896
1897         switch (cmd) {
1898         case OBD_IOC_LOV_GET_CONFIG: {
1899                 struct obd_ioctl_data *data = karg;
1900                 struct lov_tgt_desc *tgtdesc;
1901                 struct lov_desc *desc;
1902                 char *buf = NULL;
1903                 __u32 *genp;
1904
1905                 buf = NULL;
1906                 len = 0;
1907                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1908                         RETURN(-EINVAL);
1909
1910                 data = (struct obd_ioctl_data *)buf;
1911
1912                 if (sizeof(*desc) > data->ioc_inllen1) {
1913                         obd_ioctl_freedata(buf, len);
1914                         RETURN(-EINVAL);
1915                 }
1916
1917                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
1918                         obd_ioctl_freedata(buf, len);
1919                         RETURN(-EINVAL);
1920                 }
1921
1922                 if (sizeof(__u32) * count > data->ioc_inllen3) {
1923                         obd_ioctl_freedata(buf, len);
1924                         RETURN(-EINVAL);
1925                 }
1926
1927                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1928                 memcpy(desc, &(lov->desc), sizeof(*desc));
1929
1930                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
1931                 genp = (__u32 *)data->ioc_inlbuf3;
1932                 tgtdesc = lov->tgts;
1933                 /* the uuid will be empty for deleted OSTs */
1934                 for (i = 0; i < count; i++, uuidp++, genp++, tgtdesc++) {
1935                         obd_str2uuid(uuidp, tgtdesc->uuid.uuid);
1936                         *genp = tgtdesc->ltd_gen;
1937                 }
1938
1939                 rc = copy_to_user((void *)uarg, buf, len);
1940                 if (rc)
1941                         rc = -EFAULT;
1942                 obd_ioctl_freedata(buf, len);
1943                 break;
1944         }
1945         case LL_IOC_LOV_SETSTRIPE:
1946                 rc = lov_setstripe(exp, karg, uarg);
1947                 break;
1948         case LL_IOC_LOV_GETSTRIPE:
1949                 rc = lov_getstripe(exp, karg, uarg);
1950                 break;
1951         case LL_IOC_LOV_SETEA:
1952                 rc = lov_setea(exp, karg, uarg);
1953                 break;
1954         default: {
1955                 int set = 0;
1956                 if (count == 0)
1957                         RETURN(-ENOTTY);
1958                 rc = 0;
1959                 for (i = 0; i < count; i++) {
1960                         int err;
1961
1962                         /* OST was deleted */
1963                         if (obd_uuid_empty(&lov->tgts[i].uuid))
1964                                 continue;
1965
1966                         err = obd_iocontrol(cmd, lov->tgts[i].ltd_exp,
1967                                             len, karg, uarg);
1968                         if (err) {
1969                                 if (lov->tgts[i].active) {
1970                                         CERROR("error: iocontrol OSC %s on OST "
1971                                                "idx %d cmd %x: err = %d\n",
1972                                                lov->tgts[i].uuid.uuid, i,
1973                                                cmd, err);
1974                                         if (!rc)
1975                                                 rc = err;
1976                                 }
1977                         } else
1978                                 set = 1;
1979                 }
1980                 if (!set && !rc)
1981                         rc = -EIO;
1982         }
1983         }
1984
1985         RETURN(rc);
1986 }
1987
1988 static int lov_get_info(struct obd_export *exp, __u32 keylen,
1989                         void *key, __u32 *vallen, void *val)
1990 {
1991         struct obd_device *obddev = class_exp2obd(exp);
1992         struct lov_obd *lov = &obddev->u.lov;
1993         int i;
1994         ENTRY;
1995
1996         if (!vallen || !val)
1997                 RETURN(-EFAULT);
1998
1999         if (keylen > strlen("lock_to_stripe") &&
2000             strcmp(key, "lock_to_stripe") == 0) {
2001                 struct {
2002                         char name[16];
2003                         struct ldlm_lock *lock;
2004                         struct lov_stripe_md *lsm;
2005                 } *data = key;
2006                 struct lov_oinfo *loi;
2007                 struct ldlm_res_id *res_id = &data->lock->l_resource->lr_name;
2008                 __u32 *stripe = val;
2009
2010                 if (*vallen < sizeof(*stripe))
2011                         RETURN(-EFAULT);
2012                 *vallen = sizeof(*stripe);
2013
2014                 /* XXX This is another one of those bits that will need to
2015                  * change if we ever actually support nested LOVs.  It uses
2016                  * the lock's export to find out which stripe it is. */
2017                 /* XXX - it's assumed all the locks for deleted OSTs have
2018                  * been cancelled. Also, the export for deleted OSTs will
2019                  * be NULL and won't match the lock's export. */
2020                 for (i = 0, loi = data->lsm->lsm_oinfo;
2021                      i < data->lsm->lsm_stripe_count;
2022                      i++, loi++) {
2023                         if (lov->tgts[loi->loi_ost_idx].ltd_exp ==
2024                                         data->lock->l_conn_export &&
2025                             loi->loi_id == res_id->name[0] &&
2026                             loi->loi_gr == res_id->name[2]) {
2027                                 *stripe = i;
2028                                 RETURN(0);
2029                         }
2030                 }
2031                 LDLM_ERROR(data->lock, "lock on inode without such object\n");
2032                 dump_lsm(D_ERROR, data->lsm);
2033                 RETURN(-ENXIO);
2034         } else if (keylen >= strlen("size_to_stripe") &&
2035                    strcmp(key, "size_to_stripe") == 0) {
2036                 struct {
2037                         int stripe_number;
2038                         __u64 size;
2039                         struct lov_stripe_md *lsm;
2040                 } *data = val;
2041
2042                 if (*vallen < sizeof(*data))
2043                         RETURN(-EFAULT);
2044
2045                 data->size = lov_size_to_stripe(data->lsm, data->size,
2046                                                 data->stripe_number);
2047                 RETURN(0);
2048         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
2049                 obd_id *ids = val;
2050                 int rc, size = sizeof(obd_id);
2051                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2052                         if (!lov->tgts[i].active)
2053                                 continue;
2054                         rc = obd_get_info(lov->tgts[i].ltd_exp,
2055                                           keylen, key, &size, &(ids[i]));
2056                         if (rc != 0)
2057                                 RETURN(rc);
2058                 }
2059                 RETURN(0);
2060         } else if (keylen >= strlen("lovdesc") && strcmp(key, "lovdesc") == 0) {
2061                 struct lov_desc *desc_ret = val;
2062                 *desc_ret = lov->desc;
2063
2064                 RETURN(0);
2065         }
2066
2067         RETURN(-EINVAL);
2068 }
2069
2070 static int lov_set_info(struct obd_export *exp, obd_count keylen,
2071                         void *key, obd_count vallen, void *val)
2072 {
2073         struct obd_device *obddev = class_exp2obd(exp);
2074         struct lov_obd *lov = &obddev->u.lov;
2075         int i, rc = 0, err;
2076         ENTRY;
2077
2078 #define KEY_IS(str) \
2079         (keylen == strlen(str) && memcmp(key, str, keylen) == 0)
2080
2081         if (KEY_IS("next_id")) {
2082                 if (vallen != lov->desc.ld_tgt_count)
2083                         RETURN(-EINVAL);
2084                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2085                         /* initialize all OSCs, even inactive ones */
2086            
2087                         err = obd_set_info(lov->tgts[i].ltd_exp,
2088                                           keylen, key, sizeof(obd_id),
2089                                           ((obd_id*)val) + i);
2090                         if (!rc)
2091                                 rc = err;
2092                 }
2093                 RETURN(rc);
2094         }
2095         if (KEY_IS("async")) {
2096                 struct lov_desc *desc = &lov->desc;
2097                 struct lov_tgt_desc *tgts = lov->tgts;
2098
2099                 if (vallen != sizeof(int))
2100                         RETURN(-EINVAL);
2101                 lov->async = *((int*) val);
2102
2103                 for (i = 0; i < desc->ld_tgt_count; i++, tgts++) {
2104                         struct obd_uuid *tgt_uuid = &tgts->uuid;
2105                         struct obd_device *tgt_obd;
2106
2107                         tgt_obd = class_find_client_obd(tgt_uuid,
2108                                                         LUSTRE_OSC_NAME,
2109                                                         &obddev->obd_uuid);
2110                         if (!tgt_obd) {
2111                                 CERROR("Target %s not attached\n",
2112                                         tgt_uuid->uuid);
2113                                 if (!rc)
2114                                         rc = -EINVAL;
2115                                 continue;
2116                         }
2117
2118                         err = obd_set_info(tgt_obd->obd_self_export,
2119                                            keylen, key, vallen, val);
2120                         if (err) {
2121                                 CERROR("Failed to set async on target %s\n",
2122                                         tgt_obd->obd_name);
2123                                 if (!rc)
2124                                         rc = err;
2125                         }
2126                 }
2127                 RETURN(rc);
2128         }
2129
2130         if (KEY_IS("growth_count")) {
2131                 if (vallen != sizeof(int))
2132                         RETURN(-EINVAL);
2133         } else if (KEY_IS("mds_conn")) {
2134                 if (vallen != sizeof(__u32))
2135                         RETURN(-EINVAL);
2136         } else if (KEY_IS("unlinked") || KEY_IS("unrecovery")) {
2137                 if (vallen != 0)
2138                         RETURN(-EINVAL);
2139         } else if (KEY_IS("sec")) {
2140                 struct lov_tgt_desc *tgt;
2141                 struct obd_export *exp;
2142                 int rc = 0, err, i;
2143
2144                 spin_lock(&lov->lov_lock);
2145                 for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count;
2146                      i++, tgt++) {
2147                         exp = tgt->ltd_exp;
2148                         /* during setup time the connections to osc might
2149                          * haven't been established.
2150                          */
2151                         if (exp == NULL) {
2152                                 struct obd_device *tgt_obd;
2153
2154                                 tgt_obd = class_find_client_obd(&tgt->uuid,
2155                                                                 LUSTRE_OSC_NAME,
2156                                                                 &obddev->obd_uuid);
2157                                 if (!tgt_obd) {
2158                                         CERROR("can't set security flavor, "
2159                                                "device %s not attached?\n",
2160                                                 tgt->uuid.uuid);
2161                                         rc = -EINVAL;
2162                                         continue;
2163                                 }
2164                                 exp = tgt_obd->obd_self_export;
2165                         }
2166
2167                         err = obd_set_info(exp, keylen, key, vallen, val);
2168                         if (!rc)
2169                                 rc = err;
2170                 }
2171                 spin_unlock(&lov->lov_lock);
2172
2173                 RETURN(rc);
2174         } else {
2175                 RETURN(-EINVAL);
2176         }
2177
2178         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2179                 if (val && !obd_uuid_equals(val, &lov->tgts[i].uuid))
2180                         continue;
2181
2182                 if (!val && !lov->tgts[i].active)
2183                         continue;
2184
2185                 err = obd_set_info(lov->tgts[i].ltd_exp,
2186                                   keylen, key, vallen, val);
2187                 if (!rc)
2188                         rc = err;
2189         }
2190         RETURN(rc);
2191 #undef KEY_IS
2192 }
2193
2194 #if 0
2195 struct lov_multi_wait {
2196         struct ldlm_lock *lock;
2197         wait_queue_t      wait;
2198         int               completed;
2199         int               generation;
2200 };
2201
2202 int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm,
2203                       struct lustre_handle *lockh)
2204 {
2205         struct lov_lock_handles *lov_lockh = NULL;
2206         struct lustre_handle *lov_lockhp;
2207         struct lov_obd *lov;
2208         struct lov_oinfo *loi;
2209         struct lov_multi_wait *queues;
2210         int rc = 0, i;
2211         ENTRY;
2212
2213         if (lsm_bad_magic(lsm))
2214                 RETURN(-EINVAL);
2215
2216         if (!exp || !exp->exp_obd)
2217                 RETURN(-ENODEV);
2218
2219         LASSERT(lockh != NULL);
2220         if (lsm->lsm_stripe_count > 1) {
2221                 lov_lockh = lov_handle2llh(lockh);
2222                 if (lov_lockh == NULL) {
2223                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
2224                         RETURN(-EINVAL);
2225                 }
2226
2227                 lov_lockhp = lov_lockh->llh_handles;
2228         } else {
2229                 lov_lockhp = lockh;
2230         }
2231
2232         OBD_ALLOC(queues, lsm->lsm_stripe_count * sizeof(*queues));
2233         if (queues == NULL)
2234                 GOTO(out, rc = -ENOMEM);
2235
2236         lov = &exp->exp_obd->u.lov;
2237         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
2238              i++, loi++, lov_lockhp++) {
2239                 struct ldlm_lock *lock;
2240                 struct obd_device *obd;
2241                 unsigned long irqflags;
2242
2243                 lock = ldlm_handle2lock(lov_lockhp);
2244                 if (lock == NULL) {
2245                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
2246                                loi->loi_ost_idx, loi->loi_id);
2247                         queues[i].completed = 1;
2248                         continue;
2249                 }
2250
2251                 queues[i].lock = lock;
2252                 init_waitqueue_entry(&(queues[i].wait), current);
2253                 add_wait_queue(lock->l_waitq, &(queues[i].wait));
2254
2255                 obd = class_exp2obd(lock->l_conn_export);
2256                 if (obd != NULL)
2257                         imp = obd->u.cli.cl_import;
2258                 if (imp != NULL) {
2259                         spin_lock_irqsave(&imp->imp_lock, irqflags);
2260                         queues[i].generation = imp->imp_generation;
2261                         spin_unlock_irqrestore(&imp->imp_lock, irqflags);
2262                 }
2263         }
2264
2265         lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
2266                                interrupted_completion_wait, &lwd);
2267         rc = l_wait_event_added(check_multi_complete(queues, lsm), &lwi);
2268
2269         for (i = 0; i < lsm->lsm_stripe_count; i++)
2270                 remove_wait_queue(lock->l_waitq, &(queues[i].wait));
2271
2272         if (rc == -EINTR || rc == -ETIMEDOUT) {
2273
2274
2275         }
2276
2277  out:
2278         if (lov_lockh != NULL)
2279                 lov_llh_put(lov_lockh);
2280         RETURN(rc);
2281 }
2282 #endif
2283
2284 struct obd_ops lov_obd_ops = {
2285         .o_owner               = THIS_MODULE,
2286         .o_attach              = lov_attach,
2287         .o_detach              = lov_detach,
2288         .o_setup               = lov_setup,
2289         .o_cleanup             = lov_cleanup,
2290         .o_process_config      = lov_process_config,
2291         .o_connect             = lov_connect,
2292         .o_disconnect          = lov_disconnect,
2293         .o_statfs              = lov_statfs,
2294         .o_packmd              = lov_packmd,
2295         .o_unpackmd            = lov_unpackmd,
2296         .o_revalidate_md       = lov_revalidate_md,
2297         .o_create              = lov_create,
2298         .o_destroy             = lov_destroy,
2299         .o_getattr             = lov_getattr,
2300         .o_getattr_async       = lov_getattr_async,
2301         .o_setattr             = lov_setattr,
2302         .o_brw                 = lov_brw,
2303         .o_brw_async           = lov_brw_async,
2304         .o_prep_async_page     = lov_prep_async_page,
2305         .o_queue_async_io      = lov_queue_async_io,
2306         .o_set_async_flags     = lov_set_async_flags,
2307         .o_queue_group_io      = lov_queue_group_io,
2308         .o_trigger_group_io    = lov_trigger_group_io,
2309         .o_teardown_async_page = lov_teardown_async_page,
2310         .o_adjust_kms          = lov_adjust_kms,
2311         .o_punch               = lov_punch,
2312         .o_sync                = lov_sync,
2313         .o_enqueue             = lov_enqueue,
2314         .o_match               = lov_match,
2315         .o_change_cbdata       = lov_change_cbdata,
2316         .o_cancel              = lov_cancel,
2317         .o_cancel_unused       = lov_cancel_unused,
2318         .o_iocontrol           = lov_iocontrol,
2319         .o_get_info            = lov_get_info,
2320         .o_set_info            = lov_set_info,
2321         .o_llog_init           = lov_llog_init,
2322         .o_llog_finish         = lov_llog_finish,
2323         .o_notify              = lov_notify,
2324 };
2325
2326 int __init lov_init(void)
2327 {
2328         struct lprocfs_static_vars lvars;
2329         int rc;
2330         ENTRY;
2331
2332         lprocfs_init_vars(lov, &lvars);
2333         rc = class_register_type(&lov_obd_ops, NULL, lvars.module_vars,
2334                                  OBD_LOV_DEVICENAME);
2335         RETURN(rc);
2336 }
2337
2338 #ifdef __KERNEL__
2339 static void /*__exit*/ lov_exit(void)
2340 {
2341         class_unregister_type(OBD_LOV_DEVICENAME);
2342 }
2343
2344 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2345 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
2346 MODULE_LICENSE("GPL");
2347
2348 module_init(lov_init);
2349 module_exit(lov_exit);
2350 #endif