Whamcloud - gitweb
b=10555
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/lov/lov_obd.c
37  *
38  * Author: Phil Schwan <phil@clusterfs.com>
39  * Author: Peter Braam <braam@clusterfs.com>
40  * Author: Mike Shaver <shaver@clusterfs.com>
41  * Author: Nathan Rutman <nathan@clusterfs.com>
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_LOV
48 #ifdef __KERNEL__
49 #include <libcfs/libcfs.h>
50 #else
51 #include <liblustre.h>
52 #endif
53
54 #include <obd_support.h>
55 #include <lustre_lib.h>
56 #include <lustre_net.h>
57 #include <lustre/lustre_idl.h>
58 #include <lustre_dlm.h>
59 #include <lustre_mds.h>
60 #include <lustre_debug.h>
61 #include <obd_class.h>
62 #include <obd_lov.h>
63 #include <obd_ost.h>
64 #include <lprocfs_status.h>
65 #include <lustre_param.h>
66 #include <lustre_cache.h>
67 #include <lustre/ll_fiemap.h>
68
69 #include "lov_internal.h"
70
71
72 /* Keep a refcount of lov->tgt usage to prevent racing with addition/deletion.
73    Any function that expects lov_tgts to remain stationary must take a ref. */
74 void lov_getref(struct obd_device *obd)
75 {
76         struct lov_obd *lov = &obd->u.lov;
77
78         /* nobody gets through here until lov_putref is done */
79         mutex_down(&lov->lov_lock);
80         atomic_inc(&lov->lov_refcount);
81         mutex_up(&lov->lov_lock);
82         return;
83 }
84
85 static void __lov_del_obd(struct obd_device *obd, __u32 index);
86
87 void lov_putref(struct obd_device *obd)
88 {
89         struct lov_obd *lov = &obd->u.lov;
90         mutex_down(&lov->lov_lock);
91         /* ok to dec to 0 more than once -- ltd_exp's will be null */
92         if (atomic_dec_and_test(&lov->lov_refcount) && lov->lov_death_row) {
93                 int i;
94                 CDEBUG(D_CONFIG, "destroying %d lov targets\n",
95                        lov->lov_death_row);
96                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
97                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_reap)
98                                 continue;
99                         /* Disconnect and delete from list */
100                         __lov_del_obd(obd, i);
101                         lov->lov_death_row--;
102                 }
103         }
104         mutex_up(&lov->lov_lock);
105 }
106
107 static int lov_register_page_removal_cb(struct obd_export *exp,
108                                         obd_page_removal_cb_t func,
109                                         obd_pin_extent_cb pin_cb)
110 {
111         struct lov_obd *lov = &exp->exp_obd->u.lov;
112         int i, rc = 0;
113
114         if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
115                 return -EBUSY;
116
117         if (lov->lov_page_pin_cb && lov->lov_page_pin_cb != pin_cb)
118                 return -EBUSY;
119
120         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
121                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
122                         continue;
123                 rc |= obd_register_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
124                                                    func, pin_cb);
125         }
126
127         lov->lov_page_removal_cb = func;
128         lov->lov_page_pin_cb = pin_cb;
129
130         return rc;
131 }
132
133 static int lov_unregister_page_removal_cb(struct obd_export *exp,
134                                         obd_page_removal_cb_t func)
135 {
136         struct lov_obd *lov = &exp->exp_obd->u.lov;
137         int i, rc = 0;
138
139         if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
140                 return -EINVAL;
141
142         lov->lov_page_removal_cb = NULL;
143         lov->lov_page_pin_cb = NULL;
144
145         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
146                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
147                         continue;
148                 rc |= obd_unregister_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
149                                                      func);
150         }
151
152         return rc;
153 }
154
155 static int lov_register_lock_cancel_cb(struct obd_export *exp,
156                                          obd_lock_cancel_cb func)
157 {
158         struct lov_obd *lov = &exp->exp_obd->u.lov;
159         int i, rc = 0;
160
161         if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
162                 return -EBUSY;
163
164         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
165                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
166                         continue;
167                 rc |= obd_register_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
168                                                   func);
169         }
170
171         lov->lov_lock_cancel_cb = func;
172
173         return rc;
174 }
175
176 static int lov_unregister_lock_cancel_cb(struct obd_export *exp,
177                                          obd_lock_cancel_cb func)
178 {
179         struct lov_obd *lov = &exp->exp_obd->u.lov;
180         int i, rc = 0;
181
182         if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
183                 return -EINVAL;
184
185         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
186                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
187                         continue;
188                 rc |= obd_unregister_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
189                                                     func);
190         }
191         lov->lov_lock_cancel_cb = NULL;
192         return rc;
193 }
194
195 #define MAX_STRING_SIZE 128
196 static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
197                            struct obd_connect_data *data)
198 {
199         struct lov_obd *lov = &obd->u.lov;
200         struct obd_uuid tgt_uuid;
201         struct obd_device *tgt_obd;
202         struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
203         struct lustre_handle conn = {0, };
204         struct obd_import *imp;
205
206 #ifdef __KERNEL__
207         cfs_proc_dir_entry_t *lov_proc_dir;
208 #endif
209         int rc;
210         ENTRY;
211
212         if (!lov->lov_tgts[index])
213                 RETURN(-EINVAL);
214
215         tgt_uuid = lov->lov_tgts[index]->ltd_uuid;
216
217         tgt_obd = class_find_client_obd(&tgt_uuid, LUSTRE_OSC_NAME,
218                                         &obd->obd_uuid);
219
220         if (!tgt_obd) {
221                 CERROR("Target %s not attached\n", obd_uuid2str(&tgt_uuid));
222                 RETURN(-EINVAL);
223         }
224         if (!tgt_obd->obd_set_up) {
225                 CERROR("Target %s not set up\n", obd_uuid2str(&tgt_uuid));
226                 RETURN(-EINVAL);
227         }
228
229         if (data && (data->ocd_connect_flags & OBD_CONNECT_INDEX))
230                 data->ocd_index = index;
231
232         /*
233          * Divine LOV knows that OBDs under it are OSCs.
234          */
235         imp = tgt_obd->u.cli.cl_import;
236
237         if (activate) {
238                 tgt_obd->obd_no_recov = 0;
239                 /* FIXME this is probably supposed to be 
240                    ptlrpc_set_import_active.  Horrible naming. */
241                 ptlrpc_activate_import(imp);
242         }
243
244         if (imp->imp_invalid) {
245                 CERROR("not connecting OSC %s; administratively "
246                        "disabled\n", obd_uuid2str(&tgt_uuid));
247                 rc = obd_register_observer(tgt_obd, obd);
248                 if (rc) {
249                         CERROR("Target %s register_observer error %d; "
250                                "will not be able to reactivate\n",
251                                obd_uuid2str(&tgt_uuid), rc);
252                 }
253                 RETURN(0);
254         }
255
256         rc = obd_connect(NULL, &conn, tgt_obd, &lov_osc_uuid, data, NULL);
257         if (rc) {
258                 CERROR("Target %s connect error %d\n",
259                        obd_uuid2str(&tgt_uuid), rc);
260                 RETURN(rc);
261         }
262         lov->lov_tgts[index]->ltd_exp = class_conn2export(&conn);
263         if (!lov->lov_tgts[index]->ltd_exp) {
264                 CERROR("Target %s: null export!\n", obd_uuid2str(&tgt_uuid));
265                 RETURN(-ENODEV);
266         }
267
268         rc = obd_register_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
269                                           lov->lov_page_removal_cb,
270                                           lov->lov_page_pin_cb);
271         if (rc) {
272                 obd_disconnect(lov->lov_tgts[index]->ltd_exp);
273                 lov->lov_tgts[index]->ltd_exp = NULL;
274                 RETURN(rc);
275         }
276
277         rc = obd_register_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
278                                          lov->lov_lock_cancel_cb);
279         if (rc) {
280                 obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
281                                                lov->lov_page_removal_cb);
282                 obd_disconnect(lov->lov_tgts[index]->ltd_exp);
283                 lov->lov_tgts[index]->ltd_exp = NULL;
284                 RETURN(rc);
285         }
286
287         rc = obd_register_observer(tgt_obd, obd);
288         if (rc) {
289                 CERROR("Target %s register_observer error %d\n",
290                        obd_uuid2str(&tgt_uuid), rc);
291                 obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
292                                               lov->lov_lock_cancel_cb);
293                 obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
294                                                lov->lov_page_removal_cb);
295                 obd_disconnect(lov->lov_tgts[index]->ltd_exp);
296                 lov->lov_tgts[index]->ltd_exp = NULL;
297                 RETURN(rc);
298         }
299
300         lov->lov_tgts[index]->ltd_reap = 0;
301         if (activate) {
302                 lov->lov_tgts[index]->ltd_active = 1;
303                 lov->desc.ld_active_tgt_count++;
304                 lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 0;
305         }
306         CDEBUG(D_CONFIG, "Connected tgt idx %d %s (%s) %sactive\n", index,
307                obd_uuid2str(&tgt_uuid), tgt_obd->obd_name, activate ? "":"in");
308
309 #ifdef __KERNEL__
310         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
311         if (lov_proc_dir) {
312                 struct obd_device *osc_obd = class_conn2obd(&conn);
313                 cfs_proc_dir_entry_t *osc_symlink;
314                 char name[MAX_STRING_SIZE];
315
316                 LASSERT(osc_obd != NULL);
317                 LASSERT(osc_obd->obd_magic == OBD_DEVICE_MAGIC);
318                 LASSERT(osc_obd->obd_type->typ_name != NULL);
319                 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
320                          osc_obd->obd_type->typ_name,
321                          osc_obd->obd_name);
322                 osc_symlink = lprocfs_add_symlink(osc_obd->obd_name, lov_proc_dir,
323                                                   name);
324                 if (osc_symlink == NULL) {
325                         CERROR("could not register LOV target "
326                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
327                                obd->obd_type->typ_name, obd->obd_name,
328                                osc_obd->obd_name);
329                         lprocfs_remove(&lov_proc_dir);
330                 }
331         }
332 #endif
333
334         rc = qos_add_tgt(obd, index);
335         if (rc)
336                 CERROR("qos_add_tgt failed %d\n", rc);
337
338         RETURN(0);
339 }
340
341 static int lov_connect(const struct lu_env *env,
342                        struct lustre_handle *conn, struct obd_device *obd,
343                        struct obd_uuid *cluuid, struct obd_connect_data *data,
344                        void *localdata)
345 {
346         struct lov_obd *lov = &obd->u.lov;
347         struct lov_tgt_desc *tgt;
348         int i, rc;
349         ENTRY;
350
351         CDEBUG(D_CONFIG, "connect #%d\n", lov->lov_connects);
352
353         rc = class_connect(conn, obd, cluuid);
354         if (rc)
355                 RETURN(rc);
356
357         /* Why should there ever be more than 1 connect? */
358         lov->lov_connects++;
359         LASSERT(lov->lov_connects == 1);
360
361         memset(&lov->lov_ocd, 0, sizeof(lov->lov_ocd));
362         if (data)
363                 lov->lov_ocd = *data;
364
365         lov_getref(obd);
366         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
367                 tgt = lov->lov_tgts[i];
368                 if (!tgt || obd_uuid_empty(&tgt->ltd_uuid))
369                         continue;
370                 /* Flags will be lowest common denominator */
371                 rc = lov_connect_obd(obd, i, tgt->ltd_activate, &lov->lov_ocd);
372                 if (rc) {
373                         CERROR("%s: lov connect tgt %d failed: %d\n",
374                                obd->obd_name, i, rc);
375                         continue;
376                 }
377         }
378         lov_putref(obd);
379
380         RETURN(0);
381 }
382
383 static int lov_disconnect_obd(struct obd_device *obd, __u32 index)
384 {
385         cfs_proc_dir_entry_t *lov_proc_dir;
386         struct lov_obd *lov = &obd->u.lov;
387         struct obd_device *osc_obd;
388         int rc;
389
390         ENTRY;
391
392         if (lov->lov_tgts[index] == NULL)
393                 RETURN(-EINVAL);
394
395         osc_obd = class_exp2obd(lov->lov_tgts[index]->ltd_exp);
396         CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
397                obd->obd_name, osc_obd->obd_name);
398
399         obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
400                                       lov->lov_lock_cancel_cb);
401         obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
402                                        lov->lov_page_removal_cb);
403
404         if (lov->lov_tgts[index]->ltd_active) {
405                 lov->lov_tgts[index]->ltd_active = 0;
406                 lov->desc.ld_active_tgt_count--;
407                 lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 1;
408         }
409
410         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
411         if (lov_proc_dir) {
412                 cfs_proc_dir_entry_t *osc_symlink;
413
414                 osc_symlink = lprocfs_srch(lov_proc_dir, osc_obd->obd_name);
415                 if (osc_symlink) {
416                         lprocfs_remove(&osc_symlink);
417                 } else {
418                         CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing.",
419                                obd->obd_type->typ_name, obd->obd_name,
420                                osc_obd->obd_name);
421                 }
422         }
423
424         if (osc_obd) {
425                 /* Pass it on to our clients.
426                  * XXX This should be an argument to disconnect,
427                  * XXX not a back-door flag on the OBD.  Ah well.
428                  */
429                 osc_obd->obd_force = obd->obd_force;
430                 osc_obd->obd_fail = obd->obd_fail;
431                 osc_obd->obd_no_recov = obd->obd_no_recov;
432         }
433
434         obd_register_observer(osc_obd, NULL);
435
436         rc = obd_disconnect(lov->lov_tgts[index]->ltd_exp);
437         if (rc) {
438                 CERROR("Target %s disconnect error %d\n",
439                        lov_uuid2str(lov, index), rc);
440                 rc = 0;
441         }
442
443         qos_del_tgt(obd, index);
444
445         lov->lov_tgts[index]->ltd_exp = NULL;
446         RETURN(0);
447 }
448
449 static int lov_del_target(struct obd_device *obd, __u32 index,
450                           struct obd_uuid *uuidp, int gen);
451
452 static int lov_disconnect(struct obd_export *exp)
453 {
454         struct obd_device *obd = class_exp2obd(exp);
455         struct lov_obd *lov = &obd->u.lov;
456         int i, rc;
457         ENTRY;
458
459         if (!lov->lov_tgts)
460                 goto out;
461
462         /* Only disconnect the underlying layers on the final disconnect. */
463         lov->lov_connects--;
464         if (lov->lov_connects != 0) {
465                 /* why should there be more than 1 connect? */
466                 CERROR("disconnect #%d\n", lov->lov_connects);
467                 goto out;
468         }
469
470         /* Let's hold another reference so lov_del_obd doesn't spin through
471            putref every time */
472         lov_getref(obd);
473         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
474                 if (lov->lov_tgts[i] && lov->lov_tgts[i]->ltd_exp) {
475                         /* Disconnection is the last we know about an obd */
476                         lov_del_target(obd, i, 0, lov->lov_tgts[i]->ltd_gen);
477                 }
478         }
479         lov_putref(obd);
480
481 out:
482         rc = class_disconnect(exp); /* bz 9811 */
483         RETURN(rc);
484 }
485
486 /* Error codes:
487  *
488  *  -EINVAL  : UUID can't be found in the LOV's target list
489  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
490  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
491  */
492 static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
493                               int activate)
494 {
495         struct lov_obd *lov = &obd->u.lov;
496         struct lov_tgt_desc *tgt;
497         int i, rc = 0;
498         ENTRY;
499
500         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
501                lov, uuid->uuid, activate);
502
503         lov_getref(obd);
504         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
505                 tgt = lov->lov_tgts[i];
506                 if (!tgt || !tgt->ltd_exp)
507                         continue;
508
509                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
510                        i, obd_uuid2str(&tgt->ltd_uuid),
511                        tgt->ltd_exp->exp_handle.h_cookie);
512                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
513                         break;
514         }
515
516         if (i == lov->desc.ld_tgt_count)
517                 GOTO(out, rc = -EINVAL);
518
519         if (lov->lov_tgts[i]->ltd_active == activate) {
520                 CDEBUG(D_INFO, "OSC %s already %sactive!\n", uuid->uuid,
521                        activate ? "" : "in");
522                 GOTO(out, rc);
523         }
524
525         CDEBUG(D_CONFIG, "Marking OSC %s %sactive\n", obd_uuid2str(uuid),
526                activate ? "" : "in");
527
528         lov->lov_tgts[i]->ltd_active = activate;
529
530         if (activate) {
531                 lov->desc.ld_active_tgt_count++;
532                 lov->lov_tgts[i]->ltd_exp->exp_obd->obd_inactive = 0;
533         } else {
534                 lov->desc.ld_active_tgt_count--;
535                 lov->lov_tgts[i]->ltd_exp->exp_obd->obd_inactive = 1;
536         }
537         /* remove any old qos penalty */
538         lov->lov_tgts[i]->ltd_qos.ltq_penalty = 0;
539
540  out:
541         lov_putref(obd);
542         RETURN(rc);
543 }
544
545 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
546                       enum obd_notify_event ev, void *data)
547 {
548         int rc = 0;
549         ENTRY;
550
551         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
552                 struct obd_uuid *uuid;
553
554                 LASSERT(watched);
555
556                 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME)) {
557                         CERROR("unexpected notification of %s %s!\n",
558                                watched->obd_type->typ_name,
559                                watched->obd_name);
560                         RETURN(-EINVAL);
561                 }
562                 uuid = &watched->u.cli.cl_target_uuid;
563
564                 /* Set OSC as active before notifying the observer, so the
565                  * observer can use the OSC normally.
566                  */
567                 rc = lov_set_osc_active(obd, uuid, ev == OBD_NOTIFY_ACTIVE);
568                 if (rc) {
569                         CERROR("%sactivation of %s failed: %d\n",
570                                (ev == OBD_NOTIFY_ACTIVE) ? "" : "de",
571                                obd_uuid2str(uuid), rc);
572                         RETURN(rc);
573                 }
574         }
575
576         /* Pass the notification up the chain. */
577         if (watched) {
578                 rc = obd_notify_observer(obd, watched, ev, data);
579         } else {
580                 /* NULL watched means all osc's in the lov (only for syncs) */
581                 struct lov_obd *lov = &obd->u.lov;
582                 struct obd_device *tgt_obd;
583                 int i;
584                 lov_getref(obd);
585                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
586                         if (!lov->lov_tgts[i])
587                                 continue;
588                         tgt_obd = class_exp2obd(lov->lov_tgts[i]->ltd_exp);
589                         rc = obd_notify_observer(obd, tgt_obd, ev, data);
590                         if (rc) {
591                                 CERROR("%s: notify %s of %s failed %d\n",
592                                        obd->obd_name,
593                                        obd->obd_observer->obd_name,
594                                        tgt_obd->obd_name, rc);
595                                 break;
596                         }
597                 }
598                 lov_putref(obd);
599         }
600
601         RETURN(rc);
602 }
603
604 static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
605                           __u32 index, int gen, int active)
606 {
607         struct lov_obd *lov = &obd->u.lov;
608         struct lov_tgt_desc *tgt;
609         int rc;
610         ENTRY;
611
612         CDEBUG(D_CONFIG, "uuid:%s idx:%d gen:%d active:%d\n",
613                uuidp->uuid, index, gen, active);
614
615         if (gen <= 0) {
616                 CERROR("request to add OBD %s with invalid generation: %d\n",
617                        uuidp->uuid, gen);
618                 RETURN(-EINVAL);
619         }
620
621         mutex_down(&lov->lov_lock);
622
623         if ((index < lov->lov_tgt_size) && (lov->lov_tgts[index] != NULL)) {
624                 tgt = lov->lov_tgts[index];
625                 CERROR("UUID %s already assigned at LOV target index %d\n",
626                        obd_uuid2str(&tgt->ltd_uuid), index);
627                 mutex_up(&lov->lov_lock);
628                 RETURN(-EEXIST);
629         }
630
631         if (index >= lov->lov_tgt_size) {
632                 /* We need to reallocate the lov target array. */
633                 struct lov_tgt_desc **newtgts, **old = NULL;
634                 __u32 newsize, oldsize = 0;
635
636                 newsize = max(lov->lov_tgt_size, (__u32)2);
637                 while (newsize < index + 1)
638                         newsize = newsize << 1;
639                 OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
640                 if (newtgts == NULL) {
641                         mutex_up(&lov->lov_lock);
642                         RETURN(-ENOMEM);
643                 }
644
645                 if (lov->lov_tgt_size) {
646                         memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) *
647                                lov->lov_tgt_size);
648                         old = lov->lov_tgts;
649                         oldsize = lov->lov_tgt_size;
650                 }
651
652                 lov->lov_tgts = newtgts;
653                 lov->lov_tgt_size = newsize;
654 #ifdef __KERNEL__
655                 smp_rmb();
656 #endif
657                 if (old)
658                         OBD_FREE(old, sizeof(*old) * oldsize);
659
660                 CDEBUG(D_CONFIG, "tgts: %p size: %d\n",
661                        lov->lov_tgts, lov->lov_tgt_size);
662         }
663
664
665         OBD_ALLOC_PTR(tgt);
666         if (!tgt) {
667                 mutex_up(&lov->lov_lock);
668                 RETURN(-ENOMEM);
669         }
670
671         memset(tgt, 0, sizeof(*tgt));
672         tgt->ltd_uuid = *uuidp;
673         /* XXX - add a sanity check on the generation number. */
674         tgt->ltd_gen = gen;
675         tgt->ltd_index = index;
676         tgt->ltd_activate = active;
677         lov->lov_tgts[index] = tgt;
678         if (index >= lov->desc.ld_tgt_count)
679                 lov->desc.ld_tgt_count = index + 1;
680         mutex_up(&lov->lov_lock);
681
682         CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n",
683                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
684
685         if (lov->lov_connects == 0) {
686                 /* lov_connect hasn't been called yet. We'll do the
687                    lov_connect_obd on this target when that fn first runs,
688                    because we don't know the connect flags yet. */
689                 RETURN(0);
690         }
691
692         lov_getref(obd);
693
694         rc = lov_connect_obd(obd, index, active, &lov->lov_ocd);
695         if (rc)
696                 GOTO(out, rc);
697
698         rc = lov_notify(obd, tgt->ltd_exp->exp_obd,
699                         active ? OBD_NOTIFY_ACTIVE : OBD_NOTIFY_INACTIVE,
700                         (void *)&index);
701
702 out:
703         if (rc) {
704                 CERROR("add failed (%d), deleting %s\n", rc,
705                        obd_uuid2str(&tgt->ltd_uuid));
706                 lov_del_target(obd, index, 0, 0);
707         }
708         lov_putref(obd);
709         RETURN(rc);
710 }
711
712 /* Schedule a target for deletion */
713 static int lov_del_target(struct obd_device *obd, __u32 index,
714                           struct obd_uuid *uuidp, int gen)
715 {
716         struct lov_obd *lov = &obd->u.lov;
717         int count = lov->desc.ld_tgt_count;
718         int rc = 0;
719         ENTRY;
720
721         if (index >= count) {
722                 CERROR("LOV target index %d >= number of LOV OBDs %d.\n",
723                        index, count);
724                 RETURN(-EINVAL);
725         }
726
727         lov_getref(obd);
728
729         if (!lov->lov_tgts[index]) {
730                 CERROR("LOV target at index %d is not setup.\n", index);
731                 GOTO(out, rc = -EINVAL);
732         }
733
734         if (uuidp && !obd_uuid_equals(uuidp, &lov->lov_tgts[index]->ltd_uuid)) {
735                 CERROR("LOV target UUID %s at index %d doesn't match %s.\n",
736                        lov_uuid2str(lov, index), index,
737                        obd_uuid2str(uuidp));
738                 GOTO(out, rc = -EINVAL);
739         }
740
741         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
742                lov_uuid2str(lov, index), index,
743                lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp,
744                lov->lov_tgts[index]->ltd_active);
745
746         lov->lov_tgts[index]->ltd_reap = 1;
747         lov->lov_death_row++;
748         /* we really delete it from lov_putref */
749 out:
750         lov_putref(obd);
751
752         RETURN(rc);
753 }
754
755 /* We are holding lov_lock */
756 static void __lov_del_obd(struct obd_device *obd, __u32 index)
757 {
758         struct lov_obd *lov = &obd->u.lov;
759         struct obd_device *osc_obd;
760         struct lov_tgt_desc *tgt = lov->lov_tgts[index];
761
762         LASSERT(tgt);
763         LASSERT(tgt->ltd_reap);
764
765         osc_obd = class_exp2obd(tgt->ltd_exp);
766
767         CDEBUG(D_CONFIG, "Removing tgt %s : %s\n",
768                lov_uuid2str(lov, index),
769                osc_obd ? osc_obd->obd_name : "<no obd>");
770
771         if (tgt->ltd_exp)
772                 lov_disconnect_obd(obd, index);
773
774         /* XXX - right now there is a dependency on ld_tgt_count being the
775          * maximum tgt index for computing the mds_max_easize. So we can't
776          * shrink it. */
777
778         lov->lov_tgts[index] = NULL;
779         OBD_FREE_PTR(tgt);
780
781         /* Manual cleanup - no cleanup logs to clean up the osc's.  We must
782            do it ourselves. And we can't do it from lov_cleanup,
783            because we just lost our only reference to it. */
784         if (osc_obd)
785                 class_manual_cleanup(osc_obd);
786 }
787
788 void lov_fix_desc_stripe_size(__u64 *val)
789 {
790         if (*val < PTLRPC_MAX_BRW_SIZE) {
791                 LCONSOLE_WARN("Increasing default stripe size to min %u\n",
792                               PTLRPC_MAX_BRW_SIZE);
793                 *val = PTLRPC_MAX_BRW_SIZE;
794         } else if (*val & (LOV_MIN_STRIPE_SIZE - 1)) {
795                 *val &= ~(LOV_MIN_STRIPE_SIZE - 1);
796                 LCONSOLE_WARN("Changing default stripe size to "LPU64" (a "
797                               "multiple of %u)\n",
798                               *val, LOV_MIN_STRIPE_SIZE);
799         }
800 }
801
802 void lov_fix_desc_stripe_count(__u32 *val)
803 {
804         if (*val == 0)
805                 *val = 1;
806 }
807
808 void lov_fix_desc_pattern(__u32 *val)
809 {
810         /* from lov_setstripe */
811         if ((*val != 0) && (*val != LOV_PATTERN_RAID0)) {
812                 LCONSOLE_WARN("Unknown stripe pattern: %#x\n", *val);
813                 *val = 0;
814         }
815 }
816
817 void lov_fix_desc_qos_maxage(__u32 *val)
818 {
819         /* fix qos_maxage */
820         if (*val == 0)
821                 *val = QOS_DEFAULT_MAXAGE;
822 }
823
824 void lov_fix_desc(struct lov_desc *desc)
825 {
826         lov_fix_desc_stripe_size(&desc->ld_default_stripe_size);
827         lov_fix_desc_stripe_count(&desc->ld_default_stripe_count);
828         lov_fix_desc_pattern(&desc->ld_pattern);
829         lov_fix_desc_qos_maxage(&desc->ld_qos_maxage);
830 }
831
832 static int lov_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
833 {
834         struct lprocfs_static_vars lvars = { 0 };
835         struct lov_desc *desc;
836         struct lov_obd *lov = &obd->u.lov;
837         int count;
838         ENTRY;
839
840         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
841                 CERROR("LOV setup requires a descriptor\n");
842                 RETURN(-EINVAL);
843         }
844
845         desc = (struct lov_desc *)lustre_cfg_buf(lcfg, 1);
846
847         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
848                 CERROR("descriptor size wrong: %d > %d\n",
849                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
850                 RETURN(-EINVAL);
851         }
852
853         if (desc->ld_magic != LOV_DESC_MAGIC) {
854                 if (desc->ld_magic == __swab32(LOV_DESC_MAGIC)) {
855                             CDEBUG(D_OTHER, "%s: Swabbing lov desc %p\n",
856                                    obd->obd_name, desc);
857                             lustre_swab_lov_desc(desc);
858                 } else {
859                         CERROR("%s: Bad lov desc magic: %#x\n",
860                                obd->obd_name, desc->ld_magic);
861                         RETURN(-EINVAL);
862                 }
863         }
864
865         lov_fix_desc(desc);
866
867         /* Because of 64-bit divide/mod operations only work with a 32-bit
868          * divisor in a 32-bit kernel, we cannot support a stripe width
869          * of 4GB or larger on 32-bit CPUs. */
870         count = desc->ld_default_stripe_count;
871         if ((count > 0 ? count : desc->ld_tgt_count) *
872             desc->ld_default_stripe_size > 0xffffffff) {
873                 CERROR("LOV: stripe width "LPU64"x%u > 4294967295 bytes\n",
874                        desc->ld_default_stripe_size, count);
875                 RETURN(-EINVAL);
876         }
877
878         desc->ld_active_tgt_count = 0;
879         lov->desc = *desc;
880         lov->lov_tgt_size = 0;
881         sema_init(&lov->lov_lock, 1);
882         atomic_set(&lov->lov_refcount, 0);
883         CFS_INIT_LIST_HEAD(&lov->lov_qos.lq_oss_list);
884         init_rwsem(&lov->lov_qos.lq_rw_sem);
885         lov->lov_qos.lq_dirty = 1;
886         lov->lov_qos.lq_dirty_rr = 1;
887         lov->lov_qos.lq_reset = 1;
888         /* Default priority is toward free space balance */
889         lov->lov_qos.lq_prio_free = 232;
890
891         lprocfs_lov_init_vars(&lvars);
892         lprocfs_obd_setup(obd, lvars.obd_vars);
893 #ifdef LPROCFS
894         {
895                 int rc;
896
897                 rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
898                                         0444, &lov_proc_target_fops, obd);
899                 if (rc)
900                         CWARN("Error adding the target_obd file\n");
901         }
902 #endif
903
904         RETURN(0);
905 }
906
907 static int lov_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
908 {
909         int rc = 0;
910         ENTRY;
911
912         switch (stage) {
913         case OBD_CLEANUP_EARLY: {
914                 struct lov_obd *lov = &obd->u.lov;
915                 int i;
916                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
917                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active)
918                                 continue;
919                         obd_precleanup(class_exp2obd(lov->lov_tgts[i]->ltd_exp),
920                                        OBD_CLEANUP_EARLY);
921                 }
922                 break;
923         }
924         case OBD_CLEANUP_EXPORTS:
925                 rc = obd_llog_finish(obd, 0);
926                 if (rc != 0)
927                         CERROR("failed to cleanup llogging subsystems\n");
928                 break;
929         }
930         RETURN(rc);
931 }
932
933 static int lov_cleanup(struct obd_device *obd)
934 {
935         struct lov_obd *lov = &obd->u.lov;
936
937         lprocfs_obd_cleanup(obd);
938         if (lov->lov_tgts) {
939                 int i;
940                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
941                         if (!lov->lov_tgts[i])
942                                 continue;
943
944                         /* Inactive targets may never have connected */
945                         if (lov->lov_tgts[i]->ltd_active ||
946                             atomic_read(&lov->lov_refcount))
947                             /* We should never get here - these
948                                should have been removed in the
949                              disconnect. */
950                                 CERROR("lov tgt %d not cleaned!"
951                                        " deathrow=%d, lovrc=%d\n",
952                                        i, lov->lov_death_row,
953                                        atomic_read(&lov->lov_refcount));
954                         lov_del_target(obd, i, 0, 0);
955                 }
956                 OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) *
957                          lov->lov_tgt_size);
958                 lov->lov_tgt_size = 0;
959         }
960
961         if (lov->lov_qos.lq_rr_size)
962                 OBD_FREE(lov->lov_qos.lq_rr_array, lov->lov_qos.lq_rr_size);
963
964         RETURN(0);
965 }
966
967 static int lov_process_config(struct obd_device *obd, obd_count len, void *buf)
968 {
969         struct lustre_cfg *lcfg = buf;
970         struct obd_uuid obd_uuid;
971         int cmd;
972         int rc = 0;
973         ENTRY;
974
975         switch(cmd = lcfg->lcfg_command) {
976         case LCFG_LOV_ADD_OBD:
977         case LCFG_LOV_ADD_INA:
978         case LCFG_LOV_DEL_OBD: {
979                 __u32 index;
980                 int gen;
981                 /* lov_modify_tgts add  0:lov_mdsA  1:ost1_UUID  2:0  3:1 */
982                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
983                         GOTO(out, rc = -EINVAL);
984
985                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
986
987                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
988                         GOTO(out, rc = -EINVAL);
989                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
990                         GOTO(out, rc = -EINVAL);
991                 if (cmd == LCFG_LOV_ADD_OBD)
992                         rc = lov_add_target(obd, &obd_uuid, index, gen, 1);
993                 else if (cmd == LCFG_LOV_ADD_INA)
994                         rc = lov_add_target(obd, &obd_uuid, index, gen, 0);
995                 else
996                         rc = lov_del_target(obd, index, &obd_uuid, gen);
997                 GOTO(out, rc);
998         }
999         case LCFG_PARAM: {
1000                 struct lprocfs_static_vars lvars = { 0 };
1001                 struct lov_desc *desc = &(obd->u.lov.desc);
1002
1003                 if (!desc)
1004                         GOTO(out, rc = -EINVAL);
1005
1006                 lprocfs_lov_init_vars(&lvars);
1007
1008                 rc = class_process_proc_param(PARAM_LOV, lvars.obd_vars,
1009                                               lcfg, obd);
1010                 GOTO(out, rc);
1011         }
1012         default: {
1013                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1014                 GOTO(out, rc = -EINVAL);
1015
1016         }
1017         }
1018 out:
1019         RETURN(rc);
1020 }
1021
1022 #ifndef log2
1023 #define log2(n) ffz(~(n))
1024 #endif
1025
1026 static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
1027                              struct lov_stripe_md **ea,
1028                              struct obd_trans_info *oti)
1029 {
1030         struct lov_obd *lov;
1031         struct obdo *tmp_oa;
1032         struct obd_uuid *ost_uuid = NULL;
1033         int rc = 0, i;
1034         ENTRY;
1035
1036         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
1037                 src_oa->o_flags == OBD_FL_DELORPHAN);
1038
1039         lov = &export->exp_obd->u.lov;
1040
1041         OBDO_ALLOC(tmp_oa);
1042         if (tmp_oa == NULL)
1043                 RETURN(-ENOMEM);
1044
1045         if (src_oa->o_valid & OBD_MD_FLINLINE) {
1046                 ost_uuid = (struct obd_uuid *)src_oa->o_inline;
1047                 CDEBUG(D_HA, "clearing orphans only for %s\n",
1048                        ost_uuid->uuid);
1049         }
1050
1051         lov_getref(export->exp_obd);
1052         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1053                 struct lov_stripe_md obj_md;
1054                 struct lov_stripe_md *obj_mdp = &obj_md;
1055                 struct lov_tgt_desc *tgt;
1056                 int err;
1057
1058                 tgt = lov->lov_tgts[i];
1059                 if (!tgt)
1060                         continue;
1061
1062                 /* if called for a specific target, we don't
1063                    care if it is not active. */
1064                 if (!lov->lov_tgts[i]->ltd_active && ost_uuid == NULL) {
1065                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1066                         continue;
1067                 }
1068
1069                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &tgt->ltd_uuid))
1070                         continue;
1071
1072                 CDEBUG(D_CONFIG,"Clear orphans for %d:%s\n", i,
1073                        obd_uuid2str(ost_uuid));
1074
1075                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
1076
1077                 LASSERT(lov->lov_tgts[i]->ltd_exp);
1078                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
1079                 err = obd_create(lov->lov_tgts[i]->ltd_exp,
1080                                  tmp_oa, &obj_mdp, oti);
1081                 if (err)
1082                         /* This export will be disabled until it is recovered,
1083                            and then orphan recovery will be completed. */
1084                         CERROR("error in orphan recovery on OST idx %d/%d: "
1085                                "rc = %d\n", i, lov->desc.ld_tgt_count, err);
1086
1087                 if (ost_uuid)
1088                         break;
1089         }
1090         lov_putref(export->exp_obd);
1091
1092         OBDO_FREE(tmp_oa);
1093         RETURN(rc);
1094 }
1095
1096 static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
1097                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
1098 {
1099         struct lov_stripe_md *obj_mdp, *lsm;
1100         struct lov_obd *lov = &exp->exp_obd->u.lov;
1101         unsigned ost_idx;
1102         int rc, i;
1103         ENTRY;
1104
1105         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
1106                 src_oa->o_flags & OBD_FL_RECREATE_OBJS);
1107
1108         OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
1109         if (obj_mdp == NULL)
1110                 RETURN(-ENOMEM);
1111
1112         ost_idx = src_oa->o_nlink;
1113         lsm = *ea;
1114         if (lsm == NULL)
1115                 GOTO(out, rc = -EINVAL);
1116         if (ost_idx >= lov->desc.ld_tgt_count ||
1117             !lov->lov_tgts[ost_idx])
1118                 GOTO(out, rc = -EINVAL);
1119
1120         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1121                 if (lsm->lsm_oinfo[i]->loi_ost_idx == ost_idx) {
1122                         if (lsm->lsm_oinfo[i]->loi_id != src_oa->o_id)
1123                                 GOTO(out, rc = -EINVAL);
1124                         break;
1125                 }
1126         }
1127         if (i == lsm->lsm_stripe_count)
1128                 GOTO(out, rc = -EINVAL);
1129
1130         rc = obd_create(lov->lov_tgts[ost_idx]->ltd_exp, src_oa, &obj_mdp, oti);
1131 out:
1132         OBD_FREE(obj_mdp, sizeof(*obj_mdp));
1133         RETURN(rc);
1134 }
1135
1136 /* the LOV expects oa->o_id to be set to the LOV object id */
1137 static int lov_create(struct obd_export *exp, struct obdo *src_oa,
1138                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
1139 {
1140         struct lov_obd *lov;
1141         struct obd_info oinfo;
1142         struct lov_request_set *set = NULL;
1143         struct lov_request *req;
1144         struct obd_statfs osfs;
1145         __u64 maxage;
1146         int rc = 0;
1147         ENTRY;
1148
1149         LASSERT(ea != NULL);
1150         if (exp == NULL)
1151                 RETURN(-EINVAL);
1152
1153         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
1154             src_oa->o_flags == OBD_FL_DELORPHAN) {
1155                 rc = lov_clear_orphans(exp, src_oa, ea, oti);
1156                 RETURN(rc);
1157         }
1158
1159         lov = &exp->exp_obd->u.lov;
1160         if (!lov->desc.ld_active_tgt_count)
1161                 RETURN(-EIO);
1162
1163         /* Recreate a specific object id at the given OST index */
1164         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
1165             (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
1166                  rc = lov_recreate(exp, src_oa, ea, oti);
1167                  RETURN(rc);
1168         }
1169
1170         maxage = cfs_time_shift_64(-lov->desc.ld_qos_maxage);
1171         obd_statfs_rqset(exp->exp_obd, &osfs, maxage, OBD_STATFS_NODELAY);
1172
1173         rc = lov_prep_create_set(exp, &oinfo, ea, src_oa, oti, &set);
1174         if (rc)
1175                 RETURN(rc);
1176
1177         list_for_each_entry(req, &set->set_list, rq_link) {
1178                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
1179                 rc = obd_create(lov->lov_tgts[req->rq_idx]->ltd_exp,
1180                                 req->rq_oi.oi_oa, &req->rq_oi.oi_md, oti);
1181                 lov_update_create_set(set, req, rc);
1182         }
1183         rc = lov_fini_create_set(set, ea);
1184         RETURN(rc);
1185 }
1186
1187 #define ASSERT_LSM_MAGIC(lsmp)                                                  \
1188 do {                                                                            \
1189         LASSERT((lsmp) != NULL);                                                \
1190         LASSERTF(((lsmp)->lsm_magic == LOV_MAGIC ||                             \
1191                  (lsmp)->lsm_magic == LOV_MAGIC_JOIN), "%p->lsm_magic=%x\n",    \
1192                  (lsmp), (lsmp)->lsm_magic);                                    \
1193 } while (0)
1194
1195 static int lov_destroy(struct obd_export *exp, struct obdo *oa,
1196                        struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1197                        struct obd_export *md_exp)
1198 {
1199         struct lov_request_set *set;
1200         struct obd_info oinfo;
1201         struct lov_request *req;
1202         struct list_head *pos;
1203         struct lov_obd *lov;
1204         int rc = 0, err;
1205         ENTRY;
1206
1207         ASSERT_LSM_MAGIC(lsm);
1208
1209         if (!exp || !exp->exp_obd)
1210                 RETURN(-ENODEV);
1211
1212         if (oa->o_valid & OBD_MD_FLCOOKIE) {
1213                 LASSERT(oti);
1214                 LASSERT(oti->oti_logcookies);
1215         }
1216
1217         lov = &exp->exp_obd->u.lov;
1218         rc = lov_prep_destroy_set(exp, &oinfo, oa, lsm, oti, &set);
1219         if (rc)
1220                 RETURN(rc);
1221
1222         list_for_each (pos, &set->set_list) {
1223                 int err;
1224                 req = list_entry(pos, struct lov_request, rq_link);
1225
1226                 if (oa->o_valid & OBD_MD_FLCOOKIE)
1227                         oti->oti_logcookies = set->set_cookies + req->rq_stripe;
1228
1229                 err = obd_destroy(lov->lov_tgts[req->rq_idx]->ltd_exp,
1230                                   req->rq_oi.oi_oa, NULL, oti, NULL);
1231                 err = lov_update_common_set(set, req, err);
1232                 if (err) {
1233                         CERROR("error: destroying objid "LPX64" subobj "
1234                                LPX64" on OST idx %d: rc = %d\n",
1235                                oa->o_id, req->rq_oi.oi_oa->o_id,
1236                                req->rq_idx, err);
1237                         if (!rc)
1238                                 rc = err;
1239                 }
1240         }
1241
1242         if (rc == 0) {
1243                 LASSERT(lsm_op_find(lsm->lsm_magic) != NULL);
1244                 rc = lsm_op_find(lsm->lsm_magic)->lsm_destroy(lsm, oa, md_exp);
1245         }
1246         err = lov_fini_destroy_set(set);
1247         RETURN(rc ? rc : err);
1248 }
1249
1250 static int lov_getattr(struct obd_export *exp, struct obd_info *oinfo)
1251 {
1252         struct lov_request_set *set;
1253         struct lov_request *req;
1254         struct list_head *pos;
1255         struct lov_obd *lov;
1256         int err = 0, rc = 0;
1257         ENTRY;
1258
1259         LASSERT(oinfo);
1260         ASSERT_LSM_MAGIC(oinfo->oi_md);
1261
1262         if (!exp || !exp->exp_obd)
1263                 RETURN(-ENODEV);
1264
1265         lov = &exp->exp_obd->u.lov;
1266
1267         rc = lov_prep_getattr_set(exp, oinfo, &set);
1268         if (rc)
1269                 RETURN(rc);
1270
1271         list_for_each (pos, &set->set_list) {
1272                 req = list_entry(pos, struct lov_request, rq_link);
1273
1274                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
1275                        "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
1276                        req->rq_oi.oi_oa->o_id, req->rq_idx);
1277
1278                 rc = obd_getattr(lov->lov_tgts[req->rq_idx]->ltd_exp,
1279                                  &req->rq_oi);
1280                 err = lov_update_common_set(set, req, rc);
1281                 if (err) {
1282                         CERROR("error: getattr objid "LPX64" subobj "
1283                                LPX64" on OST idx %d: rc = %d\n",
1284                                oinfo->oi_oa->o_id, req->rq_oi.oi_oa->o_id,
1285                                req->rq_idx, err);
1286                         break;
1287                 }
1288         }
1289
1290         rc = lov_fini_getattr_set(set);
1291         if (err)
1292                 rc = err;
1293         RETURN(rc);
1294 }
1295
1296 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
1297                                  void *data, int rc)
1298 {
1299         struct lov_request_set *lovset = (struct lov_request_set *)data;
1300         int err;
1301         ENTRY;
1302
1303         /* don't do attribute merge if this aysnc op failed */
1304         if (rc)
1305                 lovset->set_completes = 0;
1306         err = lov_fini_getattr_set(lovset);
1307         RETURN(rc ? rc : err);
1308 }
1309
1310 static int lov_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
1311                               struct ptlrpc_request_set *rqset)
1312 {
1313         struct lov_request_set *lovset;
1314         struct lov_obd *lov;
1315         struct list_head *pos;
1316         struct lov_request *req;
1317         int rc = 0, err;
1318         ENTRY;
1319
1320         LASSERT(oinfo);
1321         ASSERT_LSM_MAGIC(oinfo->oi_md);
1322
1323         if (!exp || !exp->exp_obd)
1324                 RETURN(-ENODEV);
1325
1326         lov = &exp->exp_obd->u.lov;
1327
1328         rc = lov_prep_getattr_set(exp, oinfo, &lovset);
1329         if (rc)
1330                 RETURN(rc);
1331
1332         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
1333                oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count,
1334                oinfo->oi_md->lsm_stripe_size);
1335
1336         list_for_each (pos, &lovset->set_list) {
1337                 req = list_entry(pos, struct lov_request, rq_link);
1338
1339                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
1340                        "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
1341                        req->rq_oi.oi_oa->o_id, req->rq_idx);
1342                 rc = obd_getattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
1343                                        &req->rq_oi, rqset);
1344                 if (rc) {
1345                         CERROR("error: getattr objid "LPX64" subobj "
1346                                LPX64" on OST idx %d: rc = %d\n",
1347                                oinfo->oi_oa->o_id, req->rq_oi.oi_oa->o_id,
1348                                req->rq_idx, rc);
1349                         GOTO(out, rc);
1350                 }
1351         }
1352
1353         if (!list_empty(&rqset->set_requests)) {
1354                 LASSERT(rc == 0);
1355                 LASSERT (rqset->set_interpret == NULL);
1356                 rqset->set_interpret = lov_getattr_interpret;
1357                 rqset->set_arg = (void *)lovset;
1358                 RETURN(rc);
1359         }
1360 out:
1361         if (rc)
1362                 lovset->set_completes = 0;
1363         err = lov_fini_getattr_set(lovset);
1364         RETURN(rc ? rc : err);
1365 }
1366
1367 static int lov_setattr(struct obd_export *exp, struct obd_info *oinfo,
1368                        struct obd_trans_info *oti)
1369 {
1370         struct lov_request_set *set;
1371         struct lov_obd *lov;
1372         struct list_head *pos;
1373         struct lov_request *req;
1374         int err = 0, rc = 0;
1375         ENTRY;
1376
1377         LASSERT(oinfo);
1378         ASSERT_LSM_MAGIC(oinfo->oi_md);
1379
1380         if (!exp || !exp->exp_obd)
1381                 RETURN(-ENODEV);
1382
1383         /* for now, we only expect the following updates here */
1384         LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE |
1385                                             OBD_MD_FLMODE | OBD_MD_FLATIME |
1386                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME |
1387                                             OBD_MD_FLFLAGS | OBD_MD_FLSIZE |
1388                                             OBD_MD_FLGROUP | OBD_MD_FLUID |
1389                                             OBD_MD_FLGID | OBD_MD_FLINLINE |
1390                                             OBD_MD_FLFID | OBD_MD_FLGENER)));
1391         lov = &exp->exp_obd->u.lov;
1392         rc = lov_prep_setattr_set(exp, oinfo, oti, &set);
1393         if (rc)
1394                 RETURN(rc);
1395
1396         list_for_each (pos, &set->set_list) {
1397                 req = list_entry(pos, struct lov_request, rq_link);
1398
1399                 rc = obd_setattr(lov->lov_tgts[req->rq_idx]->ltd_exp,
1400                                  &req->rq_oi, NULL);
1401                 err = lov_update_setattr_set(set, req, rc);
1402                 if (err) {
1403                         CERROR("error: setattr objid "LPX64" subobj "
1404                                LPX64" on OST idx %d: rc = %d\n",
1405                                set->set_oi->oi_oa->o_id,
1406                                req->rq_oi.oi_oa->o_id, req->rq_idx, err);
1407                         if (!rc)
1408                                 rc = err;
1409                 }
1410         }
1411         err = lov_fini_setattr_set(set);
1412         if (!rc)
1413                 rc = err;
1414         RETURN(rc);
1415 }
1416
1417 static int lov_setattr_interpret(struct ptlrpc_request_set *rqset,
1418                                  void *data, int rc)
1419 {
1420         struct lov_request_set *lovset = (struct lov_request_set *)data;
1421         int err;
1422         ENTRY;
1423
1424         if (rc)
1425                 lovset->set_completes = 0;
1426         err = lov_fini_setattr_set(lovset);
1427         RETURN(rc ? rc : err);
1428 }
1429
1430 /* If @oti is given, the request goes from MDS and responses from OSTs are not
1431    needed. Otherwise, a client is waiting for responses. */
1432 static int lov_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
1433                              struct obd_trans_info *oti,
1434                              struct ptlrpc_request_set *rqset)
1435 {
1436         struct lov_request_set *set;
1437         struct lov_request *req;
1438         struct list_head *pos;
1439         struct lov_obd *lov;
1440         int rc = 0;
1441         ENTRY;
1442
1443         LASSERT(oinfo);
1444         ASSERT_LSM_MAGIC(oinfo->oi_md);
1445         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
1446                 LASSERT(oti);
1447                 LASSERT(oti->oti_logcookies);
1448         }
1449
1450         if (!exp || !exp->exp_obd)
1451                 RETURN(-ENODEV);
1452
1453         lov = &exp->exp_obd->u.lov;
1454         rc = lov_prep_setattr_set(exp, oinfo, oti, &set);
1455         if (rc)
1456                 RETURN(rc);
1457
1458         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
1459                oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count,
1460                oinfo->oi_md->lsm_stripe_size);
1461
1462         list_for_each (pos, &set->set_list) {
1463                 req = list_entry(pos, struct lov_request, rq_link);
1464
1465                 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1466                         oti->oti_logcookies = set->set_cookies + req->rq_stripe;
1467
1468                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
1469                        "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
1470                        req->rq_oi.oi_oa->o_id, req->rq_idx);
1471
1472                 rc = obd_setattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
1473                                        &req->rq_oi, oti, rqset);
1474                 if (rc) {
1475                         CERROR("error: setattr objid "LPX64" subobj "
1476                                LPX64" on OST idx %d: rc = %d\n",
1477                                set->set_oi->oi_oa->o_id,
1478                                req->rq_oi.oi_oa->o_id,
1479                                req->rq_idx, rc);
1480                         break;
1481                 }
1482         }
1483
1484         /* If we are not waiting for responses on async requests, return. */
1485         if (rc || !rqset || list_empty(&rqset->set_requests)) {
1486                 int err;
1487                 if (rc)
1488                         set->set_completes = 0;
1489                 err = lov_fini_setattr_set(set);
1490                 RETURN(rc ? rc : err);
1491         }
1492
1493         LASSERT(rqset->set_interpret == NULL);
1494         rqset->set_interpret = lov_setattr_interpret;
1495         rqset->set_arg = (void *)set;
1496
1497         RETURN(0);
1498 }
1499
1500 static int lov_punch_interpret(struct ptlrpc_request_set *rqset,
1501                                void *data, int rc)
1502 {
1503         struct lov_request_set *lovset = (struct lov_request_set *)data;
1504         int err;
1505         ENTRY;
1506
1507         if (rc)
1508                 lovset->set_completes = 0;
1509         err = lov_fini_punch_set(lovset);
1510         RETURN(rc ? rc : err);
1511 }
1512
1513 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1514  * we can send this 'punch' to just the authoritative node and the nodes
1515  * that the punch will affect. */
1516 static int lov_punch(struct obd_export *exp, struct obd_info *oinfo,
1517                      struct obd_trans_info *oti,
1518                      struct ptlrpc_request_set *rqset)
1519 {
1520         struct lov_request_set *set;
1521         struct lov_obd *lov;
1522         struct list_head *pos;
1523         struct lov_request *req;
1524         int rc = 0;
1525         ENTRY;
1526
1527         LASSERT(oinfo);
1528         ASSERT_LSM_MAGIC(oinfo->oi_md);
1529
1530         if (!exp || !exp->exp_obd)
1531                 RETURN(-ENODEV);
1532
1533         lov = &exp->exp_obd->u.lov;
1534         rc = lov_prep_punch_set(exp, oinfo, oti, &set);
1535         if (rc)
1536                 RETURN(rc);
1537
1538         list_for_each (pos, &set->set_list) {
1539                 req = list_entry(pos, struct lov_request, rq_link);
1540
1541                 rc = obd_punch(lov->lov_tgts[req->rq_idx]->ltd_exp,
1542                                &req->rq_oi, NULL, rqset);
1543                 if (rc) {
1544                         CERROR("error: punch objid "LPX64" subobj "LPX64
1545                                " on OST idx %d: rc = %d\n",
1546                                set->set_oi->oi_oa->o_id,
1547                                req->rq_oi.oi_oa->o_id, req->rq_idx, rc);
1548                         break;
1549                 }
1550         }
1551
1552         if (rc || list_empty(&rqset->set_requests)) {
1553                 int err;
1554                 err = lov_fini_punch_set(set);
1555                 RETURN(rc ? rc : err);
1556         }
1557
1558         LASSERT(rqset->set_interpret == NULL);
1559         rqset->set_interpret = lov_punch_interpret;
1560         rqset->set_arg = (void *)set;
1561
1562         RETURN(0);
1563 }
1564
1565 static int lov_sync(struct obd_export *exp, struct obdo *oa,
1566                     struct lov_stripe_md *lsm, obd_off start, obd_off end,
1567                     void *capa)
1568 {
1569         struct lov_request_set *set;
1570         struct obd_info oinfo;
1571         struct lov_obd *lov;
1572         struct list_head *pos;
1573         struct lov_request *req;
1574         int err = 0, rc = 0;
1575         ENTRY;
1576
1577         ASSERT_LSM_MAGIC(lsm);
1578
1579         if (!exp->exp_obd)
1580                 RETURN(-ENODEV);
1581
1582         lov = &exp->exp_obd->u.lov;
1583         rc = lov_prep_sync_set(exp, &oinfo, oa, lsm, start, end, &set);
1584         if (rc)
1585                 RETURN(rc);
1586
1587         list_for_each (pos, &set->set_list) {
1588                 req = list_entry(pos, struct lov_request, rq_link);
1589
1590                 rc = obd_sync(lov->lov_tgts[req->rq_idx]->ltd_exp,
1591                               req->rq_oi.oi_oa, NULL,
1592                               req->rq_oi.oi_policy.l_extent.start,
1593                               req->rq_oi.oi_policy.l_extent.end, capa);
1594                 err = lov_update_common_set(set, req, rc);
1595                 if (err) {
1596                         CERROR("error: fsync objid "LPX64" subobj "LPX64
1597                                " on OST idx %d: rc = %d\n",
1598                                set->set_oi->oi_oa->o_id,
1599                                req->rq_oi.oi_oa->o_id, req->rq_idx, rc);
1600                         if (!rc)
1601                                 rc = err;
1602                 }
1603         }
1604         err = lov_fini_sync_set(set);
1605         if (!rc)
1606                 rc = err;
1607         RETURN(rc);
1608 }
1609
1610 static int lov_brw_check(struct lov_obd *lov, struct obd_info *lov_oinfo,
1611                          obd_count oa_bufs, struct brw_page *pga)
1612 {
1613         struct obd_info oinfo = { { { 0 } } };
1614         int i, rc = 0;
1615
1616         oinfo.oi_oa = lov_oinfo->oi_oa;
1617
1618         /* The caller just wants to know if there's a chance that this
1619          * I/O can succeed */
1620         for (i = 0; i < oa_bufs; i++) {
1621                 int stripe = lov_stripe_number(lov_oinfo->oi_md, pga[i].off);
1622                 int ost = lov_oinfo->oi_md->lsm_oinfo[stripe]->loi_ost_idx;
1623                 obd_off start, end;
1624
1625                 if (!lov_stripe_intersects(lov_oinfo->oi_md, i, pga[i].off,
1626                                            pga[i].off + pga[i].count,
1627                                            &start, &end))
1628                         continue;
1629
1630                 if (!lov->lov_tgts[ost] || !lov->lov_tgts[ost]->ltd_active) {
1631                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1632                         return -EIO;
1633                 }
1634
1635                 rc = obd_brw(OBD_BRW_CHECK, lov->lov_tgts[ost]->ltd_exp, &oinfo,
1636                              1, &pga[i], NULL);
1637                 if (rc)
1638                         break;
1639         }
1640         return rc;
1641 }
1642
1643 static int lov_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1644                    obd_count oa_bufs, struct brw_page *pga,
1645                    struct obd_trans_info *oti)
1646 {
1647         struct lov_request_set *set;
1648         struct lov_request *req;
1649         struct list_head *pos;
1650         struct lov_obd *lov = &exp->exp_obd->u.lov;
1651         int err, rc = 0;
1652         ENTRY;
1653
1654         ASSERT_LSM_MAGIC(oinfo->oi_md);
1655
1656         if (cmd == OBD_BRW_CHECK) {
1657                 rc = lov_brw_check(lov, oinfo, oa_bufs, pga);
1658                 RETURN(rc);
1659         }
1660
1661         rc = lov_prep_brw_set(exp, oinfo, oa_bufs, pga, oti, &set);
1662         if (rc)
1663                 RETURN(rc);
1664
1665         list_for_each (pos, &set->set_list) {
1666                 struct obd_export *sub_exp;
1667                 struct brw_page *sub_pga;
1668                 req = list_entry(pos, struct lov_request, rq_link);
1669
1670                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
1671                 sub_pga = set->set_pga + req->rq_pgaidx;
1672                 rc = obd_brw(cmd, sub_exp, &req->rq_oi, req->rq_oabufs,
1673                              sub_pga, oti);
1674                 if (rc)
1675                         break;
1676                 lov_update_common_set(set, req, rc);
1677         }
1678
1679         err = lov_fini_brw_set(set);
1680         if (!rc)
1681                 rc = err;
1682         RETURN(rc);
1683 }
1684
1685 static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data,
1686                              int rc)
1687 {
1688         struct lov_request_set *lovset = (struct lov_request_set *)data;
1689         ENTRY;
1690
1691         if (rc) {
1692                 lovset->set_completes = 0;
1693                 lov_fini_brw_set(lovset);
1694         } else {
1695                 rc = lov_fini_brw_set(lovset);
1696         }
1697
1698         RETURN(rc);
1699 }
1700
1701 static int lov_brw_async(int cmd, struct obd_export *exp,
1702                          struct obd_info *oinfo, obd_count oa_bufs,
1703                          struct brw_page *pga, struct obd_trans_info *oti,
1704                          struct ptlrpc_request_set *set)
1705 {
1706         struct lov_request_set *lovset;
1707         struct lov_request *req;
1708         struct list_head *pos;
1709         struct lov_obd *lov = &exp->exp_obd->u.lov;
1710         int rc = 0;
1711         ENTRY;
1712
1713         LASSERT(oinfo);
1714         ASSERT_LSM_MAGIC(oinfo->oi_md);
1715
1716         if (cmd == OBD_BRW_CHECK) {
1717                 rc = lov_brw_check(lov, oinfo, oa_bufs, pga);
1718                 RETURN(rc);
1719         }
1720
1721         rc = lov_prep_brw_set(exp, oinfo, oa_bufs, pga, oti, &lovset);
1722         if (rc)
1723                 RETURN(rc);
1724
1725         list_for_each (pos, &lovset->set_list) {
1726                 struct obd_export *sub_exp;
1727                 struct brw_page *sub_pga;
1728                 req = list_entry(pos, struct lov_request, rq_link);
1729
1730                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
1731                 sub_pga = lovset->set_pga + req->rq_pgaidx;
1732                 rc = obd_brw_async(cmd, sub_exp, &req->rq_oi, req->rq_oabufs,
1733                                    sub_pga, oti, set);
1734                 if (rc)
1735                         GOTO(out, rc);
1736                 lov_update_common_set(lovset, req, rc);
1737         }
1738         LASSERT(rc == 0);
1739         LASSERT(set->set_interpret == NULL);
1740         LASSERT(set->set_arg == NULL);
1741         rc = ptlrpc_set_add_cb(set, lov_brw_interpret, lovset);
1742         if (rc)
1743                 GOTO(out, rc);
1744
1745         RETURN(rc);
1746 out:
1747         lov_fini_brw_set(lovset);
1748         RETURN(rc);
1749 }
1750
1751 static int lov_ap_make_ready(void *data, int cmd)
1752 {
1753         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1754
1755         return lap->lap_caller_ops->ap_make_ready(lap->lap_caller_data, cmd);
1756 }
1757
1758 static int lov_ap_refresh_count(void *data, int cmd)
1759 {
1760         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1761
1762         return lap->lap_caller_ops->ap_refresh_count(lap->lap_caller_data,
1763                                                      cmd);
1764 }
1765
1766 static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
1767 {
1768         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1769
1770         lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa);
1771         /* XXX woah, shouldn't we be altering more here?  size? */
1772         oa->o_id = lap->lap_loi_id;
1773         oa->o_gr = lap->lap_loi_gr;
1774         oa->o_valid |= OBD_MD_FLGROUP;
1775         oa->o_stripe_idx = lap->lap_stripe;
1776 }
1777
1778 static void lov_ap_update_obdo(void *data, int cmd, struct obdo *oa,
1779                                obd_valid valid)
1780 {
1781         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1782
1783         lap->lap_caller_ops->ap_update_obdo(lap->lap_caller_data, cmd,oa,valid);
1784 }
1785
1786 static int lov_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
1787 {
1788         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1789
1790         /* in a raid1 regime this would down a count of many ios
1791          * in flight, onl calling the caller_ops completion when all
1792          * the raid1 ios are complete */
1793         rc = lap->lap_caller_ops->ap_completion(lap->lap_caller_data,cmd,oa,rc);
1794         return rc;
1795 }
1796
1797 static struct obd_capa *lov_ap_lookup_capa(void *data, int cmd)
1798 {
1799         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1800         return lap->lap_caller_ops->ap_lookup_capa(lap->lap_caller_data, cmd);
1801 }
1802
1803 static struct obd_async_page_ops lov_async_page_ops = {
1804         .ap_make_ready =        lov_ap_make_ready,
1805         .ap_refresh_count =     lov_ap_refresh_count,
1806         .ap_fill_obdo =         lov_ap_fill_obdo,
1807         .ap_update_obdo =       lov_ap_update_obdo,
1808         .ap_completion =        lov_ap_completion,
1809         .ap_lookup_capa =       lov_ap_lookup_capa,
1810 };
1811
1812 int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
1813                            struct lov_oinfo *loi, cfs_page_t *page,
1814                            obd_off offset, struct obd_async_page_ops *ops,
1815                            void *data, void **res, int nocache,
1816                            struct lustre_handle *lockh)
1817 {
1818         struct lov_obd *lov = &exp->exp_obd->u.lov;
1819         struct lov_async_page *lap;
1820         struct lov_lock_handles *lov_lockh = NULL;
1821         int rc = 0;
1822         ENTRY;
1823
1824         if (!page) {
1825                 int i = 0;
1826                 /* Find an existing osc so we can get it's stupid sizeof(*oap).
1827                    Only because of this layering limitation will a client
1828                    mount with no osts fail */
1829                 while (!lov->lov_tgts || !lov->lov_tgts[i] ||
1830                        !lov->lov_tgts[i]->ltd_exp) {
1831                         i++;
1832                         if (i >= lov->desc.ld_tgt_count)
1833                                 RETURN(-ENOMEDIUM);
1834                 }
1835                 rc = size_round(sizeof(*lap)) +
1836                         obd_prep_async_page(lov->lov_tgts[i]->ltd_exp, NULL,
1837                                             NULL, NULL, 0, NULL, NULL, NULL, 0,
1838                                             NULL);
1839                 RETURN(rc);
1840         }
1841         ASSERT_LSM_MAGIC(lsm);
1842         LASSERT(loi == NULL);
1843
1844         lap = *res;
1845         lap->lap_magic = LOV_AP_MAGIC;
1846         lap->lap_caller_ops = ops;
1847         lap->lap_caller_data = data;
1848
1849         /* for now only raid 0 which passes through */
1850         lap->lap_stripe = lov_stripe_number(lsm, offset);
1851         lov_stripe_offset(lsm, offset, lap->lap_stripe, &lap->lap_sub_offset);
1852         loi = lsm->lsm_oinfo[lap->lap_stripe];
1853
1854         /* so the callback doesn't need the lsm */
1855         lap->lap_loi_id = loi->loi_id;
1856         lap->lap_loi_gr = lsm->lsm_object_gr;
1857         LASSERT(lsm->lsm_object_gr > 0);
1858         
1859         lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap));
1860
1861         if (lockh) {
1862                 lov_lockh = lov_handle2llh(lockh);
1863                 if (lov_lockh) {
1864                         lockh = lov_lockh->llh_handles + lap->lap_stripe;
1865                 }
1866         }
1867
1868         rc = obd_prep_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
1869                                  lsm, loi, page, lap->lap_sub_offset,
1870                                  &lov_async_page_ops, lap,
1871                                  &lap->lap_sub_cookie, nocache, lockh);
1872         if (lov_lockh)
1873                 lov_llh_put(lov_lockh);
1874         if (rc)
1875                 RETURN(rc);
1876         CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
1877                lap->lap_sub_cookie, offset);
1878         RETURN(0);
1879 }
1880
1881 static int lov_queue_async_io(struct obd_export *exp,
1882                               struct lov_stripe_md *lsm,
1883                               struct lov_oinfo *loi, void *cookie,
1884                               int cmd, obd_off off, int count,
1885                               obd_flag brw_flags, obd_flag async_flags)
1886 {
1887         struct lov_obd *lov = &exp->exp_obd->u.lov;
1888         struct lov_async_page *lap;
1889         int rc;
1890
1891         LASSERT(loi == NULL);
1892
1893         ASSERT_LSM_MAGIC(lsm);
1894
1895         lap = LAP_FROM_COOKIE(cookie);
1896
1897         loi = lsm->lsm_oinfo[lap->lap_stripe];
1898
1899         rc = obd_queue_async_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp, lsm,
1900                                 loi, lap->lap_sub_cookie, cmd, off, count,
1901                                 brw_flags, async_flags);
1902         RETURN(rc);
1903 }
1904
1905 static int lov_set_async_flags(struct obd_export *exp,
1906                                struct lov_stripe_md *lsm,
1907                                struct lov_oinfo *loi, void *cookie,
1908                                obd_flag async_flags)
1909 {
1910         struct lov_obd *lov = &exp->exp_obd->u.lov;
1911         struct lov_async_page *lap;
1912         int rc;
1913
1914         LASSERT(loi == NULL);
1915
1916         ASSERT_LSM_MAGIC(lsm);
1917
1918         lap = LAP_FROM_COOKIE(cookie);
1919
1920         loi = lsm->lsm_oinfo[lap->lap_stripe];
1921
1922         rc = obd_set_async_flags(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
1923                                  lsm, loi, lap->lap_sub_cookie, async_flags);
1924         RETURN(rc);
1925 }
1926
1927 static int lov_queue_group_io(struct obd_export *exp,
1928                               struct lov_stripe_md *lsm,
1929                               struct lov_oinfo *loi,
1930                               struct obd_io_group *oig, void *cookie,
1931                               int cmd, obd_off off, int count,
1932                               obd_flag brw_flags, obd_flag async_flags)
1933 {
1934         struct lov_obd *lov = &exp->exp_obd->u.lov;
1935         struct lov_async_page *lap;
1936         int rc;
1937
1938         LASSERT(loi == NULL);
1939
1940         ASSERT_LSM_MAGIC(lsm);
1941
1942         lap = LAP_FROM_COOKIE(cookie);
1943
1944         loi = lsm->lsm_oinfo[lap->lap_stripe];
1945
1946         rc = obd_queue_group_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp, lsm,
1947                                 loi, oig, lap->lap_sub_cookie, cmd, off, count,
1948                                 brw_flags, async_flags);
1949         RETURN(rc);
1950 }
1951
1952 /* this isn't exactly optimal.  we may have queued sync io in oscs on
1953  * all stripes, but we don't record that fact at queue time.  so we
1954  * trigger sync io on all stripes. */
1955 static int lov_trigger_group_io(struct obd_export *exp,
1956                                 struct lov_stripe_md *lsm,
1957                                 struct lov_oinfo *loi,
1958                                 struct obd_io_group *oig)
1959 {
1960         struct lov_obd *lov = &exp->exp_obd->u.lov;
1961         int rc = 0, i, err;
1962
1963         LASSERT(loi == NULL);
1964
1965         ASSERT_LSM_MAGIC(lsm);
1966
1967         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1968                 loi = lsm->lsm_oinfo[i];
1969                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1970                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1971                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1972                         continue;
1973                 }
1974
1975                 err = obd_trigger_group_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
1976                                            lsm, loi, oig);
1977                 if (rc == 0 && err != 0)
1978                         rc = err;
1979         };
1980         RETURN(rc);
1981 }
1982
1983 static int lov_teardown_async_page(struct obd_export *exp,
1984                                    struct lov_stripe_md *lsm,
1985                                    struct lov_oinfo *loi, void *cookie)
1986 {
1987         struct lov_obd *lov = &exp->exp_obd->u.lov;
1988         struct lov_async_page *lap;
1989         int rc;
1990
1991         LASSERT(loi == NULL);
1992
1993         ASSERT_LSM_MAGIC(lsm);
1994
1995         lap = LAP_FROM_COOKIE(cookie);
1996
1997         loi = lsm->lsm_oinfo[lap->lap_stripe];
1998
1999         rc = obd_teardown_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
2000                                      lsm, loi, lap->lap_sub_cookie);
2001         if (rc) {
2002                 CERROR("unable to teardown sub cookie %p: %d\n",
2003                        lap->lap_sub_cookie, rc);
2004                 RETURN(rc);
2005         }
2006         RETURN(rc);
2007 }
2008
2009 static int lov_enqueue_interpret(struct ptlrpc_request_set *rqset,
2010                                  void *data, int rc)
2011 {
2012         struct lov_request_set *lovset = (struct lov_request_set *)data;
2013         ENTRY;
2014         rc = lov_fini_enqueue_set(lovset, lovset->set_ei->ei_mode, rc, rqset);
2015         RETURN(rc);
2016 }
2017
2018 static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2019                        struct ldlm_enqueue_info *einfo,
2020                        struct ptlrpc_request_set *rqset)
2021 {
2022         ldlm_mode_t mode = einfo->ei_mode;
2023         struct lov_request_set *set;
2024         struct lov_request *req;
2025         struct list_head *pos;
2026         struct lov_obd *lov;
2027         ldlm_error_t rc;
2028         ENTRY;
2029
2030         LASSERT(oinfo);
2031         ASSERT_LSM_MAGIC(oinfo->oi_md);
2032         LASSERT(mode == (mode & -mode));
2033
2034         /* we should never be asked to replay a lock this way. */
2035         LASSERT((oinfo->oi_flags & LDLM_FL_REPLAY) == 0);
2036
2037         if (!exp || !exp->exp_obd)
2038                 RETURN(-ENODEV);
2039
2040         lov = &exp->exp_obd->u.lov;
2041         rc = lov_prep_enqueue_set(exp, oinfo, einfo, &set);
2042         if (rc)
2043                 RETURN(rc);
2044
2045         list_for_each (pos, &set->set_list) {
2046                 req = list_entry(pos, struct lov_request, rq_link);
2047
2048                 rc = obd_enqueue(lov->lov_tgts[req->rq_idx]->ltd_exp,
2049                                  &req->rq_oi, einfo, rqset);
2050                 if (rc != ELDLM_OK)
2051                         GOTO(out, rc);
2052         }
2053
2054         if (rqset && !list_empty(&rqset->set_requests)) {
2055                 LASSERT(rc == 0);
2056                 LASSERT(rqset->set_interpret == NULL);
2057                 rqset->set_interpret = lov_enqueue_interpret;
2058                 rqset->set_arg = (void *)set;
2059                 RETURN(rc);
2060         }
2061 out:
2062         rc = lov_fini_enqueue_set(set, mode, rc, rqset);
2063         RETURN(rc);
2064 }
2065
2066 static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2067                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2068                      int *flags, void *data, struct lustre_handle *lockh)
2069 {
2070         struct lov_request_set *set;
2071         struct obd_info oinfo;
2072         struct lov_request *req;
2073         struct list_head *pos;
2074         struct lov_obd *lov = &exp->exp_obd->u.lov;
2075         struct lustre_handle *lov_lockhp;
2076         int lov_flags, rc = 0;
2077         ENTRY;
2078
2079         ASSERT_LSM_MAGIC(lsm);
2080         LASSERT((*flags & LDLM_FL_TEST_LOCK) || mode == (mode & -mode));
2081
2082         if (!exp || !exp->exp_obd)
2083                 RETURN(-ENODEV);
2084
2085         lov = &exp->exp_obd->u.lov;
2086         rc = lov_prep_match_set(exp, &oinfo, lsm, policy, mode, lockh, &set);
2087         if (rc)
2088                 RETURN(rc);
2089
2090         list_for_each (pos, &set->set_list) {
2091                 ldlm_policy_data_t sub_policy;
2092                 req = list_entry(pos, struct lov_request, rq_link);
2093                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
2094                 LASSERT(lov_lockhp);
2095
2096                 lov_flags = *flags;
2097                 sub_policy.l_extent = req->rq_oi.oi_policy.l_extent;
2098
2099                 rc = obd_match(lov->lov_tgts[req->rq_idx]->ltd_exp,
2100                                req->rq_oi.oi_md, type, &sub_policy,
2101                                mode, &lov_flags, data, lov_lockhp);
2102                 rc = lov_update_match_set(set, req, rc);
2103                 if (rc <= 0)
2104                         break;
2105         }
2106         lov_fini_match_set(set, mode, *flags);
2107         RETURN(rc);
2108 }
2109
2110 static int lov_change_cbdata(struct obd_export *exp,
2111                              struct lov_stripe_md *lsm, ldlm_iterator_t it,
2112                              void *data)
2113 {
2114         struct lov_obd *lov;
2115         int rc = 0, i;
2116         ENTRY;
2117
2118         ASSERT_LSM_MAGIC(lsm);
2119
2120         if (!exp || !exp->exp_obd)
2121                 RETURN(-ENODEV);
2122
2123         LASSERT(lsm->lsm_object_gr > 0);
2124
2125         lov = &exp->exp_obd->u.lov;
2126         for (i = 0; i < lsm->lsm_stripe_count; i++) {
2127                 struct lov_stripe_md submd;
2128                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
2129
2130                 if (!lov->lov_tgts[loi->loi_ost_idx]) {
2131                         CDEBUG(D_HA, "lov idx %d NULL \n", loi->loi_ost_idx);
2132                         continue;
2133                 }
2134                 
2135                 submd.lsm_object_id = loi->loi_id;
2136                 submd.lsm_object_gr = lsm->lsm_object_gr;
2137                 submd.lsm_stripe_count = 0;
2138                 rc = obd_change_cbdata(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
2139                                        &submd, it, data);
2140         }
2141         RETURN(rc);
2142 }
2143
2144 static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
2145                       __u32 mode, struct lustre_handle *lockh)
2146 {
2147         struct lov_request_set *set;
2148         struct obd_info oinfo;
2149         struct lov_request *req;
2150         struct list_head *pos;
2151         struct lov_obd *lov = &exp->exp_obd->u.lov;
2152         struct lustre_handle *lov_lockhp;
2153         int err = 0, rc = 0;
2154         ENTRY;
2155
2156         ASSERT_LSM_MAGIC(lsm);
2157
2158         if (!exp || !exp->exp_obd)
2159                 RETURN(-ENODEV);
2160
2161         LASSERT(lsm->lsm_object_gr > 0);
2162         LASSERT(lockh);
2163         lov = &exp->exp_obd->u.lov;
2164         rc = lov_prep_cancel_set(exp, &oinfo, lsm, mode, lockh, &set);
2165         if (rc)
2166                 RETURN(rc);
2167
2168         list_for_each (pos, &set->set_list) {
2169                 req = list_entry(pos, struct lov_request, rq_link);
2170                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
2171
2172                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
2173                                 req->rq_oi.oi_md, mode, lov_lockhp);
2174                 rc = lov_update_common_set(set, req, rc);
2175                 if (rc) {
2176                         CERROR("error: cancel objid "LPX64" subobj "
2177                                LPX64" on OST idx %d: rc = %d\n",
2178                                lsm->lsm_object_id,
2179                                req->rq_oi.oi_md->lsm_object_id,
2180                                req->rq_idx, rc);
2181                         err = rc;
2182                 }
2183
2184         }
2185         lov_fini_cancel_set(set);
2186         RETURN(err);
2187 }
2188
2189 static int lov_cancel_unused(struct obd_export *exp,
2190                              struct lov_stripe_md *lsm,
2191                              int flags, void *opaque)
2192 {
2193         struct lov_obd *lov;
2194         int rc = 0, i;
2195         ENTRY;
2196
2197         if (!exp || !exp->exp_obd)
2198                 RETURN(-ENODEV);
2199
2200         lov = &exp->exp_obd->u.lov;
2201         if (lsm == NULL) {
2202                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2203                         int err;
2204                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
2205                                 continue;
2206
2207                         err = obd_cancel_unused(lov->lov_tgts[i]->ltd_exp, NULL,
2208                                                 flags, opaque);
2209                         if (!rc)
2210                                 rc = err;
2211                 }
2212                 RETURN(rc);
2213         }
2214
2215         ASSERT_LSM_MAGIC(lsm);
2216
2217         LASSERT(lsm->lsm_object_gr > 0);
2218         for (i = 0; i < lsm->lsm_stripe_count; i++) {
2219                 struct lov_stripe_md submd;
2220                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
2221                 int err;
2222
2223                 if (!lov->lov_tgts[loi->loi_ost_idx]) {
2224                         CDEBUG(D_HA, "lov idx %d NULL\n", loi->loi_ost_idx);
2225                         continue;
2226                 }
2227
2228                 if (!lov->lov_tgts[loi->loi_ost_idx]->ltd_active)
2229                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
2230
2231                 submd.lsm_object_id = loi->loi_id;
2232                 submd.lsm_object_gr = lsm->lsm_object_gr;
2233                 submd.lsm_stripe_count = 0;
2234                 err = obd_cancel_unused(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
2235                                         &submd, flags, opaque);
2236                 if (err && lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
2237                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
2238                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
2239                                loi->loi_id, loi->loi_ost_idx, err);
2240                         if (!rc)
2241                                 rc = err;
2242                 }
2243         }
2244         RETURN(rc);
2245 }
2246
2247 static int lov_join_lru(struct obd_export *exp,
2248                         struct lov_stripe_md *lsm, int join)
2249 {
2250         struct lov_obd *lov;
2251         int i, count = 0;
2252         ENTRY;
2253
2254         ASSERT_LSM_MAGIC(lsm);
2255         if (!exp || !exp->exp_obd)
2256                 RETURN(-ENODEV);
2257
2258         lov = &exp->exp_obd->u.lov;
2259         for (i = 0; i < lsm->lsm_stripe_count; i++) {
2260                 struct lov_stripe_md submd;
2261                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
2262                 int rc = 0;
2263
2264                 if (!lov->lov_tgts[loi->loi_ost_idx]) {
2265                         CDEBUG(D_HA, "lov idx %d NULL\n", loi->loi_ost_idx);
2266                         continue;
2267                 }
2268
2269                 if (!lov->lov_tgts[loi->loi_ost_idx]->ltd_active)
2270                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
2271
2272                 submd.lsm_object_id = loi->loi_id;
2273                 submd.lsm_object_gr = lsm->lsm_object_gr;
2274                 submd.lsm_stripe_count = 0;
2275                 rc = obd_join_lru(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
2276                                   &submd, join);
2277                 if (rc < 0) {
2278                         CERROR("join lru failed. objid: "LPX64" subobj: "LPX64
2279                                " ostidx: %d rc: %d\n", lsm->lsm_object_id,
2280                                loi->loi_id, loi->loi_ost_idx, rc);
2281                         return rc;
2282                 } else {
2283                         count += rc;
2284                 }
2285         }
2286         RETURN(count);
2287 }
2288
2289 static int lov_statfs_interpret(struct ptlrpc_request_set *rqset,
2290                                 void *data, int rc)
2291 {
2292         struct lov_request_set *lovset = (struct lov_request_set *)data;
2293         int err;
2294         ENTRY;
2295
2296         if (rc)
2297                 lovset->set_completes = 0;
2298
2299         err = lov_fini_statfs_set(lovset);
2300         RETURN(rc ? rc : err);
2301 }
2302
2303 static int lov_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
2304                             __u64 max_age, struct ptlrpc_request_set *rqset)
2305 {
2306         struct lov_request_set *set;
2307         struct lov_request *req;
2308         struct list_head *pos;
2309         struct lov_obd *lov;
2310         int rc = 0;
2311         ENTRY;
2312
2313         LASSERT(oinfo != NULL);
2314         LASSERT(oinfo->oi_osfs != NULL);
2315
2316         lov = &obd->u.lov;
2317         rc = lov_prep_statfs_set(obd, oinfo, &set);
2318         if (rc)
2319                 RETURN(rc);
2320
2321         list_for_each (pos, &set->set_list) {
2322                 struct obd_device *osc_obd;
2323
2324                 req = list_entry(pos, struct lov_request, rq_link);
2325
2326                 osc_obd = class_exp2obd(lov->lov_tgts[req->rq_idx]->ltd_exp);
2327                 rc = obd_statfs_async(osc_obd, &req->rq_oi, max_age, rqset);
2328                 if (rc)
2329                         break;
2330         }
2331
2332         if (rc || list_empty(&rqset->set_requests)) {
2333                 int err;
2334                 if (rc)
2335                         set->set_completes = 0;
2336                 err = lov_fini_statfs_set(set);
2337                 RETURN(rc ? rc : err);
2338         }
2339
2340         LASSERT(rqset->set_interpret == NULL);
2341         rqset->set_interpret = lov_statfs_interpret;
2342         rqset->set_arg = (void *)set;
2343         RETURN(0);
2344 }
2345
2346 static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2347                       __u64 max_age, __u32 flags)
2348 {
2349         struct ptlrpc_request_set *set = NULL;
2350         struct obd_info oinfo = { { { 0 } } };
2351         int rc = 0;
2352         ENTRY;
2353
2354
2355         /* for obdclass we forbid using obd_statfs_rqset, but prefer using async
2356          * statfs requests */
2357         set = ptlrpc_prep_set();
2358         if (set == NULL)
2359                 RETURN(-ENOMEM);
2360
2361         oinfo.oi_osfs = osfs;
2362         oinfo.oi_flags = flags;
2363         rc = lov_statfs_async(obd, &oinfo, max_age, set);
2364         if (rc == 0)
2365                 rc = ptlrpc_set_wait(set);
2366         ptlrpc_set_destroy(set);
2367
2368         RETURN(rc);
2369 }
2370
2371 static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2372                          void *karg, void *uarg)
2373 {
2374         struct obd_device *obddev = class_exp2obd(exp);
2375         struct lov_obd *lov = &obddev->u.lov;
2376         int i, rc, count = lov->desc.ld_tgt_count;
2377         struct obd_uuid *uuidp;
2378         ENTRY;
2379
2380         switch (cmd) {
2381         case IOC_OBD_STATFS: {
2382                 struct obd_ioctl_data *data = karg;
2383                 struct obd_device *osc_obd;
2384                 struct obd_statfs stat_buf = {0};
2385                 __u32 index;
2386
2387                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
2388                 LASSERT(data->ioc_plen1 == sizeof(struct obd_statfs));
2389
2390                 if ((index >= count))
2391                         RETURN(-ENODEV);
2392
2393                 if (!lov->lov_tgts[index])
2394                         /* Try again with the next index */
2395                         RETURN(-EAGAIN);
2396                 if (!lov->lov_tgts[index]->ltd_active)
2397                         RETURN(-ENODATA);
2398
2399                 osc_obd = class_exp2obd(lov->lov_tgts[index]->ltd_exp);
2400                 if (!osc_obd)
2401                         RETURN(-EINVAL);
2402
2403                 /* got statfs data */
2404                 rc = obd_statfs(osc_obd, &stat_buf,
2405                                 cfs_time_current_64() - HZ, 0);
2406                 if (rc)
2407                         RETURN(rc);
2408                 if (copy_to_user(data->ioc_pbuf1, &stat_buf, data->ioc_plen1))
2409                         RETURN(rc);
2410                 /* copy UUID */
2411                 rc = copy_to_user(data->ioc_pbuf2, obd2cli_tgt(osc_obd),
2412                                   data->ioc_plen2);
2413                 break;
2414         }
2415         case OBD_IOC_LOV_GET_CONFIG: {
2416                 struct obd_ioctl_data *data;
2417                 struct lov_desc *desc;
2418                 char *buf = NULL;
2419                 __u32 *genp;
2420
2421                 len = 0;
2422                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2423                         RETURN(-EINVAL);
2424
2425                 data = (struct obd_ioctl_data *)buf;
2426
2427                 if (sizeof(*desc) > data->ioc_inllen1) {
2428                         obd_ioctl_freedata(buf, len);
2429                         RETURN(-EINVAL);
2430                 }
2431
2432                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
2433                         obd_ioctl_freedata(buf, len);
2434                         RETURN(-EINVAL);
2435                 }
2436
2437                 if (sizeof(__u32) * count > data->ioc_inllen3) {
2438                         obd_ioctl_freedata(buf, len);
2439                         RETURN(-EINVAL);
2440                 }
2441
2442                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2443                 memcpy(desc, &(lov->desc), sizeof(*desc));
2444
2445                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
2446                 genp = (__u32 *)data->ioc_inlbuf3;
2447                 /* the uuid will be empty for deleted OSTs */
2448                 for (i = 0; i < count; i++, uuidp++, genp++) {
2449                         if (!lov->lov_tgts[i])
2450                                 continue;
2451                         *uuidp = lov->lov_tgts[i]->ltd_uuid;
2452                         *genp = lov->lov_tgts[i]->ltd_gen;
2453                 }
2454
2455                 rc = copy_to_user((void *)uarg, buf, len);
2456                 if (rc)
2457                         rc = -EFAULT;
2458                 obd_ioctl_freedata(buf, len);
2459                 break;
2460         }
2461         case LL_IOC_LOV_SETSTRIPE:
2462                 rc = lov_setstripe(exp, karg, uarg);
2463                 break;
2464         case LL_IOC_LOV_GETSTRIPE:
2465                 rc = lov_getstripe(exp, karg, uarg);
2466                 break;
2467         case LL_IOC_LOV_SETEA:
2468                 rc = lov_setea(exp, karg, uarg);
2469                 break;
2470         default: {
2471                 int set = 0;
2472
2473                 if (count == 0)
2474                         RETURN(-ENOTTY);
2475
2476                 rc = 0;
2477                 for (i = 0; i < count; i++) {
2478                         int err;
2479
2480                         /* OST was disconnected */
2481                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
2482                                 continue;
2483
2484                         err = obd_iocontrol(cmd, lov->lov_tgts[i]->ltd_exp,
2485                                             len, karg, uarg);
2486                         if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
2487                                 RETURN(err);
2488                         } else if (err) {
2489                                 if (lov->lov_tgts[i]->ltd_active) {
2490                                         CDEBUG(err == -ENOTTY ?
2491                                                D_IOCTL : D_WARNING,
2492                                                "iocontrol OSC %s on OST "
2493                                                "idx %d cmd %x: err = %d\n",
2494                                                lov_uuid2str(lov, i),
2495                                                i, cmd, err);
2496                                         if (!rc)
2497                                                 rc = err;
2498                                 }
2499                         } else {
2500                                 set = 1;
2501                         }
2502                 }
2503                 if (!set && !rc)
2504                         rc = -EIO;
2505         }
2506         }
2507
2508         RETURN(rc);
2509 }
2510
2511 #define FIEMAP_BUFFER_SIZE 4096
2512
2513 /**
2514  * Non-zero fe_logical indicates that this is a continuation FIEMAP
2515  * call. The local end offset and the device are sent in the first
2516  * fm_extent. This function calculates the stripe number from the index.
2517  * This function returns a stripe_no on which mapping is to be restarted.
2518  *
2519  * This function returns fm_end_offset which is the in-OST offset at which
2520  * mapping should be restarted. If fm_end_offset=0 is returned then caller
2521  * will re-calculate proper offset in next stripe.
2522  * Note that the first extent is passed to lov_get_info via the value field.
2523  *
2524  * \param fiemap fiemap request header
2525  * \param lsm striping information for the file
2526  * \param fm_start logical start of mapping
2527  * \param fm_end logical end of mapping
2528  * \param start_stripe starting stripe will be returned in this
2529  */
2530 obd_size fiemap_calc_fm_end_offset(struct ll_user_fiemap *fiemap,
2531                                    struct lov_stripe_md *lsm, obd_size fm_start,
2532                                    obd_size fm_end, int *start_stripe)
2533 {
2534         obd_size local_end = fiemap->fm_extents[0].fe_logical;
2535         obd_off lun_start, lun_end;
2536         obd_size fm_end_offset;
2537         int stripe_no = -1, i;
2538
2539         if (fiemap->fm_extent_count == 0 ||
2540             fiemap->fm_extents[0].fe_logical == 0)
2541                 return 0;
2542
2543         /* Find out stripe_no from ost_index saved in the fe_device */
2544         for (i = 0; i < lsm->lsm_stripe_count; i++) {
2545                 if (lsm->lsm_oinfo[i]->loi_ost_idx ==
2546                                         fiemap->fm_extents[0].fe_device) {
2547                         stripe_no = i;
2548                         break;
2549                 }
2550         }
2551
2552         /* If we have finished mapping on previous device, shift logical
2553          * offset to start of next device */
2554         if ((lov_stripe_intersects(lsm, stripe_no, fm_start, fm_end,
2555                                    &lun_start, &lun_end)) != 0 &&
2556                                    local_end < lun_end) {
2557                 fm_end_offset = local_end;
2558                 *start_stripe = stripe_no;
2559         } else {
2560                 /* This is a special value to indicate that caller should
2561                  * calculate offset in next stripe. */
2562                 fm_end_offset = 0;
2563                 *start_stripe = (stripe_no + 1) % lsm->lsm_stripe_count;
2564         }
2565
2566         return fm_end_offset;
2567 }
2568
2569 /**
2570  * We calculate on which OST the mapping will end. If the length of mapping
2571  * is greater than (stripe_size * stripe_count) then the last_stripe will
2572  * will be one just before start_stripe. Else we check if the mapping
2573  * intersects each OST and find last_stripe.
2574  * This function returns the last_stripe and also sets the stripe_count
2575  * over which the mapping is spread
2576  *
2577  * \param lsm striping information for the file
2578  * \param fm_start logical start of mapping
2579  * \param fm_end logical end of mapping
2580  * \param start_stripe starting stripe of the mapping
2581  * \param stripe_count the number of stripes across which to map is returned
2582  *
2583  * \retval last_stripe return the last stripe of the mapping
2584  */
2585 int fiemap_calc_last_stripe(struct lov_stripe_md *lsm, obd_size fm_start,
2586                             obd_size fm_end, int start_stripe,
2587                             int *stripe_count)
2588 {
2589         int last_stripe;
2590         obd_off obd_start, obd_end;
2591         int i, j;
2592
2593         if (fm_end - fm_start > lsm->lsm_stripe_size * lsm->lsm_stripe_count) {
2594                 last_stripe = (start_stripe < 1 ? lsm->lsm_stripe_count - 1 :
2595                                                               start_stripe - 1);
2596                 *stripe_count = lsm->lsm_stripe_count;
2597         } else {
2598                 for (j = 0, i = start_stripe; j < lsm->lsm_stripe_count;
2599                      i = (i + 1) % lsm->lsm_stripe_count, j++) {
2600                         if ((lov_stripe_intersects(lsm, i, fm_start, fm_end,
2601                                                    &obd_start, &obd_end)) == 0)
2602                                 break;
2603                 }
2604                 *stripe_count = j;
2605                 last_stripe = (start_stripe + j - 1) %lsm->lsm_stripe_count;
2606         }
2607
2608         return last_stripe;
2609 }
2610
2611 /**
2612  * Set fe_device and copy extents from local buffer into main return buffer.
2613  *
2614  * \param fiemap fiemap request header
2615  * \param lcl_fm_ext array of local fiemap extents to be copied
2616  * \param ost_index OST index to be written into the fm_device field for each
2617                     extent
2618  * \param ext_count number of extents to be copied
2619  * \param current_extent where to start copying in main extent array
2620  */
2621 void fiemap_prepare_and_copy_exts(struct ll_user_fiemap *fiemap,
2622                                   struct ll_fiemap_extent *lcl_fm_ext,
2623                                   int ost_index, unsigned int ext_count,
2624                                   int current_extent)
2625 {
2626         char *to;
2627         int ext;
2628
2629         for (ext = 0; ext < ext_count; ext++) {
2630                 lcl_fm_ext[ext].fe_device = ost_index;
2631                 lcl_fm_ext[ext].fe_flags |= FIEMAP_EXTENT_NET;
2632         }
2633
2634         /* Copy fm_extent's from fm_local to return buffer */
2635         to = (char *)fiemap + fiemap_count_to_size(current_extent);
2636         memcpy(to, lcl_fm_ext, ext_count * sizeof(struct ll_fiemap_extent));
2637 }
2638
2639 /**
2640  * Break down the FIEMAP request and send appropriate calls to individual OSTs.
2641  * This also handles the restarting of FIEMAP calls in case mapping overflows
2642  * the available number of extents in single call.
2643  */
2644 static int lov_fiemap(struct lov_obd *lov, __u32 keylen, void *key,
2645                       __u32 *vallen, void *val, struct lov_stripe_md *lsm)
2646 {
2647         struct ll_fiemap_info_key *fm_key = key;
2648         struct ll_user_fiemap *fiemap = val;
2649         struct ll_user_fiemap *fm_local = NULL;
2650         struct ll_fiemap_extent *lcl_fm_ext;
2651         int count_local;
2652         unsigned int get_num_extents = 0;
2653         int ost_index = 0, actual_start_stripe, start_stripe;
2654         obd_size fm_start, fm_end, fm_length, fm_end_offset = 0;
2655         obd_size curr_loc;
2656         int current_extent = 0, rc = 0, i;
2657         int ost_eof = 0; /* EOF for object */
2658         int ost_done = 0; /* done with required mapping for this OST? */
2659         int last_stripe;
2660         int cur_stripe = 0, cur_stripe_wrap = 0, stripe_count;
2661         unsigned int buffer_size = FIEMAP_BUFFER_SIZE;
2662
2663         if (lsm == NULL)
2664                 GOTO(out, rc = 0);
2665
2666         if (fiemap_count_to_size(fm_key->fiemap.fm_extent_count) < buffer_size)
2667                 buffer_size = fiemap_count_to_size(fm_key->fiemap.fm_extent_count);
2668
2669         OBD_ALLOC(fm_local, buffer_size);
2670         if (fm_local == NULL)
2671                 GOTO(out, rc = -ENOMEM);
2672         lcl_fm_ext = &fm_local->fm_extents[0];
2673
2674         count_local = fiemap_size_to_count(buffer_size);
2675
2676         memcpy(fiemap, &fm_key->fiemap, sizeof(*fiemap));
2677         fm_start = fiemap->fm_start;
2678         fm_length = fiemap->fm_length;
2679         /* Calculate start stripe, last stripe and length of mapping */
2680         actual_start_stripe = start_stripe = lov_stripe_number(lsm, fm_start);
2681         fm_end = (fm_length == ~0ULL ? fm_key->oa.o_size :
2682                                                 fm_start + fm_length - 1);
2683         /* If fm_length != ~0ULL but fm_start+fm_length-1 exceeds file size */
2684         if (fm_end > fm_key->oa.o_size)
2685                 fm_end = fm_key->oa.o_size;
2686
2687         last_stripe = fiemap_calc_last_stripe(lsm, fm_start, fm_end,
2688                                             actual_start_stripe, &stripe_count);
2689
2690         fm_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, fm_start, fm_end,
2691                                                   &start_stripe);
2692
2693         if (fiemap->fm_extent_count == 0) {
2694                 get_num_extents = 1;
2695                 count_local = 0;
2696         }
2697
2698         /* Check each stripe */
2699         for (cur_stripe = start_stripe, i = 0; i < stripe_count;
2700              i++, cur_stripe = (cur_stripe + 1) % lsm->lsm_stripe_count) {
2701                 obd_size req_fm_len; /* Stores length of required mapping */
2702                 obd_size len_mapped_single_call;
2703                 obd_off lun_start, lun_end, obd_object_end;
2704                 unsigned int ext_count;
2705
2706                 cur_stripe_wrap = cur_stripe;
2707
2708                 /* Find out range of mapping on this stripe */
2709                 if ((lov_stripe_intersects(lsm, cur_stripe, fm_start, fm_end,
2710                                            &lun_start, &obd_object_end)) == 0)
2711                         continue;
2712
2713                 /* If this is a continuation FIEMAP call and we are on
2714                  * starting stripe then lun_start needs to be set to
2715                  * fm_end_offset */
2716                 if (fm_end_offset != 0 && cur_stripe == start_stripe)
2717                         lun_start = fm_end_offset;
2718
2719                 if (fm_length != ~0ULL) {
2720                         /* Handle fm_start + fm_length overflow */
2721                         if (fm_start + fm_length < fm_start)
2722                                 fm_length = ~0ULL - fm_start;
2723                         lun_end = lov_size_to_stripe(lsm, fm_start + fm_length,
2724                                                      cur_stripe);
2725                 } else {
2726                         lun_end = ~0ULL;
2727                 }
2728
2729                 if (lun_start == lun_end)
2730                         continue;
2731
2732                 req_fm_len = obd_object_end - lun_start;
2733                 fm_local->fm_length = 0;
2734                 len_mapped_single_call = 0;
2735
2736                 /* If the output buffer is very large and the objects have many
2737                  * extents we may need to loop on a single OST repeatedly */
2738                 ost_eof = 0;
2739                 ost_done = 0;
2740                 do {
2741                         if (get_num_extents == 0) {
2742                                 /* Don't get too many extents. */
2743                                 if (current_extent + count_local >
2744                                     fiemap->fm_extent_count)
2745                                         count_local = fiemap->fm_extent_count -
2746                                                                  current_extent;
2747                         }
2748
2749                         lun_start += len_mapped_single_call;
2750                         fm_local->fm_length = req_fm_len - len_mapped_single_call;
2751                         req_fm_len = fm_local->fm_length;
2752                         fm_local->fm_extent_count = count_local;
2753                         fm_local->fm_mapped_extents = 0;
2754                         fm_local->fm_flags = fiemap->fm_flags;
2755
2756                         fm_key->oa.o_id = lsm->lsm_oinfo[cur_stripe]->loi_id;
2757                         ost_index = lsm->lsm_oinfo[cur_stripe]->loi_ost_idx;
2758
2759                         if (ost_index < 0 || ost_index >=lov->desc.ld_tgt_count)
2760                                 GOTO(out, rc = -EINVAL);
2761
2762                         /* If OST is inactive, return extent with UNKNOWN flag */
2763                         if (lov && !lov->lov_tgts[ost_index]->ltd_active) {
2764                                 fm_local->fm_flags |= FIEMAP_EXTENT_LAST;
2765                                 fm_local->fm_mapped_extents = 1;
2766
2767                                 lcl_fm_ext[0].fe_logical = lun_start;
2768                                 lcl_fm_ext[0].fe_length = obd_object_end -
2769                                                                       lun_start;
2770                                 lcl_fm_ext[0].fe_flags |= FIEMAP_EXTENT_UNKNOWN;
2771
2772                                 goto inactive_tgt;
2773                         }
2774
2775                         fm_local->fm_start = lun_start;
2776                         fm_local->fm_flags &= ~FIEMAP_FLAG_DEVICE_ORDER;
2777                         memcpy(&fm_key->fiemap, fm_local, sizeof(*fm_local));
2778                         *vallen=fiemap_count_to_size(fm_local->fm_extent_count);
2779                         rc = obd_get_info(lov->lov_tgts[ost_index]->ltd_exp,
2780                                           keylen, key, vallen, fm_local, lsm);
2781                         if (rc != 0)
2782                                 GOTO(out, rc);
2783
2784 inactive_tgt:
2785                         ext_count = fm_local->fm_mapped_extents;
2786                         if (ext_count == 0) {
2787                                 ost_done = 1;
2788                                 /* If last stripe has hole at the end,
2789                                  * then we need to return */
2790                                 if (cur_stripe_wrap == last_stripe) {
2791                                         fiemap->fm_mapped_extents = 0;
2792                                         goto finish;
2793                                 }
2794                                 break;
2795                         }
2796
2797                         /* If we just need num of extents then go to next device */
2798                         if (get_num_extents) {
2799                                 current_extent += ext_count;
2800                                 break;
2801                         }
2802
2803                         len_mapped_single_call = lcl_fm_ext[ext_count-1].fe_logical -
2804                                   lun_start + lcl_fm_ext[ext_count - 1].fe_length;
2805
2806                         /* Have we finished mapping on this device? */
2807                         if (req_fm_len <= len_mapped_single_call)
2808                                 ost_done = 1;
2809
2810                         /* Clear the EXTENT_LAST flag which can be present on
2811                          * last extent */
2812                         if (lcl_fm_ext[ext_count-1].fe_flags & FIEMAP_EXTENT_LAST)
2813                                 lcl_fm_ext[ext_count - 1].fe_flags &=
2814                                                             ~FIEMAP_EXTENT_LAST;
2815
2816                         curr_loc = lov_stripe_size(lsm,
2817                                            lcl_fm_ext[ext_count - 1].fe_logical+
2818                                            lcl_fm_ext[ext_count - 1].fe_length,
2819                                            cur_stripe);
2820                         if (curr_loc >= fm_key->oa.o_size)
2821                                 ost_eof = 1;
2822
2823                         fiemap_prepare_and_copy_exts(fiemap, lcl_fm_ext,
2824                                                      ost_index, ext_count,
2825                                                      current_extent);
2826
2827                         current_extent += ext_count;
2828
2829                         /* Ran out of available extents? */
2830                         if (current_extent >= fiemap->fm_extent_count)
2831                                 goto finish;
2832                 } while (ost_done == 0 && ost_eof == 0);
2833
2834                 if (cur_stripe_wrap == last_stripe)
2835                         goto finish;
2836         }
2837
2838 finish:
2839         /* Indicate that we are returning device offsets unless file just has
2840          * single stripe */
2841         if (lsm->lsm_stripe_count > 1)
2842                 fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER;
2843
2844         if (get_num_extents)
2845                 goto skip_last_device_calc;
2846
2847         /* Check if we have reached the last stripe and whether mapping for that
2848          * stripe is done. */
2849         if (cur_stripe_wrap == last_stripe) {
2850                 if (ost_done || ost_eof)
2851                         fiemap->fm_extents[current_extent - 1].fe_flags |=
2852                                                              FIEMAP_EXTENT_LAST;
2853         }
2854
2855 skip_last_device_calc:
2856         fiemap->fm_mapped_extents = current_extent;
2857
2858 out:
2859         OBD_FREE(fm_local, buffer_size);
2860         return rc;
2861 }
2862
2863 static int lov_get_info(struct obd_export *exp, __u32 keylen,
2864                         void *key, __u32 *vallen, void *val,
2865                         struct lov_stripe_md *lsm)
2866 {
2867         struct obd_device *obddev = class_exp2obd(exp);
2868         struct lov_obd *lov = &obddev->u.lov;
2869         int i, rc;
2870         ENTRY;
2871
2872         if (!vallen || !val)
2873                 RETURN(-EFAULT);
2874
2875         lov_getref(obddev);
2876
2877         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2878                 struct {
2879                         char name[16];
2880                         struct ldlm_lock *lock;
2881                 } *data = key;
2882                 struct ldlm_res_id *res_id = &data->lock->l_resource->lr_name;
2883                 struct lov_oinfo *loi;
2884                 __u32 *stripe = val;
2885
2886                 if (*vallen < sizeof(*stripe))
2887                         GOTO(out, rc = -EFAULT);
2888                 *vallen = sizeof(*stripe);
2889
2890                 /* XXX This is another one of those bits that will need to
2891                  * change if we ever actually support nested LOVs.  It uses
2892                  * the lock's export to find out which stripe it is. */
2893                 /* XXX - it's assumed all the locks for deleted OSTs have
2894                  * been cancelled. Also, the export for deleted OSTs will
2895                  * be NULL and won't match the lock's export. */
2896                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2897                         loi = lsm->lsm_oinfo[i];
2898                         if (!lov->lov_tgts[loi->loi_ost_idx])
2899                                 continue;
2900                         if (lov->lov_tgts[loi->loi_ost_idx]->ltd_exp ==
2901                             data->lock->l_conn_export &&
2902                             osc_res_name_eq(loi->loi_id, loi->loi_gr, res_id)) {
2903                                 *stripe = i;
2904                                 GOTO(out, rc = 0);
2905                         }
2906                 }
2907                 LDLM_ERROR(data->lock, "lock on inode without such object");
2908                 dump_lsm(D_ERROR, lsm);
2909                 GOTO(out, rc = -ENXIO);
2910         } else if (KEY_IS(KEY_LAST_ID)) {
2911                 struct obd_id_info *info = val;
2912                 __u32 size = sizeof(obd_id);
2913                 struct lov_tgt_desc *tgt;
2914
2915                 LASSERT(*vallen == sizeof(struct obd_id_info));
2916                 tgt = lov->lov_tgts[info->idx];
2917
2918                 if (!tgt || !tgt->ltd_active)
2919                         GOTO(out, rc = -ESRCH);
2920
2921                 rc = obd_get_info(tgt->ltd_exp, keylen, key, &size, info->data, NULL);
2922                 GOTO(out, rc = 0);
2923         } else if (KEY_IS(KEY_LOVDESC)) {
2924                 struct lov_desc *desc_ret = val;
2925                 *desc_ret = lov->desc;
2926
2927                 GOTO(out, rc = 0);
2928         } else if (KEY_IS(KEY_LOV_IDX)) {
2929                 struct lov_tgt_desc *tgt;
2930
2931                 for(i = 0; i < lov->desc.ld_tgt_count; i++) {
2932                         tgt = lov->lov_tgts[i];
2933                         if (tgt && obd_uuid_equals(val, &tgt->ltd_uuid))
2934                                 GOTO(out, rc = i);
2935                 }
2936         } else if (KEY_IS(KEY_FIEMAP)) {
2937                 rc = lov_fiemap(lov, keylen, key, vallen, val, lsm);
2938                 GOTO(out, rc);
2939         }
2940
2941         rc = -EINVAL;
2942
2943 out:
2944         lov_putref(obddev);
2945         RETURN(rc);
2946 }
2947
2948 static int lov_set_info_async(struct obd_export *exp, obd_count keylen,
2949                               void *key, obd_count vallen, void *val,
2950                               struct ptlrpc_request_set *set)
2951 {
2952         struct obd_device *obddev = class_exp2obd(exp);
2953         struct lov_obd *lov = &obddev->u.lov;
2954         obd_count count;
2955         int i, rc = 0, err;
2956         struct lov_tgt_desc *tgt;
2957         unsigned incr, check_uuid,
2958                  do_inactive, no_set;
2959         unsigned next_id = 0,  mds_con = 0;
2960         ENTRY;
2961
2962         incr = check_uuid = do_inactive = no_set = 0;
2963         if (set == NULL) {
2964                 no_set = 1;
2965                 set = ptlrpc_prep_set();
2966                 if (!set)
2967                         RETURN(-ENOMEM);
2968         }
2969
2970         lov_getref(obddev);
2971         count = lov->desc.ld_tgt_count;
2972
2973         if (KEY_IS(KEY_NEXT_ID)) {
2974                 count = vallen / sizeof(struct obd_id_info);
2975                 vallen = sizeof(obd_id);
2976                 incr = sizeof(struct obd_id_info);
2977                 do_inactive = 1;
2978                 next_id = 1;
2979         } else if (KEY_IS(KEY_CHECKSUM)) {
2980                 do_inactive = 1;
2981         } else if (KEY_IS(KEY_UNLINKED)) {
2982                 check_uuid = val ? 1 : 0;
2983         } else if (KEY_IS(KEY_EVICT_BY_NID)) {
2984                 /* use defaults:  do_inactive = incr = 0; */
2985         } else if (KEY_IS(KEY_MDS_CONN)) {
2986                 mds_con = 1;
2987         }
2988
2989         for (i = 0; i < count; i++, val = (char *)val + incr) {
2990                 if (next_id) {
2991                         tgt = lov->lov_tgts[((struct obd_id_info*)val)->idx];
2992                 } else {
2993                         tgt = lov->lov_tgts[i];
2994                 }
2995                 /* OST was disconnected */
2996                 if (!tgt || !tgt->ltd_exp)
2997                         continue;
2998
2999                 /* OST is inactive and we don't want inactive OSCs */
3000                 if (!tgt->ltd_active && !do_inactive)
3001                         continue;
3002
3003                 if (mds_con) {
3004                         struct mds_group_info *mgi;
3005
3006                         LASSERT(vallen == sizeof(*mgi));
3007                         mgi = (struct mds_group_info *)val;
3008
3009                         /* Only want a specific OSC */
3010                         if (mgi->uuid && !obd_uuid_equals(mgi->uuid,
3011                                                 &tgt->ltd_uuid))
3012                                 continue;
3013
3014                         err = obd_set_info_async(tgt->ltd_exp,
3015                                          keylen, key, sizeof(int),
3016                                          &mgi->group, set);
3017                 } else if (next_id) {
3018                         err = obd_set_info_async(tgt->ltd_exp,
3019                                          keylen, key, vallen,
3020                                          ((struct obd_id_info*)val)->data, set);
3021                 } else  {
3022                         /* Only want a specific OSC */
3023                         if (check_uuid &&
3024                             !obd_uuid_equals(val, &tgt->ltd_uuid))
3025                                 continue;
3026
3027                         err = obd_set_info_async(tgt->ltd_exp,
3028                                          keylen, key, vallen, val, set);
3029                 }
3030
3031                 if (!rc)
3032                         rc = err;
3033         }
3034
3035         lov_putref(obddev);
3036         if (no_set) {
3037                 err = ptlrpc_set_wait(set);
3038                 if (!rc)
3039                         rc = err;
3040                 ptlrpc_set_destroy(set);
3041         }
3042         RETURN(rc);
3043 }
3044
3045 static int lov_checkmd(struct obd_export *exp, struct obd_export *md_exp,
3046                        struct lov_stripe_md *lsm)
3047 {
3048         int rc;
3049         ENTRY;
3050
3051         if (!lsm)
3052                 RETURN(0);
3053         LASSERT(md_exp);
3054         LASSERT(lsm_op_find(lsm->lsm_magic) != NULL);
3055         rc = lsm_op_find(lsm->lsm_magic)->lsm_revalidate(lsm, md_exp->exp_obd);
3056
3057         RETURN(rc);
3058 }
3059
3060 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm)
3061 {
3062         int i, rc = 0;
3063         ENTRY;
3064
3065         for (i = 0; i < lsm->lsm_stripe_count; i++) {
3066                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
3067                 if (loi->loi_ar.ar_rc && !rc)
3068                         rc = loi->loi_ar.ar_rc;
3069                 loi->loi_ar.ar_rc = 0;
3070         }
3071         RETURN(rc);
3072 }
3073 EXPORT_SYMBOL(lov_test_and_clear_async_rc);
3074
3075
3076 static int lov_extent_calc(struct obd_export *exp, struct lov_stripe_md *lsm,
3077                            int cmd, __u64 *offset)
3078 {
3079         __u32 ssize = lsm->lsm_stripe_size;
3080         __u64 start;
3081
3082         start = *offset;
3083         do_div(start, ssize);
3084         start = start * ssize;
3085
3086         CDEBUG(D_DLMTRACE, "offset "LPU64", stripe %u, start "LPU64
3087                            ", end "LPU64"\n", *offset, ssize, start,
3088                            start + ssize - 1);
3089         if (cmd == OBD_CALC_STRIPE_END) {
3090                 *offset = start + ssize - 1;
3091         } else if (cmd == OBD_CALC_STRIPE_START) {
3092                 *offset = start;
3093         } else {
3094                 LBUG();
3095         }
3096
3097         RETURN(0);
3098 }
3099
3100
3101 #if 0
3102 struct lov_multi_wait {
3103         struct ldlm_lock *lock;
3104         wait_queue_t      wait;
3105         int               completed;
3106         int               generation;
3107 };
3108
3109 int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm,
3110                       struct lustre_handle *lockh)
3111 {
3112         struct lov_lock_handles *lov_lockh = NULL;
3113         struct lustre_handle *lov_lockhp;
3114         struct lov_obd *lov;
3115         struct lov_oinfo *loi;
3116         struct lov_multi_wait *queues;
3117         int rc = 0, i;
3118         ENTRY;
3119
3120         ASSERT_LSM_MAGIC(lsm);
3121
3122         if (!exp || !exp->exp_obd)
3123                 RETURN(-ENODEV);
3124
3125         LASSERT(lockh != NULL);
3126         if (lsm->lsm_stripe_count > 1) {
3127                 lov_lockh = lov_handle2llh(lockh);
3128                 if (lov_lockh == NULL) {
3129                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
3130                         RETURN(-EINVAL);
3131                 }
3132
3133                 lov_lockhp = lov_lockh->llh_handles;
3134         } else {
3135                 lov_lockhp = lockh;
3136         }
3137
3138         OBD_ALLOC(queues, lsm->lsm_stripe_count * sizeof(*queues));
3139         if (queues == NULL)
3140                 GOTO(out, rc = -ENOMEM);
3141
3142         lov = &exp->exp_obd->u.lov;
3143         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
3144              i++, loi++, lov_lockhp++) {
3145                 struct ldlm_lock *lock;
3146                 struct obd_device *obd;
3147
3148                 lock = ldlm_handle2lock(lov_lockhp);
3149                 if (lock == NULL) {
3150                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
3151                                loi->loi_ost_idx, loi->loi_id);
3152                         queues[i].completed = 1;
3153                         continue;
3154                 }
3155
3156                 queues[i].lock = lock;
3157                 init_waitqueue_entry(&(queues[i].wait), current);
3158                 add_wait_queue(lock->l_waitq, &(queues[i].wait));
3159
3160                 obd = class_exp2obd(lock->l_conn_export);
3161                 if (obd != NULL)
3162                         imp = obd->u.cli.cl_import;
3163                 if (imp != NULL) {
3164                         spin_lock(&imp->imp_lock);
3165                         queues[i].generation = imp->imp_generation;
3166                         spin_unlock(&imp->imp_lock);
3167                 }
3168         }
3169
3170         lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
3171                                interrupted_completion_wait, &lwd);
3172         rc = l_wait_event_added(check_multi_complete(queues, lsm), &lwi);
3173
3174         for (i = 0; i < lsm->lsm_stripe_count; i++)
3175                 remove_wait_queue(lock->l_waitq, &(queues[i].wait));
3176
3177         if (rc == -EINTR || rc == -ETIMEDOUT) {
3178
3179
3180         }
3181
3182  out:
3183         if (lov_lockh != NULL)
3184                 lov_llh_put(lov_lockh);
3185         RETURN(rc);
3186 }
3187 #endif
3188
3189 void lov_stripe_lock(struct lov_stripe_md *md)
3190 {
3191         LASSERT(md->lsm_lock_owner != cfs_curproc_pid());
3192         spin_lock(&md->lsm_lock);
3193         LASSERT(md->lsm_lock_owner == 0);
3194         md->lsm_lock_owner = cfs_curproc_pid();
3195 }
3196 EXPORT_SYMBOL(lov_stripe_lock);
3197
3198 void lov_stripe_unlock(struct lov_stripe_md *md)
3199 {
3200         LASSERT(md->lsm_lock_owner == cfs_curproc_pid());
3201         md->lsm_lock_owner = 0;
3202         spin_unlock(&md->lsm_lock);
3203 }
3204 EXPORT_SYMBOL(lov_stripe_unlock);
3205
3206 /**
3207  * Checks if requested extent lock is compatible with a lock under the page.
3208  *
3209  * Checks if the lock under \a page is compatible with a read or write lock
3210  * (specified by \a rw) for an extent [\a start , \a end].
3211  *
3212  * \param exp lov export
3213  * \param lsm striping information for the file
3214  * \param res lov_async_page placeholder
3215  * \param rw OBD_BRW_READ if requested for reading,
3216  *           OBD_BRW_WRITE if requested for writing
3217  * \param start start of the requested extent
3218  * \param end end of the requested extent
3219  * \param cookie transparent parameter for passing locking context
3220  *
3221  * \post result == 1, *cookie == context, appropriate lock is referenced or
3222  * \post result == 0
3223  *
3224  * \retval 1 owned lock is reused for the request
3225  * \retval 0 no lock reused for the request
3226  *
3227  * \see lov_release_short_lock
3228  */
3229 static int lov_reget_short_lock(struct obd_export *exp,
3230                                 struct lov_stripe_md *lsm,
3231                                 void **res, int rw,
3232                                 obd_off start, obd_off end,
3233                                 void **cookie)
3234 {
3235         struct lov_async_page *l = *res;
3236         obd_off stripe_start, stripe_end = start;
3237
3238         ENTRY;
3239
3240         /* ensure we don't cross stripe boundaries */
3241         lov_extent_calc(exp, lsm, OBD_CALC_STRIPE_END, &stripe_end);
3242         if (stripe_end <= end)
3243                 RETURN(0);
3244
3245         /* map the region limits to the object limits */
3246         lov_stripe_offset(lsm, start, l->lap_stripe, &stripe_start);
3247         lov_stripe_offset(lsm, end, l->lap_stripe, &stripe_end);
3248
3249         RETURN(obd_reget_short_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
3250                                     lsm_oinfo[l->lap_stripe]->loi_ost_idx]->
3251                                     ltd_exp, NULL, &l->lap_sub_cookie,
3252                                     rw, stripe_start, stripe_end, cookie));
3253 }
3254
3255 /**
3256  * Releases a reference to a lock taken in a "fast" way.
3257  *
3258  * Releases a read or a write (specified by \a rw) lock
3259  * referenced by \a cookie.
3260  *
3261  * \param exp lov export
3262  * \param lsm striping information for the file
3263  * \param end end of the locked extent
3264  * \param rw OBD_BRW_READ if requested for reading,
3265  *           OBD_BRW_WRITE if requested for writing
3266  * \param cookie transparent parameter for passing locking context
3267  *
3268  * \post appropriate lock is dereferenced
3269  *
3270  * \see lov_reget_short_lock
3271  */
3272 static int lov_release_short_lock(struct obd_export *exp,
3273                                   struct lov_stripe_md *lsm, obd_off end,
3274                                   void *cookie, int rw)
3275 {
3276         int stripe;
3277
3278         ENTRY;
3279
3280         stripe = lov_stripe_number(lsm, end);
3281
3282         RETURN(obd_release_short_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
3283                                       lsm_oinfo[stripe]->loi_ost_idx]->
3284                                       ltd_exp, NULL, end, cookie, rw));
3285 }
3286
3287 struct obd_ops lov_obd_ops = {
3288         .o_owner               = THIS_MODULE,
3289         .o_setup               = lov_setup,
3290         .o_precleanup          = lov_precleanup,
3291         .o_cleanup             = lov_cleanup,
3292         .o_process_config      = lov_process_config,
3293         .o_connect             = lov_connect,
3294         .o_disconnect          = lov_disconnect,
3295         .o_statfs              = lov_statfs,
3296         .o_statfs_async        = lov_statfs_async,
3297         .o_packmd              = lov_packmd,
3298         .o_unpackmd            = lov_unpackmd,
3299         .o_checkmd             = lov_checkmd,
3300         .o_create              = lov_create,
3301         .o_destroy             = lov_destroy,
3302         .o_getattr             = lov_getattr,
3303         .o_getattr_async       = lov_getattr_async,
3304         .o_setattr             = lov_setattr,
3305         .o_setattr_async       = lov_setattr_async,
3306         .o_brw                 = lov_brw,
3307         .o_brw_async           = lov_brw_async,
3308         .o_prep_async_page     = lov_prep_async_page,
3309         .o_reget_short_lock    = lov_reget_short_lock,
3310         .o_release_short_lock  = lov_release_short_lock,
3311         .o_queue_async_io      = lov_queue_async_io,
3312         .o_set_async_flags     = lov_set_async_flags,
3313         .o_queue_group_io      = lov_queue_group_io,
3314         .o_trigger_group_io    = lov_trigger_group_io,
3315         .o_teardown_async_page = lov_teardown_async_page,
3316         .o_merge_lvb           = lov_merge_lvb,
3317         .o_adjust_kms          = lov_adjust_kms,
3318         .o_punch               = lov_punch,
3319         .o_sync                = lov_sync,
3320         .o_enqueue             = lov_enqueue,
3321         .o_match               = lov_match,
3322         .o_change_cbdata       = lov_change_cbdata,
3323         .o_cancel              = lov_cancel,
3324         .o_cancel_unused       = lov_cancel_unused,
3325         .o_join_lru            = lov_join_lru,
3326         .o_iocontrol           = lov_iocontrol,
3327         .o_get_info            = lov_get_info,
3328         .o_set_info_async      = lov_set_info_async,
3329         .o_extent_calc         = lov_extent_calc,
3330         .o_llog_init           = lov_llog_init,
3331         .o_llog_finish         = lov_llog_finish,
3332         .o_notify              = lov_notify,
3333         .o_register_page_removal_cb = lov_register_page_removal_cb,
3334         .o_unregister_page_removal_cb = lov_unregister_page_removal_cb,
3335         .o_register_lock_cancel_cb = lov_register_lock_cancel_cb,
3336         .o_unregister_lock_cancel_cb = lov_unregister_lock_cancel_cb,
3337 };
3338
3339 static quota_interface_t *quota_interface;
3340 extern quota_interface_t lov_quota_interface;
3341
3342 cfs_mem_cache_t *lov_oinfo_slab;
3343
3344 int __init lov_init(void)
3345 {
3346         struct lprocfs_static_vars lvars = { 0 };
3347         int rc, rc2;
3348         ENTRY;
3349
3350         lov_oinfo_slab = cfs_mem_cache_create("lov_oinfo",
3351                                               sizeof(struct lov_oinfo), 
3352                                               0, SLAB_HWCACHE_ALIGN);
3353         if (lov_oinfo_slab == NULL)
3354                 return -ENOMEM;
3355         lprocfs_lov_init_vars(&lvars);
3356
3357         request_module("lquota");
3358         quota_interface = PORTAL_SYMBOL_GET(lov_quota_interface);
3359         init_obd_quota_ops(quota_interface, &lov_obd_ops);
3360
3361         rc = class_register_type(&lov_obd_ops, NULL, lvars.module_vars,
3362                                  LUSTRE_LOV_NAME, NULL);
3363         if (rc) {
3364                 if (quota_interface)
3365                         PORTAL_SYMBOL_PUT(lov_quota_interface);
3366                 rc2 = cfs_mem_cache_destroy(lov_oinfo_slab);
3367                 LASSERT(rc2 == 0);
3368         }
3369
3370         RETURN(rc);
3371 }
3372
3373 #ifdef __KERNEL__
3374 static void /*__exit*/ lov_exit(void)
3375 {
3376         int rc;
3377         
3378         if (quota_interface)
3379                 PORTAL_SYMBOL_PUT(lov_quota_interface);
3380
3381         class_unregister_type(LUSTRE_LOV_NAME);
3382         rc = cfs_mem_cache_destroy(lov_oinfo_slab);
3383         LASSERT(rc == 0);
3384 }
3385
3386 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3387 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
3388 MODULE_LICENSE("GPL");
3389
3390 cfs_module(lov, LUSTRE_VERSION_STRING, lov_init, lov_exit);
3391 #endif