Whamcloud - gitweb
739fee30559c9019f5893ed6c56aaa75829a46d9
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/lov/lov_obd.c
37  *
38  * Author: Phil Schwan <phil@clusterfs.com>
39  * Author: Peter Braam <braam@clusterfs.com>
40  * Author: Mike Shaver <shaver@clusterfs.com>
41  * Author: Nathan Rutman <nathan@clusterfs.com>
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_LOV
48 #ifdef __KERNEL__
49 #include <libcfs/libcfs.h>
50 #else
51 #include <liblustre.h>
52 #endif
53
54 #include <obd_support.h>
55 #include <lustre_lib.h>
56 #include <lustre_net.h>
57 #include <lustre/lustre_idl.h>
58 #include <lustre_dlm.h>
59 #include <lustre_mds.h>
60 #include <lustre_debug.h>
61 #include <obd_class.h>
62 #include <obd_lov.h>
63 #include <obd_ost.h>
64 #include <lprocfs_status.h>
65 #include <lustre_param.h>
66 #include <lustre_cache.h>
67 #include <lustre/ll_fiemap.h>
68
69 #include "lov_internal.h"
70
71
72 /* Keep a refcount of lov->tgt usage to prevent racing with addition/deletion.
73    Any function that expects lov_tgts to remain stationary must take a ref. */
74 void lov_getref(struct obd_device *obd)
75 {
76         struct lov_obd *lov = &obd->u.lov;
77
78         /* nobody gets through here until lov_putref is done */
79         mutex_down(&lov->lov_lock);
80         atomic_inc(&lov->lov_refcount);
81         mutex_up(&lov->lov_lock);
82         return;
83 }
84
85 static void __lov_del_obd(struct obd_device *obd, __u32 index);
86
87 void lov_putref(struct obd_device *obd)
88 {
89         struct lov_obd *lov = &obd->u.lov;
90         mutex_down(&lov->lov_lock);
91         /* ok to dec to 0 more than once -- ltd_exp's will be null */
92         if (atomic_dec_and_test(&lov->lov_refcount) && lov->lov_death_row) {
93                 int i;
94                 CDEBUG(D_CONFIG, "destroying %d lov targets\n",
95                        lov->lov_death_row);
96                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
97                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_reap)
98                                 continue;
99                         /* Disconnect and delete from list */
100                         __lov_del_obd(obd, i);
101                         lov->lov_death_row--;
102                 }
103         }
104         mutex_up(&lov->lov_lock);
105 }
106
107 static int lov_register_page_removal_cb(struct obd_export *exp,
108                                         obd_page_removal_cb_t func,
109                                         obd_pin_extent_cb pin_cb)
110 {
111         struct lov_obd *lov = &exp->exp_obd->u.lov;
112         int i, rc = 0;
113
114         if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
115                 return -EBUSY;
116
117         if (lov->lov_page_pin_cb && lov->lov_page_pin_cb != pin_cb)
118                 return -EBUSY;
119
120         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
121                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
122                         continue;
123                 rc |= obd_register_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
124                                                    func, pin_cb);
125         }
126
127         lov->lov_page_removal_cb = func;
128         lov->lov_page_pin_cb = pin_cb;
129
130         return rc;
131 }
132
133 static int lov_unregister_page_removal_cb(struct obd_export *exp,
134                                         obd_page_removal_cb_t func)
135 {
136         struct lov_obd *lov = &exp->exp_obd->u.lov;
137         int i, rc = 0;
138
139         if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
140                 return -EINVAL;
141
142         lov->lov_page_removal_cb = NULL;
143         lov->lov_page_pin_cb = NULL;
144
145         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
146                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
147                         continue;
148                 rc |= obd_unregister_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
149                                                      func);
150         }
151
152         return rc;
153 }
154
155 static int lov_register_lock_cancel_cb(struct obd_export *exp,
156                                          obd_lock_cancel_cb func)
157 {
158         struct lov_obd *lov = &exp->exp_obd->u.lov;
159         int i, rc = 0;
160
161         if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
162                 return -EBUSY;
163
164         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
165                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
166                         continue;
167                 rc |= obd_register_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
168                                                   func);
169         }
170
171         lov->lov_lock_cancel_cb = func;
172
173         return rc;
174 }
175
176 static int lov_unregister_lock_cancel_cb(struct obd_export *exp,
177                                          obd_lock_cancel_cb func)
178 {
179         struct lov_obd *lov = &exp->exp_obd->u.lov;
180         int i, rc = 0;
181
182         if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
183                 return -EINVAL;
184
185         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
186                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
187                         continue;
188                 rc |= obd_unregister_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
189                                                     func);
190         }
191         lov->lov_lock_cancel_cb = NULL;
192         return rc;
193 }
194
195 #define MAX_STRING_SIZE 128
196 static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
197                            struct obd_connect_data *data)
198 {
199         struct lov_obd *lov = &obd->u.lov;
200         struct obd_uuid tgt_uuid;
201         struct obd_device *tgt_obd;
202         struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
203         struct lustre_handle conn = {0, };
204         struct obd_import *imp;
205
206 #ifdef __KERNEL__
207         cfs_proc_dir_entry_t *lov_proc_dir;
208 #endif
209         int rc;
210         ENTRY;
211
212         if (!lov->lov_tgts[index])
213                 RETURN(-EINVAL);
214
215         tgt_uuid = lov->lov_tgts[index]->ltd_uuid;
216
217         tgt_obd = class_find_client_obd(&tgt_uuid, LUSTRE_OSC_NAME,
218                                         &obd->obd_uuid);
219
220         if (!tgt_obd) {
221                 CERROR("Target %s not attached\n", obd_uuid2str(&tgt_uuid));
222                 RETURN(-EINVAL);
223         }
224         if (!tgt_obd->obd_set_up) {
225                 CERROR("Target %s not set up\n", obd_uuid2str(&tgt_uuid));
226                 RETURN(-EINVAL);
227         }
228
229         if (data && (data->ocd_connect_flags & OBD_CONNECT_INDEX))
230                 data->ocd_index = index;
231
232         /*
233          * Divine LOV knows that OBDs under it are OSCs.
234          */
235         imp = tgt_obd->u.cli.cl_import;
236
237         if (activate) {
238                 tgt_obd->obd_no_recov = 0;
239                 /* FIXME this is probably supposed to be 
240                    ptlrpc_set_import_active.  Horrible naming. */
241                 ptlrpc_activate_import(imp);
242         }
243
244         if (imp->imp_invalid) {
245                 CERROR("not connecting OSC %s; administratively "
246                        "disabled\n", obd_uuid2str(&tgt_uuid));
247                 rc = obd_register_observer(tgt_obd, obd);
248                 if (rc) {
249                         CERROR("Target %s register_observer error %d; "
250                                "will not be able to reactivate\n",
251                                obd_uuid2str(&tgt_uuid), rc);
252                 }
253                 RETURN(0);
254         }
255
256         rc = obd_connect(NULL, &conn, tgt_obd, &lov_osc_uuid, data, NULL);
257         if (rc) {
258                 CERROR("Target %s connect error %d\n",
259                        obd_uuid2str(&tgt_uuid), rc);
260                 RETURN(rc);
261         }
262         lov->lov_tgts[index]->ltd_exp = class_conn2export(&conn);
263         if (!lov->lov_tgts[index]->ltd_exp) {
264                 CERROR("Target %s: null export!\n", obd_uuid2str(&tgt_uuid));
265                 RETURN(-ENODEV);
266         }
267
268         rc = obd_register_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
269                                           lov->lov_page_removal_cb,
270                                           lov->lov_page_pin_cb);
271         if (rc) {
272                 obd_disconnect(lov->lov_tgts[index]->ltd_exp);
273                 lov->lov_tgts[index]->ltd_exp = NULL;
274                 RETURN(rc);
275         }
276
277         rc = obd_register_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
278                                          lov->lov_lock_cancel_cb);
279         if (rc) {
280                 obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
281                                                lov->lov_page_removal_cb);
282                 obd_disconnect(lov->lov_tgts[index]->ltd_exp);
283                 lov->lov_tgts[index]->ltd_exp = NULL;
284                 RETURN(rc);
285         }
286
287         rc = obd_register_observer(tgt_obd, obd);
288         if (rc) {
289                 CERROR("Target %s register_observer error %d\n",
290                        obd_uuid2str(&tgt_uuid), rc);
291                 obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
292                                               lov->lov_lock_cancel_cb);
293                 obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
294                                                lov->lov_page_removal_cb);
295                 obd_disconnect(lov->lov_tgts[index]->ltd_exp);
296                 lov->lov_tgts[index]->ltd_exp = NULL;
297                 RETURN(rc);
298         }
299
300         lov->lov_tgts[index]->ltd_reap = 0;
301         if (activate) {
302                 lov->lov_tgts[index]->ltd_active = 1;
303                 lov->desc.ld_active_tgt_count++;
304                 lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 0;
305         }
306         CDEBUG(D_CONFIG, "Connected tgt idx %d %s (%s) %sactive\n", index,
307                obd_uuid2str(&tgt_uuid), tgt_obd->obd_name, activate ? "":"in");
308
309 #ifdef __KERNEL__
310         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
311         if (lov_proc_dir) {
312                 struct obd_device *osc_obd = class_conn2obd(&conn);
313                 cfs_proc_dir_entry_t *osc_symlink;
314                 char name[MAX_STRING_SIZE];
315
316                 LASSERT(osc_obd != NULL);
317                 LASSERT(osc_obd->obd_magic == OBD_DEVICE_MAGIC);
318                 LASSERT(osc_obd->obd_type->typ_name != NULL);
319                 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
320                          osc_obd->obd_type->typ_name,
321                          osc_obd->obd_name);
322                 osc_symlink = lprocfs_add_symlink(osc_obd->obd_name, lov_proc_dir,
323                                                   name);
324                 if (osc_symlink == NULL) {
325                         CERROR("could not register LOV target "
326                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
327                                obd->obd_type->typ_name, obd->obd_name,
328                                osc_obd->obd_name);
329                         lprocfs_remove(&lov_proc_dir);
330                 }
331         }
332 #endif
333
334         rc = qos_add_tgt(obd, index);
335         if (rc)
336                 CERROR("qos_add_tgt failed %d\n", rc);
337
338         RETURN(0);
339 }
340
341 static int lov_connect(const struct lu_env *env,
342                        struct lustre_handle *conn, struct obd_device *obd,
343                        struct obd_uuid *cluuid, struct obd_connect_data *data,
344                        void *localdata)
345 {
346         struct lov_obd *lov = &obd->u.lov;
347         struct lov_tgt_desc *tgt;
348         int i, rc;
349         ENTRY;
350
351         CDEBUG(D_CONFIG, "connect #%d\n", lov->lov_connects);
352
353         rc = class_connect(conn, obd, cluuid);
354         if (rc)
355                 RETURN(rc);
356
357         /* Why should there ever be more than 1 connect? */
358         lov->lov_connects++;
359         LASSERT(lov->lov_connects == 1);
360
361         memset(&lov->lov_ocd, 0, sizeof(lov->lov_ocd));
362         if (data)
363                 lov->lov_ocd = *data;
364
365         lov_getref(obd);
366         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
367                 tgt = lov->lov_tgts[i];
368                 if (!tgt || obd_uuid_empty(&tgt->ltd_uuid))
369                         continue;
370                 /* Flags will be lowest common denominator */
371                 rc = lov_connect_obd(obd, i, tgt->ltd_activate, &lov->lov_ocd);
372                 if (rc) {
373                         CERROR("%s: lov connect tgt %d failed: %d\n",
374                                obd->obd_name, i, rc);
375                         continue;
376                 }
377         }
378         lov_putref(obd);
379
380         RETURN(0);
381 }
382
383 static int lov_disconnect_obd(struct obd_device *obd, __u32 index)
384 {
385         cfs_proc_dir_entry_t *lov_proc_dir;
386         struct lov_obd *lov = &obd->u.lov;
387         struct obd_device *osc_obd;
388         int rc;
389
390         ENTRY;
391
392         if (lov->lov_tgts[index] == NULL)
393                 RETURN(-EINVAL);
394
395         osc_obd = class_exp2obd(lov->lov_tgts[index]->ltd_exp);
396         CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
397                obd->obd_name, osc_obd->obd_name);
398
399         obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
400                                       lov->lov_lock_cancel_cb);
401         obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
402                                        lov->lov_page_removal_cb);
403
404         if (lov->lov_tgts[index]->ltd_active) {
405                 lov->lov_tgts[index]->ltd_active = 0;
406                 lov->desc.ld_active_tgt_count--;
407                 lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 1;
408         }
409
410         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
411         if (lov_proc_dir) {
412                 cfs_proc_dir_entry_t *osc_symlink;
413
414                 osc_symlink = lprocfs_srch(lov_proc_dir, osc_obd->obd_name);
415                 if (osc_symlink) {
416                         lprocfs_remove(&osc_symlink);
417                 } else {
418                         CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing.",
419                                obd->obd_type->typ_name, obd->obd_name,
420                                osc_obd->obd_name);
421                 }
422         }
423
424         if (osc_obd) {
425                 /* Pass it on to our clients.
426                  * XXX This should be an argument to disconnect,
427                  * XXX not a back-door flag on the OBD.  Ah well.
428                  */
429                 osc_obd->obd_force = obd->obd_force;
430                 osc_obd->obd_fail = obd->obd_fail;
431                 osc_obd->obd_no_recov = obd->obd_no_recov;
432         }
433
434         obd_register_observer(osc_obd, NULL);
435
436         rc = obd_disconnect(lov->lov_tgts[index]->ltd_exp);
437         if (rc) {
438                 CERROR("Target %s disconnect error %d\n",
439                        lov_uuid2str(lov, index), rc);
440                 rc = 0;
441         }
442
443         qos_del_tgt(obd, index);
444
445         lov->lov_tgts[index]->ltd_exp = NULL;
446         RETURN(0);
447 }
448
449 static int lov_del_target(struct obd_device *obd, __u32 index,
450                           struct obd_uuid *uuidp, int gen);
451
452 static int lov_disconnect(struct obd_export *exp)
453 {
454         struct obd_device *obd = class_exp2obd(exp);
455         struct lov_obd *lov = &obd->u.lov;
456         int i, rc;
457         ENTRY;
458
459         if (!lov->lov_tgts)
460                 goto out;
461
462         /* Only disconnect the underlying layers on the final disconnect. */
463         lov->lov_connects--;
464         if (lov->lov_connects != 0) {
465                 /* why should there be more than 1 connect? */
466                 CERROR("disconnect #%d\n", lov->lov_connects);
467                 goto out;
468         }
469
470         /* Let's hold another reference so lov_del_obd doesn't spin through
471            putref every time */
472         lov_getref(obd);
473         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
474                 if (lov->lov_tgts[i] && lov->lov_tgts[i]->ltd_exp) {
475                         /* Disconnection is the last we know about an obd */
476                         lov_del_target(obd, i, 0, lov->lov_tgts[i]->ltd_gen);
477                 }
478         }
479         lov_putref(obd);
480
481 out:
482         rc = class_disconnect(exp); /* bz 9811 */
483         RETURN(rc);
484 }
485
486 /* Error codes:
487  *
488  *  -EINVAL  : UUID can't be found in the LOV's target list
489  * - any other is lov index
490  */
491 static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
492                               int activate)
493 {
494         struct lov_obd *lov = &obd->u.lov;
495         struct lov_tgt_desc *tgt;
496         int index;
497         ENTRY;
498
499         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
500                lov, uuid->uuid, activate);
501
502         lov_getref(obd);
503         for (index = 0; index < lov->desc.ld_tgt_count; index++) {
504                 tgt = lov->lov_tgts[index];
505                 if (!tgt || !tgt->ltd_exp)
506                         continue;
507
508                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
509                        index, obd_uuid2str(&tgt->ltd_uuid),
510                        tgt->ltd_exp->exp_handle.h_cookie);
511                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
512                         break;
513         }
514
515         if (index == lov->desc.ld_tgt_count)
516                 GOTO(out, index = -EINVAL);
517
518         if (lov->lov_tgts[index]->ltd_active == activate) {
519                 CDEBUG(D_INFO, "OSC %s already %sactive!\n", uuid->uuid,
520                        activate ? "" : "in");
521                 GOTO(out, index);
522         }
523
524         CDEBUG(D_CONFIG, "Marking OSC %s %sactive\n", obd_uuid2str(uuid),
525                activate ? "" : "in");
526
527         lov->lov_tgts[index]->ltd_active = activate;
528
529         if (activate) {
530                 lov->desc.ld_active_tgt_count++;
531                 lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 0;
532         } else {
533                 lov->desc.ld_active_tgt_count--;
534                 lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 1;
535         }
536         /* remove any old qos penalty */
537         lov->lov_tgts[index]->ltd_qos.ltq_penalty = 0;
538
539  out:
540         lov_putref(obd);
541         RETURN(index);
542 }
543
544 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
545                       enum obd_notify_event ev, void *data)
546 {
547         int rc = 0;
548         ENTRY;
549
550         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
551                 struct obd_uuid *uuid;
552
553                 LASSERT(watched);
554
555                 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME)) {
556                         CERROR("unexpected notification of %s %s!\n",
557                                watched->obd_type->typ_name,
558                                watched->obd_name);
559                         RETURN(-EINVAL);
560                 }
561                 uuid = &watched->u.cli.cl_target_uuid;
562
563                 /* Set OSC as active before notifying the observer, so the
564                  * observer can use the OSC normally.
565                  */
566                 rc = lov_set_osc_active(obd, uuid, ev == OBD_NOTIFY_ACTIVE);
567                 if (rc < 0) {
568                         CERROR("%sactivation of %s failed: %d\n",
569                                (ev == OBD_NOTIFY_ACTIVE) ? "" : "de",
570                                obd_uuid2str(uuid), rc);
571                         RETURN(rc);
572                 }
573                 data = &rc;
574         }
575
576         /* Pass the notification up the chain. */
577         if (watched) {
578                 rc = obd_notify_observer(obd, watched, ev, data);
579         } else {
580                 /* NULL watched means all osc's in the lov (only for syncs) */
581                 /* sync event should be send lov idx as data */
582                 struct lov_obd *lov = &obd->u.lov;
583                 struct obd_device *tgt_obd;
584                 int i;
585                 lov_getref(obd);
586                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
587                         if (!lov->lov_tgts[i])
588                                 continue;
589
590                         if ((ev == OBD_NOTIFY_SYNC) ||
591                             (ev == OBD_NOTIFY_SYNC_NONBLOCK))
592                                 data = &i;
593
594                         tgt_obd = class_exp2obd(lov->lov_tgts[i]->ltd_exp);
595                         rc = obd_notify_observer(obd, tgt_obd, ev, data);
596                         if (rc) {
597                                 CERROR("%s: notify %s of %s failed %d\n",
598                                        obd->obd_name,
599                                        obd->obd_observer->obd_name,
600                                        tgt_obd->obd_name, rc);
601                                 break;
602                         }
603                 }
604                 lov_putref(obd);
605         }
606
607         RETURN(rc);
608 }
609
610 static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
611                           __u32 index, int gen, int active)
612 {
613         struct lov_obd *lov = &obd->u.lov;
614         struct lov_tgt_desc *tgt;
615         int rc;
616         ENTRY;
617
618         CDEBUG(D_CONFIG, "uuid:%s idx:%d gen:%d active:%d\n",
619                uuidp->uuid, index, gen, active);
620
621         if (gen <= 0) {
622                 CERROR("request to add OBD %s with invalid generation: %d\n",
623                        uuidp->uuid, gen);
624                 RETURN(-EINVAL);
625         }
626
627         mutex_down(&lov->lov_lock);
628
629         if ((index < lov->lov_tgt_size) && (lov->lov_tgts[index] != NULL)) {
630                 tgt = lov->lov_tgts[index];
631                 CERROR("UUID %s already assigned at LOV target index %d\n",
632                        obd_uuid2str(&tgt->ltd_uuid), index);
633                 mutex_up(&lov->lov_lock);
634                 RETURN(-EEXIST);
635         }
636
637         if (index >= lov->lov_tgt_size) {
638                 /* We need to reallocate the lov target array. */
639                 struct lov_tgt_desc **newtgts, **old = NULL;
640                 __u32 newsize, oldsize = 0;
641
642                 newsize = max(lov->lov_tgt_size, (__u32)2);
643                 while (newsize < index + 1)
644                         newsize = newsize << 1;
645                 OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
646                 if (newtgts == NULL) {
647                         mutex_up(&lov->lov_lock);
648                         RETURN(-ENOMEM);
649                 }
650
651                 if (lov->lov_tgt_size) {
652                         memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) *
653                                lov->lov_tgt_size);
654                         old = lov->lov_tgts;
655                         oldsize = lov->lov_tgt_size;
656                 }
657
658                 lov->lov_tgts = newtgts;
659                 lov->lov_tgt_size = newsize;
660 #ifdef __KERNEL__
661                 smp_rmb();
662 #endif
663                 if (old)
664                         OBD_FREE(old, sizeof(*old) * oldsize);
665
666                 CDEBUG(D_CONFIG, "tgts: %p size: %d\n",
667                        lov->lov_tgts, lov->lov_tgt_size);
668         }
669
670
671         OBD_ALLOC_PTR(tgt);
672         if (!tgt) {
673                 mutex_up(&lov->lov_lock);
674                 RETURN(-ENOMEM);
675         }
676
677         memset(tgt, 0, sizeof(*tgt));
678         tgt->ltd_uuid = *uuidp;
679         /* XXX - add a sanity check on the generation number. */
680         tgt->ltd_gen = gen;
681         tgt->ltd_index = index;
682         tgt->ltd_activate = active;
683         lov->lov_tgts[index] = tgt;
684         if (index >= lov->desc.ld_tgt_count)
685                 lov->desc.ld_tgt_count = index + 1;
686         mutex_up(&lov->lov_lock);
687
688         CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n",
689                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
690
691         if (lov->lov_connects == 0) {
692                 /* lov_connect hasn't been called yet. We'll do the
693                    lov_connect_obd on this target when that fn first runs,
694                    because we don't know the connect flags yet. */
695                 RETURN(0);
696         }
697
698         lov_getref(obd);
699
700         rc = lov_connect_obd(obd, index, active, &lov->lov_ocd);
701         if (rc)
702                 GOTO(out, rc);
703
704         rc = lov_notify(obd, tgt->ltd_exp->exp_obd,
705                         active ? OBD_NOTIFY_ACTIVE : OBD_NOTIFY_INACTIVE,
706                         (void *)&index);
707
708 out:
709         if (rc) {
710                 CERROR("add failed (%d), deleting %s\n", rc,
711                        obd_uuid2str(&tgt->ltd_uuid));
712                 lov_del_target(obd, index, 0, 0);
713         }
714         lov_putref(obd);
715         RETURN(rc);
716 }
717
718 /* Schedule a target for deletion */
719 static int lov_del_target(struct obd_device *obd, __u32 index,
720                           struct obd_uuid *uuidp, int gen)
721 {
722         struct lov_obd *lov = &obd->u.lov;
723         int count = lov->desc.ld_tgt_count;
724         int rc = 0;
725         ENTRY;
726
727         if (index >= count) {
728                 CERROR("LOV target index %d >= number of LOV OBDs %d.\n",
729                        index, count);
730                 RETURN(-EINVAL);
731         }
732
733         lov_getref(obd);
734
735         if (!lov->lov_tgts[index]) {
736                 CERROR("LOV target at index %d is not setup.\n", index);
737                 GOTO(out, rc = -EINVAL);
738         }
739
740         if (uuidp && !obd_uuid_equals(uuidp, &lov->lov_tgts[index]->ltd_uuid)) {
741                 CERROR("LOV target UUID %s at index %d doesn't match %s.\n",
742                        lov_uuid2str(lov, index), index,
743                        obd_uuid2str(uuidp));
744                 GOTO(out, rc = -EINVAL);
745         }
746
747         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
748                lov_uuid2str(lov, index), index,
749                lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp,
750                lov->lov_tgts[index]->ltd_active);
751
752         lov->lov_tgts[index]->ltd_reap = 1;
753         lov->lov_death_row++;
754         /* we really delete it from lov_putref */
755 out:
756         lov_putref(obd);
757
758         RETURN(rc);
759 }
760
761 /* We are holding lov_lock */
762 static void __lov_del_obd(struct obd_device *obd, __u32 index)
763 {
764         struct lov_obd *lov = &obd->u.lov;
765         struct obd_device *osc_obd;
766         struct lov_tgt_desc *tgt = lov->lov_tgts[index];
767
768         LASSERT(tgt);
769         LASSERT(tgt->ltd_reap);
770
771         osc_obd = class_exp2obd(tgt->ltd_exp);
772
773         CDEBUG(D_CONFIG, "Removing tgt %s : %s\n",
774                lov_uuid2str(lov, index),
775                osc_obd ? osc_obd->obd_name : "<no obd>");
776
777         if (tgt->ltd_exp)
778                 lov_disconnect_obd(obd, index);
779
780         /* XXX - right now there is a dependency on ld_tgt_count being the
781          * maximum tgt index for computing the mds_max_easize. So we can't
782          * shrink it. */
783
784         lov->lov_tgts[index] = NULL;
785         OBD_FREE_PTR(tgt);
786
787         /* Manual cleanup - no cleanup logs to clean up the osc's.  We must
788            do it ourselves. And we can't do it from lov_cleanup,
789            because we just lost our only reference to it. */
790         if (osc_obd)
791                 class_manual_cleanup(osc_obd);
792 }
793
794 void lov_fix_desc_stripe_size(__u64 *val)
795 {
796         if (*val < PTLRPC_MAX_BRW_SIZE) {
797                 LCONSOLE_WARN("Increasing default stripe size to min %u\n",
798                               PTLRPC_MAX_BRW_SIZE);
799                 *val = PTLRPC_MAX_BRW_SIZE;
800         } else if (*val & (LOV_MIN_STRIPE_SIZE - 1)) {
801                 *val &= ~(LOV_MIN_STRIPE_SIZE - 1);
802                 LCONSOLE_WARN("Changing default stripe size to "LPU64" (a "
803                               "multiple of %u)\n",
804                               *val, LOV_MIN_STRIPE_SIZE);
805         }
806 }
807
808 void lov_fix_desc_stripe_count(__u32 *val)
809 {
810         if (*val == 0)
811                 *val = 1;
812 }
813
814 void lov_fix_desc_pattern(__u32 *val)
815 {
816         /* from lov_setstripe */
817         if ((*val != 0) && (*val != LOV_PATTERN_RAID0)) {
818                 LCONSOLE_WARN("Unknown stripe pattern: %#x\n", *val);
819                 *val = 0;
820         }
821 }
822
823 void lov_fix_desc_qos_maxage(__u32 *val)
824 {
825         /* fix qos_maxage */
826         if (*val == 0)
827                 *val = QOS_DEFAULT_MAXAGE;
828 }
829
830 void lov_fix_desc(struct lov_desc *desc)
831 {
832         lov_fix_desc_stripe_size(&desc->ld_default_stripe_size);
833         lov_fix_desc_stripe_count(&desc->ld_default_stripe_count);
834         lov_fix_desc_pattern(&desc->ld_pattern);
835         lov_fix_desc_qos_maxage(&desc->ld_qos_maxage);
836 }
837
838 static int lov_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
839 {
840         struct lprocfs_static_vars lvars = { 0 };
841         struct lov_desc *desc;
842         struct lov_obd *lov = &obd->u.lov;
843         int count;
844         ENTRY;
845
846         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
847                 CERROR("LOV setup requires a descriptor\n");
848                 RETURN(-EINVAL);
849         }
850
851         desc = (struct lov_desc *)lustre_cfg_buf(lcfg, 1);
852
853         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
854                 CERROR("descriptor size wrong: %d > %d\n",
855                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
856                 RETURN(-EINVAL);
857         }
858
859         if (desc->ld_magic != LOV_DESC_MAGIC) {
860                 if (desc->ld_magic == __swab32(LOV_DESC_MAGIC)) {
861                             CDEBUG(D_OTHER, "%s: Swabbing lov desc %p\n",
862                                    obd->obd_name, desc);
863                             lustre_swab_lov_desc(desc);
864                 } else {
865                         CERROR("%s: Bad lov desc magic: %#x\n",
866                                obd->obd_name, desc->ld_magic);
867                         RETURN(-EINVAL);
868                 }
869         }
870
871         lov_fix_desc(desc);
872
873         /* Because of 64-bit divide/mod operations only work with a 32-bit
874          * divisor in a 32-bit kernel, we cannot support a stripe width
875          * of 4GB or larger on 32-bit CPUs. */
876         count = desc->ld_default_stripe_count;
877         if ((count > 0 ? count : desc->ld_tgt_count) *
878             desc->ld_default_stripe_size > 0xffffffff) {
879                 CERROR("LOV: stripe width "LPU64"x%u > 4294967295 bytes\n",
880                        desc->ld_default_stripe_size, count);
881                 RETURN(-EINVAL);
882         }
883
884         desc->ld_active_tgt_count = 0;
885         lov->desc = *desc;
886         lov->lov_tgt_size = 0;
887         sema_init(&lov->lov_lock, 1);
888         atomic_set(&lov->lov_refcount, 0);
889         CFS_INIT_LIST_HEAD(&lov->lov_qos.lq_oss_list);
890         init_rwsem(&lov->lov_qos.lq_rw_sem);
891         lov->lov_qos.lq_dirty = 1;
892         lov->lov_qos.lq_dirty_rr = 1;
893         lov->lov_qos.lq_reset = 1;
894         /* Default priority is toward free space balance */
895         lov->lov_qos.lq_prio_free = 232;
896
897         lprocfs_lov_init_vars(&lvars);
898         lprocfs_obd_setup(obd, lvars.obd_vars);
899 #ifdef LPROCFS
900         {
901                 int rc;
902
903                 rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
904                                         0444, &lov_proc_target_fops, obd);
905                 if (rc)
906                         CWARN("Error adding the target_obd file\n");
907         }
908 #endif
909
910         RETURN(0);
911 }
912
913 static int lov_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
914 {
915         int rc = 0;
916         ENTRY;
917
918         switch (stage) {
919         case OBD_CLEANUP_EARLY: {
920                 struct lov_obd *lov = &obd->u.lov;
921                 int i;
922                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
923                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active)
924                                 continue;
925                         obd_precleanup(class_exp2obd(lov->lov_tgts[i]->ltd_exp),
926                                        OBD_CLEANUP_EARLY);
927                 }
928                 break;
929         }
930         case OBD_CLEANUP_EXPORTS:
931                 rc = obd_llog_finish(obd, 0);
932                 if (rc != 0)
933                         CERROR("failed to cleanup llogging subsystems\n");
934                 break;
935         }
936         RETURN(rc);
937 }
938
939 static int lov_cleanup(struct obd_device *obd)
940 {
941         struct lov_obd *lov = &obd->u.lov;
942
943         lprocfs_obd_cleanup(obd);
944         if (lov->lov_tgts) {
945                 int i;
946                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
947                         if (!lov->lov_tgts[i])
948                                 continue;
949
950                         /* Inactive targets may never have connected */
951                         if (lov->lov_tgts[i]->ltd_active ||
952                             atomic_read(&lov->lov_refcount))
953                             /* We should never get here - these
954                                should have been removed in the
955                              disconnect. */
956                                 CERROR("lov tgt %d not cleaned!"
957                                        " deathrow=%d, lovrc=%d\n",
958                                        i, lov->lov_death_row,
959                                        atomic_read(&lov->lov_refcount));
960                         lov_del_target(obd, i, 0, 0);
961                 }
962                 OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) *
963                          lov->lov_tgt_size);
964                 lov->lov_tgt_size = 0;
965         }
966
967         if (lov->lov_qos.lq_rr_size)
968                 OBD_FREE(lov->lov_qos.lq_rr_array, lov->lov_qos.lq_rr_size);
969
970         RETURN(0);
971 }
972
973 static int lov_process_config(struct obd_device *obd, obd_count len, void *buf)
974 {
975         struct lustre_cfg *lcfg = buf;
976         struct obd_uuid obd_uuid;
977         int cmd;
978         int rc = 0;
979         ENTRY;
980
981         switch(cmd = lcfg->lcfg_command) {
982         case LCFG_LOV_ADD_OBD:
983         case LCFG_LOV_ADD_INA:
984         case LCFG_LOV_DEL_OBD: {
985                 __u32 index;
986                 int gen;
987                 /* lov_modify_tgts add  0:lov_mdsA  1:ost1_UUID  2:0  3:1 */
988                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
989                         GOTO(out, rc = -EINVAL);
990
991                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
992
993                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
994                         GOTO(out, rc = -EINVAL);
995                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
996                         GOTO(out, rc = -EINVAL);
997                 if (cmd == LCFG_LOV_ADD_OBD)
998                         rc = lov_add_target(obd, &obd_uuid, index, gen, 1);
999                 else if (cmd == LCFG_LOV_ADD_INA)
1000                         rc = lov_add_target(obd, &obd_uuid, index, gen, 0);
1001                 else
1002                         rc = lov_del_target(obd, index, &obd_uuid, gen);
1003                 GOTO(out, rc);
1004         }
1005         case LCFG_PARAM: {
1006                 struct lprocfs_static_vars lvars = { 0 };
1007                 struct lov_desc *desc = &(obd->u.lov.desc);
1008
1009                 if (!desc)
1010                         GOTO(out, rc = -EINVAL);
1011
1012                 lprocfs_lov_init_vars(&lvars);
1013
1014                 rc = class_process_proc_param(PARAM_LOV, lvars.obd_vars,
1015                                               lcfg, obd);
1016                 GOTO(out, rc);
1017         }
1018         default: {
1019                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1020                 GOTO(out, rc = -EINVAL);
1021
1022         }
1023         }
1024 out:
1025         RETURN(rc);
1026 }
1027
1028 #ifndef log2
1029 #define log2(n) ffz(~(n))
1030 #endif
1031
1032 static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
1033                              struct lov_stripe_md **ea,
1034                              struct obd_trans_info *oti)
1035 {
1036         struct lov_obd *lov;
1037         struct obdo *tmp_oa;
1038         struct obd_uuid *ost_uuid = NULL;
1039         int rc = 0, i;
1040         ENTRY;
1041
1042         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
1043                 src_oa->o_flags == OBD_FL_DELORPHAN);
1044
1045         lov = &export->exp_obd->u.lov;
1046
1047         OBDO_ALLOC(tmp_oa);
1048         if (tmp_oa == NULL)
1049                 RETURN(-ENOMEM);
1050
1051         if (oti->oti_ost_uuid) {
1052                 ost_uuid = oti->oti_ost_uuid;
1053                 CDEBUG(D_HA, "clearing orphans only for %s\n",
1054                        ost_uuid->uuid);
1055         }
1056
1057         lov_getref(export->exp_obd);
1058         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1059                 struct lov_stripe_md obj_md;
1060                 struct lov_stripe_md *obj_mdp = &obj_md;
1061                 struct lov_tgt_desc *tgt;
1062                 int err;
1063
1064                 tgt = lov->lov_tgts[i];
1065                 if (!tgt)
1066                         continue;
1067
1068                 /* if called for a specific target, we don't
1069                    care if it is not active. */
1070                 if (!lov->lov_tgts[i]->ltd_active && ost_uuid == NULL) {
1071                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1072                         continue;
1073                 }
1074
1075                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &tgt->ltd_uuid))
1076                         continue;
1077
1078                 CDEBUG(D_CONFIG,"Clear orphans for %d:%s\n", i,
1079                        obd_uuid2str(ost_uuid));
1080
1081                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
1082
1083                 LASSERT(lov->lov_tgts[i]->ltd_exp);
1084                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
1085                 err = obd_create(lov->lov_tgts[i]->ltd_exp,
1086                                  tmp_oa, &obj_mdp, oti);
1087                 if (err)
1088                         /* This export will be disabled until it is recovered,
1089                            and then orphan recovery will be completed. */
1090                         CERROR("error in orphan recovery on OST idx %d/%d: "
1091                                "rc = %d\n", i, lov->desc.ld_tgt_count, err);
1092
1093                 if (ost_uuid)
1094                         break;
1095         }
1096         lov_putref(export->exp_obd);
1097
1098         OBDO_FREE(tmp_oa);
1099         RETURN(rc);
1100 }
1101
1102 static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
1103                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
1104 {
1105         struct lov_stripe_md *obj_mdp, *lsm;
1106         struct lov_obd *lov = &exp->exp_obd->u.lov;
1107         unsigned ost_idx;
1108         int rc, i;
1109         ENTRY;
1110
1111         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
1112                 src_oa->o_flags & OBD_FL_RECREATE_OBJS);
1113
1114         OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
1115         if (obj_mdp == NULL)
1116                 RETURN(-ENOMEM);
1117
1118         ost_idx = src_oa->o_nlink;
1119         lsm = *ea;
1120         if (lsm == NULL)
1121                 GOTO(out, rc = -EINVAL);
1122         if (ost_idx >= lov->desc.ld_tgt_count ||
1123             !lov->lov_tgts[ost_idx])
1124                 GOTO(out, rc = -EINVAL);
1125
1126         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1127                 if (lsm->lsm_oinfo[i]->loi_ost_idx == ost_idx) {
1128                         if (lsm->lsm_oinfo[i]->loi_id != src_oa->o_id)
1129                                 GOTO(out, rc = -EINVAL);
1130                         break;
1131                 }
1132         }
1133         if (i == lsm->lsm_stripe_count)
1134                 GOTO(out, rc = -EINVAL);
1135
1136         rc = obd_create(lov->lov_tgts[ost_idx]->ltd_exp, src_oa, &obj_mdp, oti);
1137 out:
1138         OBD_FREE(obj_mdp, sizeof(*obj_mdp));
1139         RETURN(rc);
1140 }
1141
1142 /* the LOV expects oa->o_id to be set to the LOV object id */
1143 static int lov_create(struct obd_export *exp, struct obdo *src_oa,
1144                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
1145 {
1146         struct lov_obd *lov;
1147         struct obd_info oinfo;
1148         struct lov_request_set *set = NULL;
1149         struct lov_request *req;
1150         struct obd_statfs osfs;
1151         __u64 maxage;
1152         int rc = 0;
1153         ENTRY;
1154
1155         LASSERT(ea != NULL);
1156         if (exp == NULL)
1157                 RETURN(-EINVAL);
1158
1159         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
1160             src_oa->o_flags == OBD_FL_DELORPHAN) {
1161                 rc = lov_clear_orphans(exp, src_oa, ea, oti);
1162                 RETURN(rc);
1163         }
1164
1165         lov = &exp->exp_obd->u.lov;
1166         if (!lov->desc.ld_active_tgt_count)
1167                 RETURN(-EIO);
1168
1169         /* Recreate a specific object id at the given OST index */
1170         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
1171             (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
1172                  rc = lov_recreate(exp, src_oa, ea, oti);
1173                  RETURN(rc);
1174         }
1175
1176         maxage = cfs_time_shift_64(-lov->desc.ld_qos_maxage);
1177         obd_statfs_rqset(exp->exp_obd, &osfs, maxage, OBD_STATFS_NODELAY);
1178
1179         rc = lov_prep_create_set(exp, &oinfo, ea, src_oa, oti, &set);
1180         if (rc)
1181                 RETURN(rc);
1182
1183         list_for_each_entry(req, &set->set_list, rq_link) {
1184                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
1185                 rc = obd_create(lov->lov_tgts[req->rq_idx]->ltd_exp,
1186                                 req->rq_oi.oi_oa, &req->rq_oi.oi_md, oti);
1187                 lov_update_create_set(set, req, rc);
1188         }
1189         rc = lov_fini_create_set(set, ea);
1190         RETURN(rc);
1191 }
1192
1193 #define ASSERT_LSM_MAGIC(lsmp)                                                  \
1194 do {                                                                            \
1195         LASSERT((lsmp) != NULL);                                                \
1196         LASSERTF(((lsmp)->lsm_magic == LOV_MAGIC ||                             \
1197                  (lsmp)->lsm_magic == LOV_MAGIC_JOIN), "%p->lsm_magic=%x\n",    \
1198                  (lsmp), (lsmp)->lsm_magic);                                    \
1199 } while (0)
1200
1201 static int lov_destroy(struct obd_export *exp, struct obdo *oa,
1202                        struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1203                        struct obd_export *md_exp)
1204 {
1205         struct lov_request_set *set;
1206         struct obd_info oinfo;
1207         struct lov_request *req;
1208         struct list_head *pos;
1209         struct lov_obd *lov;
1210         int rc = 0, err;
1211         ENTRY;
1212
1213         ASSERT_LSM_MAGIC(lsm);
1214
1215         if (!exp || !exp->exp_obd)
1216                 RETURN(-ENODEV);
1217
1218         if (oa->o_valid & OBD_MD_FLCOOKIE) {
1219                 LASSERT(oti);
1220                 LASSERT(oti->oti_logcookies);
1221         }
1222
1223         lov = &exp->exp_obd->u.lov;
1224         rc = lov_prep_destroy_set(exp, &oinfo, oa, lsm, oti, &set);
1225         if (rc)
1226                 RETURN(rc);
1227
1228         list_for_each (pos, &set->set_list) {
1229                 int err;
1230                 req = list_entry(pos, struct lov_request, rq_link);
1231
1232                 if (oa->o_valid & OBD_MD_FLCOOKIE)
1233                         oti->oti_logcookies = set->set_cookies + req->rq_stripe;
1234
1235                 err = obd_destroy(lov->lov_tgts[req->rq_idx]->ltd_exp,
1236                                   req->rq_oi.oi_oa, NULL, oti, NULL);
1237                 err = lov_update_common_set(set, req, err);
1238                 if (err) {
1239                         CERROR("error: destroying objid "LPX64" subobj "
1240                                LPX64" on OST idx %d: rc = %d\n",
1241                                oa->o_id, req->rq_oi.oi_oa->o_id,
1242                                req->rq_idx, err);
1243                         if (!rc)
1244                                 rc = err;
1245                 }
1246         }
1247
1248         if (rc == 0) {
1249                 LASSERT(lsm_op_find(lsm->lsm_magic) != NULL);
1250                 rc = lsm_op_find(lsm->lsm_magic)->lsm_destroy(lsm, oa, md_exp);
1251         }
1252         err = lov_fini_destroy_set(set);
1253         RETURN(rc ? rc : err);
1254 }
1255
1256 static int lov_getattr(struct obd_export *exp, struct obd_info *oinfo)
1257 {
1258         struct lov_request_set *set;
1259         struct lov_request *req;
1260         struct list_head *pos;
1261         struct lov_obd *lov;
1262         int err = 0, rc = 0;
1263         ENTRY;
1264
1265         LASSERT(oinfo);
1266         ASSERT_LSM_MAGIC(oinfo->oi_md);
1267
1268         if (!exp || !exp->exp_obd)
1269                 RETURN(-ENODEV);
1270
1271         lov = &exp->exp_obd->u.lov;
1272
1273         rc = lov_prep_getattr_set(exp, oinfo, &set);
1274         if (rc)
1275                 RETURN(rc);
1276
1277         list_for_each (pos, &set->set_list) {
1278                 req = list_entry(pos, struct lov_request, rq_link);
1279
1280                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
1281                        "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
1282                        req->rq_oi.oi_oa->o_id, req->rq_idx);
1283
1284                 rc = obd_getattr(lov->lov_tgts[req->rq_idx]->ltd_exp,
1285                                  &req->rq_oi);
1286                 err = lov_update_common_set(set, req, rc);
1287                 if (err) {
1288                         CERROR("error: getattr objid "LPX64" subobj "
1289                                LPX64" on OST idx %d: rc = %d\n",
1290                                oinfo->oi_oa->o_id, req->rq_oi.oi_oa->o_id,
1291                                req->rq_idx, err);
1292                         break;
1293                 }
1294         }
1295
1296         rc = lov_fini_getattr_set(set);
1297         if (err)
1298                 rc = err;
1299         RETURN(rc);
1300 }
1301
1302 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
1303                                  void *data, int rc)
1304 {
1305         struct lov_request_set *lovset = (struct lov_request_set *)data;
1306         int err;
1307         ENTRY;
1308
1309         /* don't do attribute merge if this aysnc op failed */
1310         if (rc)
1311                 lovset->set_completes = 0;
1312         err = lov_fini_getattr_set(lovset);
1313         RETURN(rc ? rc : err);
1314 }
1315
1316 static int lov_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
1317                               struct ptlrpc_request_set *rqset)
1318 {
1319         struct lov_request_set *lovset;
1320         struct lov_obd *lov;
1321         struct list_head *pos;
1322         struct lov_request *req;
1323         int rc = 0, err;
1324         ENTRY;
1325
1326         LASSERT(oinfo);
1327         ASSERT_LSM_MAGIC(oinfo->oi_md);
1328
1329         if (!exp || !exp->exp_obd)
1330                 RETURN(-ENODEV);
1331
1332         lov = &exp->exp_obd->u.lov;
1333
1334         rc = lov_prep_getattr_set(exp, oinfo, &lovset);
1335         if (rc)
1336                 RETURN(rc);
1337
1338         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
1339                oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count,
1340                oinfo->oi_md->lsm_stripe_size);
1341
1342         list_for_each (pos, &lovset->set_list) {
1343                 req = list_entry(pos, struct lov_request, rq_link);
1344
1345                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
1346                        "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
1347                        req->rq_oi.oi_oa->o_id, req->rq_idx);
1348                 rc = obd_getattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
1349                                        &req->rq_oi, rqset);
1350                 if (rc) {
1351                         CERROR("error: getattr objid "LPX64" subobj "
1352                                LPX64" on OST idx %d: rc = %d\n",
1353                                oinfo->oi_oa->o_id, req->rq_oi.oi_oa->o_id,
1354                                req->rq_idx, rc);
1355                         GOTO(out, rc);
1356                 }
1357         }
1358
1359         if (!list_empty(&rqset->set_requests)) {
1360                 LASSERT(rc == 0);
1361                 LASSERT (rqset->set_interpret == NULL);
1362                 rqset->set_interpret = lov_getattr_interpret;
1363                 rqset->set_arg = (void *)lovset;
1364                 RETURN(rc);
1365         }
1366 out:
1367         if (rc)
1368                 lovset->set_completes = 0;
1369         err = lov_fini_getattr_set(lovset);
1370         RETURN(rc ? rc : err);
1371 }
1372
1373 static int lov_setattr(struct obd_export *exp, struct obd_info *oinfo,
1374                        struct obd_trans_info *oti)
1375 {
1376         struct lov_request_set *set;
1377         struct lov_obd *lov;
1378         struct list_head *pos;
1379         struct lov_request *req;
1380         int err = 0, rc = 0;
1381         ENTRY;
1382
1383         LASSERT(oinfo);
1384         ASSERT_LSM_MAGIC(oinfo->oi_md);
1385
1386         if (!exp || !exp->exp_obd)
1387                 RETURN(-ENODEV);
1388
1389         /* for now, we only expect the following updates here */
1390         LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE |
1391                                             OBD_MD_FLMODE | OBD_MD_FLATIME |
1392                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME |
1393                                             OBD_MD_FLFLAGS | OBD_MD_FLSIZE |
1394                                             OBD_MD_FLGROUP | OBD_MD_FLUID |
1395                                             OBD_MD_FLGID | OBD_MD_FLFID |
1396                                             OBD_MD_FLGENER)));
1397         lov = &exp->exp_obd->u.lov;
1398         rc = lov_prep_setattr_set(exp, oinfo, oti, &set);
1399         if (rc)
1400                 RETURN(rc);
1401
1402         list_for_each (pos, &set->set_list) {
1403                 req = list_entry(pos, struct lov_request, rq_link);
1404
1405                 rc = obd_setattr(lov->lov_tgts[req->rq_idx]->ltd_exp,
1406                                  &req->rq_oi, NULL);
1407                 err = lov_update_setattr_set(set, req, rc);
1408                 if (err) {
1409                         CERROR("error: setattr objid "LPX64" subobj "
1410                                LPX64" on OST idx %d: rc = %d\n",
1411                                set->set_oi->oi_oa->o_id,
1412                                req->rq_oi.oi_oa->o_id, req->rq_idx, err);
1413                         if (!rc)
1414                                 rc = err;
1415                 }
1416         }
1417         err = lov_fini_setattr_set(set);
1418         if (!rc)
1419                 rc = err;
1420         RETURN(rc);
1421 }
1422
1423 static int lov_setattr_interpret(struct ptlrpc_request_set *rqset,
1424                                  void *data, int rc)
1425 {
1426         struct lov_request_set *lovset = (struct lov_request_set *)data;
1427         int err;
1428         ENTRY;
1429
1430         if (rc)
1431                 lovset->set_completes = 0;
1432         err = lov_fini_setattr_set(lovset);
1433         RETURN(rc ? rc : err);
1434 }
1435
1436 /* If @oti is given, the request goes from MDS and responses from OSTs are not
1437    needed. Otherwise, a client is waiting for responses. */
1438 static int lov_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
1439                              struct obd_trans_info *oti,
1440                              struct ptlrpc_request_set *rqset)
1441 {
1442         struct lov_request_set *set;
1443         struct lov_request *req;
1444         struct list_head *pos;
1445         struct lov_obd *lov;
1446         int rc = 0;
1447         ENTRY;
1448
1449         LASSERT(oinfo);
1450         ASSERT_LSM_MAGIC(oinfo->oi_md);
1451         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
1452                 LASSERT(oti);
1453                 LASSERT(oti->oti_logcookies);
1454         }
1455
1456         if (!exp || !exp->exp_obd)
1457                 RETURN(-ENODEV);
1458
1459         lov = &exp->exp_obd->u.lov;
1460         rc = lov_prep_setattr_set(exp, oinfo, oti, &set);
1461         if (rc)
1462                 RETURN(rc);
1463
1464         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
1465                oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count,
1466                oinfo->oi_md->lsm_stripe_size);
1467
1468         list_for_each (pos, &set->set_list) {
1469                 req = list_entry(pos, struct lov_request, rq_link);
1470
1471                 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1472                         oti->oti_logcookies = set->set_cookies + req->rq_stripe;
1473
1474                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
1475                        "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,
1476                        req->rq_oi.oi_oa->o_id, req->rq_idx);
1477
1478                 rc = obd_setattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
1479                                        &req->rq_oi, oti, rqset);
1480                 if (rc) {
1481                         CERROR("error: setattr objid "LPX64" subobj "
1482                                LPX64" on OST idx %d: rc = %d\n",
1483                                set->set_oi->oi_oa->o_id,
1484                                req->rq_oi.oi_oa->o_id,
1485                                req->rq_idx, rc);
1486                         break;
1487                 }
1488         }
1489
1490         /* If we are not waiting for responses on async requests, return. */
1491         if (rc || !rqset || list_empty(&rqset->set_requests)) {
1492                 int err;
1493                 if (rc)
1494                         set->set_completes = 0;
1495                 err = lov_fini_setattr_set(set);
1496                 RETURN(rc ? rc : err);
1497         }
1498
1499         LASSERT(rqset->set_interpret == NULL);
1500         rqset->set_interpret = lov_setattr_interpret;
1501         rqset->set_arg = (void *)set;
1502
1503         RETURN(0);
1504 }
1505
1506 static int lov_punch_interpret(struct ptlrpc_request_set *rqset,
1507                                void *data, int rc)
1508 {
1509         struct lov_request_set *lovset = (struct lov_request_set *)data;
1510         int err;
1511         ENTRY;
1512
1513         if (rc)
1514                 lovset->set_completes = 0;
1515         err = lov_fini_punch_set(lovset);
1516         RETURN(rc ? rc : err);
1517 }
1518
1519 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1520  * we can send this 'punch' to just the authoritative node and the nodes
1521  * that the punch will affect. */
1522 static int lov_punch(struct obd_export *exp, struct obd_info *oinfo,
1523                      struct obd_trans_info *oti,
1524                      struct ptlrpc_request_set *rqset)
1525 {
1526         struct lov_request_set *set;
1527         struct lov_obd *lov;
1528         struct list_head *pos;
1529         struct lov_request *req;
1530         int rc = 0;
1531         ENTRY;
1532
1533         LASSERT(oinfo);
1534         ASSERT_LSM_MAGIC(oinfo->oi_md);
1535
1536         if (!exp || !exp->exp_obd)
1537                 RETURN(-ENODEV);
1538
1539         lov = &exp->exp_obd->u.lov;
1540         rc = lov_prep_punch_set(exp, oinfo, oti, &set);
1541         if (rc)
1542                 RETURN(rc);
1543
1544         list_for_each (pos, &set->set_list) {
1545                 req = list_entry(pos, struct lov_request, rq_link);
1546
1547                 rc = obd_punch(lov->lov_tgts[req->rq_idx]->ltd_exp,
1548                                &req->rq_oi, NULL, rqset);
1549                 if (rc) {
1550                         CERROR("error: punch objid "LPX64" subobj "LPX64
1551                                " on OST idx %d: rc = %d\n",
1552                                set->set_oi->oi_oa->o_id,
1553                                req->rq_oi.oi_oa->o_id, req->rq_idx, rc);
1554                         break;
1555                 }
1556         }
1557
1558         if (rc || list_empty(&rqset->set_requests)) {
1559                 int err;
1560                 err = lov_fini_punch_set(set);
1561                 RETURN(rc ? rc : err);
1562         }
1563
1564         LASSERT(rqset->set_interpret == NULL);
1565         rqset->set_interpret = lov_punch_interpret;
1566         rqset->set_arg = (void *)set;
1567
1568         RETURN(0);
1569 }
1570
1571 static int lov_sync(struct obd_export *exp, struct obdo *oa,
1572                     struct lov_stripe_md *lsm, obd_off start, obd_off end,
1573                     void *capa)
1574 {
1575         struct lov_request_set *set;
1576         struct obd_info oinfo;
1577         struct lov_obd *lov;
1578         struct list_head *pos;
1579         struct lov_request *req;
1580         int err = 0, rc = 0;
1581         ENTRY;
1582
1583         ASSERT_LSM_MAGIC(lsm);
1584
1585         if (!exp->exp_obd)
1586                 RETURN(-ENODEV);
1587
1588         lov = &exp->exp_obd->u.lov;
1589         rc = lov_prep_sync_set(exp, &oinfo, oa, lsm, start, end, &set);
1590         if (rc)
1591                 RETURN(rc);
1592
1593         list_for_each (pos, &set->set_list) {
1594                 req = list_entry(pos, struct lov_request, rq_link);
1595
1596                 rc = obd_sync(lov->lov_tgts[req->rq_idx]->ltd_exp,
1597                               req->rq_oi.oi_oa, NULL,
1598                               req->rq_oi.oi_policy.l_extent.start,
1599                               req->rq_oi.oi_policy.l_extent.end, capa);
1600                 err = lov_update_common_set(set, req, rc);
1601                 if (err) {
1602                         CERROR("error: fsync objid "LPX64" subobj "LPX64
1603                                " on OST idx %d: rc = %d\n",
1604                                set->set_oi->oi_oa->o_id,
1605                                req->rq_oi.oi_oa->o_id, req->rq_idx, rc);
1606                         if (!rc)
1607                                 rc = err;
1608                 }
1609         }
1610         err = lov_fini_sync_set(set);
1611         if (!rc)
1612                 rc = err;
1613         RETURN(rc);
1614 }
1615
1616 static int lov_brw_check(struct lov_obd *lov, struct obd_info *lov_oinfo,
1617                          obd_count oa_bufs, struct brw_page *pga)
1618 {
1619         struct obd_info oinfo = { { { 0 } } };
1620         int i, rc = 0;
1621
1622         oinfo.oi_oa = lov_oinfo->oi_oa;
1623
1624         /* The caller just wants to know if there's a chance that this
1625          * I/O can succeed */
1626         for (i = 0; i < oa_bufs; i++) {
1627                 int stripe = lov_stripe_number(lov_oinfo->oi_md, pga[i].off);
1628                 int ost = lov_oinfo->oi_md->lsm_oinfo[stripe]->loi_ost_idx;
1629                 obd_off start, end;
1630
1631                 if (!lov_stripe_intersects(lov_oinfo->oi_md, i, pga[i].off,
1632                                            pga[i].off + pga[i].count,
1633                                            &start, &end))
1634                         continue;
1635
1636                 if (!lov->lov_tgts[ost] || !lov->lov_tgts[ost]->ltd_active) {
1637                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1638                         return -EIO;
1639                 }
1640
1641                 rc = obd_brw(OBD_BRW_CHECK, lov->lov_tgts[ost]->ltd_exp, &oinfo,
1642                              1, &pga[i], NULL);
1643                 if (rc)
1644                         break;
1645         }
1646         return rc;
1647 }
1648
1649 static int lov_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1650                    obd_count oa_bufs, struct brw_page *pga,
1651                    struct obd_trans_info *oti)
1652 {
1653         struct lov_request_set *set;
1654         struct lov_request *req;
1655         struct list_head *pos;
1656         struct lov_obd *lov = &exp->exp_obd->u.lov;
1657         int err, rc = 0;
1658         ENTRY;
1659
1660         ASSERT_LSM_MAGIC(oinfo->oi_md);
1661
1662         if (cmd == OBD_BRW_CHECK) {
1663                 rc = lov_brw_check(lov, oinfo, oa_bufs, pga);
1664                 RETURN(rc);
1665         }
1666
1667         rc = lov_prep_brw_set(exp, oinfo, oa_bufs, pga, oti, &set);
1668         if (rc)
1669                 RETURN(rc);
1670
1671         list_for_each (pos, &set->set_list) {
1672                 struct obd_export *sub_exp;
1673                 struct brw_page *sub_pga;
1674                 req = list_entry(pos, struct lov_request, rq_link);
1675
1676                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
1677                 sub_pga = set->set_pga + req->rq_pgaidx;
1678                 rc = obd_brw(cmd, sub_exp, &req->rq_oi, req->rq_oabufs,
1679                              sub_pga, oti);
1680                 if (rc)
1681                         break;
1682                 lov_update_common_set(set, req, rc);
1683         }
1684
1685         err = lov_fini_brw_set(set);
1686         if (!rc)
1687                 rc = err;
1688         RETURN(rc);
1689 }
1690
1691 static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data,
1692                              int rc)
1693 {
1694         struct lov_request_set *lovset = (struct lov_request_set *)data;
1695         ENTRY;
1696
1697         if (rc) {
1698                 lovset->set_completes = 0;
1699                 lov_fini_brw_set(lovset);
1700         } else {
1701                 rc = lov_fini_brw_set(lovset);
1702         }
1703
1704         RETURN(rc);
1705 }
1706
1707 static int lov_brw_async(int cmd, struct obd_export *exp,
1708                          struct obd_info *oinfo, obd_count oa_bufs,
1709                          struct brw_page *pga, struct obd_trans_info *oti,
1710                          struct ptlrpc_request_set *set)
1711 {
1712         struct lov_request_set *lovset;
1713         struct lov_request *req;
1714         struct list_head *pos;
1715         struct lov_obd *lov = &exp->exp_obd->u.lov;
1716         int rc = 0;
1717         ENTRY;
1718
1719         LASSERT(oinfo);
1720         ASSERT_LSM_MAGIC(oinfo->oi_md);
1721
1722         if (cmd == OBD_BRW_CHECK) {
1723                 rc = lov_brw_check(lov, oinfo, oa_bufs, pga);
1724                 RETURN(rc);
1725         }
1726
1727         rc = lov_prep_brw_set(exp, oinfo, oa_bufs, pga, oti, &lovset);
1728         if (rc)
1729                 RETURN(rc);
1730
1731         list_for_each (pos, &lovset->set_list) {
1732                 struct obd_export *sub_exp;
1733                 struct brw_page *sub_pga;
1734                 req = list_entry(pos, struct lov_request, rq_link);
1735
1736                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
1737                 sub_pga = lovset->set_pga + req->rq_pgaidx;
1738                 rc = obd_brw_async(cmd, sub_exp, &req->rq_oi, req->rq_oabufs,
1739                                    sub_pga, oti, set);
1740                 if (rc)
1741                         GOTO(out, rc);
1742                 lov_update_common_set(lovset, req, rc);
1743         }
1744         LASSERT(rc == 0);
1745         LASSERT(set->set_interpret == NULL);
1746         LASSERT(set->set_arg == NULL);
1747         rc = ptlrpc_set_add_cb(set, lov_brw_interpret, lovset);
1748         if (rc)
1749                 GOTO(out, rc);
1750
1751         RETURN(rc);
1752 out:
1753         lov_fini_brw_set(lovset);
1754         RETURN(rc);
1755 }
1756
1757 static int lov_ap_make_ready(void *data, int cmd)
1758 {
1759         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1760
1761         return lap->lap_caller_ops->ap_make_ready(lap->lap_caller_data, cmd);
1762 }
1763
1764 static int lov_ap_refresh_count(void *data, int cmd)
1765 {
1766         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1767
1768         return lap->lap_caller_ops->ap_refresh_count(lap->lap_caller_data,
1769                                                      cmd);
1770 }
1771
1772 static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
1773 {
1774         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1775
1776         lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa);
1777         /* XXX woah, shouldn't we be altering more here?  size? */
1778         oa->o_id = lap->lap_loi_id;
1779         oa->o_gr = lap->lap_loi_gr;
1780         oa->o_valid |= OBD_MD_FLGROUP;
1781         oa->o_stripe_idx = lap->lap_stripe;
1782 }
1783
1784 static void lov_ap_update_obdo(void *data, int cmd, struct obdo *oa,
1785                                obd_valid valid)
1786 {
1787         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1788
1789         lap->lap_caller_ops->ap_update_obdo(lap->lap_caller_data, cmd,oa,valid);
1790 }
1791
1792 static int lov_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
1793 {
1794         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1795
1796         /* in a raid1 regime this would down a count of many ios
1797          * in flight, onl calling the caller_ops completion when all
1798          * the raid1 ios are complete */
1799         rc = lap->lap_caller_ops->ap_completion(lap->lap_caller_data,cmd,oa,rc);
1800         return rc;
1801 }
1802
1803 static struct obd_capa *lov_ap_lookup_capa(void *data, int cmd)
1804 {
1805         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1806         return lap->lap_caller_ops->ap_lookup_capa(lap->lap_caller_data, cmd);
1807 }
1808
1809 static struct obd_async_page_ops lov_async_page_ops = {
1810         .ap_make_ready =        lov_ap_make_ready,
1811         .ap_refresh_count =     lov_ap_refresh_count,
1812         .ap_fill_obdo =         lov_ap_fill_obdo,
1813         .ap_update_obdo =       lov_ap_update_obdo,
1814         .ap_completion =        lov_ap_completion,
1815         .ap_lookup_capa =       lov_ap_lookup_capa,
1816 };
1817
1818 int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
1819                            struct lov_oinfo *loi, cfs_page_t *page,
1820                            obd_off offset, struct obd_async_page_ops *ops,
1821                            void *data, void **res, int nocache,
1822                            struct lustre_handle *lockh)
1823 {
1824         struct lov_obd *lov = &exp->exp_obd->u.lov;
1825         struct lov_async_page *lap;
1826         struct lov_lock_handles *lov_lockh = NULL;
1827         int rc = 0;
1828         ENTRY;
1829
1830         if (!page) {
1831                 int i = 0;
1832                 /* Find an existing osc so we can get it's stupid sizeof(*oap).
1833                    Only because of this layering limitation will a client
1834                    mount with no osts fail */
1835                 while (!lov->lov_tgts || !lov->lov_tgts[i] ||
1836                        !lov->lov_tgts[i]->ltd_exp) {
1837                         i++;
1838                         if (i >= lov->desc.ld_tgt_count)
1839                                 RETURN(-ENOMEDIUM);
1840                 }
1841                 rc = size_round(sizeof(*lap)) +
1842                         obd_prep_async_page(lov->lov_tgts[i]->ltd_exp, NULL,
1843                                             NULL, NULL, 0, NULL, NULL, NULL, 0,
1844                                             NULL);
1845                 RETURN(rc);
1846         }
1847         ASSERT_LSM_MAGIC(lsm);
1848         LASSERT(loi == NULL);
1849
1850         lap = *res;
1851         lap->lap_magic = LOV_AP_MAGIC;
1852         lap->lap_caller_ops = ops;
1853         lap->lap_caller_data = data;
1854
1855         /* for now only raid 0 which passes through */
1856         lap->lap_stripe = lov_stripe_number(lsm, offset);
1857         lov_stripe_offset(lsm, offset, lap->lap_stripe, &lap->lap_sub_offset);
1858         loi = lsm->lsm_oinfo[lap->lap_stripe];
1859
1860         /* so the callback doesn't need the lsm */
1861         lap->lap_loi_id = loi->loi_id;
1862         lap->lap_loi_gr = lsm->lsm_object_gr;
1863         LASSERT(lsm->lsm_object_gr > 0);
1864         
1865         lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap));
1866
1867         if (lockh) {
1868                 lov_lockh = lov_handle2llh(lockh);
1869                 if (lov_lockh) {
1870                         lockh = lov_lockh->llh_handles + lap->lap_stripe;
1871                 }
1872         }
1873
1874         rc = obd_prep_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
1875                                  lsm, loi, page, lap->lap_sub_offset,
1876                                  &lov_async_page_ops, lap,
1877                                  &lap->lap_sub_cookie, nocache, lockh);
1878         if (lov_lockh)
1879                 lov_llh_put(lov_lockh);
1880         if (rc)
1881                 RETURN(rc);
1882         CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
1883                lap->lap_sub_cookie, offset);
1884         RETURN(0);
1885 }
1886
1887 static int lov_queue_async_io(struct obd_export *exp,
1888                               struct lov_stripe_md *lsm,
1889                               struct lov_oinfo *loi, void *cookie,
1890                               int cmd, obd_off off, int count,
1891                               obd_flag brw_flags, obd_flag async_flags)
1892 {
1893         struct lov_obd *lov = &exp->exp_obd->u.lov;
1894         struct lov_async_page *lap;
1895         int rc;
1896
1897         LASSERT(loi == NULL);
1898
1899         ASSERT_LSM_MAGIC(lsm);
1900
1901         lap = LAP_FROM_COOKIE(cookie);
1902
1903         loi = lsm->lsm_oinfo[lap->lap_stripe];
1904
1905         rc = obd_queue_async_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp, lsm,
1906                                 loi, lap->lap_sub_cookie, cmd, off, count,
1907                                 brw_flags, async_flags);
1908         RETURN(rc);
1909 }
1910
1911 static int lov_set_async_flags(struct obd_export *exp,
1912                                struct lov_stripe_md *lsm,
1913                                struct lov_oinfo *loi, void *cookie,
1914                                obd_flag async_flags)
1915 {
1916         struct lov_obd *lov = &exp->exp_obd->u.lov;
1917         struct lov_async_page *lap;
1918         int rc;
1919
1920         LASSERT(loi == NULL);
1921
1922         ASSERT_LSM_MAGIC(lsm);
1923
1924         lap = LAP_FROM_COOKIE(cookie);
1925
1926         loi = lsm->lsm_oinfo[lap->lap_stripe];
1927
1928         rc = obd_set_async_flags(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
1929                                  lsm, loi, lap->lap_sub_cookie, async_flags);
1930         RETURN(rc);
1931 }
1932
1933 static int lov_queue_group_io(struct obd_export *exp,
1934                               struct lov_stripe_md *lsm,
1935                               struct lov_oinfo *loi,
1936                               struct obd_io_group *oig, void *cookie,
1937                               int cmd, obd_off off, int count,
1938                               obd_flag brw_flags, obd_flag async_flags)
1939 {
1940         struct lov_obd *lov = &exp->exp_obd->u.lov;
1941         struct lov_async_page *lap;
1942         int rc;
1943
1944         LASSERT(loi == NULL);
1945
1946         ASSERT_LSM_MAGIC(lsm);
1947
1948         lap = LAP_FROM_COOKIE(cookie);
1949
1950         loi = lsm->lsm_oinfo[lap->lap_stripe];
1951
1952         rc = obd_queue_group_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp, lsm,
1953                                 loi, oig, lap->lap_sub_cookie, cmd, off, count,
1954                                 brw_flags, async_flags);
1955         RETURN(rc);
1956 }
1957
1958 /* this isn't exactly optimal.  we may have queued sync io in oscs on
1959  * all stripes, but we don't record that fact at queue time.  so we
1960  * trigger sync io on all stripes. */
1961 static int lov_trigger_group_io(struct obd_export *exp,
1962                                 struct lov_stripe_md *lsm,
1963                                 struct lov_oinfo *loi,
1964                                 struct obd_io_group *oig)
1965 {
1966         struct lov_obd *lov = &exp->exp_obd->u.lov;
1967         int rc = 0, i, err;
1968
1969         LASSERT(loi == NULL);
1970
1971         ASSERT_LSM_MAGIC(lsm);
1972
1973         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1974                 loi = lsm->lsm_oinfo[i];
1975                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1976                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1977                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1978                         continue;
1979                 }
1980
1981                 err = obd_trigger_group_io(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
1982                                            lsm, loi, oig);
1983                 if (rc == 0 && err != 0)
1984                         rc = err;
1985         };
1986         RETURN(rc);
1987 }
1988
1989 static int lov_teardown_async_page(struct obd_export *exp,
1990                                    struct lov_stripe_md *lsm,
1991                                    struct lov_oinfo *loi, void *cookie)
1992 {
1993         struct lov_obd *lov = &exp->exp_obd->u.lov;
1994         struct lov_async_page *lap;
1995         int rc;
1996
1997         LASSERT(loi == NULL);
1998
1999         ASSERT_LSM_MAGIC(lsm);
2000
2001         lap = LAP_FROM_COOKIE(cookie);
2002
2003         loi = lsm->lsm_oinfo[lap->lap_stripe];
2004
2005         rc = obd_teardown_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
2006                                      lsm, loi, lap->lap_sub_cookie);
2007         if (rc) {
2008                 CERROR("unable to teardown sub cookie %p: %d\n",
2009                        lap->lap_sub_cookie, rc);
2010                 RETURN(rc);
2011         }
2012         RETURN(rc);
2013 }
2014
2015 static int lov_enqueue_interpret(struct ptlrpc_request_set *rqset,
2016                                  void *data, int rc)
2017 {
2018         struct lov_request_set *lovset = (struct lov_request_set *)data;
2019         ENTRY;
2020         rc = lov_fini_enqueue_set(lovset, lovset->set_ei->ei_mode, rc, rqset);
2021         RETURN(rc);
2022 }
2023
2024 static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2025                        struct ldlm_enqueue_info *einfo,
2026                        struct ptlrpc_request_set *rqset)
2027 {
2028         ldlm_mode_t mode = einfo->ei_mode;
2029         struct lov_request_set *set;
2030         struct lov_request *req;
2031         struct list_head *pos;
2032         struct lov_obd *lov;
2033         ldlm_error_t rc;
2034         ENTRY;
2035
2036         LASSERT(oinfo);
2037         ASSERT_LSM_MAGIC(oinfo->oi_md);
2038         LASSERT(mode == (mode & -mode));
2039
2040         /* we should never be asked to replay a lock this way. */
2041         LASSERT((oinfo->oi_flags & LDLM_FL_REPLAY) == 0);
2042
2043         if (!exp || !exp->exp_obd)
2044                 RETURN(-ENODEV);
2045
2046         lov = &exp->exp_obd->u.lov;
2047         rc = lov_prep_enqueue_set(exp, oinfo, einfo, &set);
2048         if (rc)
2049                 RETURN(rc);
2050
2051         list_for_each (pos, &set->set_list) {
2052                 req = list_entry(pos, struct lov_request, rq_link);
2053
2054                 rc = obd_enqueue(lov->lov_tgts[req->rq_idx]->ltd_exp,
2055                                  &req->rq_oi, einfo, rqset);
2056                 if (rc != ELDLM_OK)
2057                         GOTO(out, rc);
2058         }
2059
2060         if (rqset && !list_empty(&rqset->set_requests)) {
2061                 LASSERT(rc == 0);
2062                 LASSERT(rqset->set_interpret == NULL);
2063                 rqset->set_interpret = lov_enqueue_interpret;
2064                 rqset->set_arg = (void *)set;
2065                 RETURN(rc);
2066         }
2067 out:
2068         rc = lov_fini_enqueue_set(set, mode, rc, rqset);
2069         RETURN(rc);
2070 }
2071
2072 static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2073                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2074                      int *flags, void *data, struct lustre_handle *lockh)
2075 {
2076         struct lov_request_set *set;
2077         struct obd_info oinfo;
2078         struct lov_request *req;
2079         struct list_head *pos;
2080         struct lov_obd *lov = &exp->exp_obd->u.lov;
2081         struct lustre_handle *lov_lockhp;
2082         int lov_flags, rc = 0;
2083         ENTRY;
2084
2085         ASSERT_LSM_MAGIC(lsm);
2086         LASSERT((*flags & LDLM_FL_TEST_LOCK) || mode == (mode & -mode));
2087
2088         if (!exp || !exp->exp_obd)
2089                 RETURN(-ENODEV);
2090
2091         lov = &exp->exp_obd->u.lov;
2092         rc = lov_prep_match_set(exp, &oinfo, lsm, policy, mode, lockh, &set);
2093         if (rc)
2094                 RETURN(rc);
2095
2096         list_for_each (pos, &set->set_list) {
2097                 ldlm_policy_data_t sub_policy;
2098                 req = list_entry(pos, struct lov_request, rq_link);
2099                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
2100                 LASSERT(lov_lockhp);
2101
2102                 lov_flags = *flags;
2103                 sub_policy.l_extent = req->rq_oi.oi_policy.l_extent;
2104
2105                 rc = obd_match(lov->lov_tgts[req->rq_idx]->ltd_exp,
2106                                req->rq_oi.oi_md, type, &sub_policy,
2107                                mode, &lov_flags, data, lov_lockhp);
2108                 rc = lov_update_match_set(set, req, rc);
2109                 if (rc <= 0)
2110                         break;
2111         }
2112         lov_fini_match_set(set, mode, *flags);
2113         RETURN(rc);
2114 }
2115
2116 static int lov_change_cbdata(struct obd_export *exp,
2117                              struct lov_stripe_md *lsm, ldlm_iterator_t it,
2118                              void *data)
2119 {
2120         struct lov_obd *lov;
2121         int rc = 0, i;
2122         ENTRY;
2123
2124         ASSERT_LSM_MAGIC(lsm);
2125
2126         if (!exp || !exp->exp_obd)
2127                 RETURN(-ENODEV);
2128
2129         LASSERT(lsm->lsm_object_gr > 0);
2130
2131         lov = &exp->exp_obd->u.lov;
2132         for (i = 0; i < lsm->lsm_stripe_count; i++) {
2133                 struct lov_stripe_md submd;
2134                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
2135
2136                 if (!lov->lov_tgts[loi->loi_ost_idx]) {
2137                         CDEBUG(D_HA, "lov idx %d NULL \n", loi->loi_ost_idx);
2138                         continue;
2139                 }
2140                 
2141                 submd.lsm_object_id = loi->loi_id;
2142                 submd.lsm_object_gr = lsm->lsm_object_gr;
2143                 submd.lsm_stripe_count = 0;
2144                 rc = obd_change_cbdata(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
2145                                        &submd, it, data);
2146         }
2147         RETURN(rc);
2148 }
2149
2150 static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
2151                       __u32 mode, struct lustre_handle *lockh)
2152 {
2153         struct lov_request_set *set;
2154         struct obd_info oinfo;
2155         struct lov_request *req;
2156         struct list_head *pos;
2157         struct lov_obd *lov = &exp->exp_obd->u.lov;
2158         struct lustre_handle *lov_lockhp;
2159         int err = 0, rc = 0;
2160         ENTRY;
2161
2162         ASSERT_LSM_MAGIC(lsm);
2163
2164         if (!exp || !exp->exp_obd)
2165                 RETURN(-ENODEV);
2166
2167         LASSERT(lsm->lsm_object_gr > 0);
2168         LASSERT(lockh);
2169         lov = &exp->exp_obd->u.lov;
2170         rc = lov_prep_cancel_set(exp, &oinfo, lsm, mode, lockh, &set);
2171         if (rc)
2172                 RETURN(rc);
2173
2174         list_for_each (pos, &set->set_list) {
2175                 req = list_entry(pos, struct lov_request, rq_link);
2176                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
2177
2178                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
2179                                 req->rq_oi.oi_md, mode, lov_lockhp);
2180                 rc = lov_update_common_set(set, req, rc);
2181                 if (rc) {
2182                         CERROR("error: cancel objid "LPX64" subobj "
2183                                LPX64" on OST idx %d: rc = %d\n",
2184                                lsm->lsm_object_id,
2185                                req->rq_oi.oi_md->lsm_object_id,
2186                                req->rq_idx, rc);
2187                         err = rc;
2188                 }
2189
2190         }
2191         lov_fini_cancel_set(set);
2192         RETURN(err);
2193 }
2194
2195 static int lov_cancel_unused(struct obd_export *exp,
2196                              struct lov_stripe_md *lsm,
2197                              int flags, void *opaque)
2198 {
2199         struct lov_obd *lov;
2200         int rc = 0, i;
2201         ENTRY;
2202
2203         if (!exp || !exp->exp_obd)
2204                 RETURN(-ENODEV);
2205
2206         lov = &exp->exp_obd->u.lov;
2207         if (lsm == NULL) {
2208                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2209                         int err;
2210                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
2211                                 continue;
2212
2213                         err = obd_cancel_unused(lov->lov_tgts[i]->ltd_exp, NULL,
2214                                                 flags, opaque);
2215                         if (!rc)
2216                                 rc = err;
2217                 }
2218                 RETURN(rc);
2219         }
2220
2221         ASSERT_LSM_MAGIC(lsm);
2222
2223         LASSERT(lsm->lsm_object_gr > 0);
2224         for (i = 0; i < lsm->lsm_stripe_count; i++) {
2225                 struct lov_stripe_md submd;
2226                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
2227                 int err;
2228
2229                 if (!lov->lov_tgts[loi->loi_ost_idx]) {
2230                         CDEBUG(D_HA, "lov idx %d NULL\n", loi->loi_ost_idx);
2231                         continue;
2232                 }
2233
2234                 if (!lov->lov_tgts[loi->loi_ost_idx]->ltd_active)
2235                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
2236
2237                 submd.lsm_object_id = loi->loi_id;
2238                 submd.lsm_object_gr = lsm->lsm_object_gr;
2239                 submd.lsm_stripe_count = 0;
2240                 err = obd_cancel_unused(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
2241                                         &submd, flags, opaque);
2242                 if (err && lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
2243                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
2244                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
2245                                loi->loi_id, loi->loi_ost_idx, err);
2246                         if (!rc)
2247                                 rc = err;
2248                 }
2249         }
2250         RETURN(rc);
2251 }
2252
2253 static int lov_join_lru(struct obd_export *exp,
2254                         struct lov_stripe_md *lsm, int join)
2255 {
2256         struct lov_obd *lov;
2257         int i, count = 0;
2258         ENTRY;
2259
2260         ASSERT_LSM_MAGIC(lsm);
2261         if (!exp || !exp->exp_obd)
2262                 RETURN(-ENODEV);
2263
2264         lov = &exp->exp_obd->u.lov;
2265         for (i = 0; i < lsm->lsm_stripe_count; i++) {
2266                 struct lov_stripe_md submd;
2267                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
2268                 int rc = 0;
2269
2270                 if (!lov->lov_tgts[loi->loi_ost_idx]) {
2271                         CDEBUG(D_HA, "lov idx %d NULL\n", loi->loi_ost_idx);
2272                         continue;
2273                 }
2274
2275                 if (!lov->lov_tgts[loi->loi_ost_idx]->ltd_active)
2276                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
2277
2278                 submd.lsm_object_id = loi->loi_id;
2279                 submd.lsm_object_gr = lsm->lsm_object_gr;
2280                 submd.lsm_stripe_count = 0;
2281                 rc = obd_join_lru(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
2282                                   &submd, join);
2283                 if (rc < 0) {
2284                         CERROR("join lru failed. objid: "LPX64" subobj: "LPX64
2285                                " ostidx: %d rc: %d\n", lsm->lsm_object_id,
2286                                loi->loi_id, loi->loi_ost_idx, rc);
2287                         return rc;
2288                 } else {
2289                         count += rc;
2290                 }
2291         }
2292         RETURN(count);
2293 }
2294
2295 static int lov_statfs_interpret(struct ptlrpc_request_set *rqset,
2296                                 void *data, int rc)
2297 {
2298         struct lov_request_set *lovset = (struct lov_request_set *)data;
2299         int err;
2300         ENTRY;
2301
2302         if (rc)
2303                 lovset->set_completes = 0;
2304
2305         err = lov_fini_statfs_set(lovset);
2306         RETURN(rc ? rc : err);
2307 }
2308
2309 static int lov_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
2310                             __u64 max_age, struct ptlrpc_request_set *rqset)
2311 {
2312         struct lov_request_set *set;
2313         struct lov_request *req;
2314         struct list_head *pos;
2315         struct lov_obd *lov;
2316         int rc = 0;
2317         ENTRY;
2318
2319         LASSERT(oinfo != NULL);
2320         LASSERT(oinfo->oi_osfs != NULL);
2321
2322         lov = &obd->u.lov;
2323         rc = lov_prep_statfs_set(obd, oinfo, &set);
2324         if (rc)
2325                 RETURN(rc);
2326
2327         list_for_each (pos, &set->set_list) {
2328                 struct obd_device *osc_obd;
2329
2330                 req = list_entry(pos, struct lov_request, rq_link);
2331
2332                 osc_obd = class_exp2obd(lov->lov_tgts[req->rq_idx]->ltd_exp);
2333                 rc = obd_statfs_async(osc_obd, &req->rq_oi, max_age, rqset);
2334                 if (rc)
2335                         break;
2336         }
2337
2338         if (rc || list_empty(&rqset->set_requests)) {
2339                 int err;
2340                 if (rc)
2341                         set->set_completes = 0;
2342                 err = lov_fini_statfs_set(set);
2343                 RETURN(rc ? rc : err);
2344         }
2345
2346         LASSERT(rqset->set_interpret == NULL);
2347         rqset->set_interpret = lov_statfs_interpret;
2348         rqset->set_arg = (void *)set;
2349         RETURN(0);
2350 }
2351
2352 static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2353                       __u64 max_age, __u32 flags)
2354 {
2355         struct ptlrpc_request_set *set = NULL;
2356         struct obd_info oinfo = { { { 0 } } };
2357         int rc = 0;
2358         ENTRY;
2359
2360
2361         /* for obdclass we forbid using obd_statfs_rqset, but prefer using async
2362          * statfs requests */
2363         set = ptlrpc_prep_set();
2364         if (set == NULL)
2365                 RETURN(-ENOMEM);
2366
2367         oinfo.oi_osfs = osfs;
2368         oinfo.oi_flags = flags;
2369         rc = lov_statfs_async(obd, &oinfo, max_age, set);
2370         if (rc == 0)
2371                 rc = ptlrpc_set_wait(set);
2372         ptlrpc_set_destroy(set);
2373
2374         RETURN(rc);
2375 }
2376
2377 static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2378                          void *karg, void *uarg)
2379 {
2380         struct obd_device *obddev = class_exp2obd(exp);
2381         struct lov_obd *lov = &obddev->u.lov;
2382         int i, rc, count = lov->desc.ld_tgt_count;
2383         struct obd_uuid *uuidp;
2384         ENTRY;
2385
2386         switch (cmd) {
2387         case IOC_OBD_STATFS: {
2388                 struct obd_ioctl_data *data = karg;
2389                 struct obd_device *osc_obd;
2390                 struct obd_statfs stat_buf = {0};
2391                 __u32 index;
2392
2393                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
2394                 LASSERT(data->ioc_plen1 == sizeof(struct obd_statfs));
2395
2396                 if ((index >= count))
2397                         RETURN(-ENODEV);
2398
2399                 if (!lov->lov_tgts[index])
2400                         /* Try again with the next index */
2401                         RETURN(-EAGAIN);
2402                 if (!lov->lov_tgts[index]->ltd_active)
2403                         RETURN(-ENODATA);
2404
2405                 osc_obd = class_exp2obd(lov->lov_tgts[index]->ltd_exp);
2406                 if (!osc_obd)
2407                         RETURN(-EINVAL);
2408
2409                 /* got statfs data */
2410                 rc = obd_statfs(osc_obd, &stat_buf,
2411                                 cfs_time_current_64() - HZ, 0);
2412                 if (rc)
2413                         RETURN(rc);
2414                 if (copy_to_user(data->ioc_pbuf1, &stat_buf, data->ioc_plen1))
2415                         RETURN(rc);
2416                 /* copy UUID */
2417                 rc = copy_to_user(data->ioc_pbuf2, obd2cli_tgt(osc_obd),
2418                                   data->ioc_plen2);
2419                 break;
2420         }
2421         case OBD_IOC_LOV_GET_CONFIG: {
2422                 struct obd_ioctl_data *data;
2423                 struct lov_desc *desc;
2424                 char *buf = NULL;
2425                 __u32 *genp;
2426
2427                 len = 0;
2428                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2429                         RETURN(-EINVAL);
2430
2431                 data = (struct obd_ioctl_data *)buf;
2432
2433                 if (sizeof(*desc) > data->ioc_inllen1) {
2434                         obd_ioctl_freedata(buf, len);
2435                         RETURN(-EINVAL);
2436                 }
2437
2438                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
2439                         obd_ioctl_freedata(buf, len);
2440                         RETURN(-EINVAL);
2441                 }
2442
2443                 if (sizeof(__u32) * count > data->ioc_inllen3) {
2444                         obd_ioctl_freedata(buf, len);
2445                         RETURN(-EINVAL);
2446                 }
2447
2448                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2449                 memcpy(desc, &(lov->desc), sizeof(*desc));
2450
2451                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
2452                 genp = (__u32 *)data->ioc_inlbuf3;
2453                 /* the uuid will be empty for deleted OSTs */
2454                 for (i = 0; i < count; i++, uuidp++, genp++) {
2455                         if (!lov->lov_tgts[i])
2456                                 continue;
2457                         *uuidp = lov->lov_tgts[i]->ltd_uuid;
2458                         *genp = lov->lov_tgts[i]->ltd_gen;
2459                 }
2460
2461                 rc = copy_to_user((void *)uarg, buf, len);
2462                 if (rc)
2463                         rc = -EFAULT;
2464                 obd_ioctl_freedata(buf, len);
2465                 break;
2466         }
2467         case LL_IOC_LOV_SETSTRIPE:
2468                 rc = lov_setstripe(exp, karg, uarg);
2469                 break;
2470         case LL_IOC_LOV_GETSTRIPE:
2471                 rc = lov_getstripe(exp, karg, uarg);
2472                 break;
2473         case LL_IOC_LOV_SETEA:
2474                 rc = lov_setea(exp, karg, uarg);
2475                 break;
2476         default: {
2477                 int set = 0;
2478
2479                 if (count == 0)
2480                         RETURN(-ENOTTY);
2481
2482                 rc = 0;
2483                 for (i = 0; i < count; i++) {
2484                         int err;
2485
2486                         /* OST was disconnected */
2487                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
2488                                 continue;
2489
2490                         err = obd_iocontrol(cmd, lov->lov_tgts[i]->ltd_exp,
2491                                             len, karg, uarg);
2492                         if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
2493                                 RETURN(err);
2494                         } else if (err) {
2495                                 if (lov->lov_tgts[i]->ltd_active) {
2496                                         CDEBUG(err == -ENOTTY ?
2497                                                D_IOCTL : D_WARNING,
2498                                                "iocontrol OSC %s on OST "
2499                                                "idx %d cmd %x: err = %d\n",
2500                                                lov_uuid2str(lov, i),
2501                                                i, cmd, err);
2502                                         if (!rc)
2503                                                 rc = err;
2504                                 }
2505                         } else {
2506                                 set = 1;
2507                         }
2508                 }
2509                 if (!set && !rc)
2510                         rc = -EIO;
2511         }
2512         }
2513
2514         RETURN(rc);
2515 }
2516
2517 #define FIEMAP_BUFFER_SIZE 4096
2518
2519 /**
2520  * Non-zero fe_logical indicates that this is a continuation FIEMAP
2521  * call. The local end offset and the device are sent in the first
2522  * fm_extent. This function calculates the stripe number from the index.
2523  * This function returns a stripe_no on which mapping is to be restarted.
2524  *
2525  * This function returns fm_end_offset which is the in-OST offset at which
2526  * mapping should be restarted. If fm_end_offset=0 is returned then caller
2527  * will re-calculate proper offset in next stripe.
2528  * Note that the first extent is passed to lov_get_info via the value field.
2529  *
2530  * \param fiemap fiemap request header
2531  * \param lsm striping information for the file
2532  * \param fm_start logical start of mapping
2533  * \param fm_end logical end of mapping
2534  * \param start_stripe starting stripe will be returned in this
2535  */
2536 obd_size fiemap_calc_fm_end_offset(struct ll_user_fiemap *fiemap,
2537                                    struct lov_stripe_md *lsm, obd_size fm_start,
2538                                    obd_size fm_end, int *start_stripe)
2539 {
2540         obd_size local_end = fiemap->fm_extents[0].fe_logical;
2541         obd_off lun_start, lun_end;
2542         obd_size fm_end_offset;
2543         int stripe_no = -1, i;
2544
2545         if (fiemap->fm_extent_count == 0 ||
2546             fiemap->fm_extents[0].fe_logical == 0)
2547                 return 0;
2548
2549         /* Find out stripe_no from ost_index saved in the fe_device */
2550         for (i = 0; i < lsm->lsm_stripe_count; i++) {
2551                 if (lsm->lsm_oinfo[i]->loi_ost_idx ==
2552                                         fiemap->fm_extents[0].fe_device) {
2553                         stripe_no = i;
2554                         break;
2555                 }
2556         }
2557
2558         /* If we have finished mapping on previous device, shift logical
2559          * offset to start of next device */
2560         if ((lov_stripe_intersects(lsm, stripe_no, fm_start, fm_end,
2561                                    &lun_start, &lun_end)) != 0 &&
2562                                    local_end < lun_end) {
2563                 fm_end_offset = local_end;
2564                 *start_stripe = stripe_no;
2565         } else {
2566                 /* This is a special value to indicate that caller should
2567                  * calculate offset in next stripe. */
2568                 fm_end_offset = 0;
2569                 *start_stripe = (stripe_no + 1) % lsm->lsm_stripe_count;
2570         }
2571
2572         return fm_end_offset;
2573 }
2574
2575 /**
2576  * We calculate on which OST the mapping will end. If the length of mapping
2577  * is greater than (stripe_size * stripe_count) then the last_stripe will
2578  * will be one just before start_stripe. Else we check if the mapping
2579  * intersects each OST and find last_stripe.
2580  * This function returns the last_stripe and also sets the stripe_count
2581  * over which the mapping is spread
2582  *
2583  * \param lsm striping information for the file
2584  * \param fm_start logical start of mapping
2585  * \param fm_end logical end of mapping
2586  * \param start_stripe starting stripe of the mapping
2587  * \param stripe_count the number of stripes across which to map is returned
2588  *
2589  * \retval last_stripe return the last stripe of the mapping
2590  */
2591 int fiemap_calc_last_stripe(struct lov_stripe_md *lsm, obd_size fm_start,
2592                             obd_size fm_end, int start_stripe,
2593                             int *stripe_count)
2594 {
2595         int last_stripe;
2596         obd_off obd_start, obd_end;
2597         int i, j;
2598
2599         if (fm_end - fm_start > lsm->lsm_stripe_size * lsm->lsm_stripe_count) {
2600                 last_stripe = (start_stripe < 1 ? lsm->lsm_stripe_count - 1 :
2601                                                               start_stripe - 1);
2602                 *stripe_count = lsm->lsm_stripe_count;
2603         } else {
2604                 for (j = 0, i = start_stripe; j < lsm->lsm_stripe_count;
2605                      i = (i + 1) % lsm->lsm_stripe_count, j++) {
2606                         if ((lov_stripe_intersects(lsm, i, fm_start, fm_end,
2607                                                    &obd_start, &obd_end)) == 0)
2608                                 break;
2609                 }
2610                 *stripe_count = j;
2611                 last_stripe = (start_stripe + j - 1) %lsm->lsm_stripe_count;
2612         }
2613
2614         return last_stripe;
2615 }
2616
2617 /**
2618  * Set fe_device and copy extents from local buffer into main return buffer.
2619  *
2620  * \param fiemap fiemap request header
2621  * \param lcl_fm_ext array of local fiemap extents to be copied
2622  * \param ost_index OST index to be written into the fm_device field for each
2623                     extent
2624  * \param ext_count number of extents to be copied
2625  * \param current_extent where to start copying in main extent array
2626  */
2627 void fiemap_prepare_and_copy_exts(struct ll_user_fiemap *fiemap,
2628                                   struct ll_fiemap_extent *lcl_fm_ext,
2629                                   int ost_index, unsigned int ext_count,
2630                                   int current_extent)
2631 {
2632         char *to;
2633         int ext;
2634
2635         for (ext = 0; ext < ext_count; ext++) {
2636                 lcl_fm_ext[ext].fe_device = ost_index;
2637                 lcl_fm_ext[ext].fe_flags |= FIEMAP_EXTENT_NET;
2638         }
2639
2640         /* Copy fm_extent's from fm_local to return buffer */
2641         to = (char *)fiemap + fiemap_count_to_size(current_extent);
2642         memcpy(to, lcl_fm_ext, ext_count * sizeof(struct ll_fiemap_extent));
2643 }
2644
2645 /**
2646  * Break down the FIEMAP request and send appropriate calls to individual OSTs.
2647  * This also handles the restarting of FIEMAP calls in case mapping overflows
2648  * the available number of extents in single call.
2649  */
2650 static int lov_fiemap(struct lov_obd *lov, __u32 keylen, void *key,
2651                       __u32 *vallen, void *val, struct lov_stripe_md *lsm)
2652 {
2653         struct ll_fiemap_info_key *fm_key = key;
2654         struct ll_user_fiemap *fiemap = val;
2655         struct ll_user_fiemap *fm_local = NULL;
2656         struct ll_fiemap_extent *lcl_fm_ext;
2657         int count_local;
2658         unsigned int get_num_extents = 0;
2659         int ost_index = 0, actual_start_stripe, start_stripe;
2660         obd_size fm_start, fm_end, fm_length, fm_end_offset = 0;
2661         obd_size curr_loc;
2662         int current_extent = 0, rc = 0, i;
2663         int ost_eof = 0; /* EOF for object */
2664         int ost_done = 0; /* done with required mapping for this OST? */
2665         int last_stripe;
2666         int cur_stripe = 0, cur_stripe_wrap = 0, stripe_count;
2667         unsigned int buffer_size = FIEMAP_BUFFER_SIZE;
2668
2669         if (lsm == NULL)
2670                 GOTO(out, rc = 0);
2671
2672         if (fiemap_count_to_size(fm_key->fiemap.fm_extent_count) < buffer_size)
2673                 buffer_size = fiemap_count_to_size(fm_key->fiemap.fm_extent_count);
2674
2675         OBD_ALLOC(fm_local, buffer_size);
2676         if (fm_local == NULL)
2677                 GOTO(out, rc = -ENOMEM);
2678         lcl_fm_ext = &fm_local->fm_extents[0];
2679
2680         count_local = fiemap_size_to_count(buffer_size);
2681
2682         memcpy(fiemap, &fm_key->fiemap, sizeof(*fiemap));
2683         fm_start = fiemap->fm_start;
2684         fm_length = fiemap->fm_length;
2685         /* Calculate start stripe, last stripe and length of mapping */
2686         actual_start_stripe = start_stripe = lov_stripe_number(lsm, fm_start);
2687         fm_end = (fm_length == ~0ULL ? fm_key->oa.o_size :
2688                                                 fm_start + fm_length - 1);
2689         /* If fm_length != ~0ULL but fm_start+fm_length-1 exceeds file size */
2690         if (fm_end > fm_key->oa.o_size)
2691                 fm_end = fm_key->oa.o_size;
2692
2693         last_stripe = fiemap_calc_last_stripe(lsm, fm_start, fm_end,
2694                                             actual_start_stripe, &stripe_count);
2695
2696         fm_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, fm_start, fm_end,
2697                                                   &start_stripe);
2698
2699         if (fiemap->fm_extent_count == 0) {
2700                 get_num_extents = 1;
2701                 count_local = 0;
2702         }
2703
2704         /* Check each stripe */
2705         for (cur_stripe = start_stripe, i = 0; i < stripe_count;
2706              i++, cur_stripe = (cur_stripe + 1) % lsm->lsm_stripe_count) {
2707                 obd_size req_fm_len; /* Stores length of required mapping */
2708                 obd_size len_mapped_single_call;
2709                 obd_off lun_start, lun_end, obd_object_end;
2710                 unsigned int ext_count;
2711
2712                 cur_stripe_wrap = cur_stripe;
2713
2714                 /* Find out range of mapping on this stripe */
2715                 if ((lov_stripe_intersects(lsm, cur_stripe, fm_start, fm_end,
2716                                            &lun_start, &obd_object_end)) == 0)
2717                         continue;
2718
2719                 /* If this is a continuation FIEMAP call and we are on
2720                  * starting stripe then lun_start needs to be set to
2721                  * fm_end_offset */
2722                 if (fm_end_offset != 0 && cur_stripe == start_stripe)
2723                         lun_start = fm_end_offset;
2724
2725                 if (fm_length != ~0ULL) {
2726                         /* Handle fm_start + fm_length overflow */
2727                         if (fm_start + fm_length < fm_start)
2728                                 fm_length = ~0ULL - fm_start;
2729                         lun_end = lov_size_to_stripe(lsm, fm_start + fm_length,
2730                                                      cur_stripe);
2731                 } else {
2732                         lun_end = ~0ULL;
2733                 }
2734
2735                 if (lun_start == lun_end)
2736                         continue;
2737
2738                 req_fm_len = obd_object_end - lun_start;
2739                 fm_local->fm_length = 0;
2740                 len_mapped_single_call = 0;
2741
2742                 /* If the output buffer is very large and the objects have many
2743                  * extents we may need to loop on a single OST repeatedly */
2744                 ost_eof = 0;
2745                 ost_done = 0;
2746                 do {
2747                         if (get_num_extents == 0) {
2748                                 /* Don't get too many extents. */
2749                                 if (current_extent + count_local >
2750                                     fiemap->fm_extent_count)
2751                                         count_local = fiemap->fm_extent_count -
2752                                                                  current_extent;
2753                         }
2754
2755                         lun_start += len_mapped_single_call;
2756                         fm_local->fm_length = req_fm_len - len_mapped_single_call;
2757                         req_fm_len = fm_local->fm_length;
2758                         fm_local->fm_extent_count = count_local;
2759                         fm_local->fm_mapped_extents = 0;
2760                         fm_local->fm_flags = fiemap->fm_flags;
2761
2762                         fm_key->oa.o_id = lsm->lsm_oinfo[cur_stripe]->loi_id;
2763                         ost_index = lsm->lsm_oinfo[cur_stripe]->loi_ost_idx;
2764
2765                         if (ost_index < 0 || ost_index >=lov->desc.ld_tgt_count)
2766                                 GOTO(out, rc = -EINVAL);
2767
2768                         /* If OST is inactive, return extent with UNKNOWN flag */
2769                         if (lov && !lov->lov_tgts[ost_index]->ltd_active) {
2770                                 fm_local->fm_flags |= FIEMAP_EXTENT_LAST;
2771                                 fm_local->fm_mapped_extents = 1;
2772
2773                                 lcl_fm_ext[0].fe_logical = lun_start;
2774                                 lcl_fm_ext[0].fe_length = obd_object_end -
2775                                                                       lun_start;
2776                                 lcl_fm_ext[0].fe_flags |= FIEMAP_EXTENT_UNKNOWN;
2777
2778                                 goto inactive_tgt;
2779                         }
2780
2781                         fm_local->fm_start = lun_start;
2782                         fm_local->fm_flags &= ~FIEMAP_FLAG_DEVICE_ORDER;
2783                         memcpy(&fm_key->fiemap, fm_local, sizeof(*fm_local));
2784                         *vallen=fiemap_count_to_size(fm_local->fm_extent_count);
2785                         rc = obd_get_info(lov->lov_tgts[ost_index]->ltd_exp,
2786                                           keylen, key, vallen, fm_local, lsm);
2787                         if (rc != 0)
2788                                 GOTO(out, rc);
2789
2790 inactive_tgt:
2791                         ext_count = fm_local->fm_mapped_extents;
2792                         if (ext_count == 0) {
2793                                 ost_done = 1;
2794                                 /* If last stripe has hole at the end,
2795                                  * then we need to return */
2796                                 if (cur_stripe_wrap == last_stripe) {
2797                                         fiemap->fm_mapped_extents = 0;
2798                                         goto finish;
2799                                 }
2800                                 break;
2801                         }
2802
2803                         /* If we just need num of extents then go to next device */
2804                         if (get_num_extents) {
2805                                 current_extent += ext_count;
2806                                 break;
2807                         }
2808
2809                         len_mapped_single_call = lcl_fm_ext[ext_count-1].fe_logical -
2810                                   lun_start + lcl_fm_ext[ext_count - 1].fe_length;
2811
2812                         /* Have we finished mapping on this device? */
2813                         if (req_fm_len <= len_mapped_single_call)
2814                                 ost_done = 1;
2815
2816                         /* Clear the EXTENT_LAST flag which can be present on
2817                          * last extent */
2818                         if (lcl_fm_ext[ext_count-1].fe_flags & FIEMAP_EXTENT_LAST)
2819                                 lcl_fm_ext[ext_count - 1].fe_flags &=
2820                                                             ~FIEMAP_EXTENT_LAST;
2821
2822                         curr_loc = lov_stripe_size(lsm,
2823                                            lcl_fm_ext[ext_count - 1].fe_logical+
2824                                            lcl_fm_ext[ext_count - 1].fe_length,
2825                                            cur_stripe);
2826                         if (curr_loc >= fm_key->oa.o_size)
2827                                 ost_eof = 1;
2828
2829                         fiemap_prepare_and_copy_exts(fiemap, lcl_fm_ext,
2830                                                      ost_index, ext_count,
2831                                                      current_extent);
2832
2833                         current_extent += ext_count;
2834
2835                         /* Ran out of available extents? */
2836                         if (current_extent >= fiemap->fm_extent_count)
2837                                 goto finish;
2838                 } while (ost_done == 0 && ost_eof == 0);
2839
2840                 if (cur_stripe_wrap == last_stripe)
2841                         goto finish;
2842         }
2843
2844 finish:
2845         /* Indicate that we are returning device offsets unless file just has
2846          * single stripe */
2847         if (lsm->lsm_stripe_count > 1)
2848                 fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER;
2849
2850         if (get_num_extents)
2851                 goto skip_last_device_calc;
2852
2853         /* Check if we have reached the last stripe and whether mapping for that
2854          * stripe is done. */
2855         if (cur_stripe_wrap == last_stripe) {
2856                 if (ost_done || ost_eof)
2857                         fiemap->fm_extents[current_extent - 1].fe_flags |=
2858                                                              FIEMAP_EXTENT_LAST;
2859         }
2860
2861 skip_last_device_calc:
2862         fiemap->fm_mapped_extents = current_extent;
2863
2864 out:
2865         OBD_FREE(fm_local, buffer_size);
2866         return rc;
2867 }
2868
2869 static int lov_get_info(struct obd_export *exp, __u32 keylen,
2870                         void *key, __u32 *vallen, void *val,
2871                         struct lov_stripe_md *lsm)
2872 {
2873         struct obd_device *obddev = class_exp2obd(exp);
2874         struct lov_obd *lov = &obddev->u.lov;
2875         int i, rc;
2876         ENTRY;
2877
2878         if (!vallen || !val)
2879                 RETURN(-EFAULT);
2880
2881         lov_getref(obddev);
2882
2883         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2884                 struct {
2885                         char name[16];
2886                         struct ldlm_lock *lock;
2887                 } *data = key;
2888                 struct ldlm_res_id *res_id = &data->lock->l_resource->lr_name;
2889                 struct lov_oinfo *loi;
2890                 __u32 *stripe = val;
2891
2892                 if (*vallen < sizeof(*stripe))
2893                         GOTO(out, rc = -EFAULT);
2894                 *vallen = sizeof(*stripe);
2895
2896                 /* XXX This is another one of those bits that will need to
2897                  * change if we ever actually support nested LOVs.  It uses
2898                  * the lock's export to find out which stripe it is. */
2899                 /* XXX - it's assumed all the locks for deleted OSTs have
2900                  * been cancelled. Also, the export for deleted OSTs will
2901                  * be NULL and won't match the lock's export. */
2902                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2903                         loi = lsm->lsm_oinfo[i];
2904                         if (!lov->lov_tgts[loi->loi_ost_idx])
2905                                 continue;
2906                         if (lov->lov_tgts[loi->loi_ost_idx]->ltd_exp ==
2907                             data->lock->l_conn_export &&
2908                             osc_res_name_eq(loi->loi_id, loi->loi_gr, res_id)) {
2909                                 *stripe = i;
2910                                 GOTO(out, rc = 0);
2911                         }
2912                 }
2913                 LDLM_ERROR(data->lock, "lock on inode without such object");
2914                 dump_lsm(D_ERROR, lsm);
2915                 GOTO(out, rc = -ENXIO);
2916         } else if (KEY_IS(KEY_LAST_ID)) {
2917                 struct obd_id_info *info = val;
2918                 __u32 size = sizeof(obd_id);
2919                 struct lov_tgt_desc *tgt;
2920
2921                 LASSERT(*vallen == sizeof(struct obd_id_info));
2922                 tgt = lov->lov_tgts[info->idx];
2923
2924                 if (!tgt || !tgt->ltd_active)
2925                         GOTO(out, rc = -ESRCH);
2926
2927                 rc = obd_get_info(tgt->ltd_exp, keylen, key, &size, info->data, NULL);
2928                 GOTO(out, rc = 0);
2929         } else if (KEY_IS(KEY_LOVDESC)) {
2930                 struct lov_desc *desc_ret = val;
2931                 *desc_ret = lov->desc;
2932
2933                 GOTO(out, rc = 0);
2934         } else if (KEY_IS(KEY_FIEMAP)) {
2935                 rc = lov_fiemap(lov, keylen, key, vallen, val, lsm);
2936                 GOTO(out, rc);
2937         }
2938
2939         rc = -EINVAL;
2940
2941 out:
2942         lov_putref(obddev);
2943         RETURN(rc);
2944 }
2945
2946 static int lov_set_info_async(struct obd_export *exp, obd_count keylen,
2947                               void *key, obd_count vallen, void *val,
2948                               struct ptlrpc_request_set *set)
2949 {
2950         struct obd_device *obddev = class_exp2obd(exp);
2951         struct lov_obd *lov = &obddev->u.lov;
2952         obd_count count;
2953         int i, rc = 0, err;
2954         struct lov_tgt_desc *tgt;
2955         unsigned incr, check_uuid,
2956                  do_inactive, no_set;
2957         unsigned next_id = 0,  mds_con = 0;
2958         ENTRY;
2959
2960         incr = check_uuid = do_inactive = no_set = 0;
2961         if (set == NULL) {
2962                 no_set = 1;
2963                 set = ptlrpc_prep_set();
2964                 if (!set)
2965                         RETURN(-ENOMEM);
2966         }
2967
2968         lov_getref(obddev);
2969         count = lov->desc.ld_tgt_count;
2970
2971         if (KEY_IS(KEY_NEXT_ID)) {
2972                 count = vallen / sizeof(struct obd_id_info);
2973                 vallen = sizeof(obd_id);
2974                 incr = sizeof(struct obd_id_info);
2975                 do_inactive = 1;
2976                 next_id = 1;
2977         } else if (KEY_IS(KEY_CHECKSUM)) {
2978                 do_inactive = 1;
2979         } else if (KEY_IS(KEY_UNLINKED)) {
2980                 check_uuid = val ? 1 : 0;
2981         } else if (KEY_IS(KEY_EVICT_BY_NID)) {
2982                 /* use defaults:  do_inactive = incr = 0; */
2983         } else if (KEY_IS(KEY_MDS_CONN)) {
2984                 mds_con = 1;
2985         }
2986
2987         for (i = 0; i < count; i++, val = (char *)val + incr) {
2988                 if (next_id) {
2989                         tgt = lov->lov_tgts[((struct obd_id_info*)val)->idx];
2990                 } else {
2991                         tgt = lov->lov_tgts[i];
2992                 }
2993                 /* OST was disconnected */
2994                 if (!tgt || !tgt->ltd_exp)
2995                         continue;
2996
2997                 /* OST is inactive and we don't want inactive OSCs */
2998                 if (!tgt->ltd_active && !do_inactive)
2999                         continue;
3000
3001                 if (mds_con) {
3002                         struct mds_group_info *mgi;
3003
3004                         LASSERT(vallen == sizeof(*mgi));
3005                         mgi = (struct mds_group_info *)val;
3006
3007                         /* Only want a specific OSC */
3008                         if (mgi->uuid && !obd_uuid_equals(mgi->uuid,
3009                                                 &tgt->ltd_uuid))
3010                                 continue;
3011
3012                         err = obd_set_info_async(tgt->ltd_exp,
3013                                          keylen, key, sizeof(int),
3014                                          &mgi->group, set);
3015                 } else if (next_id) {
3016                         err = obd_set_info_async(tgt->ltd_exp,
3017                                          keylen, key, vallen,
3018                                          ((struct obd_id_info*)val)->data, set);
3019                 } else  {
3020                         /* Only want a specific OSC */
3021                         if (check_uuid &&
3022                             !obd_uuid_equals(val, &tgt->ltd_uuid))
3023                                 continue;
3024
3025                         err = obd_set_info_async(tgt->ltd_exp,
3026                                          keylen, key, vallen, val, set);
3027                 }
3028
3029                 if (!rc)
3030                         rc = err;
3031         }
3032
3033         lov_putref(obddev);
3034         if (no_set) {
3035                 err = ptlrpc_set_wait(set);
3036                 if (!rc)
3037                         rc = err;
3038                 ptlrpc_set_destroy(set);
3039         }
3040         RETURN(rc);
3041 }
3042
3043 static int lov_checkmd(struct obd_export *exp, struct obd_export *md_exp,
3044                        struct lov_stripe_md *lsm)
3045 {
3046         int rc;
3047         ENTRY;
3048
3049         if (!lsm)
3050                 RETURN(0);
3051         LASSERT(md_exp);
3052         LASSERT(lsm_op_find(lsm->lsm_magic) != NULL);
3053         rc = lsm_op_find(lsm->lsm_magic)->lsm_revalidate(lsm, md_exp->exp_obd);
3054
3055         RETURN(rc);
3056 }
3057
3058 int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm)
3059 {
3060         int i, rc = 0;
3061         ENTRY;
3062
3063         for (i = 0; i < lsm->lsm_stripe_count; i++) {
3064                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
3065                 if (loi->loi_ar.ar_rc && !rc)
3066                         rc = loi->loi_ar.ar_rc;
3067                 loi->loi_ar.ar_rc = 0;
3068         }
3069         RETURN(rc);
3070 }
3071 EXPORT_SYMBOL(lov_test_and_clear_async_rc);
3072
3073
3074 static int lov_extent_calc(struct obd_export *exp, struct lov_stripe_md *lsm,
3075                            int cmd, __u64 *offset)
3076 {
3077         __u32 ssize = lsm->lsm_stripe_size;
3078         __u64 start;
3079
3080         start = *offset;
3081         do_div(start, ssize);
3082         start = start * ssize;
3083
3084         CDEBUG(D_DLMTRACE, "offset "LPU64", stripe %u, start "LPU64
3085                            ", end "LPU64"\n", *offset, ssize, start,
3086                            start + ssize - 1);
3087         if (cmd == OBD_CALC_STRIPE_END) {
3088                 *offset = start + ssize - 1;
3089         } else if (cmd == OBD_CALC_STRIPE_START) {
3090                 *offset = start;
3091         } else {
3092                 LBUG();
3093         }
3094
3095         RETURN(0);
3096 }
3097
3098
3099 #if 0
3100 struct lov_multi_wait {
3101         struct ldlm_lock *lock;
3102         wait_queue_t      wait;
3103         int               completed;
3104         int               generation;
3105 };
3106
3107 int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm,
3108                       struct lustre_handle *lockh)
3109 {
3110         struct lov_lock_handles *lov_lockh = NULL;
3111         struct lustre_handle *lov_lockhp;
3112         struct lov_obd *lov;
3113         struct lov_oinfo *loi;
3114         struct lov_multi_wait *queues;
3115         int rc = 0, i;
3116         ENTRY;
3117
3118         ASSERT_LSM_MAGIC(lsm);
3119
3120         if (!exp || !exp->exp_obd)
3121                 RETURN(-ENODEV);
3122
3123         LASSERT(lockh != NULL);
3124         if (lsm->lsm_stripe_count > 1) {
3125                 lov_lockh = lov_handle2llh(lockh);
3126                 if (lov_lockh == NULL) {
3127                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
3128                         RETURN(-EINVAL);
3129                 }
3130
3131                 lov_lockhp = lov_lockh->llh_handles;
3132         } else {
3133                 lov_lockhp = lockh;
3134         }
3135
3136         OBD_ALLOC(queues, lsm->lsm_stripe_count * sizeof(*queues));
3137         if (queues == NULL)
3138                 GOTO(out, rc = -ENOMEM);
3139
3140         lov = &exp->exp_obd->u.lov;
3141         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
3142              i++, loi++, lov_lockhp++) {
3143                 struct ldlm_lock *lock;
3144                 struct obd_device *obd;
3145
3146                 lock = ldlm_handle2lock(lov_lockhp);
3147                 if (lock == NULL) {
3148                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
3149                                loi->loi_ost_idx, loi->loi_id);
3150                         queues[i].completed = 1;
3151                         continue;
3152                 }
3153
3154                 queues[i].lock = lock;
3155                 init_waitqueue_entry(&(queues[i].wait), current);
3156                 add_wait_queue(lock->l_waitq, &(queues[i].wait));
3157
3158                 obd = class_exp2obd(lock->l_conn_export);
3159                 if (obd != NULL)
3160                         imp = obd->u.cli.cl_import;
3161                 if (imp != NULL) {
3162                         spin_lock(&imp->imp_lock);
3163                         queues[i].generation = imp->imp_generation;
3164                         spin_unlock(&imp->imp_lock);
3165                 }
3166         }
3167
3168         lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
3169                                interrupted_completion_wait, &lwd);
3170         rc = l_wait_event_added(check_multi_complete(queues, lsm), &lwi);
3171
3172         for (i = 0; i < lsm->lsm_stripe_count; i++)
3173                 remove_wait_queue(lock->l_waitq, &(queues[i].wait));
3174
3175         if (rc == -EINTR || rc == -ETIMEDOUT) {
3176
3177
3178         }
3179
3180  out:
3181         if (lov_lockh != NULL)
3182                 lov_llh_put(lov_lockh);
3183         RETURN(rc);
3184 }
3185 #endif
3186
3187 void lov_stripe_lock(struct lov_stripe_md *md)
3188 {
3189         LASSERT(md->lsm_lock_owner != cfs_curproc_pid());
3190         spin_lock(&md->lsm_lock);
3191         LASSERT(md->lsm_lock_owner == 0);
3192         md->lsm_lock_owner = cfs_curproc_pid();
3193 }
3194 EXPORT_SYMBOL(lov_stripe_lock);
3195
3196 void lov_stripe_unlock(struct lov_stripe_md *md)
3197 {
3198         LASSERT(md->lsm_lock_owner == cfs_curproc_pid());
3199         md->lsm_lock_owner = 0;
3200         spin_unlock(&md->lsm_lock);
3201 }
3202 EXPORT_SYMBOL(lov_stripe_unlock);
3203
3204 /**
3205  * Checks if requested extent lock is compatible with a lock under the page.
3206  *
3207  * Checks if the lock under \a page is compatible with a read or write lock
3208  * (specified by \a rw) for an extent [\a start , \a end].
3209  *
3210  * \param exp lov export
3211  * \param lsm striping information for the file
3212  * \param res lov_async_page placeholder
3213  * \param rw OBD_BRW_READ if requested for reading,
3214  *           OBD_BRW_WRITE if requested for writing
3215  * \param start start of the requested extent
3216  * \param end end of the requested extent
3217  * \param cookie transparent parameter for passing locking context
3218  *
3219  * \post result == 1, *cookie == context, appropriate lock is referenced or
3220  * \post result == 0
3221  *
3222  * \retval 1 owned lock is reused for the request
3223  * \retval 0 no lock reused for the request
3224  *
3225  * \see lov_release_short_lock
3226  */
3227 static int lov_reget_short_lock(struct obd_export *exp,
3228                                 struct lov_stripe_md *lsm,
3229                                 void **res, int rw,
3230                                 obd_off start, obd_off end,
3231                                 void **cookie)
3232 {
3233         struct lov_async_page *l = *res;
3234         obd_off stripe_start, stripe_end = start;
3235
3236         ENTRY;
3237
3238         /* ensure we don't cross stripe boundaries */
3239         lov_extent_calc(exp, lsm, OBD_CALC_STRIPE_END, &stripe_end);
3240         if (stripe_end <= end)
3241                 RETURN(0);
3242
3243         /* map the region limits to the object limits */
3244         lov_stripe_offset(lsm, start, l->lap_stripe, &stripe_start);
3245         lov_stripe_offset(lsm, end, l->lap_stripe, &stripe_end);
3246
3247         RETURN(obd_reget_short_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
3248                                     lsm_oinfo[l->lap_stripe]->loi_ost_idx]->
3249                                     ltd_exp, NULL, &l->lap_sub_cookie,
3250                                     rw, stripe_start, stripe_end, cookie));
3251 }
3252
3253 /**
3254  * Releases a reference to a lock taken in a "fast" way.
3255  *
3256  * Releases a read or a write (specified by \a rw) lock
3257  * referenced by \a cookie.
3258  *
3259  * \param exp lov export
3260  * \param lsm striping information for the file
3261  * \param end end of the locked extent
3262  * \param rw OBD_BRW_READ if requested for reading,
3263  *           OBD_BRW_WRITE if requested for writing
3264  * \param cookie transparent parameter for passing locking context
3265  *
3266  * \post appropriate lock is dereferenced
3267  *
3268  * \see lov_reget_short_lock
3269  */
3270 static int lov_release_short_lock(struct obd_export *exp,
3271                                   struct lov_stripe_md *lsm, obd_off end,
3272                                   void *cookie, int rw)
3273 {
3274         int stripe;
3275
3276         ENTRY;
3277
3278         stripe = lov_stripe_number(lsm, end);
3279
3280         RETURN(obd_release_short_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
3281                                       lsm_oinfo[stripe]->loi_ost_idx]->
3282                                       ltd_exp, NULL, end, cookie, rw));
3283 }
3284
3285 struct obd_ops lov_obd_ops = {
3286         .o_owner               = THIS_MODULE,
3287         .o_setup               = lov_setup,
3288         .o_precleanup          = lov_precleanup,
3289         .o_cleanup             = lov_cleanup,
3290         .o_process_config      = lov_process_config,
3291         .o_connect             = lov_connect,
3292         .o_disconnect          = lov_disconnect,
3293         .o_statfs              = lov_statfs,
3294         .o_statfs_async        = lov_statfs_async,
3295         .o_packmd              = lov_packmd,
3296         .o_unpackmd            = lov_unpackmd,
3297         .o_checkmd             = lov_checkmd,
3298         .o_create              = lov_create,
3299         .o_destroy             = lov_destroy,
3300         .o_getattr             = lov_getattr,
3301         .o_getattr_async       = lov_getattr_async,
3302         .o_setattr             = lov_setattr,
3303         .o_setattr_async       = lov_setattr_async,
3304         .o_brw                 = lov_brw,
3305         .o_brw_async           = lov_brw_async,
3306         .o_prep_async_page     = lov_prep_async_page,
3307         .o_reget_short_lock    = lov_reget_short_lock,
3308         .o_release_short_lock  = lov_release_short_lock,
3309         .o_queue_async_io      = lov_queue_async_io,
3310         .o_set_async_flags     = lov_set_async_flags,
3311         .o_queue_group_io      = lov_queue_group_io,
3312         .o_trigger_group_io    = lov_trigger_group_io,
3313         .o_teardown_async_page = lov_teardown_async_page,
3314         .o_merge_lvb           = lov_merge_lvb,
3315         .o_adjust_kms          = lov_adjust_kms,
3316         .o_punch               = lov_punch,
3317         .o_sync                = lov_sync,
3318         .o_enqueue             = lov_enqueue,
3319         .o_match               = lov_match,
3320         .o_change_cbdata       = lov_change_cbdata,
3321         .o_cancel              = lov_cancel,
3322         .o_cancel_unused       = lov_cancel_unused,
3323         .o_join_lru            = lov_join_lru,
3324         .o_iocontrol           = lov_iocontrol,
3325         .o_get_info            = lov_get_info,
3326         .o_set_info_async      = lov_set_info_async,
3327         .o_extent_calc         = lov_extent_calc,
3328         .o_llog_init           = lov_llog_init,
3329         .o_llog_finish         = lov_llog_finish,
3330         .o_notify              = lov_notify,
3331         .o_register_page_removal_cb = lov_register_page_removal_cb,
3332         .o_unregister_page_removal_cb = lov_unregister_page_removal_cb,
3333         .o_register_lock_cancel_cb = lov_register_lock_cancel_cb,
3334         .o_unregister_lock_cancel_cb = lov_unregister_lock_cancel_cb,
3335 };
3336
3337 static quota_interface_t *quota_interface;
3338 extern quota_interface_t lov_quota_interface;
3339
3340 cfs_mem_cache_t *lov_oinfo_slab;
3341
3342 int __init lov_init(void)
3343 {
3344         struct lprocfs_static_vars lvars = { 0 };
3345         int rc, rc2;
3346         ENTRY;
3347
3348         lov_oinfo_slab = cfs_mem_cache_create("lov_oinfo",
3349                                               sizeof(struct lov_oinfo), 
3350                                               0, SLAB_HWCACHE_ALIGN);
3351         if (lov_oinfo_slab == NULL)
3352                 return -ENOMEM;
3353         lprocfs_lov_init_vars(&lvars);
3354
3355         request_module("lquota");
3356         quota_interface = PORTAL_SYMBOL_GET(lov_quota_interface);
3357         init_obd_quota_ops(quota_interface, &lov_obd_ops);
3358
3359         rc = class_register_type(&lov_obd_ops, NULL, lvars.module_vars,
3360                                  LUSTRE_LOV_NAME, NULL);
3361         if (rc) {
3362                 if (quota_interface)
3363                         PORTAL_SYMBOL_PUT(lov_quota_interface);
3364                 rc2 = cfs_mem_cache_destroy(lov_oinfo_slab);
3365                 LASSERT(rc2 == 0);
3366         }
3367
3368         RETURN(rc);
3369 }
3370
3371 #ifdef __KERNEL__
3372 static void /*__exit*/ lov_exit(void)
3373 {
3374         int rc;
3375         
3376         if (quota_interface)
3377                 PORTAL_SYMBOL_PUT(lov_quota_interface);
3378
3379         class_unregister_type(LUSTRE_LOV_NAME);
3380         rc = cfs_mem_cache_destroy(lov_oinfo_slab);
3381         LASSERT(rc == 0);
3382 }
3383
3384 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3385 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
3386 MODULE_LICENSE("GPL");
3387
3388 cfs_module(lov, LUSTRE_VERSION_STRING, lov_init, lov_exit);
3389 #endif