Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/lov/lov_obd.c
37  *
38  * Author: Phil Schwan <phil@clusterfs.com>
39  * Author: Peter Braam <braam@clusterfs.com>
40  * Author: Mike Shaver <shaver@clusterfs.com>
41  * Author: Nathan Rutman <nathan@clusterfs.com>
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_LOV
48 #ifdef __KERNEL__
49 #include <libcfs/libcfs.h>
50 #else
51 #include <liblustre.h>
52 #endif
53
54 #include <obd_support.h>
55 #include <lustre_lib.h>
56 #include <lustre_net.h>
57 #include <lustre/lustre_idl.h>
58 #include <lustre_dlm.h>
59 #include <lustre_mds.h>
60 #include <lustre_debug.h>
61 #include <obd_class.h>
62 #include <obd_lov.h>
63 #include <obd_ost.h>
64 #include <lprocfs_status.h>
65 #include <lustre_param.h>
66 #include <lustre_cache.h>
67
68 #include "lov_internal.h"
69
70
71 /* Keep a refcount of lov->tgt usage to prevent racing with addition/deletion.
72    Any function that expects lov_tgts to remain stationary must take a ref. */
73 void lov_getref(struct obd_device *obd)
74 {
75         struct lov_obd *lov = &obd->u.lov;
76
77         /* nobody gets through here until lov_putref is done */
78         mutex_down(&lov->lov_lock);
79         atomic_inc(&lov->lov_refcount);
80         mutex_up(&lov->lov_lock);
81         return;
82 }
83
84 static void __lov_del_obd(struct obd_device *obd, __u32 index);
85
86 void lov_putref(struct obd_device *obd)
87 {
88         struct lov_obd *lov = &obd->u.lov;
89         mutex_down(&lov->lov_lock);
90         /* ok to dec to 0 more than once -- ltd_exp's will be null */
91         if (atomic_dec_and_test(&lov->lov_refcount) && lov->lov_death_row) {
92                 int i;
93                 CDEBUG(D_CONFIG, "destroying %d lov targets\n",
94                        lov->lov_death_row);
95                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
96                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_reap)
97                                 continue;
98                         /* Disconnect and delete from list */
99                         __lov_del_obd(obd, i);
100                         lov->lov_death_row--;
101                 }
102         }
103         mutex_up(&lov->lov_lock);
104 }
105
106 static int lov_register_page_removal_cb(struct obd_export *exp,
107                                         obd_page_removal_cb_t func,
108                                         obd_pin_extent_cb pin_cb)
109 {
110         struct lov_obd *lov = &exp->exp_obd->u.lov;
111         int i, rc = 0;
112
113         if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
114                 return -EBUSY;
115
116         if (lov->lov_page_pin_cb && lov->lov_page_pin_cb != pin_cb)
117                 return -EBUSY;
118
119         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
120                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
121                         continue;
122                 rc |= obd_register_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
123                                                    func, pin_cb);
124         }
125
126         lov->lov_page_removal_cb = func;
127         lov->lov_page_pin_cb = pin_cb;
128
129         return rc;
130 }
131
132 static int lov_unregister_page_removal_cb(struct obd_export *exp,
133                                         obd_page_removal_cb_t func)
134 {
135         struct lov_obd *lov = &exp->exp_obd->u.lov;
136         int i, rc = 0;
137
138         if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
139                 return -EINVAL;
140
141         lov->lov_page_removal_cb = NULL;
142         lov->lov_page_pin_cb = NULL;
143
144         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
145                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
146                         continue;
147                 rc |= obd_unregister_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
148                                                      func);
149         }
150
151         return rc;
152 }
153
154 static int lov_register_lock_cancel_cb(struct obd_export *exp,
155                                          obd_lock_cancel_cb func)
156 {
157         struct lov_obd *lov = &exp->exp_obd->u.lov;
158         int i, rc = 0;
159
160         if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
161                 return -EBUSY;
162
163         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
164                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
165                         continue;
166                 rc |= obd_register_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
167                                                   func);
168         }
169
170         lov->lov_lock_cancel_cb = func;
171
172         return rc;
173 }
174
175 static int lov_unregister_lock_cancel_cb(struct obd_export *exp,
176                                          obd_lock_cancel_cb func)
177 {
178         struct lov_obd *lov = &exp->exp_obd->u.lov;
179         int i, rc = 0;
180
181         if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
182                 return -EINVAL;
183
184         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
185                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
186                         continue;
187                 rc |= obd_unregister_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
188                                                     func);
189         }
190         lov->lov_lock_cancel_cb = NULL;
191         return rc;
192 }
193
194 #define MAX_STRING_SIZE 128
195 static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
196                            struct obd_connect_data *data)
197 {
198         struct lov_obd *lov = &obd->u.lov;
199         struct obd_uuid tgt_uuid;
200         struct obd_device *tgt_obd;
201         struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
202         struct lustre_handle conn = {0, };
203         struct obd_import *imp;
204
205 #ifdef __KERNEL__
206         cfs_proc_dir_entry_t *lov_proc_dir;
207 #endif
208         int rc;
209         ENTRY;
210
211         if (!lov->lov_tgts[index])
212                 RETURN(-EINVAL);
213
214         tgt_uuid = lov->lov_tgts[index]->ltd_uuid;
215
216         tgt_obd = class_find_client_obd(&tgt_uuid, LUSTRE_OSC_NAME,
217                                         &obd->obd_uuid);
218
219         if (!tgt_obd) {
220                 CERROR("Target %s not attached\n", obd_uuid2str(&tgt_uuid));
221                 RETURN(-EINVAL);
222         }
223         if (!tgt_obd->obd_set_up) {
224                 CERROR("Target %s not set up\n", obd_uuid2str(&tgt_uuid));
225                 RETURN(-EINVAL);
226         }
227
228         if (data && (data->ocd_connect_flags & OBD_CONNECT_INDEX))
229                 data->ocd_index = index;
230
231         /*
232          * Divine LOV knows that OBDs under it are OSCs.
233          */
234         imp = tgt_obd->u.cli.cl_import;
235
236         if (activate) {
237                 tgt_obd->obd_no_recov = 0;
238                 /* FIXME this is probably supposed to be 
239                    ptlrpc_set_import_active.  Horrible naming. */
240                 ptlrpc_activate_import(imp);
241         }
242
243         if (imp->imp_invalid) {
244                 CERROR("not connecting OSC %s; administratively "
245                        "disabled\n", obd_uuid2str(&tgt_uuid));
246                 rc = obd_register_observer(tgt_obd, obd);
247                 if (rc) {
248                         CERROR("Target %s register_observer error %d; "
249                                "will not be able to reactivate\n",
250                                obd_uuid2str(&tgt_uuid), rc);
251                 }
252                 RETURN(0);
253         }
254
255         rc = obd_connect(NULL, &conn, tgt_obd, &lov_osc_uuid, data, NULL);
256         if (rc) {
257                 CERROR("Target %s connect error %d\n",
258                        obd_uuid2str(&tgt_uuid), rc);
259                 RETURN(rc);
260         }
261         lov->lov_tgts[index]->ltd_exp = class_conn2export(&conn);
262         if (!lov->lov_tgts[index]->ltd_exp) {
263                 CERROR("Target %s: null export!\n", obd_uuid2str(&tgt_uuid));
264                 RETURN(-ENODEV);
265         }
266
267         rc = obd_register_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
268                                           lov->lov_page_removal_cb,
269                                           lov->lov_page_pin_cb);
270         if (rc) {
271                 obd_disconnect(lov->lov_tgts[index]->ltd_exp);
272                 lov->lov_tgts[index]->ltd_exp = NULL;
273                 RETURN(rc);
274         }
275
276         rc = obd_register_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
277                                          lov->lov_lock_cancel_cb);
278         if (rc) {
279                 obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
280                                                lov->lov_page_removal_cb);
281                 obd_disconnect(lov->lov_tgts[index]->ltd_exp);
282                 lov->lov_tgts[index]->ltd_exp = NULL;
283                 RETURN(rc);
284         }
285
286         rc = obd_register_observer(tgt_obd, obd);
287         if (rc) {
288                 CERROR("Target %s register_observer error %d\n",
289                        obd_uuid2str(&tgt_uuid), rc);
290                 obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
291                                               lov->lov_lock_cancel_cb);
292                 obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
293                                                lov->lov_page_removal_cb);
294                 obd_disconnect(lov->lov_tgts[index]->ltd_exp);
295                 lov->lov_tgts[index]->ltd_exp = NULL;
296                 RETURN(rc);
297         }
298
299         lov->lov_tgts[index]->ltd_reap = 0;
300         if (activate) {
301                 lov->lov_tgts[index]->ltd_active = 1;
302                 lov->desc.ld_active_tgt_count++;
303                 lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 0;
304         }
305         CDEBUG(D_CONFIG, "Connected tgt idx %d %s (%s) %sactive\n", index,
306                obd_uuid2str(&tgt_uuid), tgt_obd->obd_name, activate ? "":"in");
307
308 #ifdef __KERNEL__
309         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
310         if (lov_proc_dir) {
311                 struct obd_device *osc_obd = class_conn2obd(&conn);
312                 cfs_proc_dir_entry_t *osc_symlink;
313                 char name[MAX_STRING_SIZE];
314
315                 LASSERT(osc_obd != NULL);
316                 LASSERT(osc_obd->obd_magic == OBD_DEVICE_MAGIC);
317                 LASSERT(osc_obd->obd_type->typ_name != NULL);
318                 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
319                          osc_obd->obd_type->typ_name,
320                          osc_obd->obd_name);
321                 osc_symlink = lprocfs_add_symlink(osc_obd->obd_name, lov_proc_dir,
322                                                   name);
323                 if (osc_symlink == NULL) {
324                         CERROR("could not register LOV target "
325                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
326                                obd->obd_type->typ_name, obd->obd_name,
327                                osc_obd->obd_name);
328                         lprocfs_remove(&lov_proc_dir);
329                 }
330         }
331 #endif
332
333         rc = qos_add_tgt(obd, index);
334         if (rc)
335                 CERROR("qos_add_tgt failed %d\n", rc);
336
337         RETURN(0);
338 }
339
340 static int lov_connect(const struct lu_env *env,
341                        struct lustre_handle *conn, struct obd_device *obd,
342                        struct obd_uuid *cluuid, struct obd_connect_data *data,
343                        void *localdata)
344 {
345         struct lov_obd *lov = &obd->u.lov;
346         struct lov_tgt_desc *tgt;
347         int i, rc;
348         ENTRY;
349
350         CDEBUG(D_CONFIG, "connect #%d\n", lov->lov_connects);
351
352         rc = class_connect(conn, obd, cluuid);
353         if (rc)
354                 RETURN(rc);
355
356         /* Why should there ever be more than 1 connect? */
357         lov->lov_connects++;
358         LASSERT(lov->lov_connects == 1);
359
360         memset(&lov->lov_ocd, 0, sizeof(lov->lov_ocd));
361         if (data)
362                 lov->lov_ocd = *data;
363
364         lov_getref(obd);
365         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
366                 tgt = lov->lov_tgts[i];
367                 if (!tgt || obd_uuid_empty(&tgt->ltd_uuid))
368                         continue;
369                 /* Flags will be lowest common denominator */
370                 rc = lov_connect_obd(obd, i, tgt->ltd_activate, &lov->lov_ocd);
371                 if (rc) {
372                         CERROR("%s: lov connect tgt %d failed: %d\n",
373                                obd->obd_name, i, rc);
374                         continue;
375                 }
376         }
377         lov_putref(obd);
378
379         RETURN(0);
380 }
381
382 static int lov_disconnect_obd(struct obd_device *obd, __u32 index)
383 {
384         cfs_proc_dir_entry_t *lov_proc_dir;
385         struct lov_obd *lov = &obd->u.lov;
386         struct obd_device *osc_obd;
387         int rc;
388
389         ENTRY;
390
391         if (lov->lov_tgts[index] == NULL)
392                 RETURN(-EINVAL);
393
394         osc_obd = class_exp2obd(lov->lov_tgts[index]->ltd_exp);
395         CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
396                obd->obd_name, osc_obd->obd_name);
397
398         obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
399                                       lov->lov_lock_cancel_cb);
400         obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
401                                        lov->lov_page_removal_cb);
402
403         if (lov->lov_tgts[index]->ltd_active) {
404                 lov->lov_tgts[index]->ltd_active = 0;
405                 lov->desc.ld_active_tgt_count--;
406                 lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 1;
407         }
408
409         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
410         if (lov_proc_dir) {
411                 cfs_proc_dir_entry_t *osc_symlink;
412
413                 osc_symlink = lprocfs_srch(lov_proc_dir, osc_obd->obd_name);
414                 if (osc_symlink) {
415                         lprocfs_remove(&osc_symlink);
416                 } else {
417                         CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing.",
418                                obd->obd_type->typ_name, obd->obd_name,
419                                osc_obd->obd_name);
420                 }
421         }
422
423         if (osc_obd) {
424                 /* Pass it on to our clients.
425                  * XXX This should be an argument to disconnect,
426                  * XXX not a back-door flag on the OBD.  Ah well.
427                  */
428                 osc_obd->obd_force = obd->obd_force;
429                 osc_obd->obd_fail = obd->obd_fail;
430                 osc_obd->obd_no_recov = obd->obd_no_recov;
431         }
432
433         obd_register_observer(osc_obd, NULL);
434
435         rc = obd_disconnect(lov->lov_tgts[index]->ltd_exp);
436         if (rc) {
437                 CERROR("Target %s disconnect error %d\n",
438                        lov_uuid2str(lov, index), rc);
439                 rc = 0;
440         }
441
442         qos_del_tgt(obd, index);
443
444         lov->lov_tgts[index]->ltd_exp = NULL;
445         RETURN(0);
446 }
447
448 static int lov_del_target(struct obd_device *obd, __u32 index,
449                           struct obd_uuid *uuidp, int gen);
450
451 static int lov_disconnect(struct obd_export *exp)
452 {
453         struct obd_device *obd = class_exp2obd(exp);
454         struct lov_obd *lov = &obd->u.lov;
455         int i, rc;
456         ENTRY;
457
458         if (!lov->lov_tgts)
459                 goto out;
460
461         /* Only disconnect the underlying layers on the final disconnect. */
462         lov->lov_connects--;
463         if (lov->lov_connects != 0) {
464                 /* why should there be more than 1 connect? */
465                 CERROR("disconnect #%d\n", lov->lov_connects);
466                 goto out;
467         }
468
469         /* Let's hold another reference so lov_del_obd doesn't spin through
470            putref every time */
471         lov_getref(obd);
472         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
473                 if (lov->lov_tgts[i] && lov->lov_tgts[i]->ltd_exp) {
474                         /* Disconnection is the last we know about an obd */
475                         lov_del_target(obd, i, 0, lov->lov_tgts[i]->ltd_gen);
476                 }
477         }
478         lov_putref(obd);
479
480 out:
481         rc = class_disconnect(exp); /* bz 9811 */
482         RETURN(rc);
483 }
484
485 /* Error codes:
486  *
487  *  -EINVAL  : UUID can't be found in the LOV's target list
488  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
489  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
490  */
491 static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
492                               int activate)
493 {
494         struct lov_obd *lov = &obd->u.lov;
495         struct lov_tgt_desc *tgt;
496         int i, rc = 0;
497         ENTRY;
498
499         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
500                lov, uuid->uuid, activate);
501
502         lov_getref(obd);
503         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
504                 tgt = lov->lov_tgts[i];
505                 if (!tgt || !tgt->ltd_exp)
506                         continue;
507
508                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
509                        i, obd_uuid2str(&tgt->ltd_uuid),
510                        tgt->ltd_exp->exp_handle.h_cookie);
511                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
512                         break;
513         }
514
515         if (i == lov->desc.ld_tgt_count)
516                 GOTO(out, rc = -EINVAL);
517
518         if (lov->lov_tgts[i]->ltd_active == activate) {
519                 CDEBUG(D_INFO, "OSC %s already %sactive!\n", uuid->uuid,
520                        activate ? "" : "in");
521                 GOTO(out, rc);
522         }
523
524         CDEBUG(D_CONFIG, "Marking OSC %s %sactive\n", obd_uuid2str(uuid),
525                activate ? "" : "in");
526
527         lov->lov_tgts[i]->ltd_active = activate;
528
529         if (activate) {
530                 lov->desc.ld_active_tgt_count++;
531                 lov->lov_tgts[i]->ltd_exp->exp_obd->obd_inactive = 0;
532         } else {
533                 lov->desc.ld_active_tgt_count--;
534                 lov->lov_tgts[i]->ltd_exp->exp_obd->obd_inactive = 1;
535         }
536         /* remove any old qos penalty */
537         lov->lov_tgts[i]->ltd_qos.ltq_penalty = 0;
538
539  out:
540         lov_putref(obd);
541         RETURN(rc);
542 }
543
544 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
545                       enum obd_notify_event ev, void *data)
546 {
547         int rc = 0;
548         ENTRY;
549
550         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
551                 struct obd_uuid *uuid;
552
553                 LASSERT(watched);
554
555                 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME)) {
556                         CERROR("unexpected notification of %s %s!\n",
557                                watched->obd_type->typ_name,
558                                watched->obd_name);
559                         RETURN(-EINVAL);
560                 }
561                 uuid = &watched->u.cli.cl_target_uuid;
562
563                 /* Set OSC as active before notifying the observer, so the
564                  * observer can use the OSC normally.
565                  */
566                 rc = lov_set_osc_active(obd, uuid, ev == OBD_NOTIFY_ACTIVE);
567                 if (rc) {
568                         CERROR("%sactivation of %s failed: %d\n",
569                                (ev == OBD_NOTIFY_ACTIVE) ? "" : "de",
570                                obd_uuid2str(uuid), rc);
571                         RETURN(rc);
572                 }
573         }
574
575         /* Pass the notification up the chain. */
576         if (watched) {
577                 rc = obd_notify_observer(obd, watched, ev, data);
578         } else {
579                 /* NULL watched means all osc's in the lov (only for syncs) */
580                 struct lov_obd *lov = &obd->u.lov;
581                 struct obd_device *tgt_obd;
582                 int i;
583                 lov_getref(obd);
584                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
585                         if (!lov->lov_tgts[i])
586                                 continue;
587                         tgt_obd = class_exp2obd(lov->lov_tgts[i]->ltd_exp);
588                         rc = obd_notify_observer(obd, tgt_obd, ev, data);
589                         if (rc) {
590                                 CERROR("%s: notify %s of %s failed %d\n",
591                                        obd->obd_name,
592                                        obd->obd_observer->obd_name,
593                                        tgt_obd->obd_name, rc);
594                                 break;
595                         }
596                 }
597                 lov_putref(obd);
598         }
599
600         RETURN(rc);
601 }
602
603 static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
604                           __u32 index, int gen, int active)
605 {
606         struct lov_obd *lov = &obd->u.lov;
607         struct lov_tgt_desc *tgt;
608         int rc;
609         ENTRY;
610
611         CDEBUG(D_CONFIG, "uuid:%s idx:%d gen:%d active:%d\n",
612                uuidp->uuid, index, gen, active);
613
614         if (gen <= 0) {
615                 CERROR("request to add OBD %s with invalid generation: %d\n",
616                        uuidp->uuid, gen);
617                 RETURN(-EINVAL);
618         }
619
620         mutex_down(&lov->lov_lock);
621
622         if ((index < lov->lov_tgt_size) && (lov->lov_tgts[index] != NULL)) {
623                 tgt = lov->lov_tgts[index];
624                 CERROR("UUID %s already assigned at LOV target index %d\n",
625                        obd_uuid2str(&tgt->ltd_uuid), index);
626                 mutex_up(&lov->lov_lock);
627                 RETURN(-EEXIST);
628         }
629
630         if (index >= lov->lov_tgt_size) {
631                 /* We need to reallocate the lov target array. */
632                 struct lov_tgt_desc **newtgts, **old = NULL;
633                 __u32 newsize, oldsize = 0;
634
635                 newsize = max(lov->lov_tgt_size, (__u32)2);
636                 while (newsize < index + 1)
637                         newsize = newsize << 1;
638                 OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
639                 if (newtgts == NULL) {
640                         mutex_up(&lov->lov_lock);
641                         RETURN(-ENOMEM);
642                 }
643
644                 if (lov->lov_tgt_size) {
645                         memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) *
646                                lov->lov_tgt_size);
647                         old = lov->lov_tgts;
648                         oldsize = lov->lov_tgt_size;
649                 }
650
651                 lov->lov_tgts = newtgts;
652                 lov->lov_tgt_size = newsize;
653 #ifdef __KERNEL__
654                 smp_rmb();
655 #endif
656                 if (old)
657                         OBD_FREE(old, sizeof(*old) * oldsize);
658
659                 CDEBUG(D_CONFIG, "tgts: %p size: %d\n",
660                        lov->lov_tgts, lov->lov_tgt_size);
661         }
662
663
664         OBD_ALLOC_PTR(tgt);
665         if (!tgt) {
666                 mutex_up(&lov->lov_lock);
667                 RETURN(-ENOMEM);
668         }
669
670         memset(tgt, 0, sizeof(*tgt));
671         tgt->ltd_uuid = *uuidp;
672         /* XXX - add a sanity check on the generation number. */
673         tgt->ltd_gen = gen;
674         tgt->ltd_index = index;
675         tgt->ltd_activate = active;
676         lov->lov_tgts[index] = tgt;
677         if (index >= lov->desc.ld_tgt_count)
678                 lov->desc.ld_tgt_count = index + 1;
679         mutex_up(&lov->lov_lock);
680
681         CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n",
682                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
683
684         if (lov->lov_connects == 0) {
685                 /* lov_connect hasn't been called yet. We'll do the
686                    lov_connect_obd on this target when that fn first runs,
687                    because we don't know the connect flags yet. */
688                 RETURN(0);
689         }
690
691         lov_getref(obd);
692
693         rc = lov_connect_obd(obd, index, active, &lov->lov_ocd);
694         if (rc)
695                 GOTO(out, rc);
696
697         rc = lov_notify(obd, tgt->ltd_exp->exp_obd,
698                         active ? OBD_NOTIFY_ACTIVE : OBD_NOTIFY_INACTIVE,
699                         (void *)&index);
700
701 out:
702         if (rc) {
703                 CERROR("add failed (%d), deleting %s\n", rc,
704                        obd_uuid2str(&tgt->ltd_uuid));
705                 lov_del_target(obd, index, 0, 0);
706         }
707         lov_putref(obd);
708         RETURN(rc);
709 }
710
711 /* Schedule a target for deletion */
712 static int lov_del_target(struct obd_device *obd, __u32 index,
713                           struct obd_uuid *uuidp, int gen)
714 {
715         struct lov_obd *lov = &obd->u.lov;
716         int count = lov->desc.ld_tgt_count;
717         int rc = 0;
718         ENTRY;
719
720         if (index >= count) {
721                 CERROR("LOV target index %d >= number of LOV OBDs %d.\n",
722                        index, count);
723                 RETURN(-EINVAL);
724         }
725
726         lov_getref(obd);
727
728         if (!lov->lov_tgts[index]) {
729                 CERROR("LOV target at index %d is not setup.\n", index);
730                 GOTO(out, rc = -EINVAL);
731         }
732
733         if (uuidp && !obd_uuid_equals(uuidp, &lov->lov_tgts[index]->ltd_uuid)) {
734                 CERROR("LOV target UUID %s at index %d doesn't match %s.\n",
735                        lov_uuid2str(lov, index), index,
736                        obd_uuid2str(uuidp));
737                 GOTO(out, rc = -EINVAL);
738         }
739
740         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
741                lov_uuid2str(lov, index), index,
742                lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp,
743                lov->lov_tgts[index]->ltd_active);
744
745         lov->lov_tgts[index]->ltd_reap = 1;
746         lov->lov_death_row++;
747         /* we really delete it from lov_putref */
748 out:
749         lov_putref(obd);
750
751         RETURN(rc);
752 }
753
754 /* We are holding lov_lock */
755 static void __lov_del_obd(struct obd_device *obd, __u32 index)
756 {
757         struct lov_obd *lov = &obd->u.lov;
758         struct obd_device *osc_obd;
759         struct lov_tgt_desc *tgt = lov->lov_tgts[index];
760
761         LASSERT(tgt);
762         LASSERT(tgt->ltd_reap);
763
764         osc_obd = class_exp2obd(tgt->ltd_exp);
765
766         CDEBUG(D_CONFIG, "Removing tgt %s : %s\n",
767                lov_uuid2str(lov, index),
768                osc_obd ? osc_obd->obd_name : "<no obd>");
769
770         if (tgt->ltd_exp)
771                 lov_disconnect_obd(obd, index);
772
773         /* XXX - right now there is a dependency on ld_tgt_count being the
774          * maximum tgt index for computing the mds_max_easize. So we can't
775          * shrink it. */
776
777         lov->lov_tgts[index] = NULL;
778         OBD_FREE_PTR(tgt);
779
780         /* Manual cleanup - no cleanup logs to clean up the osc's.  We must
781            do it ourselves. And we can't do it from lov_cleanup,
782            because we just lost our only reference to it. */
783         if (osc_obd)
784                 class_manual_cleanup(osc_obd);
785 }
786
787 void lov_fix_desc_stripe_size(__u64 *val)
788 {
789         if (*val < PTLRPC_MAX_BRW_SIZE) {
790                 LCONSOLE_WARN("Increasing default stripe size to min %u\n",
791                               PTLRPC_MAX_BRW_SIZE);
792                 *val = PTLRPC_MAX_BRW_SIZE;
793         } else if (*val & (LOV_MIN_STRIPE_SIZE - 1)) {
794                 *val &= ~(LOV_MIN_STRIPE_SIZE - 1);
795                 LCONSOLE_WARN("Changing default stripe size to "LPU64" (a "
796                               "multiple of %u)\n",
797                               *val, LOV_MIN_STRIPE_SIZE);
798         }
799 }
800
801 void lov_fix_desc_stripe_count(__u32 *val)
802 {
803         if (*val == 0)
804                 *val = 1;
805 }
806
807 void lov_fix_desc_pattern(__u32 *val)
808 {
809         /* from lov_setstripe */
810         if ((*val != 0) && (*val != LOV_PATTERN_RAID0)) {
811                 LCONSOLE_WARN("Unknown stripe pattern: %#x\n", *val);
812                 *val = 0;
813         }
814 }
815
816 void lov_fix_desc_qos_maxage(__u32 *val)
817 {
818         /* fix qos_maxage */
819         if (*val == 0)
820                 *val = QOS_DEFAULT_MAXAGE;
821 }
822
823 void lov_fix_desc(struct lov_desc *desc)
824 {
825         lov_fix_desc_stripe_size(&desc->ld_default_stripe_size);
826         lov_fix_desc_stripe_count(&desc->ld_default_stripe_count);
827         lov_fix_desc_pattern(&desc->ld_pattern);
828         lov_fix_desc_qos_maxage(&desc->ld_qos_maxage);
829 }
830
831 static int lov_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
832 {
833         struct lprocfs_static_vars lvars = { 0 };
834         struct lov_desc *desc;
835         struct lov_obd *lov = &obd->u.lov;
836         int count;
837         ENTRY;
838
839         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
840                 CERROR("LOV setup requires a descriptor\n");
841                 RETURN(-EINVAL);
842         }
843
844         desc = (struct lov_desc *)lustre_cfg_buf(lcfg, 1);
845
846         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
847                 CERROR("descriptor size wrong: %d > %d\n",
848                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
849                 RETURN(-EINVAL);
850         }
851
852         if (desc->ld_magic != LOV_DESC_MAGIC) {
853                 if (desc->ld_magic == __swab32(LOV_DESC_MAGIC)) {
854                             CDEBUG(D_OTHER, "%s: Swabbing lov desc %p\n",
855                                    obd->obd_name, desc);
856                             lustre_swab_lov_desc(desc);
857                 } else {
858                         CERROR("%s: Bad lov desc magic: %#x\n",
859                                obd->obd_name, desc->ld_magic);
860                         RETURN(-EINVAL);
861                 }
862         }
863
864         lov_fix_desc(desc);
865
866         /* Because of 64-bit divide/mod operations only work with a 32-bit
867          * divisor in a 32-bit kernel, we cannot support a stripe width
868          * of 4GB or larger on 32-bit CPUs. */
869         count = desc->ld_default_stripe_count;
870         if ((count > 0 ? count : desc->ld_tgt_count) *
871             desc->ld_default_stripe_size > 0xffffffff) {
872                 CERROR("LOV: stripe width "LPU64"x%u > 4294967295 bytes\n",
873                        desc->ld_default_stripe_size, count);
874                 RETURN(-EINVAL);
875         }
876
877         desc->ld_active_tgt_count = 0;
878         lov->desc = *desc;
879         lov->lov_tgt_size = 0;
880         sema_init(&lov->lov_lock, 1);
881         atomic_set(&lov->lov_refcount, 0);
882         CFS_INIT_LIST_HEAD(&lov->lov_qos.lq_oss_list);
883         init_rwsem(&lov->lov_qos.lq_rw_sem);
884         lov->lov_qos.lq_dirty = 1;
885         lov->lov_qos.lq_dirty_rr = 1;
886         lov->lov_qos.lq_reset = 1;
887         /* Default priority is toward free space balance */
888         lov->lov_qos.lq_prio_free = 232;
889
890         lprocfs_lov_init_vars(&lvars);
891         lprocfs_obd_setup(obd, lvars.obd_vars);
892 #ifdef LPROCFS
893         {
894                 int rc;
895
896                 rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
897                                         0444, &lov_proc_target_fops, obd);
898                 if (rc)
899                         CWARN("Error adding the target_obd file\n");
900         }
901 #endif
902
903         RETURN(0);
904 }
905
906 static int lov_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
907 {
908         int rc = 0;
909         ENTRY;
910
911         switch (stage) {
912         case OBD_CLEANUP_EARLY: {
913                 struct lov_obd *lov = &obd->u.lov;
914                 int i;
915                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
916                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active)
917                                 continue;
918                         obd_precleanup(class_exp2obd(lov->lov_tgts[i]->ltd_exp),
919                                        OBD_CLEANUP_EARLY);
920                 }
921                 break;
922         }
923         case OBD_CLEANUP_EXPORTS:
924                 rc = obd_llog_finish(obd, 0);
925                 if (rc != 0)
926                         CERROR("failed to cleanup llogging subsystems\n");
927                 break;
928         }
929         RETURN(rc);
930 }
931
932 static int lov_cleanup(struct obd_device *obd)
933 {
934         struct lov_obd *lov = &obd->u.lov;
935
936         lprocfs_obd_cleanup(obd);
937         if (lov->lov_tgts) {
938                 int i;
939                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
940                         if (!lov->lov_tgts[i])
941                                 continue;
942
943                         /* Inactive targets may never have connected */
944                         if (lov->lov_tgts[i]->ltd_active ||
945                             atomic_read(&lov->lov_refcount))
946                             /* We should never get here - these
947                                should have been removed in the
948                              disconnect. */
949                                 CERROR("lov tgt %d not cleaned!"
950                                        " deathrow=%d, lovrc=%d\n",
951                                        i, lov->lov_death_row,
952                                        atomic_read(&lov->lov_refcount));
953                         lov_del_target(obd, i, 0, 0);
954                 }
955                 OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) *
956                          lov->lov_tgt_size);
957                 lov->lov_tgt_size = 0;
958         }
959
960         if (lov->lov_qos.lq_rr_size)
961                 OBD_FREE(lov->lov_qos.lq_rr_array, lov->lov_qos.lq_rr_size);
962
963         RETURN(0);
964 }
965
966 static int lov_process_config(struct obd_device *obd, obd_count len, void *buf)
967 {
968         struct lustre_cfg *lcfg = buf;
969         struct obd_uuid obd_uuid;
970         int cmd;
971         int rc = 0;
972         ENTRY;
973
974         switch(cmd = lcfg->lcfg_command) {
975         case LCFG_LOV_ADD_OBD:
976         case LCFG_LOV_ADD_INA:
977         case LCFG_LOV_DEL_OBD: {
978                 __u32 index;
979                 int gen;
980                 /* lov_modify_tgts add  0:lov_mdsA  1:ost1_UUID  2:0  3:1 */
981                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
982                         GOTO(out, rc = -EINVAL);
983
984                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
985
986                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
987                         GOTO(out, rc = -EINVAL);
988                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
989                         GOTO(out, rc = -EINVAL);
990                 if (cmd == LCFG_LOV_ADD_OBD)
991                         rc = lov_add_target(obd, &obd_uuid, index, gen, 1);
992                 else if (cmd == LCFG_LOV_ADD_INA)
993                         rc = lov_add_target(obd, &obd_uuid, index, gen, 0);
994                 else
995                         rc = lov_del_target(obd, index, &obd_uuid, gen);
996                 GOTO(out, rc);
997         }
998         case LCFG_PARAM: {
999                 struct lprocfs_static_vars lvars = { 0 };
1000                 struct lov_desc *desc = &(obd->u.lov.desc);
1001
1002                 if (!desc)
1003                         GOTO(out, rc = -EINVAL);
1004
1005                 lprocfs_lov_init_vars(&lvars);
1006
1007                 rc = class_process_proc_param(PARAM_LOV, lvars.obd_vars,
1008                                               lcfg, obd);
1009                 GOTO(out, rc);
1010         }
1011         default: {
1012                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1013                 GOTO(out, rc = -EINVAL);
1014
1015         }
1016         }
1017 out:
1018         RETURN(rc);
1019 }
1020
1021 #ifndef log2
1022 #define log2(n) ffz(~(n))
1023 #endif
1024
1025 static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
1026                              struct lov_stripe_md **ea,
1027                              struct obd_trans_info *oti)
1028 {
1029         struct lov_obd *lov;
1030         struct obdo *tmp_oa;
1031         struct obd_uuid *ost_uuid = NULL;
1032         int rc = 0, i;
1033         ENTRY;
1034
1035         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
1036                 src_oa->o_flags == OBD_FL_DELORPHAN);
1037
1038         lov = &export->exp_obd->u.lov;
1039
1040         OBDO_ALLOC(tmp_oa);
1041         if (tmp_oa == NULL)
1042                 RETURN(-ENOMEM);
1043
1044         if (src_oa->o_valid & OBD_MD_FLINLINE) {
1045                 ost_uuid = (struct obd_uuid *)src_oa->o_inline;
1046                 CDEBUG(D_HA, "clearing orphans only for %s\n",
1047                        ost_uuid->uuid);
1048         }
1049
1050         lov_getref(export->exp_obd);
1051         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1052                 struct lov_stripe_md obj_md;
1053                 struct lov_stripe_md *obj_mdp = &obj_md;
1054                 struct lov_tgt_desc *tgt;
1055                 int err;
1056
1057                 tgt = lov->lov_tgts[i];
1058                 if (!tgt)
1059                         continue;
1060
1061                 /* if called for a specific target, we don't
1062                    care if it is not active. */
1063                 if (!lov->lov_tgts[i]->ltd_active && ost_uuid == NULL) {
1064                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1065                         continue;
1066                 }
1067
1068                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &tgt->ltd_uuid))
1069                         continue;
1070
1071                 CDEBUG(D_CONFIG,"Clear orphans for %d:%s\n", i,
1072                        obd_uuid2str(ost_uuid));
1073
1074                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
1075
1076                 LASSERT(lov->lov_tgts[i]->ltd_exp);
1077                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
1078                 err = obd_create(lov->lov_tgts[i]->ltd_exp,
1079                                  tmp_oa, &obj_mdp, oti);
1080                 if (err)
1081                         /* This export will be disabled until it is recovered,
1082                            and then orphan recovery will be completed. */
1083                         CERROR("error in orphan recovery on OST idx %d/%d: "
1084                                "rc = %d\n", i, lov->desc.ld_tgt_count, err);
1085
1086                 if (ost_uuid)
1087                         break;
1088         }
1089         lov_putref(export->exp_obd);
1090
1091         OBDO_FREE(tmp_oa);
1092         RETURN(rc);
1093 }
1094
1095 static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
1096                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
1097 {
1098         struct lov_stripe_md *obj_mdp, *lsm;
1099         struct lov_obd *lov = &exp->exp_obd->u.lov;
1100         unsigned ost_idx;
1101         int rc, i;
1102         ENTRY;
1103
1104         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
1105                 src_oa->o_flags & OBD_FL_RECREATE_OBJS);
1106
1107         OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
1108         if (obj_mdp == NULL)
1109                 RETURN(-ENOMEM);
1110
1111         ost_idx = src_oa->o_nlink;
1112         lsm = *ea;
1113         if (lsm == NULL)
1114                 GOTO(out, rc = -EINVAL);
1115         if (ost_idx >= lov->desc.ld_tgt_count ||
1116             !lov->lov_tgts[ost_idx])
1117                 GOTO(out, rc = -EINVAL);
1118
1119         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1120                 if (lsm->lsm_oinfo[i]->loi_ost_idx == ost_idx) {
1121                         if (lsm->lsm_oinfo[i]->loi_id != src_oa->o_id)
1122                                 GOTO(out, rc = -EINVAL);
1123                         break;
1124                 }
1125         }
1126         if (i == lsm->lsm_stripe_count)
1127                 GOTO(out, rc = -EINVAL);
1128
1129         rc = obd_create(lov->lov_tgts[ost_idx]->ltd_exp, src_oa, &obj_mdp, oti);
1130 out:
1131         OBD_FREE(obj_mdp, sizeof(*obj_mdp));
1132         RETURN(rc);
1133 }
1134
1135 /* the LOV expects oa->o_id to be set to the LOV object id */
1136 static int lov_create(struct obd_export *exp, struct obdo *src_oa,
1137                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
1138 {
1139         struct lov_obd *lov;
1140         struct obd_info oinfo;
1141         struct lov_request_set *set = NULL;
1142         struct lov_request *req;
1143         struct obd_statfs osfs;
1144         __u64 maxage;
1145         int rc = 0;
1146         ENTRY;
1147
1148         LASSERT(ea != NULL);
1149         if (exp == NULL)
1150                 RETURN(-EINVAL);
1151
1152         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
1153             src_oa->o_flags == OBD_FL_DELORPHAN) {
1154                 rc = lov_clear_orphans(exp, src_oa, ea, oti);
1155                 RETURN(rc);
1156         }
1157
1158         lov = &exp->exp_obd->u.lov;
1159         if (!lov->desc.ld_active_tgt_count)
1160                 RETURN(-EIO);
1161
1162         /* Recreate a specific object id at the given OST index */
1163         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
1164             (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
1165                  rc = lov_recreate(exp, src_oa, ea, oti);
1166                  RETURN(rc);
1167         }
1168
1169         maxage = cfs_time_shift_64(-lov->desc.ld_qos_maxage);
1170         obd_statfs_rqset(exp->exp_obd, &osfs, maxage, OBD_STATFS_NODELAY);
1171
1172         rc = lov_prep_create_set(exp, &oinfo, ea, src_oa, oti, &set);
1173         if (rc)
1174                 RETURN(rc);
1175
1176         list_for_each_entry(req, &set->set_list, rq_link) {
1177                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
1178                 rc = obd_create(lov->lov_tgts[req->rq_idx]->ltd_exp,
1179                                 req->rq_oi.oi_oa, &req->rq_oi.oi_md, oti);
1180                 lov_update_create_set(set, req, rc);
1181         }
1182         rc = lov_fini_create_set(set, ea);
1183         RETURN(rc);
1184 }
1185
1186 #define ASSERT_LSM_MAGIC(lsmp)                                                  \
1187 do {                                                                            \
1188         LASSERT((lsmp) != NULL);                                                \
1189         LASSERTF(((lsmp)->lsm_magic == LOV_MAGIC ||                             \
1190                  (lsmp)->lsm_magic == LOV_MAGIC_JOIN), "%p->lsm_magic=%x\n",    \
1191                  (lsmp), (lsmp)->lsm_magic);                                    \
1192 } while (0)
1193
1194 static int lov_destroy(struct obd_export *exp, struct obdo *oa,
1195                        struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1196                        struct obd_export *md_exp)
1197 {
1198         struct lov_request_set *set;
1199         struct obd_info oinfo;
1200         struct lov_request *req;
1201         struct list_head *pos;
1202         struct lov_obd *lov;
1203         int rc = 0, err;
1204         ENTRY;
1205
1206         ASSERT_LSM_MAGIC(lsm);
1207
1208         if (!exp || !exp->exp_obd)
1209                 RETURN(-ENODEV);
1210
1211         if (oa->o_valid & OBD_MD_FLCOOKIE) {
1212                 LASSERT(oti);
1213                 LASSERT(oti->oti_logcookies);
1214         }
1215
1216         lov = &exp->exp_obd->u.lov;
1217         rc = lov_prep_destroy_set(exp, &oinfo, oa, lsm, oti, &set);
1218         if (rc)
1219                 RETURN(rc);
1220
1221         list_for_each (pos, &set->set_list) {
1222                 int err;
1223                 req = list_entry(pos, struct lov_request, rq_link);
1224
1225                 if (oa->o_valid & OBD_MD_FLCOOKIE)
1226                         oti->oti_logcookies = set->set_cookies + req->rq_stripe;
1227
1228                 err = obd_destroy(lov->lov_tgts[req->rq_idx]->ltd_exp,
1229                                   req->rq_oi.oi_oa, NULL, oti, NULL);
1230                 err = lov_update_common_set(set, req, err);
1231                 if (err) {
1232                         CERROR("error: destroying objid "LPX64" subobj "
1233                                LPX64" on OST idx %d: rc = %d\n",
1234                                oa->o_id, req->rq_oi.oi_oa->o_id,
1235                                req->rq_idx, err);
1236                         if (!rc)
1237                                 rc = err;
1238                 }
1239         }
1240
1241         if (rc == 0) {
1242                 LASSERT(lsm_op_find(lsm->lsm_magic) != NULL);
1243                 rc = lsm_op_find(lsm->lsm_magic)->lsm_destroy(lsm, oa, md_exp);
1244         }
1245         err = lov_fini_destroy_set(set);
1246         RETURN(rc ? rc : err);
1247 }
1248
1249 static int lov_getattr(struct obd_export *exp, struct obd_info *oinfo)
1250 {
1251         struct lov_request_set *set;
1252         struct lov_request *req;
1253         struct list_head *pos;
1254         struct lov_obd *lov;
1255         int err = 0, rc = 0;
1256         ENTRY;
1257
1258         LASSERT(oinfo);
1259         ASSERT_LSM_MAGIC(oinfo->oi_md);
1260
1261         if (!exp || !exp->exp_obd)
1262                 RETURN(-ENODEV);
1263
1264         lov = &exp->exp_obd->u.lov;
1265
1266         rc = lov_prep_getattr_set(exp, oinfo, &set);
1267         if (rc)
1268                 RETURN(rc);
1269
1270         list_for_each (pos, &set->set_list) {
1271                 req = list_entry(pos, struct lov_request, rq_link);
1272
1273                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
1274                        "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
1275                        req->rq_oi.oi_oa->o_id, req->rq_idx);
1276
1277                 rc = obd_getattr(lov->lov_tgts[req->rq_idx]->ltd_exp,
1278                                  &req->rq_oi);
1279                 err = lov_update_common_set(set, req, rc);
1280                 if (err) {
1281                         CERROR("error: getattr objid "LPX64" subobj "
1282                                LPX64" on OST idx %d: rc = %d\n",
1283                                oinfo->oi_oa->o_id, req->rq_oi.oi_oa->o_id,
1284                                req->rq_idx, err);
1285                         break;
1286                 }
1287         }
1288
1289         rc = lov_fini_getattr_set(set);
1290         if (err)
1291                 rc = err;
1292         RETURN(rc);
1293 }
1294
1295 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
1296                                  void *data, int rc)
1297 {
1298         struct lov_request_set *lovset = (struct lov_request_set *)data;
1299         int err;
1300         ENTRY;
1301
1302         /* don't do attribute merge if this aysnc op failed */
1303         if (rc)
1304                 lovset->set_completes = 0;
1305         err = lov_fini_getattr_set(lovset);
1306         RETURN(rc ? rc : err);
1307 }
1308
1309 static int lov_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
1310                               struct ptlrpc_request_set *rqset)
1311 {
1312         struct lov_request_set *lovset;
1313         struct lov_obd *lov;
1314         struct list_head *pos;
1315         struct lov_request *req;
1316         int rc = 0, err;
1317         ENTRY;
1318
1319         LASSERT(oinfo);
1320         ASSERT_LSM_MAGIC(oinfo->oi_md);
1321
1322         if (!exp || !exp->exp_obd)
1323                 RETURN(-ENODEV);
1324
1325         lov = &exp->exp_obd->u.lov;
1326
1327         rc = lov_prep_getattr_set(exp, oinfo, &lovset);
1328         if (rc)
1329                 RETURN(rc);
1330
1331         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
1332                oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count,
1333                oinfo->oi_md->lsm_stripe_size);
1334
1335         list_for_each (pos, &lovset->set_list) {
1336                 req = list_entry(pos, struct lov_request, rq_link);
1337
1338                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
1339                        "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
1340                        req->rq_oi.oi_oa->o_id, req->rq_idx);
1341                 rc = obd_getattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
1342                                        &req->rq_oi, rqset);
1343                 if (rc) {
1344                         CERROR("error: getattr objid "LPX64" subobj "
1345                                LPX64" on OST idx %d: rc = %d\n",
1346                                oinfo->oi_oa->o_id, req->rq_oi.oi_oa->o_id,
1347                                req->rq_idx, rc);
1348                         GOTO(out, rc);
1349                 }
1350         }
1351
1352         if (!list_empty(&rqset->set_requests)) {
1353                 LASSERT(rc == 0);
1354                 LASSERT (rqset->set_interpret == NULL);
1355                 rqset->set_interpret = lov_getattr_interpret;
1356                 rqset->set_arg = (void *)lovset;
1357                 RETURN(rc);
1358         }
1359 out:
1360         if (rc)
1361                 lovset->set_completes = 0;
1362         err = lov_fini_getattr_set(lovset);
1363         RETURN(rc ? rc : err);
1364 }
1365
1366 static int lov_setattr(struct obd_export *exp, struct obd_info *oinfo,
1367                        struct obd_trans_info *oti)
1368 {
1369         struct lov_request_set *set;
1370         struct lov_obd *lov;
1371         struct list_head *pos;
1372         struct lov_request *req;
1373         int err = 0, rc = 0;
1374         ENTRY;
1375
1376         LASSERT(oinfo);
1377         ASSERT_LSM_MAGIC(oinfo->oi_md);
1378
1379         if (!exp || !exp->exp_obd)
1380                 RETURN(-ENODEV);
1381
1382         /* for now, we only expect the following updates here */
1383         LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE |
1384                                             OBD_MD_FLMODE | OBD_MD_FLATIME |
1385                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME |
1386                                             OBD_MD_FLFLAGS | OBD_MD_FLSIZE |
1387                                             OBD_MD_FLGROUP | OBD_MD_FLUID |
1388                                             OBD_MD_FLGID | OBD_MD_FLINLINE |
1389                                             OBD_MD_FLFID | OBD_MD_FLGENER)));
1390         lov = &exp->exp_obd->u.lov;
1391         rc = lov_prep_setattr_set(exp, oinfo, oti, &set);
1392         if (rc)
1393                 RETURN(rc);
1394
1395         list_for_each (pos, &set->set_list) {
1396                 req = list_entry(pos, struct lov_request, rq_link);
1397
1398                 rc = obd_setattr(lov->lov_tgts[req->rq_idx]->ltd_exp,
1399                                  &req->rq_oi, NULL);
1400                 err = lov_update_setattr_set(set, req, rc);
1401                 if (err) {
1402                         CERROR("error: setattr objid "LPX64" subobj "
1403                                LPX64" on OST idx %d: rc = %d\n",
1404                                set->set_oi->oi_oa->o_id,
1405                                req->rq_oi.oi_oa->o_id, req->rq_idx, err);
1406                         if (!rc)
1407                                 rc = err;
1408                 }
1409         }
1410         err = lov_fini_setattr_set(set);
1411         if (!rc)
1412                 rc = err;
1413         RETURN(rc);
1414 }
1415
1416 static int lov_setattr_interpret(struct ptlrpc_request_set *rqset,
1417                                  void *data, int rc)
1418 {
1419         struct lov_request_set *lovset = (struct lov_request_set *)data;
1420         int err;
1421         ENTRY;
1422
1423         if (rc)
1424                 lovset->set_completes = 0;
1425         err = lov_fini_setattr_set(lovset);
1426         RETURN(rc ? rc : err);
1427 }
1428
1429 /* If @oti is given, the request goes from MDS and responses from OSTs are not
1430    needed. Otherwise, a client is waiting for responses. */
1431 static int lov_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
1432                              struct obd_trans_info *oti,
1433                              struct ptlrpc_request_set *rqset)
1434 {
1435         struct lov_request_set *set;
1436         struct lov_request *req;
1437         struct list_head *pos;
1438         struct lov_obd *lov;
1439         int rc = 0;
1440         ENTRY;
1441
1442         LASSERT(oinfo);
1443         ASSERT_LSM_MAGIC(oinfo->oi_md);
1444         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
1445                 LASSERT(oti);
1446                 LASSERT(oti->oti_logcookies);
1447         }
1448
1449         if (!exp || !exp->exp_obd)
1450                 RETURN(-ENODEV);
1451
1452         lov = &exp->exp_obd->u.lov;
1453         rc = lov_prep_setattr_set(exp, oinfo, oti, &set);
1454         if (rc)
1455                 RETURN(rc);
1456
1457         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
1458                oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count,
1459                oinfo->oi_md->lsm_stripe_size);
1460
1461         list_for_each (pos, &set->set_list) {
1462                 req = list_entry(pos, struct lov_request, rq_link);
1463
1464                 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1465                         oti->oti_logcookies = set->set_cookies + req->rq_stripe;
1466
1467                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
1468                        "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
1469                        req->rq_oi.oi_oa->o_id, req->rq_idx);
1470
1471                 rc = obd_setattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
1472                                        &req->rq_oi, oti, rqset);
1473                 if (rc) {
1474                         CERROR("error: setattr objid "LPX64" subobj "
1475                                LPX64" on OST idx %d: rc = %d\n",
1476                                set->set_oi->oi_oa->o_id,
1477                                req->rq_oi.oi_oa->o_id,
1478                                req->rq_idx, rc);
1479                         break;
1480                 }
1481         }
1482
1483         /* If we are not waiting for responses on async requests, return. */
1484         if (rc || !rqset || list_empty(&rqset->set_requests)) {
1485                 int err;
1486                 if (rc)
1487                         set->set_completes = 0;
1488                 err = lov_fini_setattr_set(set);
1489                 RETURN(rc ? rc : err);
1490         }
1491
1492         LASSERT(rqset->set_interpret == NULL);
1493         rqset->set_interpret = lov_setattr_interpret;
1494         rqset->set_arg = (void *)set;
1495
1496         RETURN(0);
1497 }
1498
1499 static int lov_punch_interpret(struct ptlrpc_request_set *rqset,
1500                                void *data, int rc)
1501 {
1502         struct lov_request_set *lovset = (struct lov_request_set *)data;
1503         int err;
1504         ENTRY;
1505
1506         if (rc)
1507                 lovset->set_completes = 0;
1508         err = lov_fini_punch_set(lovset);
1509         RETURN(rc ? rc : err);
1510 }
1511
1512 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1513  * we can send this 'punch' to just the authoritative node and the nodes
1514  * that the punch will affect. */
1515 static int lov_punch(struct obd_export *exp, struct obd_info *oinfo,
1516                      struct obd_trans_info *oti,
1517                      struct ptlrpc_request_set *rqset)
1518 {
1519         struct lov_request_set *set;
1520         struct lov_obd *lov;
1521         struct list_head *pos;
1522         struct lov_request *req;
1523         int rc = 0;
1524         ENTRY;
1525
1526         LASSERT(oinfo);
1527         ASSERT_LSM_MAGIC(oinfo->oi_md);
1528
1529         if (!exp || !exp->exp_obd)
1530                 RETURN(-ENODEV);
1531
1532         lov = &exp->exp_obd->u.lov;
1533         rc = lov_prep_punch_set(exp, oinfo, oti, &set);
1534         if (rc)
1535                 RETURN(rc);
1536
1537         list_for_each (pos, &set->set_list) {
1538                 req = list_entry(pos, struct lov_request, rq_link);
1539
1540                 rc = obd_punch(lov->lov_tgts[req->rq_idx]->ltd_exp,
1541                                &req->rq_oi, NULL, rqset);
1542                 if (rc) {
1543                         CERROR("error: punch objid "LPX64" subobj "LPX64
1544                                " on OST idx %d: rc = %d\n",
1545                                set->set_oi->oi_oa->o_id,
1546                                req->rq_oi.oi_oa->o_id, req->rq_idx, rc);
1547                         break;
1548                 }
1549         }
1550
1551         if (rc || list_empty(&rqset->set_requests)) {
1552                 int err;
1553                 err = lov_fini_punch_set(set);
1554                 RETURN(rc ? rc : err);
1555         }
1556
1557         LASSERT(rqset->set_interpret == NULL);
1558         rqset->set_interpret = lov_punch_interpret;
1559         rqset->set_arg = (void *)set;
1560
1561         RETURN(0);
1562 }
1563
1564 static int lov_sync(struct obd_export *exp, struct obdo *oa,
1565                     struct lov_stripe_md *lsm, obd_off start, obd_off end,
1566                     void *capa)
1567 {
1568         struct lov_request_set *set;
1569         struct obd_info oinfo;
1570         struct lov_obd *lov;
1571         struct list_head *pos;
1572         struct lov_request *req;
1573         int err = 0, rc = 0;
1574         ENTRY;
1575
1576         ASSERT_LSM_MAGIC(lsm);
1577
1578         if (!exp->exp_obd)
1579                 RETURN(-ENODEV);
1580
1581         lov = &exp->exp_obd->u.lov;
1582         rc = lov_prep_sync_set(exp, &oinfo, oa, lsm, start, end, &set);
1583         if (rc)
1584                 RETURN(rc);
1585
1586         list_for_each (pos, &set->set_list) {
1587                 req = list_entry(pos, struct lov_request, rq_link);
1588
1589                 rc = obd_sync(lov->lov_tgts[req->rq_idx]->ltd_exp,
1590                               req->rq_oi.oi_oa, NULL,
1591                               req->rq_oi.oi_policy.l_extent.start,
1592                               req->rq_oi.oi_policy.l_extent.end, capa);
1593                 err = lov_update_common_set(set, req, rc);
1594                 if (err) {
1595                         CERROR("error: fsync objid "LPX64" subobj "LPX64
1596                                " on OST idx %d: rc = %d\n",
1597                                set->set_oi->oi_oa->o_id,
1598                                req->rq_oi.oi_oa->o_id, req->rq_idx, rc);
1599                         if (!rc)
1600                                 rc = err;
1601                 }
1602         }
1603         err = lov_fini_sync_set(set);
1604         if (!rc)
1605                 rc = err;
1606         RETURN(rc);
1607 }
1608
1609 static int lov_brw_check(struct lov_obd *lov, struct obd_info *lov_oinfo,
1610                          obd_count oa_bufs, struct brw_page *pga)
1611 {
1612         struct obd_info oinfo = { { { 0 } } };
1613         int i, rc = 0;
1614
1615         oinfo.oi_oa = lov_oinfo->oi_oa;
1616
1617         /* The caller just wants to know if there's a chance that this
1618          * I/O can succeed */
1619         for (i = 0; i < oa_bufs; i++) {
1620                 int stripe = lov_stripe_number(lov_oinfo->oi_md, pga[i].off);
1621                 int ost = lov_oinfo->oi_md->lsm_oinfo[stripe]->loi_ost_idx;
1622                 obd_off start, end;
1623
1624                 if (!lov_stripe_intersects(lov_oinfo->oi_md, i, pga[i].off,
1625                                            pga[i].off + pga[i].count,
1626                                            &start, &end))
1627                         continue;
1628
1629                 if (!lov->lov_tgts[ost] || !lov->lov_tgts[ost]->ltd_active) {
1630                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1631                         return -EIO;
1632                 }
1633
1634                 rc = obd_brw(OBD_BRW_CHECK, lov->lov_tgts[ost]->ltd_exp, &oinfo,
1635                              1, &pga[i], NULL);
1636                 if (rc)
1637                         break;
1638         }
1639         return rc;
1640 }
1641
1642 static int lov_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1643                    obd_count oa_bufs, struct brw_page *pga,
1644                    struct obd_trans_info *oti)
1645 {
1646         struct lov_request_set *set;
1647         struct lov_request *req;
1648         struct list_head *pos;
1649         struct lov_obd *lov = &exp->exp_obd->u.lov;
1650         int err, rc = 0;
1651         ENTRY;
1652
1653         ASSERT_LSM_MAGIC(oinfo->oi_md);
1654
1655         if (cmd == OBD_BRW_CHECK) {
1656                 rc = lov_brw_check(lov, oinfo, oa_bufs, pga);
1657                 RETURN(rc);
1658         }
1659
1660         rc = lov_prep_brw_set(exp, oinfo, oa_bufs, pga, oti, &set);
1661         if (rc)
1662                 RETURN(rc);
1663
1664         list_for_each (pos, &set->set_list) {
1665                 struct obd_export *sub_exp;
1666                 struct brw_page *sub_pga;
1667                 req = list_entry(pos, struct lov_request, rq_link);
1668
1669                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
1670                 sub_pga = set->set_pga + req->rq_pgaidx;
1671                 rc = obd_brw(cmd, sub_exp, &req->rq_oi, req->rq_oabufs,
1672                              sub_pga, oti);
1673                 if (rc)
1674                         break;
1675                 lov_update_common_set(set, req, rc);
1676         }
1677
1678         err = lov_fini_brw_set(set);
1679         if (!rc)
1680                 rc = err;
1681         RETURN(rc);
1682 }
1683
1684 static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data,
1685                              int rc)
1686 {
1687         struct lov_request_set *lovset = (struct lov_request_set *)data;
1688         ENTRY;
1689
1690         if (rc) {
1691                 lovset->set_completes = 0;
1692                 lov_fini_brw_set(lovset);
1693         } else {
1694                 rc = lov_fini_brw_set(lovset);
1695         }
1696
1697         RETURN(rc);
1698 }
1699
1700 static int lov_brw_async(int cmd, struct obd_export *exp,
1701                          struct obd_info *oinfo, obd_count oa_bufs,
1702                          struct brw_page *pga, struct obd_trans_info *oti,
1703                          struct ptlrpc_request_set *set)
1704 {
1705         struct lov_request_set *lovset;
1706         struct lov_request *req;
1707         struct list_head *pos;
1708         struct lov_obd *lov = &exp->exp_obd->u.lov;
1709         int rc = 0;
1710         ENTRY;
1711
1712         LASSERT(oinfo);
1713         ASSERT_LSM_MAGIC(oinfo->oi_md);
1714
1715         if (cmd == OBD_BRW_CHECK) {
1716                 rc = lov_brw_check(lov, oinfo, oa_bufs, pga);
1717                 RETURN(rc);
1718         }
1719
1720         rc = lov_prep_brw_set(exp, oinfo, oa_bufs, pga, oti, &lovset);
1721         if (rc)
1722                 RETURN(rc);
1723
1724         list_for_each (pos, &lovset->set_list) {
1725                 struct obd_export *sub_exp;
1726                 struct brw_page *sub_pga;
1727                 req = list_entry(pos, struct lov_request, rq_link);
1728
1729                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
1730                 sub_pga = lovset->set_pga + req->rq_pgaidx;
1731                 rc = obd_brw_async(cmd, sub_exp, &req->rq_oi, req->rq_oabufs,
1732                                    sub_pga, oti, set);
1733                 if (rc)
1734                         GOTO(out, rc);
1735                 lov_update_common_set(lovset, req, rc);
1736         }
1737         LASSERT(rc == 0);
1738         LASSERT(set->set_interpret == NULL);
1739         LASSERT(set->set_arg == NULL);
1740         rc = ptlrpc_set_add_cb(set, lov_brw_interpret, lovset);
1741         if (rc)
1742                 GOTO(out, rc);
1743
1744         RETURN(rc);
1745 out:
1746         lov_fini_brw_set(lovset);
1747         RETURN(rc);
1748 }
1749
1750 static int lov_ap_make_ready(void *data, int cmd)
1751 {
1752         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1753
1754         return lap->lap_caller_ops->ap_make_ready(lap->lap_caller_data, cmd);
1755 }
1756
1757 static int lov_ap_refresh_count(void *data, int cmd)
1758 {
1759         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1760
1761         return lap->lap_caller_ops->ap_refresh_count(lap->lap_caller_data,
1762                                                      cmd);
1763 }
1764
1765 static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
1766 {
1767         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1768
1769         lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa);
1770         /* XXX woah, shouldn't we be altering more here?  size? */
1771         oa->o_id = lap->lap_loi_id;
1772         oa->o_gr = lap->lap_loi_gr;
1773         oa->o_valid |= OBD_MD_FLGROUP;
1774         oa->o_stripe_idx = lap->lap_stripe;
1775 }
1776
1777 static void lov_ap_update_obdo(void *data, int cmd, struct obdo *oa,
1778                                obd_valid valid)
1779 {
1780         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1781
1782         lap->lap_caller_ops->ap_update_obdo(lap->lap_caller_data, cmd,oa,valid);
1783 }
1784
1785 static int lov_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
1786 {
1787         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1788
1789         /* in a raid1 regime this would down a count of many ios
1790          * in flight, onl calling the caller_ops completion when all
1791          * the raid1 ios are complete */
1792         rc = lap->lap_caller_ops->ap_completion(lap->lap_caller_data,cmd,oa,rc);
1793         return rc;
1794 }
1795
1796 static struct obd_capa *lov_ap_lookup_capa(void *data, int cmd)
1797 {
1798         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1799         return lap->lap_caller_ops->ap_lookup_capa(lap->lap_caller_data, cmd);
1800 }
1801
1802 static struct obd_async_page_ops lov_async_page_ops = {
1803         .ap_make_ready =        lov_ap_make_ready,
1804         .ap_refresh_count =     lov_ap_refresh_count,
1805         .ap_fill_obdo =         lov_ap_fill_obdo,
1806         .ap_update_obdo =       lov_ap_update_obdo,
1807         .ap_completion =        lov_ap_completion,
1808         .ap_lookup_capa =       lov_ap_lookup_capa,
1809 };
1810
1811 int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
1812                            struct lov_oinfo *loi, cfs_page_t *page,
1813                            obd_off offset, struct obd_async_page_ops *ops,
1814                            void *data, void **res, int nocache,
1815                            struct lustre_handle *lockh)
1816 {
1817         struct lov_obd *lov = &exp->exp_obd->u.lov;
1818         struct lov_async_page *lap;
1819         struct lov_lock_handles *lov_lockh = NULL;
1820         int rc = 0;
1821         ENTRY;
1822
1823         if (!page) {
1824                 int i = 0;
1825                 /* Find an existing osc so we can get it's stupid sizeof(*oap).
1826                    Only because of this layering limitation will a client
1827                    mount with no osts fail */
1828                 while (!lov->lov_tgts || !lov->lov_tgts[i] ||
1829                        !lov->lov_tgts[i]->ltd_exp) {
1830                         i++;
1831                         if (i >= lov->desc.ld_tgt_count)
1832                                 RETURN(-ENOMEDIUM);
1833                 }
1834                 rc = size_round(sizeof(*lap)) +
1835                         obd_prep_async_page(lov->lov_tgts[i]->ltd_exp, NULL,
1836                                             NULL, NULL, 0, NULL, NULL, NULL, 0,
1837                                             NULL);
1838                 RETURN(rc);
1839         }
1840         ASSERT_LSM_MAGIC(lsm);
1841         LASSERT(loi == NULL);
1842
1843         lap = *res;
1844         lap->lap_magic = LOV_AP_MAGIC;
1845         lap->lap_caller_ops = ops;
1846         lap->lap_caller_data = data;
1847
1848         /* for now only raid 0 which passes through */
1849         lap->lap_stripe = lov_stripe_number(lsm, offset);
1850         lov_stripe_offset(lsm, offset, lap->lap_stripe, &lap->lap_sub_offset);
1851         loi = lsm->lsm_oinfo[lap->lap_stripe];
1852
1853         /* so the callback doesn't need the lsm */
1854         lap->lap_loi_id = loi->loi_id;
1855         lap->lap_loi_gr = lsm->lsm_object_gr;
1856         LASSERT(lsm->lsm_object_gr > 0);
1857         
1858         lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap));
1859
1860         if (lockh) {
1861                 lov_lockh = lov_handle2llh(lockh);
1862                 if (lov_lockh) {
1863                         lockh = lov_lockh->llh_handles + lap->lap_stripe;
1864                 }
1865         }
1866
1867         rc = obd_prep_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
1868                                  lsm, loi, page, lap->lap_sub_offset,
1869                                  &lov_async_page_ops, lap,
1870                                  &lap->lap_sub_cookie, nocache, lockh);
1871         if (lov_lockh)
1872                 lov_llh_put(lov_lockh);
1873         if (rc)
1874                 RETURN(rc);
1875         CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
1876                lap->lap_sub_cookie, offset);
1877         RETURN(0);
1878 }
1879
1880 static int lov_queue_async_io(struct obd_export *exp,
1881                               struct lov_stripe_md *lsm,
1882                               struct lov_oinfo *loi, void *cookie,
1883                               int cmd, obd_off off, int count,
1884                               obd_flag brw_flags, obd_flag async_flags)
1885 {
1886         struct lov_obd *lov = &exp->exp_obd->u.lov;
1887         struct lov_async_page *lap;
1888         int rc;
1889
1890         LASSERT(loi == NULL);
1891
1892         ASSERT_LSM_MAGIC(lsm);
1893
1894         lap = LAP_FROM_COOKIE(cookie);
1895
1896         loi = lsm->lsm_oinfo[lap->lap_stripe];
1897
1898         rc = obd_queue_async_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp, lsm,
1899                                 loi, lap->lap_sub_cookie, cmd, off, count,
1900                                 brw_flags, async_flags);
1901         RETURN(rc);
1902 }
1903
1904 static int lov_set_async_flags(struct obd_export *exp,
1905                                struct lov_stripe_md *lsm,
1906                                struct lov_oinfo *loi, void *cookie,
1907                                obd_flag async_flags)
1908 {
1909         struct lov_obd *lov = &exp->exp_obd->u.lov;
1910         struct lov_async_page *lap;
1911         int rc;
1912
1913         LASSERT(loi == NULL);
1914
1915         ASSERT_LSM_MAGIC(lsm);
1916
1917         lap = LAP_FROM_COOKIE(cookie);
1918
1919         loi = lsm->lsm_oinfo[lap->lap_stripe];
1920
1921         rc = obd_set_async_flags(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
1922                                  lsm, loi, lap->lap_sub_cookie, async_flags);
1923         RETURN(rc);
1924 }
1925
1926 static int lov_queue_group_io(struct obd_export *exp,
1927                               struct lov_stripe_md *lsm,
1928                               struct lov_oinfo *loi,
1929                               struct obd_io_group *oig, void *cookie,
1930                               int cmd, obd_off off, int count,
1931                               obd_flag brw_flags, obd_flag async_flags)
1932 {
1933         struct lov_obd *lov = &exp->exp_obd->u.lov;
1934         struct lov_async_page *lap;
1935         int rc;
1936
1937         LASSERT(loi == NULL);
1938
1939         ASSERT_LSM_MAGIC(lsm);
1940
1941         lap = LAP_FROM_COOKIE(cookie);
1942
1943         loi = lsm->lsm_oinfo[lap->lap_stripe];
1944
1945         rc = obd_queue_group_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp, lsm,
1946                                 loi, oig, lap->lap_sub_cookie, cmd, off, count,
1947                                 brw_flags, async_flags);
1948         RETURN(rc);
1949 }
1950
1951 /* this isn't exactly optimal.  we may have queued sync io in oscs on
1952  * all stripes, but we don't record that fact at queue time.  so we
1953  * trigger sync io on all stripes. */
1954 static int lov_trigger_group_io(struct obd_export *exp,
1955                                 struct lov_stripe_md *lsm,
1956                                 struct lov_oinfo *loi,
1957                                 struct obd_io_group *oig)
1958 {
1959         struct lov_obd *lov = &exp->exp_obd->u.lov;
1960         int rc = 0, i, err;
1961
1962         LASSERT(loi == NULL);
1963
1964         ASSERT_LSM_MAGIC(lsm);
1965
1966         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1967                 loi = lsm->lsm_oinfo[i];
1968                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1969                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1970                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1971                         continue;
1972                 }
1973
1974                 err = obd_trigger_group_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
1975                                            lsm, loi, oig);
1976                 if (rc == 0 && err != 0)
1977                         rc = err;
1978         };
1979         RETURN(rc);
1980 }
1981
1982 static int lov_teardown_async_page(struct obd_export *exp,
1983                                    struct lov_stripe_md *lsm,
1984                                    struct lov_oinfo *loi, void *cookie)
1985 {
1986         struct lov_obd *lov = &exp->exp_obd->u.lov;
1987         struct lov_async_page *lap;
1988         int rc;
1989
1990         LASSERT(loi == NULL);
1991
1992         ASSERT_LSM_MAGIC(lsm);
1993
1994         lap = LAP_FROM_COOKIE(cookie);
1995
1996         loi = lsm->lsm_oinfo[lap->lap_stripe];
1997
1998         rc = obd_teardown_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
1999                                      lsm, loi, lap->lap_sub_cookie);
2000         if (rc) {
2001                 CERROR("unable to teardown sub cookie %p: %d\n",
2002                        lap->lap_sub_cookie, rc);
2003                 RETURN(rc);
2004         }
2005         RETURN(rc);
2006 }
2007
2008 static int lov_enqueue_interpret(struct ptlrpc_request_set *rqset,
2009                                  void *data, int rc)
2010 {
2011         struct lov_request_set *lovset = (struct lov_request_set *)data;
2012         ENTRY;
2013         rc = lov_fini_enqueue_set(lovset, lovset->set_ei->ei_mode, rc, rqset);
2014         RETURN(rc);
2015 }
2016
2017 static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2018                        struct ldlm_enqueue_info *einfo,
2019                        struct ptlrpc_request_set *rqset)
2020 {
2021         ldlm_mode_t mode = einfo->ei_mode;
2022         struct lov_request_set *set;
2023         struct lov_request *req;
2024         struct list_head *pos;
2025         struct lov_obd *lov;
2026         ldlm_error_t rc;
2027         ENTRY;
2028
2029         LASSERT(oinfo);
2030         ASSERT_LSM_MAGIC(oinfo->oi_md);
2031         LASSERT(mode == (mode & -mode));
2032
2033         /* we should never be asked to replay a lock this way. */
2034         LASSERT((oinfo->oi_flags & LDLM_FL_REPLAY) == 0);
2035
2036         if (!exp || !exp->exp_obd)
2037                 RETURN(-ENODEV);
2038
2039         lov = &exp->exp_obd->u.lov;
2040         rc = lov_prep_enqueue_set(exp, oinfo, einfo, &set);
2041         if (rc)
2042                 RETURN(rc);
2043
2044         list_for_each (pos, &set->set_list) {
2045                 req = list_entry(pos, struct lov_request, rq_link);
2046
2047                 rc = obd_enqueue(lov->lov_tgts[req->rq_idx]->ltd_exp,
2048                                  &req->rq_oi, einfo, rqset);
2049                 if (rc != ELDLM_OK)
2050                         GOTO(out, rc);
2051         }
2052
2053         if (rqset && !list_empty(&rqset->set_requests)) {
2054                 LASSERT(rc == 0);
2055                 LASSERT(rqset->set_interpret == NULL);
2056                 rqset->set_interpret = lov_enqueue_interpret;
2057                 rqset->set_arg = (void *)set;
2058                 RETURN(rc);
2059         }
2060 out:
2061         rc = lov_fini_enqueue_set(set, mode, rc, rqset);
2062         RETURN(rc);
2063 }
2064
2065 static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2066                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2067                      int *flags, void *data, struct lustre_handle *lockh)
2068 {
2069         struct lov_request_set *set;
2070         struct obd_info oinfo;
2071         struct lov_request *req;
2072         struct list_head *pos;
2073         struct lov_obd *lov = &exp->exp_obd->u.lov;
2074         struct lustre_handle *lov_lockhp;
2075         int lov_flags, rc = 0;
2076         ENTRY;
2077
2078         ASSERT_LSM_MAGIC(lsm);
2079         LASSERT((*flags & LDLM_FL_TEST_LOCK) || mode == (mode & -mode));
2080
2081         if (!exp || !exp->exp_obd)
2082                 RETURN(-ENODEV);
2083
2084         lov = &exp->exp_obd->u.lov;
2085         rc = lov_prep_match_set(exp, &oinfo, lsm, policy, mode, lockh, &set);
2086         if (rc)
2087                 RETURN(rc);
2088
2089         list_for_each (pos, &set->set_list) {
2090                 ldlm_policy_data_t sub_policy;
2091                 req = list_entry(pos, struct lov_request, rq_link);
2092                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
2093                 LASSERT(lov_lockhp);
2094
2095                 lov_flags = *flags;
2096                 sub_policy.l_extent = req->rq_oi.oi_policy.l_extent;
2097
2098                 rc = obd_match(lov->lov_tgts[req->rq_idx]->ltd_exp,
2099                                req->rq_oi.oi_md, type, &sub_policy,
2100                                mode, &lov_flags, data, lov_lockhp);
2101                 rc = lov_update_match_set(set, req, rc);
2102                 if (rc <= 0)
2103                         break;
2104         }
2105         lov_fini_match_set(set, mode, *flags);
2106         RETURN(rc);
2107 }
2108
2109 static int lov_change_cbdata(struct obd_export *exp,
2110                              struct lov_stripe_md *lsm, ldlm_iterator_t it,
2111                              void *data)
2112 {
2113         struct lov_obd *lov;
2114         int rc = 0, i;
2115         ENTRY;
2116
2117         ASSERT_LSM_MAGIC(lsm);
2118
2119         if (!exp || !exp->exp_obd)
2120                 RETURN(-ENODEV);
2121
2122         LASSERT(lsm->lsm_object_gr > 0);
2123
2124         lov = &exp->exp_obd->u.lov;
2125         for (i = 0; i < lsm->lsm_stripe_count; i++) {
2126                 struct lov_stripe_md submd;
2127                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
2128
2129                 if (!lov->lov_tgts[loi->loi_ost_idx]) {
2130                         CDEBUG(D_HA, "lov idx %d NULL \n", loi->loi_ost_idx);
2131                         continue;
2132                 }
2133                 
2134                 submd.lsm_object_id = loi->loi_id;
2135                 submd.lsm_object_gr = lsm->lsm_object_gr;
2136                 submd.lsm_stripe_count = 0;
2137                 rc = obd_change_cbdata(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
2138                                        &submd, it, data);
2139         }
2140         RETURN(rc);
2141 }
2142
2143 static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
2144                       __u32 mode, struct lustre_handle *lockh)
2145 {
2146         struct lov_request_set *set;
2147         struct obd_info oinfo;
2148         struct lov_request *req;
2149         struct list_head *pos;
2150         struct lov_obd *lov = &exp->exp_obd->u.lov;
2151         struct lustre_handle *lov_lockhp;
2152         int err = 0, rc = 0;
2153         ENTRY;
2154
2155         ASSERT_LSM_MAGIC(lsm);
2156
2157         if (!exp || !exp->exp_obd)
2158                 RETURN(-ENODEV);
2159
2160         LASSERT(lsm->lsm_object_gr > 0);
2161         LASSERT(lockh);
2162         lov = &exp->exp_obd->u.lov;
2163         rc = lov_prep_cancel_set(exp, &oinfo, lsm, mode, lockh, &set);
2164         if (rc)
2165                 RETURN(rc);
2166
2167         list_for_each (pos, &set->set_list) {
2168                 req = list_entry(pos, struct lov_request, rq_link);
2169                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
2170
2171                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
2172                                 req->rq_oi.oi_md, mode, lov_lockhp);
2173                 rc = lov_update_common_set(set, req, rc);
2174                 if (rc) {
2175                         CERROR("error: cancel objid "LPX64" subobj "
2176                                LPX64" on OST idx %d: rc = %d\n",
2177                                lsm->lsm_object_id,
2178                                req->rq_oi.oi_md->lsm_object_id,
2179                                req->rq_idx, rc);
2180                         err = rc;
2181                 }
2182
2183         }
2184         lov_fini_cancel_set(set);
2185         RETURN(err);
2186 }
2187
2188 static int lov_cancel_unused(struct obd_export *exp,
2189                              struct lov_stripe_md *lsm,
2190                              int flags, void *opaque)
2191 {
2192         struct lov_obd *lov;
2193         int rc = 0, i;
2194         ENTRY;
2195
2196         if (!exp || !exp->exp_obd)
2197                 RETURN(-ENODEV);
2198
2199         lov = &exp->exp_obd->u.lov;
2200         if (lsm == NULL) {
2201                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2202                         int err;
2203                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
2204                                 continue;
2205
2206                         err = obd_cancel_unused(lov->lov_tgts[i]->ltd_exp, NULL,
2207                                                 flags, opaque);
2208                         if (!rc)
2209                                 rc = err;
2210                 }
2211                 RETURN(rc);
2212         }
2213
2214         ASSERT_LSM_MAGIC(lsm);
2215
2216         LASSERT(lsm->lsm_object_gr > 0);
2217         for (i = 0; i < lsm->lsm_stripe_count; i++) {
2218                 struct lov_stripe_md submd;
2219                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
2220                 int err;
2221
2222                 if (!lov->lov_tgts[loi->loi_ost_idx]) {
2223                         CDEBUG(D_HA, "lov idx %d NULL\n", loi->loi_ost_idx);
2224                         continue;
2225                 }
2226
2227                 if (!lov->lov_tgts[loi->loi_ost_idx]->ltd_active)
2228                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
2229
2230                 submd.lsm_object_id = loi->loi_id;
2231                 submd.lsm_object_gr = lsm->lsm_object_gr;
2232                 submd.lsm_stripe_count = 0;
2233                 err = obd_cancel_unused(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
2234                                         &submd, flags, opaque);
2235                 if (err && lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
2236                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
2237                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
2238                                loi->loi_id, loi->loi_ost_idx, err);
2239                         if (!rc)
2240                                 rc = err;
2241                 }
2242         }
2243         RETURN(rc);
2244 }
2245
2246 static int lov_join_lru(struct obd_export *exp,
2247                         struct lov_stripe_md *lsm, int join)
2248 {
2249         struct lov_obd *lov;
2250         int i, count = 0;
2251         ENTRY;
2252
2253         ASSERT_LSM_MAGIC(lsm);
2254         if (!exp || !exp->exp_obd)
2255                 RETURN(-ENODEV);
2256
2257         lov = &exp->exp_obd->u.lov;
2258         for (i = 0; i < lsm->lsm_stripe_count; i++) {
2259                 struct lov_stripe_md submd;
2260                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
2261                 int rc = 0;
2262
2263                 if (!lov->lov_tgts[loi->loi_ost_idx]) {
2264                         CDEBUG(D_HA, "lov idx %d NULL\n", loi->loi_ost_idx);
2265                         continue;
2266                 }
2267
2268                 if (!lov->lov_tgts[loi->loi_ost_idx]->ltd_active)
2269                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
2270
2271                 submd.lsm_object_id = loi->loi_id;
2272                 submd.lsm_object_gr = lsm->lsm_object_gr;
2273                 submd.lsm_stripe_count = 0;
2274                 rc = obd_join_lru(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
2275                                   &submd, join);
2276                 if (rc < 0) {
2277                         CERROR("join lru failed. objid: "LPX64" subobj: "LPX64
2278                                " ostidx: %d rc: %d\n", lsm->lsm_object_id,
2279                                loi->loi_id, loi->loi_ost_idx, rc);
2280                         return rc;
2281                 } else {
2282                         count += rc;
2283                 }
2284         }
2285         RETURN(count);
2286 }
2287
2288 static int lov_statfs_interpret(struct ptlrpc_request_set *rqset,
2289                                 void *data, int rc)
2290 {
2291         struct lov_request_set *lovset = (struct lov_request_set *)data;
2292         int err;
2293         ENTRY;
2294
2295         if (rc)
2296                 lovset->set_completes = 0;
2297
2298         err = lov_fini_statfs_set(lovset);
2299         RETURN(rc ? rc : err);
2300 }
2301
2302 static int lov_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
2303                             __u64 max_age, struct ptlrpc_request_set *rqset)
2304 {
2305         struct lov_request_set *set;
2306         struct lov_request *req;
2307         struct list_head *pos;
2308         struct lov_obd *lov;
2309         int rc = 0;
2310         ENTRY;
2311
2312         LASSERT(oinfo != NULL);
2313         LASSERT(oinfo->oi_osfs != NULL);
2314
2315         lov = &obd->u.lov;
2316         rc = lov_prep_statfs_set(obd, oinfo, &set);
2317         if (rc)
2318                 RETURN(rc);
2319
2320         list_for_each (pos, &set->set_list) {
2321                 struct obd_device *osc_obd;
2322
2323                 req = list_entry(pos, struct lov_request, rq_link);
2324
2325                 osc_obd = class_exp2obd(lov->lov_tgts[req->rq_idx]->ltd_exp);
2326                 rc = obd_statfs_async(osc_obd, &req->rq_oi, max_age, rqset);
2327                 if (rc)
2328                         break;
2329         }
2330
2331         if (rc || list_empty(&rqset->set_requests)) {
2332                 int err;
2333                 if (rc)
2334                         set->set_completes = 0;
2335                 err = lov_fini_statfs_set(set);
2336                 RETURN(rc ? rc : err);
2337         }
2338
2339         LASSERT(rqset->set_interpret == NULL);
2340         rqset->set_interpret = lov_statfs_interpret;
2341         rqset->set_arg = (void *)set;
2342         RETURN(0);
2343 }
2344
2345 static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2346                       __u64 max_age, __u32 flags)
2347 {
2348         struct ptlrpc_request_set *set = NULL;
2349         struct obd_info oinfo = { { { 0 } } };
2350         int rc = 0;
2351         ENTRY;
2352
2353
2354         /* for obdclass we forbid using obd_statfs_rqset, but prefer using async
2355          * statfs requests */
2356         set = ptlrpc_prep_set();
2357         if (set == NULL)
2358                 RETURN(-ENOMEM);
2359
2360         oinfo.oi_osfs = osfs;
2361         oinfo.oi_flags = flags;
2362         rc = lov_statfs_async(obd, &oinfo, max_age, set);
2363         if (rc == 0)
2364                 rc = ptlrpc_set_wait(set);
2365         ptlrpc_set_destroy(set);
2366
2367         RETURN(rc);
2368 }
2369
2370 static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2371                          void *karg, void *uarg)
2372 {
2373         struct obd_device *obddev = class_exp2obd(exp);
2374         struct lov_obd *lov = &obddev->u.lov;
2375         int i, rc, count = lov->desc.ld_tgt_count;
2376         struct obd_uuid *uuidp;
2377         ENTRY;
2378
2379         switch (cmd) {
2380         case IOC_OBD_STATFS: {
2381                 struct obd_ioctl_data *data = karg;
2382                 struct obd_device *osc_obd;
2383                 struct obd_statfs stat_buf = {0};
2384                 __u32 index;
2385
2386                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
2387                 LASSERT(data->ioc_plen1 == sizeof(struct obd_statfs));
2388
2389                 if ((index >= count))
2390                         RETURN(-ENODEV);
2391
2392                 if (!lov->lov_tgts[index])
2393                         /* Try again with the next index */
2394                         RETURN(-EAGAIN);
2395                 if (!lov->lov_tgts[index]->ltd_active)
2396                         RETURN(-ENODATA);
2397
2398                 osc_obd = class_exp2obd(lov->lov_tgts[index]->ltd_exp);
2399                 if (!osc_obd)
2400                         RETURN(-EINVAL);
2401
2402                 /* got statfs data */
2403                 rc = obd_statfs(osc_obd, &stat_buf,
2404                                 cfs_time_current_64() - HZ, 0);
2405                 if (rc)
2406                         RETURN(rc);
2407                 if (copy_to_user(data->ioc_pbuf1, &stat_buf, data->ioc_plen1))
2408                         RETURN(rc);
2409                 /* copy UUID */
2410                 rc = copy_to_user(data->ioc_pbuf2, obd2cli_tgt(osc_obd),
2411                                   data->ioc_plen2);
2412                 break;
2413         }
2414         case OBD_IOC_LOV_GET_CONFIG: {
2415                 struct obd_ioctl_data *data;
2416                 struct lov_desc *desc;
2417                 char *buf = NULL;
2418                 __u32 *genp;
2419
2420                 len = 0;
2421                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2422                         RETURN(-EINVAL);
2423
2424                 data = (struct obd_ioctl_data *)buf;
2425
2426                 if (sizeof(*desc) > data->ioc_inllen1) {
2427                         obd_ioctl_freedata(buf, len);
2428                         RETURN(-EINVAL);
2429                 }
2430
2431                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
2432                         obd_ioctl_freedata(buf, len);
2433                         RETURN(-EINVAL);
2434                 }
2435
2436                 if (sizeof(__u32) * count > data->ioc_inllen3) {
2437                         obd_ioctl_freedata(buf, len);
2438                         RETURN(-EINVAL);
2439                 }
2440
2441                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2442                 memcpy(desc, &(lov->desc), sizeof(*desc));
2443
2444                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
2445                 genp = (__u32 *)data->ioc_inlbuf3;
2446                 /* the uuid will be empty for deleted OSTs */
2447                 for (i = 0; i < count; i++, uuidp++, genp++) {
2448                         if (!lov->lov_tgts[i])
2449                                 continue;
2450                         *uuidp = lov->lov_tgts[i]->ltd_uuid;
2451                         *genp = lov->lov_tgts[i]->ltd_gen;
2452                 }
2453
2454                 rc = copy_to_user((void *)uarg, buf, len);
2455                 if (rc)
2456                         rc = -EFAULT;
2457                 obd_ioctl_freedata(buf, len);
2458                 break;
2459         }
2460         case LL_IOC_LOV_SETSTRIPE:
2461                 rc = lov_setstripe(exp, karg, uarg);
2462                 break;
2463         case LL_IOC_LOV_GETSTRIPE:
2464                 rc = lov_getstripe(exp, karg, uarg);
2465                 break;
2466         case LL_IOC_LOV_SETEA:
2467                 rc = lov_setea(exp, karg, uarg);
2468                 break;
2469         default: {
2470                 int set = 0;
2471
2472                 if (count == 0)
2473                         RETURN(-ENOTTY);
2474
2475                 rc = 0;
2476                 for (i = 0; i < count; i++) {
2477                         int err;
2478
2479                         /* OST was disconnected */
2480                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
2481                                 continue;
2482
2483                         err = obd_iocontrol(cmd, lov->lov_tgts[i]->ltd_exp,
2484                                             len, karg, uarg);
2485                         if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
2486                                 RETURN(err);
2487                         } else if (err) {
2488                                 if (lov->lov_tgts[i]->ltd_active) {
2489                                         CDEBUG(err == -ENOTTY ?
2490                                                D_IOCTL : D_WARNING,
2491                                                "iocontrol OSC %s on OST "
2492                                                "idx %d cmd %x: err = %d\n",
2493                                                lov_uuid2str(lov, i),
2494                                                i, cmd, err);
2495                                         if (!rc)
2496                                                 rc = err;
2497                                 }
2498                         } else {
2499                                 set = 1;
2500                         }
2501                 }
2502                 if (!set && !rc)
2503                         rc = -EIO;
2504         }
2505         }
2506
2507         RETURN(rc);
2508 }
2509
2510 static int lov_get_info(struct obd_export *exp, __u32 keylen,
2511                         void *key, __u32 *vallen, void *val)
2512 {
2513         struct obd_device *obddev = class_exp2obd(exp);
2514         struct lov_obd *lov = &obddev->u.lov;
2515         int i, rc;
2516         ENTRY;
2517
2518         if (!vallen || !val)
2519                 RETURN(-EFAULT);
2520
2521         lov_getref(obddev);
2522
2523         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2524                 struct {
2525                         char name[16];
2526                         struct ldlm_lock *lock;
2527                         struct lov_stripe_md *lsm;
2528                 } *data = key;
2529                 struct ldlm_res_id *res_id = &data->lock->l_resource->lr_name;
2530                 struct lov_oinfo *loi;
2531                 __u32 *stripe = val;
2532
2533                 if (*vallen < sizeof(*stripe))
2534                         GOTO(out, rc = -EFAULT);
2535                 *vallen = sizeof(*stripe);
2536
2537                 /* XXX This is another one of those bits that will need to
2538                  * change if we ever actually support nested LOVs.  It uses
2539                  * the lock's export to find out which stripe it is. */
2540                 /* XXX - it's assumed all the locks for deleted OSTs have
2541                  * been cancelled. Also, the export for deleted OSTs will
2542                  * be NULL and won't match the lock's export. */
2543                 for (i = 0; i < data->lsm->lsm_stripe_count; i++) {
2544                         loi = data->lsm->lsm_oinfo[i];
2545                         if (!lov->lov_tgts[loi->loi_ost_idx])
2546                                 continue;
2547                         if (lov->lov_tgts[loi->loi_ost_idx]->ltd_exp ==
2548                             data->lock->l_conn_export &&
2549                             osc_res_name_eq(loi->loi_id, loi->loi_gr, res_id)) {
2550                                 *stripe = i;
2551                                 GOTO(out, rc = 0);
2552                         }
2553                 }
2554                 LDLM_ERROR(data->lock, "lock on inode without such object");
2555                 dump_lsm(D_ERROR, data->lsm);
2556                 GOTO(out, rc = -ENXIO);
2557         } else if (KEY_IS(KEY_LAST_ID)) {
2558                 struct obd_id_info *info = val;
2559                 __u32 size = sizeof(obd_id);
2560                 struct lov_tgt_desc *tgt;
2561
2562                 LASSERT(*vallen == sizeof(struct obd_id_info));
2563                 tgt = lov->lov_tgts[info->idx];
2564
2565                 if (!tgt || !tgt->ltd_active)
2566                         GOTO(out, rc = -ESRCH);
2567
2568                 rc = obd_get_info(tgt->ltd_exp, keylen, key, &size, info->data);
2569                 GOTO(out, rc = 0);
2570         } else if (KEY_IS(KEY_LOVDESC)) {
2571                 struct lov_desc *desc_ret = val;
2572                 *desc_ret = lov->desc;
2573
2574                 GOTO(out, rc = 0);
2575         } else if (KEY_IS(KEY_LOV_IDX)) {
2576                 struct lov_tgt_desc *tgt;
2577
2578                 for(i = 0; i < lov->desc.ld_tgt_count; i++) {
2579                         tgt = lov->lov_tgts[i];
2580                         if (tgt && obd_uuid_equals(val, &tgt->ltd_uuid))
2581                                 GOTO(out, rc = i);
2582                 }
2583         }
2584
2585         rc = -EINVAL;
2586 out:
2587         lov_putref(obddev);
2588         RETURN(rc);
2589 }
2590
2591 static int lov_set_info_async(struct obd_export *exp, obd_count keylen,
2592                               void *key, obd_count vallen, void *val,
2593                               struct ptlrpc_request_set *set)
2594 {
2595         struct obd_device *obddev = class_exp2obd(exp);
2596         struct lov_obd *lov = &obddev->u.lov;
2597         obd_count count;
2598         int i, rc = 0, err;
2599         struct lov_tgt_desc *tgt;
2600         unsigned incr, check_uuid,
2601                  do_inactive, no_set;
2602         unsigned next_id = 0,  mds_con = 0;
2603         ENTRY;
2604
2605         incr = check_uuid = do_inactive = no_set = 0;
2606         if (set == NULL) {
2607                 no_set = 1;
2608                 set = ptlrpc_prep_set();
2609                 if (!set)
2610                         RETURN(-ENOMEM);
2611         }
2612
2613         lov_getref(obddev);
2614         count = lov->desc.ld_tgt_count;
2615
2616         if (KEY_IS(KEY_NEXT_ID)) {
2617                 count = vallen / sizeof(struct obd_id_info);
2618                 vallen = sizeof(obd_id);
2619                 incr = sizeof(struct obd_id_info);
2620                 do_inactive = 1;
2621                 next_id = 1;
2622         } else if (KEY_IS(KEY_CHECKSUM)) {
2623                 do_inactive = 1;
2624         } else if (KEY_IS(KEY_UNLINKED)) {
2625                 check_uuid = val ? 1 : 0;
2626         } else if (KEY_IS(KEY_EVICT_BY_NID)) {
2627                 /* use defaults:  do_inactive = incr = 0; */
2628         } else if (KEY_IS(KEY_MDS_CONN)) {
2629                 mds_con = 1;
2630         }
2631
2632         for (i = 0; i < count; i++, val = (char *)val + incr) {
2633                 if (next_id) {
2634                         tgt = lov->lov_tgts[((struct obd_id_info*)val)->idx];
2635                 } else {
2636                         tgt = lov->lov_tgts[i];
2637                 }
2638                 /* OST was disconnected */
2639                 if (!tgt || !tgt->ltd_exp)
2640                         continue;
2641
2642                 /* OST is inactive and we don't want inactive OSCs */
2643                 if (!tgt->ltd_active && !do_inactive)
2644                         continue;
2645
2646                 if (mds_con) {
2647                         struct mds_group_info *mgi;
2648
2649                         LASSERT(vallen == sizeof(*mgi));
2650                         mgi = (struct mds_group_info *)val;
2651
2652                         /* Only want a specific OSC */
2653                         if (mgi->uuid && !obd_uuid_equals(mgi->uuid,
2654                                                 &tgt->ltd_uuid))
2655                                 continue;
2656
2657                         err = obd_set_info_async(tgt->ltd_exp,
2658                                          keylen, key, sizeof(int),
2659                                          &mgi->group, set);
2660                 } else if (next_id) {
2661                         err = obd_set_info_async(tgt->ltd_exp,
2662                                          keylen, key, vallen,
2663                                          ((struct obd_id_info*)val)->data, set);
2664                 } else  {
2665                         /* Only want a specific OSC */
2666                         if (check_uuid &&
2667                             !obd_uuid_equals(val, &tgt->ltd_uuid))
2668                                 continue;
2669
2670                         err = obd_set_info_async(tgt->ltd_exp,
2671                                          keylen, key, vallen, val, set);
2672                 }
2673
2674                 if (!rc)
2675                         rc = err;
2676         }
2677
2678         lov_putref(obddev);
2679         if (no_set) {
2680                 err = ptlrpc_set_wait(set);
2681                 if (!rc)
2682                         rc = err;
2683                 ptlrpc_set_destroy(set);
2684         }
2685         RETURN(rc);
2686 }
2687
2688 static int lov_checkmd(struct obd_export *exp, struct obd_export *md_exp,
2689                        struct lov_stripe_md *lsm)
2690 {
2691         int rc;
2692         ENTRY;
2693
2694         if (!lsm)
2695                 RETURN(0);
2696         LASSERT(md_exp);
2697         LASSERT(lsm_op_find(lsm->lsm_magic) != NULL);
2698         rc = lsm_op_find(lsm->lsm_magic)->lsm_revalidate(lsm, md_exp->exp_obd);
2699
2700         RETURN(rc);
2701 }
2702
2703 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm)
2704 {
2705         int i, rc = 0;
2706         ENTRY;
2707
2708         for (i = 0; i < lsm->lsm_stripe_count; i++) {
2709                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
2710                 if (loi->loi_ar.ar_rc && !rc)
2711                         rc = loi->loi_ar.ar_rc;
2712                 loi->loi_ar.ar_rc = 0;
2713         }
2714         RETURN(rc);
2715 }
2716 EXPORT_SYMBOL(lov_test_and_clear_async_rc);
2717
2718
2719 static int lov_extent_calc(struct obd_export *exp, struct lov_stripe_md *lsm,
2720                            int cmd, __u64 *offset)
2721 {
2722         __u32 ssize = lsm->lsm_stripe_size;
2723         __u64 start;
2724
2725         start = *offset;
2726         do_div(start, ssize);
2727         start = start * ssize;
2728
2729         CDEBUG(D_DLMTRACE, "offset "LPU64", stripe %u, start "LPU64
2730                            ", end "LPU64"\n", *offset, ssize, start,
2731                            start + ssize - 1);
2732         if (cmd == OBD_CALC_STRIPE_END) {
2733                 *offset = start + ssize - 1;
2734         } else if (cmd == OBD_CALC_STRIPE_START) {
2735                 *offset = start;
2736         } else {
2737                 LBUG();
2738         }
2739
2740         RETURN(0);
2741 }
2742
2743
2744 #if 0
2745 struct lov_multi_wait {
2746         struct ldlm_lock *lock;
2747         wait_queue_t      wait;
2748         int               completed;
2749         int               generation;
2750 };
2751
2752 int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm,
2753                       struct lustre_handle *lockh)
2754 {
2755         struct lov_lock_handles *lov_lockh = NULL;
2756         struct lustre_handle *lov_lockhp;
2757         struct lov_obd *lov;
2758         struct lov_oinfo *loi;
2759         struct lov_multi_wait *queues;
2760         int rc = 0, i;
2761         ENTRY;
2762
2763         ASSERT_LSM_MAGIC(lsm);
2764
2765         if (!exp || !exp->exp_obd)
2766                 RETURN(-ENODEV);
2767
2768         LASSERT(lockh != NULL);
2769         if (lsm->lsm_stripe_count > 1) {
2770                 lov_lockh = lov_handle2llh(lockh);
2771                 if (lov_lockh == NULL) {
2772                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
2773                         RETURN(-EINVAL);
2774                 }
2775
2776                 lov_lockhp = lov_lockh->llh_handles;
2777         } else {
2778                 lov_lockhp = lockh;
2779         }
2780
2781         OBD_ALLOC(queues, lsm->lsm_stripe_count * sizeof(*queues));
2782         if (queues == NULL)
2783                 GOTO(out, rc = -ENOMEM);
2784
2785         lov = &exp->exp_obd->u.lov;
2786         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
2787              i++, loi++, lov_lockhp++) {
2788                 struct ldlm_lock *lock;
2789                 struct obd_device *obd;
2790
2791                 lock = ldlm_handle2lock(lov_lockhp);
2792                 if (lock == NULL) {
2793                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
2794                                loi->loi_ost_idx, loi->loi_id);
2795                         queues[i].completed = 1;
2796                         continue;
2797                 }
2798
2799                 queues[i].lock = lock;
2800                 init_waitqueue_entry(&(queues[i].wait), current);
2801                 add_wait_queue(lock->l_waitq, &(queues[i].wait));
2802
2803                 obd = class_exp2obd(lock->l_conn_export);
2804                 if (obd != NULL)
2805                         imp = obd->u.cli.cl_import;
2806                 if (imp != NULL) {
2807                         spin_lock(&imp->imp_lock);
2808                         queues[i].generation = imp->imp_generation;
2809                         spin_unlock(&imp->imp_lock);
2810                 }
2811         }
2812
2813         lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
2814                                interrupted_completion_wait, &lwd);
2815         rc = l_wait_event_added(check_multi_complete(queues, lsm), &lwi);
2816
2817         for (i = 0; i < lsm->lsm_stripe_count; i++)
2818                 remove_wait_queue(lock->l_waitq, &(queues[i].wait));
2819
2820         if (rc == -EINTR || rc == -ETIMEDOUT) {
2821
2822
2823         }
2824
2825  out:
2826         if (lov_lockh != NULL)
2827                 lov_llh_put(lov_lockh);
2828         RETURN(rc);
2829 }
2830 #endif
2831
2832 void lov_stripe_lock(struct lov_stripe_md *md)
2833 {
2834         LASSERT(md->lsm_lock_owner != cfs_curproc_pid());
2835         spin_lock(&md->lsm_lock);
2836         LASSERT(md->lsm_lock_owner == 0);
2837         md->lsm_lock_owner = cfs_curproc_pid();
2838 }
2839 EXPORT_SYMBOL(lov_stripe_lock);
2840
2841 void lov_stripe_unlock(struct lov_stripe_md *md)
2842 {
2843         LASSERT(md->lsm_lock_owner == cfs_curproc_pid());
2844         md->lsm_lock_owner = 0;
2845         spin_unlock(&md->lsm_lock);
2846 }
2847 EXPORT_SYMBOL(lov_stripe_unlock);
2848
2849 /**
2850  * Checks if requested extent lock is compatible with a lock under the page.
2851  *
2852  * Checks if the lock under \a page is compatible with a read or write lock
2853  * (specified by \a rw) for an extent [\a start , \a end].
2854  *
2855  * \param exp lov export
2856  * \param lsm striping information for the file
2857  * \param res lov_async_page placeholder
2858  * \param rw OBD_BRW_READ if requested for reading,
2859  *           OBD_BRW_WRITE if requested for writing
2860  * \param start start of the requested extent
2861  * \param end end of the requested extent
2862  * \param cookie transparent parameter for passing locking context
2863  *
2864  * \post result == 1, *cookie == context, appropriate lock is referenced or
2865  * \post result == 0
2866  *
2867  * \retval 1 owned lock is reused for the request
2868  * \retval 0 no lock reused for the request
2869  *
2870  * \see lov_release_short_lock
2871  */
2872 static int lov_reget_short_lock(struct obd_export *exp,
2873                                 struct lov_stripe_md *lsm,
2874                                 void **res, int rw,
2875                                 obd_off start, obd_off end,
2876                                 void **cookie)
2877 {
2878         struct lov_async_page *l = *res;
2879         obd_off stripe_start, stripe_end = start;
2880
2881         ENTRY;
2882
2883         /* ensure we don't cross stripe boundaries */
2884         lov_extent_calc(exp, lsm, OBD_CALC_STRIPE_END, &stripe_end);
2885         if (stripe_end <= end)
2886                 RETURN(0);
2887
2888         /* map the region limits to the object limits */
2889         lov_stripe_offset(lsm, start, l->lap_stripe, &stripe_start);
2890         lov_stripe_offset(lsm, end, l->lap_stripe, &stripe_end);
2891
2892         RETURN(obd_reget_short_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
2893                                     lsm_oinfo[l->lap_stripe]->loi_ost_idx]->
2894                                     ltd_exp, NULL, &l->lap_sub_cookie,
2895                                     rw, stripe_start, stripe_end, cookie));
2896 }
2897
2898 /**
2899  * Releases a reference to a lock taken in a "fast" way.
2900  *
2901  * Releases a read or a write (specified by \a rw) lock
2902  * referenced by \a cookie.
2903  *
2904  * \param exp lov export
2905  * \param lsm striping information for the file
2906  * \param end end of the locked extent
2907  * \param rw OBD_BRW_READ if requested for reading,
2908  *           OBD_BRW_WRITE if requested for writing
2909  * \param cookie transparent parameter for passing locking context
2910  *
2911  * \post appropriate lock is dereferenced
2912  *
2913  * \see lov_reget_short_lock
2914  */
2915 static int lov_release_short_lock(struct obd_export *exp,
2916                                   struct lov_stripe_md *lsm, obd_off end,
2917                                   void *cookie, int rw)
2918 {
2919         int stripe;
2920
2921         ENTRY;
2922
2923         stripe = lov_stripe_number(lsm, end);
2924
2925         RETURN(obd_release_short_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
2926                                       lsm_oinfo[stripe]->loi_ost_idx]->
2927                                       ltd_exp, NULL, end, cookie, rw));
2928 }
2929
2930 struct obd_ops lov_obd_ops = {
2931         .o_owner               = THIS_MODULE,
2932         .o_setup               = lov_setup,
2933         .o_precleanup          = lov_precleanup,
2934         .o_cleanup             = lov_cleanup,
2935         .o_process_config      = lov_process_config,
2936         .o_connect             = lov_connect,
2937         .o_disconnect          = lov_disconnect,
2938         .o_statfs              = lov_statfs,
2939         .o_statfs_async        = lov_statfs_async,
2940         .o_packmd              = lov_packmd,
2941         .o_unpackmd            = lov_unpackmd,
2942         .o_checkmd             = lov_checkmd,
2943         .o_create              = lov_create,
2944         .o_destroy             = lov_destroy,
2945         .o_getattr             = lov_getattr,
2946         .o_getattr_async       = lov_getattr_async,
2947         .o_setattr             = lov_setattr,
2948         .o_setattr_async       = lov_setattr_async,
2949         .o_brw                 = lov_brw,
2950         .o_brw_async           = lov_brw_async,
2951         .o_prep_async_page     = lov_prep_async_page,
2952         .o_reget_short_lock    = lov_reget_short_lock,
2953         .o_release_short_lock  = lov_release_short_lock,
2954         .o_queue_async_io      = lov_queue_async_io,
2955         .o_set_async_flags     = lov_set_async_flags,
2956         .o_queue_group_io      = lov_queue_group_io,
2957         .o_trigger_group_io    = lov_trigger_group_io,
2958         .o_teardown_async_page = lov_teardown_async_page,
2959         .o_merge_lvb           = lov_merge_lvb,
2960         .o_adjust_kms          = lov_adjust_kms,
2961         .o_punch               = lov_punch,
2962         .o_sync                = lov_sync,
2963         .o_enqueue             = lov_enqueue,
2964         .o_match               = lov_match,
2965         .o_change_cbdata       = lov_change_cbdata,
2966         .o_cancel              = lov_cancel,
2967         .o_cancel_unused       = lov_cancel_unused,
2968         .o_join_lru            = lov_join_lru,
2969         .o_iocontrol           = lov_iocontrol,
2970         .o_get_info            = lov_get_info,
2971         .o_set_info_async      = lov_set_info_async,
2972         .o_extent_calc         = lov_extent_calc,
2973         .o_llog_init           = lov_llog_init,
2974         .o_llog_finish         = lov_llog_finish,
2975         .o_notify              = lov_notify,
2976         .o_register_page_removal_cb = lov_register_page_removal_cb,
2977         .o_unregister_page_removal_cb = lov_unregister_page_removal_cb,
2978         .o_register_lock_cancel_cb = lov_register_lock_cancel_cb,
2979         .o_unregister_lock_cancel_cb = lov_unregister_lock_cancel_cb,
2980 };
2981
2982 static quota_interface_t *quota_interface;
2983 extern quota_interface_t lov_quota_interface;
2984
2985 cfs_mem_cache_t *lov_oinfo_slab;
2986
2987 int __init lov_init(void)
2988 {
2989         struct lprocfs_static_vars lvars = { 0 };
2990         int rc, rc2;
2991         ENTRY;
2992
2993         lov_oinfo_slab = cfs_mem_cache_create("lov_oinfo",
2994                                               sizeof(struct lov_oinfo), 
2995                                               0, SLAB_HWCACHE_ALIGN);
2996         if (lov_oinfo_slab == NULL)
2997                 return -ENOMEM;
2998         lprocfs_lov_init_vars(&lvars);
2999
3000         request_module("lquota");
3001         quota_interface = PORTAL_SYMBOL_GET(lov_quota_interface);
3002         init_obd_quota_ops(quota_interface, &lov_obd_ops);
3003
3004         rc = class_register_type(&lov_obd_ops, NULL, lvars.module_vars,
3005                                  LUSTRE_LOV_NAME, NULL);
3006         if (rc) {
3007                 if (quota_interface)
3008                         PORTAL_SYMBOL_PUT(lov_quota_interface);
3009                 rc2 = cfs_mem_cache_destroy(lov_oinfo_slab);
3010                 LASSERT(rc2 == 0);
3011         }
3012
3013         RETURN(rc);
3014 }
3015
3016 #ifdef __KERNEL__
3017 static void /*__exit*/ lov_exit(void)
3018 {
3019         int rc;
3020         
3021         if (quota_interface)
3022                 PORTAL_SYMBOL_PUT(lov_quota_interface);
3023
3024         class_unregister_type(LUSTRE_LOV_NAME);
3025         rc = cfs_mem_cache_destroy(lov_oinfo_slab);
3026         LASSERT(rc == 0);
3027 }
3028
3029 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3030 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
3031 MODULE_LICENSE("GPL");
3032
3033 cfs_module(lov, LUSTRE_VERSION_STRING, lov_init, lov_exit);
3034 #endif