Whamcloud - gitweb
8b3c13ac814919aaa202e6fb0d836469198df902
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  * Author: Phil Schwan <phil@clusterfs.com>
6  *         Peter Braam <braam@clusterfs.com>
7  *         Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29 #ifdef __KERNEL__
30 #include <linux/slab.h>
31 #include <linux/module.h>
32 #include <linux/init.h>
33 #include <linux/slab.h>
34 #include <linux/pagemap.h>
35 #include <linux/seq_file.h>
36 #include <asm/div64.h>
37 #else
38 #include <liblustre.h>
39 #endif
40
41 #include <linux/obd_support.h>
42 #include <linux/lustre_lib.h>
43 #include <linux/lustre_net.h>
44 #include <linux/lustre_idl.h>
45 #include <linux/lustre_dlm.h>
46 #include <linux/lustre_mds.h>
47 #include <linux/obd_class.h>
48 #include <linux/obd_lov.h>
49 #include <linux/obd_ost.h>
50 #include <linux/lprocfs_status.h>
51
52 #include "lov_internal.h"
53
54 /* obd methods */
55 #define MAX_STRING_SIZE 128
56 static int lov_connect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt,
57                            int activate, struct obd_connect_data *conn_data,
58                            unsigned long connect_flags)
59 {
60         struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
61         struct obd_uuid *tgt_uuid = &tgt->uuid;
62
63 #ifdef __KERNEL__
64         struct proc_dir_entry *lov_proc_dir;
65 #endif
66         struct lov_obd *lov = &obd->u.lov;
67         struct lustre_handle conn = {0, };
68         struct obd_device *tgt_obd;
69         int rc;
70         ENTRY;
71
72         tgt_obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
73                                         &obd->obd_uuid);
74
75         if (!tgt_obd) {
76                 CERROR("Target %s not attached\n", tgt_uuid->uuid);
77                 RETURN(-EINVAL);
78         }
79
80         if (!tgt_obd->obd_set_up) {
81                 CERROR("Target %s not set up\n", tgt_uuid->uuid);
82                 RETURN(-EINVAL);
83         }
84
85         if (activate) {
86                 tgt_obd->obd_no_recov = 0;
87                 ptlrpc_activate_import(tgt_obd->u.cli.cl_import);
88         }
89
90         if (tgt_obd->u.cli.cl_import->imp_invalid) {
91                 CERROR("not connecting OSC %s; administratively "
92                        "disabled\n", tgt_uuid->uuid);
93                 rc = obd_register_observer(tgt_obd, obd);
94                 if (rc) {
95                         CERROR("Target %s register_observer error %d; "
96                                "will not be able to reactivate\n",
97                                tgt_uuid->uuid, rc);
98                 }
99                 RETURN(0);
100         }
101
102         rc = obd_connect(&conn, tgt_obd, &lov_osc_uuid, conn_data,
103                          connect_flags);
104         if (rc) {
105                 CERROR("Target %s connect error %d\n", tgt_uuid->uuid, rc);
106                 RETURN(rc);
107         }
108         tgt->ltd_exp = class_conn2export(&conn);
109
110         rc = obd_register_observer(tgt_obd, obd);
111         if (rc) {
112                 CERROR("Target %s register_observer error %d\n",
113                        tgt_uuid->uuid, rc);
114                 obd_disconnect(tgt->ltd_exp, 0);
115                 tgt->ltd_exp = NULL;
116                 RETURN(rc);
117         }
118
119         tgt->active = 1;
120         lov->desc.ld_active_tgt_count++;
121
122 #ifdef __KERNEL__
123         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
124         if (lov_proc_dir) {
125                 struct obd_device *osc_obd = class_conn2obd(&conn);
126                 struct proc_dir_entry *osc_symlink;
127                 char name[MAX_STRING_SIZE + 1];
128
129                 LASSERT(osc_obd != NULL);
130                 LASSERT(osc_obd->obd_type != NULL);
131                 LASSERT(osc_obd->obd_type->typ_name != NULL);
132                 name[MAX_STRING_SIZE] = '\0';
133                 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
134                          osc_obd->obd_type->typ_name,
135                          osc_obd->obd_name);
136                 osc_symlink = proc_symlink(osc_obd->obd_name, lov_proc_dir,
137                                            name);
138                 if (osc_symlink == NULL) {
139                         CERROR("could not register LOV target "
140                                "/proc/fs/lustre/%s/%s/target_obds/%s\n",
141                                obd->obd_type->typ_name, obd->obd_name,
142                                osc_obd->obd_name);
143                         lprocfs_remove(lov_proc_dir);
144                         lov_proc_dir = NULL;
145                 }
146         }
147 #endif
148
149         RETURN(0);
150 }
151
152 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
153                        struct obd_uuid *cluuid, struct obd_connect_data *data,
154                        unsigned long flags)
155 {
156 #ifdef __KERNEL__
157         struct proc_dir_entry *lov_proc_dir;
158 #endif
159         struct lov_obd *lov = &obd->u.lov;
160         struct lov_tgt_desc *tgt;
161         struct obd_export *exp;
162         int rc, rc2, i;
163         ENTRY;
164
165         rc = class_connect(conn, obd, cluuid);
166         if (rc)
167                 RETURN(rc);
168
169         exp = class_conn2export(conn);
170
171         /* We don't want to actually do the underlying connections more than
172          * once, so keep track. */
173         lov->refcount++;
174         if (lov->refcount > 1) {
175                 class_export_put(exp);
176                 RETURN(0);
177         }
178
179 #ifdef __KERNEL__
180         lov_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
181                                         NULL, NULL);
182         if (IS_ERR(lov_proc_dir)) {
183                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
184                        obd->obd_type->typ_name, obd->obd_name);
185                 lov_proc_dir = NULL;
186         }
187 #endif
188
189         /* connect_flags is the MDS number, save for use in lov_add_obd */
190         lov->lov_connect_flags = flags;
191         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
192                 if (obd_uuid_empty(&tgt->uuid))
193                         continue;
194                 rc = lov_connect_obd(obd, tgt, 0, data, flags);
195                 if (rc)
196                         GOTO(out_disc, rc);
197         }
198
199         class_export_put(exp);
200         RETURN (0);
201
202  out_disc:
203 #ifdef __KERNEL__
204         if (lov_proc_dir)
205                 lprocfs_remove(lov_proc_dir);
206 #endif
207
208         while (i-- > 0) {
209                 struct obd_uuid uuid;
210                 --tgt;
211                 --lov->desc.ld_active_tgt_count;
212                 tgt->active = 0;
213                 /* save for CERROR below; (we know it's terminated) */
214                 uuid = tgt->uuid;
215                 rc2 = obd_disconnect(tgt->ltd_exp, 0);
216                 if (rc2)
217                         CERROR("error: LOV target %s disconnect on OST idx %d: "
218                                "rc = %d\n", uuid.uuid, i, rc2);
219         }
220         class_disconnect(exp, 0);
221         RETURN (rc);
222 }
223
224 static int lov_disconnect_obd(struct obd_device *obd, 
225                               struct lov_tgt_desc *tgt,
226                               unsigned long flags)
227 {
228 #ifdef __KERNEL__
229         struct proc_dir_entry *lov_proc_dir;
230 #endif
231         struct obd_device *osc_obd = class_exp2obd(tgt->ltd_exp);
232         struct lov_obd *lov = &obd->u.lov;
233         int rc;
234         ENTRY;
235
236 #ifdef __KERNEL__
237         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
238         if (lov_proc_dir) {
239                 struct proc_dir_entry *osc_symlink;
240
241                 osc_symlink = lprocfs_srch(lov_proc_dir, osc_obd->obd_name);
242                 if (osc_symlink) {
243                         lprocfs_remove(osc_symlink);
244                 } else {
245                         CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
246                                obd->obd_type->typ_name, obd->obd_name,
247                                osc_obd->obd_name);
248                 }
249         }
250 #endif
251         if (obd->obd_no_recov) {
252                 /* Pass it on to our clients.
253                  * XXX This should be an argument to disconnect,
254                  * XXX not a back-door flag on the OBD.  Ah well.
255                  */
256                 if (osc_obd)
257                         osc_obd->obd_no_recov = 1;
258         }
259
260         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
261         rc = obd_disconnect(tgt->ltd_exp, flags);
262         if (rc) {
263                 if (tgt->active) {
264                         CERROR("Target %s disconnect error %d\n",
265                                tgt->uuid.uuid, rc);
266                 }
267                 rc = 0;
268         }
269
270         if (tgt->active) {
271                 tgt->active = 0;
272                 lov->desc.ld_active_tgt_count--;
273         }
274         tgt->ltd_exp = NULL;
275         RETURN(0);
276 }
277
278 static int lov_disconnect(struct obd_export *exp, unsigned long flags)
279 {
280         struct obd_device *obd = class_exp2obd(exp);
281 #ifdef __KERNEL__
282         struct proc_dir_entry *lov_proc_dir;
283 #endif
284         struct lov_obd *lov = &obd->u.lov;
285         struct lov_tgt_desc *tgt;
286         int rc, i;
287         ENTRY;
288
289         if (!lov->tgts)
290                 goto out_local;
291
292         /* Only disconnect the underlying layers on the final disconnect. */
293         lov->refcount--;
294         if (lov->refcount != 0)
295                 goto out_local;
296
297         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
298                 if (tgt->ltd_exp)
299                         lov_disconnect_obd(obd, tgt, flags);
300         }
301
302 #ifdef __KERNEL__
303         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
304         if (lov_proc_dir) {
305                 lprocfs_remove(lov_proc_dir);
306         } else {
307                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing.",
308                        obd->obd_type->typ_name, obd->obd_name);
309         }
310 #endif
311         
312  out_local:
313         rc = class_disconnect(exp, 0);
314         RETURN(rc);
315 }
316
317 /* Error codes:
318  *
319  *  -EINVAL  : UUID can't be found in the LOV's target list
320  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
321  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
322  */
323 static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
324                               int activate)
325 {
326         struct lov_tgt_desc *tgt;
327         int i, rc = 0;
328         ENTRY;
329
330         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
331                lov, uuid->uuid, activate);
332
333         spin_lock(&lov->lov_lock);
334         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
335                 if (tgt->ltd_exp == NULL)
336                         continue;
337
338                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
339                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
340                 
341                 if (obd_uuid_equals(uuid, &tgt->uuid))
342                         break;
343         }
344
345         if (i == lov->desc.ld_tgt_count)
346                 GOTO(out, rc = -EINVAL);
347
348
349         if (tgt->active == activate) {
350                 CDEBUG(D_INFO, "OSC %s already %sactive!\n", uuid->uuid,                       
351                         activate ? "" : "in");
352                 GOTO(out, rc);
353         }
354
355         CDEBUG(D_INFO, "Marking OSC %s %sactive\n", uuid->uuid,
356                activate ? "" : "in");
357
358         tgt->active = activate;
359         if (activate)
360                 lov->desc.ld_active_tgt_count++;
361         else
362                 lov->desc.ld_active_tgt_count--;
363
364         EXIT;
365  out:
366         spin_unlock(&lov->lov_lock);
367         return rc;
368 }
369
370 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
371                       int active, void *data)
372 {
373         struct obd_uuid *uuid;
374         int rc;
375         ENTRY;
376
377         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME)) {
378                 CERROR("unexpected notification of %s %s!\n",
379                        watched->obd_type->typ_name,
380                        watched->obd_name);
381                 return -EINVAL;
382         }
383         uuid = &watched->u.cli.cl_import->imp_target_uuid;
384
385         /* Set OSC as active before notifying the observer, so the
386          * observer can use the OSC normally.  
387          */
388         rc = lov_set_osc_active(&obd->u.lov, uuid, active);
389         if (rc) {
390                 CERROR("%sactivation of %s failed: %d\n",
391                        active ? "" : "de", uuid->uuid, rc);
392                 RETURN(rc);
393         }
394
395         if (obd->obd_observer)
396                 /* Pass the notification up the chain. */
397                 rc = obd_notify(obd->obd_observer, watched, active, data);
398
399         RETURN(rc);
400 }
401
402 int lov_attach(struct obd_device *dev, obd_count len, void *data)
403 {
404         struct lprocfs_static_vars lvars;
405         int rc;
406
407         lprocfs_init_vars(lov, &lvars);
408         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
409         if (rc == 0) {
410 #ifdef __KERNEL__
411                 struct proc_dir_entry *entry;
412
413                 entry = create_proc_entry("target_obd_status", 0444, 
414                                           dev->obd_proc_entry);
415                 if (entry == NULL) {
416                         rc = -ENOMEM;
417                 } else {
418                         entry->proc_fops = &lov_proc_target_fops;
419                         entry->data = dev;
420                 }
421 #endif
422         }
423         return rc;
424 }
425
426 int lov_detach(struct obd_device *dev)
427 {
428         return lprocfs_obd_detach(dev);
429 }
430
431 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
432 {
433         struct lov_obd *lov = &obd->u.lov;
434         struct lustre_cfg *lcfg = buf;
435         struct lov_desc *desc;
436         int count;
437         ENTRY;
438
439         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
440                 CERROR("LOV setup requires a descriptor\n");
441                 RETURN(-EINVAL);
442         }
443
444         desc = (struct lov_desc *)lustre_cfg_string(lcfg, 1);
445         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
446                 CERROR("descriptor size wrong: %d > %d\n",
447                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
448                 RETURN(-EINVAL);
449         }
450  
451         /* Because of 64-bit divide/mod operations only work with a 32-bit
452          * divisor in a 32-bit kernel, we cannot support a stripe width
453          * of 4GB or larger on 32-bit CPUs.
454          */
455        
456         count = desc->ld_default_stripe_count;
457         if (count && (count * desc->ld_default_stripe_size) > ~0UL) {
458                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
459                        desc->ld_default_stripe_size, count, ~0UL);
460                 RETURN(-EINVAL);
461         }
462         if (desc->ld_tgt_count > 0) {
463                 lov->bufsize= sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
464         } else {
465                 lov->bufsize = sizeof(struct lov_tgt_desc) * LOV_MAX_TGT_COUNT;  
466         }
467         OBD_ALLOC(lov->tgts, lov->bufsize);
468         if (lov->tgts == NULL) {
469                 lov->bufsize = 0;
470                 CERROR("couldn't allocate %d bytes for target table.\n",
471                        lov->bufsize);
472                 RETURN(-EINVAL);
473         }
474
475         desc->ld_tgt_count = 0;
476         desc->ld_active_tgt_count = 0;
477         lov->desc = *desc;
478         spin_lock_init(&lov->lov_lock);
479         sema_init(&lov->lov_llog_sem, 1);
480
481         RETURN(0);
482 }
483
484 static int lov_cleanup(struct obd_device *obd, int flags)
485 {
486         struct lov_obd *lov = &obd->u.lov;
487
488         OBD_FREE(lov->tgts, lov->bufsize);
489         RETURN(0);
490 }
491
492 static int
493 lov_add_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
494 {
495         struct lov_obd *lov = &obd->u.lov;
496         struct lov_tgt_desc *tgt;
497         int rc;
498         ENTRY;
499
500         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
501                uuidp->uuid, index, gen);
502
503         if ((index < 0) || (index >= LOV_MAX_TGT_COUNT)) {
504                 CERROR("request to add OBD %s at invalid index: %d\n",
505                        uuidp->uuid, index);
506                 RETURN(-EINVAL);
507         }
508
509         if (gen <= 0) {
510                 CERROR("request to add OBD %s with invalid generation: %d\n",
511                        uuidp->uuid, gen);
512                 RETURN(-EINVAL);
513         }
514
515         tgt = lov->tgts + index;
516         if (!obd_uuid_empty(&tgt->uuid)) {
517                 CERROR("OBD already assigned at LOV target index %d\n",
518                        index);
519                 RETURN(-EEXIST);
520         }
521
522         tgt->uuid = *uuidp;
523         /* XXX - add a sanity check on the generation number. */
524         tgt->ltd_gen = gen;
525
526         if (index >= lov->desc.ld_tgt_count)
527                 lov->desc.ld_tgt_count = index + 1;
528
529         CDEBUG(D_CONFIG, "idx: %d ltd_gen: %d ld_tgt_count: %d\n",
530                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
531
532         if (lov->refcount == 0)
533                 RETURN(0);
534
535         if (tgt->ltd_exp) {
536                 struct obd_device *osc_obd;
537
538                 osc_obd = class_exp2obd(tgt->ltd_exp);
539                 if (osc_obd)
540                         osc_obd->obd_no_recov = 0;
541         }
542
543         rc = lov_connect_obd(obd, tgt, 1, NULL, lov->lov_connect_flags);
544         if (rc)
545                 GOTO(out, rc);
546
547         if (obd->obd_observer) {
548                 /* tell the mds_lov about the new target */
549                 rc = obd_notify(obd->obd_observer, tgt->ltd_exp->exp_obd, 1,
550                                 (void *)index);
551         }
552
553         GOTO(out, rc);
554  out:
555         if (rc && tgt->ltd_exp != NULL)
556                 lov_disconnect_obd(obd, tgt, 0);
557         return rc;
558 }
559
560 static int
561 lov_del_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
562 {
563         struct lov_obd *lov = &obd->u.lov;
564         struct lov_tgt_desc *tgt;
565         int count = lov->desc.ld_tgt_count;
566         int rc = 0;
567         ENTRY;
568
569         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d\n",
570                uuidp->uuid, index, gen);
571
572         if (index >= count) {
573                 CERROR("LOV target index %d >= number of LOV OBDs %d.\n",
574                        index, count);
575                 RETURN(-EINVAL);
576         }
577
578         tgt = lov->tgts + index;
579
580         if (obd_uuid_empty(&tgt->uuid)) {
581                 CERROR("LOV target at index %d is not setup.\n", index);
582                 RETURN(-EINVAL);
583         }
584
585         if (!obd_uuid_equals(uuidp, &tgt->uuid)) {
586                 CERROR("LOV target UUID %s at index %d doesn't match %s.\n",
587                        tgt->uuid.uuid, index, uuidp->uuid);
588                 RETURN(-EINVAL);
589         }
590
591         if (tgt->ltd_exp) {
592                 struct obd_device *osc_obd;
593
594                 osc_obd = class_exp2obd(tgt->ltd_exp);
595                 if (osc_obd) {
596                         osc_obd->obd_no_recov = 1;
597                         rc = obd_llog_finish(osc_obd, &osc_obd->obd_llogs, 1);
598                         if (rc)
599                                 CERROR("osc_llog_finish error: %d\n", rc);
600                 }
601                 lov_disconnect_obd(obd, tgt, 0);
602         }
603
604         /* XXX - right now there is a dependency on ld_tgt_count being the
605          * maximum tgt index for computing the mds_max_easize. So we can't
606          * shrink it. */
607
608         /* lt_gen = 0 will mean it will not match the gen of any valid loi */
609         memset(tgt, 0, sizeof(*tgt));
610
611         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
612                tgt->uuid.uuid, index, tgt->ltd_gen, tgt->ltd_exp, tgt->active);
613
614         RETURN(rc);
615 }
616
617 static int lov_process_config(struct obd_device *obd, obd_count len, void *buf)
618 {
619         struct lustre_cfg *lcfg = buf;
620         struct obd_uuid obd_uuid;
621         int cmd;
622         int index;
623         int gen;
624         int rc = 0;
625         ENTRY;
626
627         switch(cmd = lcfg->lcfg_command) {
628         case LCFG_LOV_ADD_OBD:
629         case LCFG_LOV_DEL_OBD: {
630                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
631                         GOTO(out, rc = -EINVAL);
632
633                 obd_str2uuid(&obd_uuid, lustre_cfg_string(lcfg, 1));
634
635                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
636                         GOTO(out, rc = -EINVAL);
637                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
638                         GOTO(out, rc = -EINVAL);
639                 if (cmd == LCFG_LOV_ADD_OBD)
640                         rc = lov_add_obd(obd, &obd_uuid, index, gen);
641                 else
642                         rc = lov_del_obd(obd, &obd_uuid, index, gen);
643                 GOTO(out, rc);
644         }
645         default: {
646                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
647                 GOTO(out, rc = -EINVAL);
648
649         }
650         }
651 out:
652         RETURN(rc);
653 }
654
655 #ifndef log2
656 #define log2(n) ffz(~(n))
657 #endif
658
659 static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
660                              struct lov_stripe_md **ea,
661                              struct obd_trans_info *oti)
662 {
663         struct lov_obd *lov;
664         struct obdo *tmp_oa;
665         struct obd_uuid *ost_uuid = NULL;
666         int rc = 0, i;
667         ENTRY;
668
669         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
670                 src_oa->o_flags == OBD_FL_DELORPHAN);
671
672         lov = &export->exp_obd->u.lov;
673
674         tmp_oa = obdo_alloc();
675         if (tmp_oa == NULL)
676                 RETURN(-ENOMEM);
677
678         if (src_oa->o_valid & OBD_MD_FLINLINE) {
679                 ost_uuid = (struct obd_uuid *)src_oa->o_inline;
680                 CDEBUG(D_HA, "clearing orphans only for %s\n",
681                        ost_uuid->uuid);
682         }
683
684         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
685                 struct lov_stripe_md obj_md;
686                 struct lov_stripe_md *obj_mdp = &obj_md;
687                 int err;
688
689                 /* if called for a specific target, we don't
690                    care if it is not active. */
691                 if (lov->tgts[i].active == 0 && ost_uuid == NULL) {
692                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
693                         continue;
694                 }
695
696                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &lov->tgts[i].uuid))
697                         continue;
698
699                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
700
701                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
702                 err = obd_create(lov->tgts[i].ltd_exp, tmp_oa, NULL, 0,
703                                  &obj_mdp, oti);
704                 if (err)
705                         /* This export will be disabled until it is recovered,
706                            and then orphan recovery will be completed. */
707                         CERROR("error in orphan recovery on OST idx %d/%d: "
708                                "rc = %d\n", i, lov->desc.ld_tgt_count, err);
709
710                 if (ost_uuid)
711                         break;
712         }
713         obdo_free(tmp_oa);
714         RETURN(rc);
715 }
716
717 static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
718                         void *acl, int acl_size,
719                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
720 {
721         struct lov_stripe_md *obj_mdp, *lsm;
722         struct lov_obd *lov = &exp->exp_obd->u.lov;
723         unsigned ost_idx;
724         int rc, i;
725         ENTRY;
726
727         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
728                 src_oa->o_flags & OBD_FL_RECREATE_OBJS);
729
730         OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
731         if (obj_mdp == NULL)
732                 RETURN(-ENOMEM);
733
734         ost_idx = src_oa->o_nlink;
735         lsm = *ea;
736         if (lsm == NULL)
737                 GOTO(out, rc = -EINVAL);
738         if (ost_idx >= lov->desc.ld_tgt_count)
739                 GOTO(out, rc = -EINVAL);
740
741         for (i = 0; i < lsm->lsm_stripe_count; i++) {
742                 if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) {
743                         if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id)
744                                 GOTO(out, rc = -EINVAL);
745                         break;
746                 }
747         }
748         if (i == lsm->lsm_stripe_count)
749                 GOTO(out, rc = -EINVAL);
750
751         rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, acl, acl_size,
752                         &obj_mdp, oti);
753 out:
754         OBD_FREE(obj_mdp, sizeof(*obj_mdp));
755         RETURN(rc);
756 }
757
758 /* the LOV expects oa->o_id to be set to the LOV object id */
759 static int lov_create(struct obd_export *exp, struct obdo *src_oa,
760                       void *acl, int acl_size,
761                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
762 {
763         struct lov_request_set *set = NULL;
764         struct list_head *pos;
765         struct lov_obd *lov;
766         int rc = 0;
767         ENTRY;
768
769         LASSERT(ea != NULL);
770         if (exp == NULL)
771                 RETURN(-EINVAL);
772
773         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
774             src_oa->o_flags == OBD_FL_DELORPHAN) {
775                 rc = lov_clear_orphans(exp, src_oa, ea, oti);
776                 RETURN(rc);
777         }
778
779         lov = &exp->exp_obd->u.lov;
780         if (!lov->desc.ld_active_tgt_count)
781                 RETURN(-EIO);
782
783         /* Recreate a specific object id at the given OST index */
784         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
785             (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
786                  rc = lov_recreate(exp, src_oa, acl, acl_size, ea, oti);
787                  RETURN(rc);
788         }
789
790         rc = lov_prep_create_set(exp, ea, src_oa, oti, &set);
791         if (rc)
792                 RETURN(rc);
793
794         list_for_each (pos, &set->set_list) {
795                 struct lov_request *req = 
796                         list_entry(pos, struct lov_request, rq_link);
797
798                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
799                 rc = obd_create(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
800                                 acl, acl_size, &req->rq_md, oti);
801                 lov_update_create_set(set, req, rc);
802         }
803         rc = lov_fini_create_set(set, ea);
804         RETURN(rc);
805 }
806
807 #define lsm_bad_magic(LSMP)                                     \
808 ({                                                              \
809         struct lov_stripe_md *_lsm__ = (LSMP);                  \
810         int _ret__ = 0;                                         \
811         if (!_lsm__) {                                          \
812                 CERROR("LOV requires striping ea\n");           \
813                 _ret__ = 1;                                     \
814         } else if (_lsm__->lsm_magic != LOV_MAGIC) {            \
815                 CERROR("LOV striping magic bad %#x != %#x\n",   \
816                        _lsm__->lsm_magic, LOV_MAGIC);           \
817                 _ret__ = 1;                                     \
818         }                                                       \
819         _ret__;                                                 \
820 })
821
822 static int lov_destroy(struct obd_export *exp, struct obdo *oa,
823                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
824 {
825         struct lov_request_set *set;
826         struct lov_request *req;
827         struct list_head *pos;
828         struct lov_obd *lov;
829         int rc = 0;
830         ENTRY;
831
832         if (lsm_bad_magic(lsm))
833                 RETURN(-EINVAL);
834
835         if (!exp || !exp->exp_obd)
836                 RETURN(-ENODEV);
837
838         lov = &exp->exp_obd->u.lov;
839         rc = lov_prep_destroy_set(exp, oa, lsm, oti, &set);
840         if (rc)
841                 RETURN(rc);
842
843         list_for_each (pos, &set->set_list) {
844                 int err;
845                 req = list_entry(pos, struct lov_request, rq_link);
846
847                 /* XXX update the cookie position */
848                 oti->oti_logcookies = set->set_cookies + req->rq_stripe;
849                 rc = obd_destroy(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
850                                  NULL, oti);
851                 err = lov_update_common_set(set, req, rc);
852                 if (rc) {
853                         CERROR("error: destroying objid "LPX64" subobj "
854                                LPX64" on OST idx %d: rc = %d\n", 
855                                set->set_oa->o_id, req->rq_oa->o_id, 
856                                req->rq_idx, rc);
857                         if (!rc)
858                                 rc = err;
859                 }
860         }
861         lov_fini_destroy_set(set);
862         RETURN(rc);
863 }
864
865 static int lov_getattr(struct obd_export *exp, struct obdo *oa,
866                        struct lov_stripe_md *lsm)
867 {
868         struct lov_request_set *set;
869         struct lov_request *req;
870         struct list_head *pos;
871         struct lov_obd *lov;
872         int err = 0, rc = 0;
873         ENTRY;
874
875         if (lsm_bad_magic(lsm))
876                 RETURN(-EINVAL);
877
878         if (!exp || !exp->exp_obd)
879                 RETURN(-ENODEV);
880
881         lov = &exp->exp_obd->u.lov;
882         
883         rc = lov_prep_getattr_set(exp, oa, lsm, &set);
884         if (rc)
885                 RETURN(rc);
886
887         list_for_each (pos, &set->set_list) {
888                 req = list_entry(pos, struct lov_request, rq_link);
889                 
890                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
891                        "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id, 
892                        req->rq_idx);
893
894                 rc = obd_getattr(lov->tgts[req->rq_idx].ltd_exp, 
895                                  req->rq_oa, NULL);
896                 err = lov_update_common_set(set, req, rc);
897                 if (err) {
898                         CERROR("error: getattr objid "LPX64" subobj "
899                                LPX64" on OST idx %d: rc = %d\n",
900                                set->set_oa->o_id, req->rq_oa->o_id, 
901                                req->rq_idx, err);
902                         break;
903                 }
904         }
905         
906         rc = lov_fini_getattr_set(set);
907         if (err)
908                 rc = err;
909         RETURN(rc);
910 }
911
912 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data,
913                                  int rc)
914 {
915         struct lov_request_set *lovset = (struct lov_request_set *)data;
916         ENTRY;
917
918         /* don't do attribute merge if this aysnc op failed */
919         if (rc) {
920                 lovset->set_completes = 0;
921                 lov_fini_getattr_set(lovset);
922         } else {
923                 rc = lov_fini_getattr_set(lovset);
924         }
925         RETURN (rc);
926 }
927
928 static int lov_getattr_async(struct obd_export *exp, struct obdo *oa,
929                               struct lov_stripe_md *lsm,
930                               struct ptlrpc_request_set *rqset)
931 {
932         struct lov_request_set *lovset;
933         struct lov_obd *lov;
934         struct list_head *pos;
935         struct lov_request *req;
936         int rc = 0;
937         ENTRY;
938
939         if (lsm_bad_magic(lsm))
940                 RETURN(-EINVAL);
941
942         if (!exp || !exp->exp_obd)
943                 RETURN(-ENODEV);
944
945         lov = &exp->exp_obd->u.lov;
946
947         rc = lov_prep_getattr_set(exp, oa, lsm, &lovset);
948         if (rc)
949                 RETURN(rc);
950
951         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
952                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
953
954         list_for_each (pos, &lovset->set_list) {
955                 req = list_entry(pos, struct lov_request, rq_link);
956                 
957                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
958                        "%u\n", oa->o_id, req->rq_stripe, req->rq_oa->o_id, 
959                        req->rq_idx);
960                 rc = obd_getattr_async(lov->tgts[req->rq_idx].ltd_exp,
961                                        req->rq_oa, NULL, rqset);
962                 if (rc) {
963                         CERROR("error: getattr objid "LPX64" subobj "
964                                LPX64" on OST idx %d: rc = %d\n",
965                                lovset->set_oa->o_id, req->rq_oa->o_id, 
966                                req->rq_idx, rc);
967                         GOTO(out, rc);
968                 }
969                 lov_update_common_set(lovset, req, rc);
970         }
971         
972         LASSERT(rc == 0);
973         LASSERT (rqset->set_interpret == NULL);
974         rqset->set_interpret = lov_getattr_interpret;
975         rqset->set_arg = (void *)lovset;
976         RETURN(rc);
977 out:
978         LASSERT(rc);
979         lov_fini_getattr_set(lovset);
980         RETURN(rc);
981 }
982
983 static int lov_setattr(struct obd_export *exp, struct obdo *src_oa,
984                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
985 {
986         struct lov_request_set *set;
987         struct lov_obd *lov;
988         struct list_head *pos;
989         struct lov_request *req;
990         int err = 0, rc = 0;
991         ENTRY;
992
993         if (lsm_bad_magic(lsm))
994                 RETURN(-EINVAL);
995
996         if (!exp || !exp->exp_obd)
997                 RETURN(-ENODEV);
998
999         /* for now, we only expect time updates here */
1000         LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE | OBD_MD_FLMODE|
1001                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
1002                                       OBD_MD_FLCTIME | OBD_MD_FLFLAGS |
1003                                       OBD_MD_FLSIZE | OBD_MD_FLGROUP)));
1004
1005         LASSERT(!(src_oa->o_valid & OBD_MD_FLGROUP) || src_oa->o_gr > 0);
1006
1007         lov = &exp->exp_obd->u.lov;
1008         rc = lov_prep_setattr_set(exp, src_oa, lsm, NULL, &set);
1009         if (rc)
1010                 RETURN(rc);
1011
1012         list_for_each (pos, &set->set_list) {
1013                 req = list_entry(pos, struct lov_request, rq_link);
1014                 
1015                 rc = obd_setattr(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
1016                                  NULL, NULL);
1017                 err = lov_update_common_set(set, req, rc);
1018                 if (err) {
1019                         CERROR("error: setattr objid "LPX64" subobj "
1020                                LPX64" on OST idx %d: rc = %d\n",
1021                                set->set_oa->o_id, req->rq_oa->o_id,
1022                                req->rq_idx, err);
1023                         if (!rc)
1024                                 rc = err;
1025                 }
1026         }
1027         err = lov_fini_setattr_set(set);
1028         if (!rc)
1029                 rc = err;
1030         RETURN(rc);
1031 }
1032
1033 static int lov_revalidate_policy(struct lov_obd *lov, struct lov_stripe_md *lsm)
1034 {
1035         static int next_idx = 0;
1036         struct lov_tgt_desc *tgt;
1037         int i, count;
1038
1039         /* XXX - we should do something clever and take lsm
1040          * into account but just do round robin for now. */
1041
1042         /* last_idx must always be less that count because
1043          * ld_tgt_count currently cannot shrink. */
1044         count = lov->desc.ld_tgt_count;
1045
1046         for (i = next_idx, tgt = lov->tgts + i; i < count; i++, tgt++) {
1047                 if (tgt->active) {
1048                         next_idx = (i + 1) % count;
1049                         RETURN(i);
1050                 }
1051         }
1052
1053         for (i = 0, tgt = lov->tgts; i < next_idx; i++, tgt++) {
1054                 if (tgt->active) {
1055                         next_idx = (i + 1) % count;
1056                         RETURN(i);
1057                 }
1058         }
1059
1060         RETURN(-EIO);
1061 }
1062
1063 static int lov_revalidate_md(struct obd_export *exp, struct obdo *src_oa,
1064                              struct lov_stripe_md *ea,
1065                              struct obd_trans_info *oti)
1066 {
1067         struct obd_export *osc_exp;
1068         struct lov_obd *lov = &exp->exp_obd->u.lov;
1069         struct lov_stripe_md *lsm = ea;
1070         struct lov_stripe_md obj_md;
1071         struct lov_stripe_md *obj_mdp = &obj_md;
1072         struct lov_oinfo *loi;
1073         struct obdo *tmp_oa;
1074         int ost_idx, updates = 0, i;
1075         ENTRY;
1076
1077         tmp_oa = obdo_alloc();
1078         if (tmp_oa == NULL)
1079                 RETURN(-ENOMEM);
1080
1081         loi = lsm->lsm_oinfo;
1082         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1083                 int rc;
1084                 if (!obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid))
1085                         continue;
1086
1087                 ost_idx = lov_revalidate_policy(lov, lsm);
1088                 if (ost_idx < 0) {
1089                         /* FIXME: punt for now. */
1090                         CERROR("lov_revalidate_policy failed; no active "
1091                                "OSCs?\n");
1092                         continue;
1093                 }
1094
1095                 /* create a new object */
1096                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
1097                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
1098                 osc_exp = lov->tgts[ost_idx].ltd_exp;
1099                 rc = obd_create(osc_exp, tmp_oa, NULL, 0, &obj_mdp, oti);
1100                 if (rc) {
1101                         CERROR("error creating new subobj at idx %d; "
1102                                "rc = %d\n", ost_idx, rc);
1103                         continue;
1104                 }
1105                 if (oti->oti_objid)
1106                         oti->oti_objid[ost_idx] = tmp_oa->o_id;
1107                 loi->loi_id = tmp_oa->o_id;
1108                 loi->loi_gr = tmp_oa->o_gr;
1109                 loi->loi_ost_idx = ost_idx;
1110                 loi->loi_ost_gen = lov->tgts[ost_idx].ltd_gen;
1111                 CDEBUG(D_INODE, "replacing objid "LPX64" subobj "LPX64
1112                        " with idx %d gen %d.\n", lsm->lsm_object_id,
1113                        loi->loi_id, ost_idx, loi->loi_ost_gen);
1114                 updates = 1;
1115         }
1116
1117         /* If we got an error revalidating an entry there's no need to
1118          * cleanup up objects we allocated here because the bad entry
1119          * still points to a deleted OST. */
1120
1121         obdo_free(tmp_oa);
1122         RETURN(updates);
1123 }
1124
1125 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1126  * we can send this 'punch' to just the authoritative node and the nodes
1127  * that the punch will affect. */
1128 static int lov_punch(struct obd_export *exp, struct obdo *oa,
1129                      struct lov_stripe_md *lsm,
1130                      obd_off start, obd_off end, struct obd_trans_info *oti)
1131 {
1132         struct lov_request_set *set;
1133         struct lov_obd *lov;
1134         struct list_head *pos;
1135         struct lov_request *req;
1136         int err = 0, rc = 0;
1137         ENTRY;
1138
1139         if (lsm_bad_magic(lsm))
1140                 RETURN(-EINVAL);
1141
1142         if (!exp || !exp->exp_obd)
1143                 RETURN(-ENODEV);
1144
1145         lov = &exp->exp_obd->u.lov;
1146         rc = lov_prep_punch_set(exp, oa, lsm, start, end, oti, &set);
1147         if (rc)
1148                 RETURN(rc);
1149
1150         list_for_each (pos, &set->set_list) {
1151                 req = list_entry(pos, struct lov_request, rq_link);
1152
1153                 rc = obd_punch(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, 
1154                                NULL, req->rq_extent.start, 
1155                                req->rq_extent.end, NULL);
1156                 err = lov_update_punch_set(set, req, rc);
1157                 if (err) {
1158                         CERROR("error: punch objid "LPX64" subobj "LPX64
1159                                " on OST idx %d: rc = %d\n", set->set_oa->o_id,
1160                                req->rq_oa->o_id, req->rq_idx, rc);
1161                         if (!rc)
1162                                 rc = err;
1163                 }
1164         }
1165         err = lov_fini_punch_set(set);
1166         if (!rc)
1167                 rc = err;
1168         RETURN(rc);
1169 }
1170
1171 static int lov_sync(struct obd_export *exp, struct obdo *oa,
1172                     struct lov_stripe_md *lsm, obd_off start, obd_off end)
1173 {
1174         struct lov_request_set *set;
1175         struct lov_obd *lov;
1176         struct list_head *pos;
1177         struct lov_request *req;
1178         int err = 0, rc = 0;
1179         ENTRY;
1180
1181         if (lsm_bad_magic(lsm))
1182                 RETURN(-EINVAL);
1183
1184         if (!exp->exp_obd)
1185                 RETURN(-ENODEV);
1186
1187         lov = &exp->exp_obd->u.lov;
1188         rc = lov_prep_sync_set(exp, oa, lsm, start, end, &set);
1189         if (rc)
1190                 RETURN(rc);
1191
1192         list_for_each (pos, &set->set_list) {
1193                 req = list_entry(pos, struct lov_request, rq_link);
1194
1195                 rc = obd_sync(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, 
1196                               NULL, req->rq_extent.start, req->rq_extent.end);
1197                 err = lov_update_common_set(set, req, rc);
1198                 if (err) {
1199                         CERROR("error: fsync objid "LPX64" subobj "LPX64
1200                                " on OST idx %d: rc = %d\n", set->set_oa->o_id,
1201                                req->rq_oa->o_id, req->rq_idx, rc);
1202                         if (!rc)
1203                                 rc = err;
1204                 }
1205         }
1206         err = lov_fini_sync_set(set);
1207         if (!rc)
1208                 rc = err;
1209         RETURN(rc);
1210 }
1211
1212 static int lov_brw_check(struct lov_obd *lov, struct obdo *oa,
1213                          struct lov_stripe_md *lsm,
1214                          obd_count oa_bufs, struct brw_page *pga)
1215 {
1216         int i, rc = 0;
1217         ENTRY;
1218
1219         /* The caller just wants to know if there's a chance that this
1220          * I/O can succeed */
1221         for (i = 0; i < oa_bufs; i++) {
1222                 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
1223                 int ost = lsm->lsm_oinfo[stripe].loi_ost_idx;
1224                 obd_off start, end;
1225
1226                 if (!lov_stripe_intersects(lsm, i, pga[i].disk_offset,
1227                                            pga[i].disk_offset + pga[i].count,
1228                                            &start, &end))
1229                         continue;
1230
1231                 if (lov->tgts[ost].active == 0) {
1232                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1233                         RETURN(-EIO);
1234                 }
1235                 rc = obd_brw(OBD_BRW_CHECK, lov->tgts[ost].ltd_exp, oa,
1236                              NULL, 1, &pga[i], NULL);
1237                 if (rc)
1238                         break;
1239         }
1240         RETURN(rc);
1241 }
1242
1243 static int lov_brw(int cmd, struct obd_export *exp, struct obdo *src_oa,
1244                    struct lov_stripe_md *lsm, obd_count oa_bufs,
1245                    struct brw_page *pga, struct obd_trans_info *oti)
1246 {
1247         struct lov_request_set *set;
1248         struct lov_request *req;
1249         struct list_head *pos;
1250         struct lov_obd *lov = &exp->exp_obd->u.lov;
1251         int err, rc = 0;
1252         ENTRY;
1253
1254         if (lsm_bad_magic(lsm))
1255                 RETURN(-EINVAL);
1256
1257         if (cmd == OBD_BRW_CHECK) {
1258                 rc = lov_brw_check(lov, src_oa, lsm, oa_bufs, pga);
1259                 RETURN(rc);
1260         }
1261
1262         rc = lov_prep_brw_set(exp, src_oa, lsm, oa_bufs, pga, oti, &set);
1263         if (rc)
1264                 RETURN(rc);
1265
1266         list_for_each (pos, &set->set_list) {
1267                 struct obd_export *sub_exp;
1268                 struct brw_page *sub_pga;
1269                 req = list_entry(pos, struct lov_request, rq_link);
1270                 
1271                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
1272                 sub_pga = set->set_pga + req->rq_pgaidx;
1273                 rc = obd_brw(cmd, sub_exp, req->rq_oa, req->rq_md, 
1274                              req->rq_oabufs, sub_pga, oti);
1275                 if (rc)
1276                         break;
1277                 lov_update_common_set(set, req, rc);
1278         }
1279
1280         err = lov_fini_brw_set(set);
1281         if (!rc)
1282                 rc = err;
1283         RETURN(rc);
1284 }
1285
1286 static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data,
1287                              int rc)
1288 {
1289         struct lov_request_set *lovset = (struct lov_request_set *)data;
1290         ENTRY;
1291         
1292         if (rc) {
1293                 lovset->set_completes = 0;
1294                 lov_fini_brw_set(lovset);
1295         } else {
1296                 rc = lov_fini_brw_set(lovset);
1297         }
1298                 
1299         RETURN(rc);
1300 }
1301
1302 static int lov_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
1303                          struct lov_stripe_md *lsm, obd_count oa_bufs,
1304                          struct brw_page *pga, struct ptlrpc_request_set *set,
1305                          struct obd_trans_info *oti)
1306 {
1307         struct lov_request_set *lovset;
1308         struct lov_request *req;
1309         struct list_head *pos;
1310         struct lov_obd *lov = &exp->exp_obd->u.lov;
1311         int rc = 0;
1312         ENTRY;
1313
1314         if (lsm_bad_magic(lsm))
1315                 RETURN(-EINVAL);
1316
1317         if (cmd == OBD_BRW_CHECK) {
1318                 rc = lov_brw_check(lov, oa, lsm, oa_bufs, pga);
1319                 RETURN(rc);
1320         }
1321
1322         rc = lov_prep_brw_set(exp, oa, lsm, oa_bufs, pga, oti, &lovset);
1323         if (rc)
1324                 RETURN(rc);
1325
1326         list_for_each (pos, &lovset->set_list) {
1327                 struct obd_export *sub_exp;
1328                 struct brw_page *sub_pga;
1329                 req = list_entry(pos, struct lov_request, rq_link);
1330                 
1331                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
1332                 sub_pga = lovset->set_pga + req->rq_pgaidx;
1333                 rc = obd_brw_async(cmd, sub_exp, req->rq_oa, req->rq_md,
1334                                    req->rq_oabufs, sub_pga, set, oti);
1335                 if (rc)
1336                         GOTO(out, rc);
1337                 lov_update_common_set(lovset, req, rc);
1338         }
1339         LASSERT(rc == 0);
1340         LASSERT(set->set_interpret == NULL);
1341         set->set_interpret = (set_interpreter_func)lov_brw_interpret;
1342         set->set_arg = (void *)lovset;
1343         
1344         RETURN(rc);
1345 out:
1346         lov_fini_brw_set(lovset);
1347         RETURN(rc);
1348 }
1349
1350 static int lov_ap_make_ready(void *data, int cmd)
1351 {
1352         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1353
1354         return lap->lap_caller_ops->ap_make_ready(lap->lap_caller_data, cmd);
1355 }
1356 static int lov_ap_refresh_count(void *data, int cmd)
1357 {
1358         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1359
1360         return lap->lap_caller_ops->ap_refresh_count(lap->lap_caller_data,
1361                                                      cmd);
1362 }
1363 static void lov_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
1364 {
1365         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1366
1367         lap->lap_caller_ops->ap_fill_obdo(lap->lap_caller_data, cmd, oa);
1368         /* XXX woah, shouldn't we be altering more here?  size? */
1369         oa->o_id = lap->lap_loi_id;
1370 }
1371
1372 static void lov_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
1373 {
1374         struct lov_async_page *lap = LAP_FROM_COOKIE(data);
1375
1376         /* in a raid1 regime this would down a count of many ios
1377          * in flight, onl calling the caller_ops completion when all
1378          * the raid1 ios are complete */
1379         lap->lap_caller_ops->ap_completion(lap->lap_caller_data, cmd, oa, rc);
1380 }
1381
1382 static struct obd_async_page_ops lov_async_page_ops = {
1383         .ap_make_ready =        lov_ap_make_ready,
1384         .ap_refresh_count =     lov_ap_refresh_count,
1385         .ap_fill_obdo =         lov_ap_fill_obdo,
1386         .ap_completion =        lov_ap_completion,
1387 };
1388
1389 static int lov_prep_async_page(struct obd_export *exp,
1390                                struct lov_stripe_md *lsm,
1391                                struct lov_oinfo *loi, struct page *page,
1392                                obd_off offset, struct obd_async_page_ops *ops,
1393                                void *data, void **res)
1394 {
1395         struct lov_obd *lov = &exp->exp_obd->u.lov;
1396         struct lov_async_page *lap;
1397         int rc, stripe;
1398         ENTRY;
1399
1400         if (lsm_bad_magic(lsm))
1401                 RETURN(-EINVAL);
1402         LASSERT(loi == NULL);
1403
1404         stripe = lov_stripe_number(lsm, offset);
1405         loi = &lsm->lsm_oinfo[stripe];
1406
1407         if (obd_uuid_empty(&lov->tgts[loi->loi_ost_idx].uuid))
1408                 RETURN(-EIO);
1409         if (lov->tgts[loi->loi_ost_idx].active == 0)
1410                 RETURN(-EIO);
1411         if (lov->tgts[loi->loi_ost_idx].ltd_exp == NULL) {
1412                 CERROR("ltd_exp == NULL, but OST idx %d doesn't appear to be "
1413                        "deleted or inactive.\n", loi->loi_ost_idx);
1414                 RETURN(-EIO);
1415         }
1416
1417         OBD_ALLOC(lap, sizeof(*lap));
1418         if (lap == NULL)
1419                 RETURN(-ENOMEM);
1420
1421         lap->lap_magic = LAP_MAGIC;
1422         lap->lap_caller_ops = ops;
1423         lap->lap_caller_data = data;
1424
1425         /* FIXME handle multiple oscs after landing b_raid1 */
1426         lap->lap_stripe = stripe;
1427         switch (lsm->lsm_pattern) {
1428                 case LOV_PATTERN_RAID0:
1429                         lov_stripe_offset(lsm, offset, lap->lap_stripe, 
1430                                           &lap->lap_sub_offset);
1431                         break;
1432                 case LOV_PATTERN_CMOBD:
1433                         lap->lap_sub_offset = offset;
1434                         break;
1435                 default:
1436                         LBUG();
1437         }
1438
1439         /* so the callback doesn't need the lsm */
1440         lap->lap_loi_id = loi->loi_id;
1441
1442         rc = obd_prep_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1443                                  lsm, loi, page, lap->lap_sub_offset,
1444                                  &lov_async_page_ops, lap,
1445                                  &lap->lap_sub_cookie);
1446         if (rc) {
1447                 OBD_FREE(lap, sizeof(*lap));
1448                 RETURN(rc);
1449         }
1450         CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
1451                lap->lap_sub_cookie, offset);
1452         *res = lap;
1453         RETURN(0);
1454 }
1455
1456 static int lov_queue_async_io(struct obd_export *exp,
1457                               struct lov_stripe_md *lsm,
1458                               struct lov_oinfo *loi, void *cookie,
1459                               int cmd, obd_off off, int count,
1460                               obd_flags brw_flags, obd_flags async_flags)
1461 {
1462         struct lov_obd *lov = &exp->exp_obd->u.lov;
1463         struct lov_async_page *lap;
1464         int rc;
1465
1466         LASSERT(loi == NULL);
1467
1468         if (lsm_bad_magic(lsm))
1469                 RETURN(-EINVAL);
1470
1471         lap = LAP_FROM_COOKIE(cookie);
1472
1473         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1474
1475         rc = obd_queue_async_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm,
1476                                 loi, lap->lap_sub_cookie, cmd, off, count,
1477                                 brw_flags, async_flags);
1478         RETURN(rc);
1479 }
1480
1481 static int lov_set_async_flags(struct obd_export *exp,
1482                                struct lov_stripe_md *lsm,
1483                                struct lov_oinfo *loi, void *cookie,
1484                                obd_flags async_flags)
1485 {
1486         struct lov_obd *lov = &exp->exp_obd->u.lov;
1487         struct lov_async_page *lap;
1488         int rc;
1489
1490         LASSERT(loi == NULL);
1491
1492         if (lsm_bad_magic(lsm))
1493                 RETURN(-EINVAL);
1494
1495         lap = LAP_FROM_COOKIE(cookie);
1496
1497         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1498
1499         rc = obd_set_async_flags(lov->tgts[loi->loi_ost_idx].ltd_exp,
1500                                  lsm, loi, lap->lap_sub_cookie, async_flags);
1501         RETURN(rc);
1502 }
1503
1504 static int lov_queue_group_io(struct obd_export *exp,
1505                               struct lov_stripe_md *lsm,
1506                               struct lov_oinfo *loi,
1507                               struct obd_io_group *oig, void *cookie,
1508                               int cmd, obd_off off, int count,
1509                               obd_flags brw_flags, obd_flags async_flags)
1510 {
1511         struct lov_obd *lov = &exp->exp_obd->u.lov;
1512         struct lov_async_page *lap;
1513         int rc;
1514
1515         LASSERT(loi == NULL);
1516
1517         if (lsm_bad_magic(lsm))
1518                 RETURN(-EINVAL);
1519
1520         lap = LAP_FROM_COOKIE(cookie);
1521
1522         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1523
1524         rc = obd_queue_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi,
1525                                 oig, lap->lap_sub_cookie, cmd, off, count,
1526                                 brw_flags, async_flags);
1527         RETURN(rc);
1528 }
1529
1530 /* this isn't exactly optimal.  we may have queued sync io in oscs on
1531  * all stripes, but we don't record that fact at queue time.  so we
1532  * trigger sync io on all stripes. */
1533 static int lov_trigger_group_io(struct obd_export *exp,
1534                                 struct lov_stripe_md *lsm,
1535                                 struct lov_oinfo *loi,
1536                                 struct obd_io_group *oig)
1537 {
1538         struct lov_obd *lov = &exp->exp_obd->u.lov;
1539         int rc = 0, i, err;
1540
1541         LASSERT(loi == NULL);
1542
1543         if (lsm_bad_magic(lsm))
1544                 RETURN(-EINVAL);
1545
1546         loi = lsm->lsm_oinfo;
1547         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1548                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1549                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1550                         continue;
1551                 }
1552
1553                 err = obd_trigger_group_io(lov->tgts[loi->loi_ost_idx].ltd_exp,
1554                                            lsm, loi, oig);
1555                 if (rc == 0 && err != 0)
1556                         rc = err;
1557         };
1558         RETURN(rc);
1559 }
1560
1561 static int lov_teardown_async_page(struct obd_export *exp,
1562                                    struct lov_stripe_md *lsm,
1563                                    struct lov_oinfo *loi, void *cookie)
1564 {
1565         struct lov_obd *lov = &exp->exp_obd->u.lov;
1566         struct lov_async_page *lap;
1567         int rc;
1568
1569         LASSERT(loi == NULL);
1570
1571         if (lsm_bad_magic(lsm))
1572                 RETURN(-EINVAL);
1573
1574         lap = LAP_FROM_COOKIE(cookie);
1575
1576         loi = &lsm->lsm_oinfo[lap->lap_stripe];
1577
1578         rc = obd_teardown_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp,
1579                                      lsm, loi, lap->lap_sub_cookie);
1580         if (rc) {
1581                 CERROR("unable to teardown sub cookie %p: %d\n",
1582                        lap->lap_sub_cookie, rc);
1583                 RETURN(rc);
1584         }
1585         OBD_FREE(lap, sizeof(*lap));
1586         RETURN(rc);
1587 }
1588
1589 static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
1590                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
1591                        int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
1592                        void *data,__u32 lvb_len, void *lvb_swabber,
1593                        struct lustre_handle *lockh)
1594 {
1595         struct lov_request_set *set;
1596         struct lov_request *req;
1597         struct list_head *pos;
1598         struct lustre_handle *lov_lockhp;
1599         struct lov_obd *lov;
1600         ldlm_error_t rc;
1601         int save_flags = *flags;
1602         ENTRY;
1603
1604         if (lsm_bad_magic(lsm))
1605                 RETURN(-EINVAL);
1606
1607         /* we should never be asked to replay a lock this way. */
1608         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1609
1610         if (!exp || !exp->exp_obd)
1611                 RETURN(-ENODEV);
1612
1613         lov = &exp->exp_obd->u.lov;
1614         rc = lov_prep_enqueue_set(exp, lsm, policy, mode, lockh, &set);
1615         if (rc)
1616                 RETURN(rc);
1617
1618         list_for_each (pos, &set->set_list) {
1619                 ldlm_policy_data_t sub_policy;
1620                 req = list_entry(pos, struct lov_request, rq_link);
1621                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1622                 LASSERT(lov_lockhp);
1623
1624                 *flags = save_flags;
1625                 sub_policy.l_extent.start = req->rq_extent.start;
1626                 sub_policy.l_extent.end = req->rq_extent.end;
1627
1628                 rc = obd_enqueue(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1629                                  type, &sub_policy, mode, flags, bl_cb,
1630                                  cp_cb, gl_cb, data, lvb_len, lvb_swabber,
1631                                  lov_lockhp);
1632                 rc = lov_update_enqueue_set(set, req, rc, save_flags);
1633                 if (rc != ELDLM_OK)
1634                         break;
1635         }
1636
1637         lov_fini_enqueue_set(set, mode);
1638         RETURN(rc);
1639 }
1640
1641 static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
1642                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
1643                      int *flags, void *data, struct lustre_handle *lockh)
1644 {
1645         struct lov_request_set *set;
1646         struct lov_request *req;
1647         struct list_head *pos;
1648         struct lov_obd *lov = &exp->exp_obd->u.lov;
1649         struct lustre_handle *lov_lockhp;
1650         int lov_flags, rc = 0;
1651         ENTRY;
1652
1653         if (lsm_bad_magic(lsm))
1654                 RETURN(-EINVAL);
1655
1656         if (!exp || !exp->exp_obd)
1657                 RETURN(-ENODEV);
1658
1659         lov = &exp->exp_obd->u.lov;
1660         rc = lov_prep_match_set(exp, lsm, policy, mode, lockh, &set);
1661         if (rc)
1662                 RETURN(rc);
1663
1664         list_for_each (pos, &set->set_list) {
1665                 ldlm_policy_data_t sub_policy;
1666                 req = list_entry(pos, struct lov_request, rq_link);
1667                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1668                 LASSERT(lov_lockhp);
1669
1670                 sub_policy.l_extent.start = req->rq_extent.start;
1671                 sub_policy.l_extent.end = req->rq_extent.end;
1672                 lov_flags = *flags;
1673
1674                 rc = obd_match(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1675                                type, &sub_policy, mode, &lov_flags, data,
1676                                lov_lockhp);
1677                 rc = lov_update_match_set(set, req, rc);
1678                 if (rc != 1)
1679                         break;
1680         }
1681         lov_fini_match_set(set, mode, *flags);
1682         RETURN(rc);
1683 }
1684
1685 static int lov_change_cbdata(struct obd_export *exp,
1686                              struct lov_stripe_md *lsm, ldlm_iterator_t it,
1687                              void *data)
1688 {
1689         struct lov_obd *lov;
1690         struct lov_oinfo *loi;
1691         int rc = 0, i;
1692         ENTRY;
1693
1694         if (lsm_bad_magic(lsm))
1695                 RETURN(-EINVAL);
1696
1697         if (!exp || !exp->exp_obd)
1698                 RETURN(-ENODEV);
1699
1700         LASSERT(lsm->lsm_object_gr > 0);
1701
1702         lov = &exp->exp_obd->u.lov;
1703         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1704                 struct lov_stripe_md submd;
1705                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1706                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1707                         continue;
1708                 }
1709
1710                 submd.lsm_object_id = loi->loi_id;
1711                 submd.lsm_object_gr = lsm->lsm_object_gr;
1712                 submd.lsm_stripe_count = 0;
1713                 rc = obd_change_cbdata(lov->tgts[loi->loi_ost_idx].ltd_exp,
1714                                        &submd, it, data);
1715         }
1716         RETURN(rc);
1717 }
1718
1719 static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
1720                       __u32 mode, struct lustre_handle *lockh)
1721 {
1722         struct lov_request_set *set;
1723         struct lov_request *req;
1724         struct list_head *pos;
1725         struct lov_obd *lov = &exp->exp_obd->u.lov;
1726         struct lustre_handle *lov_lockhp;
1727         int err = 0, rc = 0;
1728         ENTRY;
1729
1730         if (lsm_bad_magic(lsm))
1731                 RETURN(-EINVAL);
1732
1733         if (!exp || !exp->exp_obd)
1734                 RETURN(-ENODEV);
1735
1736         LASSERT(lsm->lsm_object_gr > 0);
1737
1738         LASSERT(lockh);
1739         lov = &exp->exp_obd->u.lov;
1740         rc = lov_prep_cancel_set(exp, lsm, mode, lockh, &set);
1741         if (rc)
1742                 RETURN(rc);
1743
1744         list_for_each (pos, &set->set_list) {
1745                 req = list_entry(pos, struct lov_request, rq_link);
1746                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1747
1748                 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
1749                                 mode, lov_lockhp);
1750                 rc = lov_update_common_set(set, req, rc);
1751                 if (rc) {
1752                         CERROR("error: cancel objid "LPX64" subobj "
1753                                LPX64" on OST idx %d: rc = %d\n",
1754                                lsm->lsm_object_id,
1755                                req->rq_md->lsm_object_id, req->rq_idx, rc);
1756                         err = rc;
1757                 }
1758  
1759         }
1760         lov_fini_cancel_set(set);
1761         RETURN(err);
1762 }
1763
1764 static int lov_cancel_unused(struct obd_export *exp,
1765                              struct lov_stripe_md *lsm, 
1766                              int flags, void *opaque)
1767 {
1768         struct lov_obd *lov;
1769         struct lov_oinfo *loi;
1770         int rc = 0, i;
1771         ENTRY;
1772
1773         lov = &exp->exp_obd->u.lov;
1774         if (lsm == NULL) {
1775                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1776                         int err = obd_cancel_unused(lov->tgts[i].ltd_exp,
1777                                                     NULL, flags, opaque);
1778                         if (!rc)
1779                                 rc = err;
1780                 }
1781                 RETURN(rc);
1782         }
1783
1784         if (lsm_bad_magic(lsm))
1785                 RETURN(-EINVAL);
1786
1787         if (!exp || !exp->exp_obd)
1788                 RETURN(-ENODEV);
1789
1790         LASSERT(lsm->lsm_object_gr > 0);
1791
1792         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1793                 struct lov_stripe_md submd;
1794                 int err;
1795
1796                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1797                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1798
1799                 submd.lsm_object_id = loi->loi_id;
1800                 submd.lsm_object_gr = lsm->lsm_object_gr;
1801                 submd.lsm_stripe_count = 0;
1802                 err = obd_cancel_unused(lov->tgts[loi->loi_ost_idx].ltd_exp,
1803                                         &submd, flags, opaque);
1804                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1805                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
1806                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1807                                loi->loi_id, loi->loi_ost_idx, err);
1808                         if (!rc)
1809                                 rc = err;
1810                 }
1811         }
1812         RETURN(rc);
1813 }
1814
1815 #define LOV_U64_MAX ((__u64)~0ULL)
1816 #define LOV_SUM_MAX(tot, add)                                           \
1817         do {                                                            \
1818                 if ((tot) + (add) < (tot))                              \
1819                         (tot) = LOV_U64_MAX;                            \
1820                 else                                                    \
1821                         (tot) += (add);                                 \
1822         } while(0)
1823
1824 static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1825                       unsigned long max_age)
1826 {
1827         struct lov_obd *lov = &obd->u.lov;
1828         struct obd_statfs lov_sfs;
1829         int set = 0;
1830         int rc = 0;
1831         int i;
1832         ENTRY;
1833
1834
1835         /* We only get block data from the OBD */
1836         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1837                 int err;
1838                 if (!lov->tgts[i].active) {
1839                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1840                         continue;
1841                 }
1842
1843                 err = obd_statfs(class_exp2obd(lov->tgts[i].ltd_exp), &lov_sfs,
1844                                  max_age);
1845                 if (err) {
1846                         if (lov->tgts[i].active && !rc)
1847                                 rc = err;
1848                         continue;
1849                 }
1850
1851                 if (!set) {
1852                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
1853                         set = 1;
1854                 } else {
1855                         osfs->os_bfree += lov_sfs.os_bfree;
1856                         osfs->os_bavail += lov_sfs.os_bavail;
1857                         osfs->os_blocks += lov_sfs.os_blocks;
1858                         /* XXX not sure about this one - depends on policy.
1859                          *   - could be minimum if we always stripe on all OBDs
1860                          *     (but that would be wrong for any other policy,
1861                          *     if one of the OBDs has no more objects left)
1862                          *   - could be sum if we stripe whole objects
1863                          *   - could be average, just to give a nice number
1864                          *
1865                          * To give a "reasonable" (if not wholly accurate)
1866                          * number, we divide the total number of free objects
1867                          * by expected stripe count (watch out for overflow).
1868                          */
1869                         LOV_SUM_MAX(osfs->os_files, lov_sfs.os_files);
1870                         LOV_SUM_MAX(osfs->os_ffree, lov_sfs.os_ffree);
1871                 }
1872         }
1873
1874         if (set) {
1875                 __u32 expected_stripes = lov->desc.ld_default_stripe_count ?
1876                                          lov->desc.ld_default_stripe_count :
1877                                          lov->desc.ld_active_tgt_count;
1878
1879                 if (osfs->os_files != LOV_U64_MAX)
1880                         do_div(osfs->os_files, expected_stripes);
1881                 if (osfs->os_ffree != LOV_U64_MAX)
1882                         do_div(osfs->os_ffree, expected_stripes);
1883         } else if (!rc)
1884                 rc = -EIO;
1885
1886         RETURN(rc);
1887 }
1888
1889 static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1890                          void *karg, void *uarg)
1891 {
1892         struct obd_device *obddev = class_exp2obd(exp);
1893         struct lov_obd *lov = &obddev->u.lov;
1894         int i, rc, count = lov->desc.ld_tgt_count;
1895         struct obd_uuid *uuidp;
1896         ENTRY;
1897
1898         switch (cmd) {
1899         case OBD_IOC_LOV_GET_CONFIG: {
1900                 struct obd_ioctl_data *data = karg;
1901                 struct lov_tgt_desc *tgtdesc;
1902                 struct lov_desc *desc;
1903                 char *buf = NULL;
1904                 __u32 *genp;
1905
1906                 buf = NULL;
1907                 len = 0;
1908                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1909                         RETURN(-EINVAL);
1910
1911                 data = (struct obd_ioctl_data *)buf;
1912
1913                 if (sizeof(*desc) > data->ioc_inllen1) {
1914                         obd_ioctl_freedata(buf, len);
1915                         RETURN(-EINVAL);
1916                 }
1917
1918                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
1919                         obd_ioctl_freedata(buf, len);
1920                         RETURN(-EINVAL);
1921                 }
1922
1923                 if (sizeof(__u32) * count > data->ioc_inllen3) {
1924                         obd_ioctl_freedata(buf, len);
1925                         RETURN(-EINVAL);
1926                 }
1927
1928                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1929                 memcpy(desc, &(lov->desc), sizeof(*desc));
1930
1931                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
1932                 genp = (__u32 *)data->ioc_inlbuf3;
1933                 tgtdesc = lov->tgts;
1934                 /* the uuid will be empty for deleted OSTs */
1935                 for (i = 0; i < count; i++, uuidp++, genp++, tgtdesc++) {
1936                         obd_str2uuid(uuidp, (char *)tgtdesc->uuid.uuid);
1937                         *genp = tgtdesc->ltd_gen;
1938                 }
1939
1940                 rc = copy_to_user((void *)uarg, buf, len);
1941                 if (rc)
1942                         rc = -EFAULT;
1943                 obd_ioctl_freedata(buf, len);
1944                 break;
1945         }
1946         case LL_IOC_LOV_SETSTRIPE:
1947                 rc = lov_setstripe(exp, karg, uarg);
1948                 break;
1949         case LL_IOC_LOV_GETSTRIPE:
1950                 rc = lov_getstripe(exp, karg, uarg);
1951                 break;
1952         case LL_IOC_LOV_SETEA:
1953                 rc = lov_setea(exp, karg, uarg);
1954                 break;
1955         default: {
1956                 int set = 0;
1957                 if (count == 0)
1958                         RETURN(-ENOTTY);
1959                 rc = 0;
1960                 for (i = 0; i < count; i++) {
1961                         int err;
1962
1963                         /* OST was deleted */
1964                         if (obd_uuid_empty(&lov->tgts[i].uuid))
1965                                 continue;
1966
1967                         err = obd_iocontrol(cmd, lov->tgts[i].ltd_exp,
1968                                             len, karg, uarg);
1969                         if (err) {
1970                                 if (lov->tgts[i].active) {
1971                                         CERROR("error: iocontrol OSC %s on OST "
1972                                                "idx %d cmd %x: err = %d\n",
1973                                                lov->tgts[i].uuid.uuid, i,
1974                                                cmd, err);
1975                                         if (!rc)
1976                                                 rc = err;
1977                                 }
1978                         } else
1979                                 set = 1;
1980                 }
1981                 if (!set && !rc)
1982                         rc = -EIO;
1983         }
1984         }
1985
1986         RETURN(rc);
1987 }
1988
1989 static int lov_get_info(struct obd_export *exp, __u32 keylen,
1990                         void *key, __u32 *vallen, void *val)
1991 {
1992         struct obd_device *obddev = class_exp2obd(exp);
1993         struct lov_obd *lov = &obddev->u.lov;
1994         int i;
1995         ENTRY;
1996
1997         if (!vallen || !val)
1998                 RETURN(-EFAULT);
1999
2000         if (keylen > strlen("lock_to_stripe") &&
2001             strcmp(key, "lock_to_stripe") == 0) {
2002                 struct {
2003                         char name[16];
2004                         struct ldlm_lock *lock;
2005                         struct lov_stripe_md *lsm;
2006                 } *data = key;
2007                 struct lov_oinfo *loi;
2008                 struct ldlm_res_id *res_id = &data->lock->l_resource->lr_name;
2009                 __u32 *stripe = val;
2010
2011                 if (*vallen < sizeof(*stripe))
2012                         RETURN(-EFAULT);
2013                 *vallen = sizeof(*stripe);
2014
2015                 /* XXX This is another one of those bits that will need to
2016                  * change if we ever actually support nested LOVs.  It uses
2017                  * the lock's export to find out which stripe it is. */
2018                 /* XXX - it's assumed all the locks for deleted OSTs have
2019                  * been cancelled. Also, the export for deleted OSTs will
2020                  * be NULL and won't match the lock's export. */
2021                 for (i = 0, loi = data->lsm->lsm_oinfo;
2022                      i < data->lsm->lsm_stripe_count;
2023                      i++, loi++) {
2024                         if (lov->tgts[loi->loi_ost_idx].ltd_exp ==
2025                                         data->lock->l_conn_export &&
2026                             loi->loi_id == res_id->name[0] &&
2027                             loi->loi_gr == res_id->name[2]) {
2028                                 *stripe = i;
2029                                 RETURN(0);
2030                         }
2031                 }
2032                 LDLM_ERROR(data->lock, "lock on inode without such object\n");
2033                 dump_lsm(D_ERROR, data->lsm);
2034                 RETURN(-ENXIO);
2035         } else if (keylen >= strlen("size_to_stripe") &&
2036                    strcmp(key, "size_to_stripe") == 0) {
2037                 struct {
2038                         int stripe_number;
2039                         __u64 size;
2040                         struct lov_stripe_md *lsm;
2041                 } *data = val;
2042
2043                 if (*vallen < sizeof(*data))
2044                         RETURN(-EFAULT);
2045
2046                 data->size = lov_size_to_stripe(data->lsm, data->size,
2047                                                 data->stripe_number);
2048                 RETURN(0);
2049         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
2050                 __u32 size = sizeof(obd_id);
2051                 obd_id *ids = val;
2052                 int rc = 0;
2053
2054                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2055                         if (!lov->tgts[i].active)
2056                                 continue;
2057                         rc = obd_get_info(lov->tgts[i].ltd_exp,
2058                                           keylen, key, &size, &(ids[i]));
2059                         if (rc != 0)
2060                                 RETURN(rc);
2061                 }
2062                 RETURN(0);
2063         } else if (keylen >= strlen("lovdesc") && strcmp(key, "lovdesc") == 0) {
2064                 struct lov_desc *desc_ret = val;
2065                 *desc_ret = lov->desc;
2066
2067                 RETURN(0);
2068         }
2069
2070         RETURN(-EINVAL);
2071 }
2072
2073 static int lov_set_info(struct obd_export *exp, obd_count keylen,
2074                         void *key, obd_count vallen, void *val)
2075 {
2076         struct obd_device *obddev = class_exp2obd(exp);
2077         struct lov_obd *lov = &obddev->u.lov;
2078         int i, rc = 0, err;
2079         ENTRY;
2080
2081 #define KEY_IS(str) \
2082         (keylen == strlen(str) && memcmp(key, str, keylen) == 0)
2083
2084         if (KEY_IS("next_id")) {
2085                 if (vallen != lov->desc.ld_tgt_count)
2086                         RETURN(-EINVAL);
2087                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2088                         /* initialize all OSCs, even inactive ones */
2089                         if (obd_uuid_empty(&lov->tgts[i].uuid))
2090                                 continue;
2091                         err = obd_set_info(lov->tgts[i].ltd_exp,
2092                                           keylen, key, sizeof(obd_id),
2093                                           ((obd_id*)val) + i);
2094                         if (!rc)
2095                                 rc = err;
2096                 }
2097                 RETURN(rc);
2098         }
2099         if (KEY_IS("async")) {
2100                 struct lov_desc *desc = &lov->desc;
2101                 struct lov_tgt_desc *tgts = lov->tgts;
2102
2103                 if (vallen != sizeof(int))
2104                         RETURN(-EINVAL);
2105                 lov->async = *((int*) val);
2106
2107                 for (i = 0; i < desc->ld_tgt_count; i++, tgts++) {
2108                         struct obd_uuid *tgt_uuid = &tgts->uuid;
2109                         struct obd_device *tgt_obd;
2110
2111                         tgt_obd = class_find_client_obd(tgt_uuid,
2112                                                         LUSTRE_OSC_NAME,
2113                                                         &obddev->obd_uuid);
2114                         if (!tgt_obd) {
2115                                 CERROR("Target %s not attached\n",
2116                                         tgt_uuid->uuid);
2117                                 if (!rc)
2118                                         rc = -EINVAL;
2119                                 continue;
2120                         }
2121
2122                         err = obd_set_info(tgt_obd->obd_self_export,
2123                                            keylen, key, vallen, val);
2124                         if (err) {
2125                                 CERROR("Failed to set async on target %s\n",
2126                                         tgt_obd->obd_name);
2127                                 if (!rc)
2128                                         rc = err;
2129                         }
2130                 }
2131                 RETURN(rc);
2132         }
2133
2134         if (KEY_IS("growth_count")) {
2135                 if (vallen != sizeof(int))
2136                         RETURN(-EINVAL);
2137         } else if (KEY_IS("mds_conn")) {
2138                 if (vallen != sizeof(__u32))
2139                         RETURN(-EINVAL);
2140         } else if (KEY_IS("unlinked") || KEY_IS("unrecovery")) {
2141                 if (vallen != 0)
2142                         RETURN(-EINVAL);
2143         } else if (KEY_IS("sec")) {
2144                 struct lov_tgt_desc *tgt;
2145                 struct obd_export *exp;
2146                 int rc = 0, err, i;
2147
2148                 spin_lock(&lov->lov_lock);
2149                 for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count;
2150                      i++, tgt++) {
2151                         exp = tgt->ltd_exp;
2152                         /* during setup time the connections to osc might
2153                          * haven't been established.
2154                          */
2155                         if (exp == NULL) {
2156                                 struct obd_device *tgt_obd;
2157
2158                                 tgt_obd = class_find_client_obd(&tgt->uuid,
2159                                                                 LUSTRE_OSC_NAME,
2160                                                                 &obddev->obd_uuid);
2161                                 if (!tgt_obd) {
2162                                         CERROR("can't set security flavor, "
2163                                                "device %s not attached?\n",
2164                                                 tgt->uuid.uuid);
2165                                         rc = -EINVAL;
2166                                         continue;
2167                                 }
2168                                 exp = tgt_obd->obd_self_export;
2169                         }
2170
2171                         err = obd_set_info(exp, keylen, key, vallen, val);
2172                         if (!rc)
2173                                 rc = err;
2174                 }
2175                 spin_unlock(&lov->lov_lock);
2176
2177                 RETURN(rc);
2178         } else {
2179                 RETURN(-EINVAL);
2180         }
2181
2182         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2183                 if (val && !obd_uuid_equals(val, &lov->tgts[i].uuid))
2184                         continue;
2185
2186                 if (!val && !lov->tgts[i].active)
2187                         continue;
2188
2189                 err = obd_set_info(lov->tgts[i].ltd_exp,
2190                                   keylen, key, vallen, val);
2191                 if (!rc)
2192                         rc = err;
2193         }
2194         RETURN(rc);
2195 #undef KEY_IS
2196 }
2197
2198 #if 0
2199 struct lov_multi_wait {
2200         struct ldlm_lock *lock;
2201         wait_queue_t      wait;
2202         int               completed;
2203         int               generation;
2204 };
2205
2206 int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm,
2207                       struct lustre_handle *lockh)
2208 {
2209         struct lov_lock_handles *lov_lockh = NULL;
2210         struct lustre_handle *lov_lockhp;
2211         struct lov_obd *lov;
2212         struct lov_oinfo *loi;
2213         struct lov_multi_wait *queues;
2214         int rc = 0, i;
2215         ENTRY;
2216
2217         if (lsm_bad_magic(lsm))
2218                 RETURN(-EINVAL);
2219
2220         if (!exp || !exp->exp_obd)
2221                 RETURN(-ENODEV);
2222
2223         LASSERT(lockh != NULL);
2224         if (lsm->lsm_stripe_count > 1) {
2225                 lov_lockh = lov_handle2llh(lockh);
2226                 if (lov_lockh == NULL) {
2227                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
2228                         RETURN(-EINVAL);
2229                 }
2230
2231                 lov_lockhp = lov_lockh->llh_handles;
2232         } else {
2233                 lov_lockhp = lockh;
2234         }
2235
2236         OBD_ALLOC(queues, lsm->lsm_stripe_count * sizeof(*queues));
2237         if (queues == NULL)
2238                 GOTO(out, rc = -ENOMEM);
2239
2240         lov = &exp->exp_obd->u.lov;
2241         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
2242              i++, loi++, lov_lockhp++) {
2243                 struct ldlm_lock *lock;
2244                 struct obd_device *obd;
2245                 unsigned long irqflags;
2246
2247                 lock = ldlm_handle2lock(lov_lockhp);
2248                 if (lock == NULL) {
2249                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
2250                                loi->loi_ost_idx, loi->loi_id);
2251                         queues[i].completed = 1;
2252                         continue;
2253                 }
2254
2255                 queues[i].lock = lock;
2256                 init_waitqueue_entry(&(queues[i].wait), current);
2257                 add_wait_queue(lock->l_waitq, &(queues[i].wait));
2258
2259                 obd = class_exp2obd(lock->l_conn_export);
2260                 if (obd != NULL)
2261                         imp = obd->u.cli.cl_import;
2262                 if (imp != NULL) {
2263                         spin_lock_irqsave(&imp->imp_lock, irqflags);
2264                         queues[i].generation = imp->imp_generation;
2265                         spin_unlock_irqrestore(&imp->imp_lock, irqflags);
2266                 }
2267         }
2268
2269         lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
2270                                interrupted_completion_wait, &lwd);
2271         rc = l_wait_event_added(check_multi_complete(queues, lsm), &lwi);
2272
2273         for (i = 0; i < lsm->lsm_stripe_count; i++)
2274                 remove_wait_queue(lock->l_waitq, &(queues[i].wait));
2275
2276         if (rc == -EINTR || rc == -ETIMEDOUT) {
2277
2278
2279         }
2280
2281  out:
2282         if (lov_lockh != NULL)
2283                 lov_llh_put(lov_lockh);
2284         RETURN(rc);
2285 }
2286 #endif
2287
2288 struct obd_ops lov_obd_ops = {
2289         .o_owner               = THIS_MODULE,
2290         .o_attach              = lov_attach,
2291         .o_detach              = lov_detach,
2292         .o_setup               = lov_setup,
2293         .o_cleanup             = lov_cleanup,
2294         .o_process_config      = lov_process_config,
2295         .o_connect             = lov_connect,
2296         .o_disconnect          = lov_disconnect,
2297         .o_statfs              = lov_statfs,
2298         .o_packmd              = lov_packmd,
2299         .o_unpackmd            = lov_unpackmd,
2300         .o_revalidate_md       = lov_revalidate_md,
2301         .o_create              = lov_create,
2302         .o_destroy             = lov_destroy,
2303         .o_getattr             = lov_getattr,
2304         .o_getattr_async       = lov_getattr_async,
2305         .o_setattr             = lov_setattr,
2306         .o_brw                 = lov_brw,
2307         .o_brw_async           = lov_brw_async,
2308         .o_prep_async_page     = lov_prep_async_page,
2309         .o_queue_async_io      = lov_queue_async_io,
2310         .o_set_async_flags     = lov_set_async_flags,
2311         .o_queue_group_io      = lov_queue_group_io,
2312         .o_trigger_group_io    = lov_trigger_group_io,
2313         .o_teardown_async_page = lov_teardown_async_page,
2314         .o_adjust_kms          = lov_adjust_kms,
2315         .o_punch               = lov_punch,
2316         .o_sync                = lov_sync,
2317         .o_enqueue             = lov_enqueue,
2318         .o_match               = lov_match,
2319         .o_change_cbdata       = lov_change_cbdata,
2320         .o_cancel              = lov_cancel,
2321         .o_cancel_unused       = lov_cancel_unused,
2322         .o_iocontrol           = lov_iocontrol,
2323         .o_get_info            = lov_get_info,
2324         .o_set_info            = lov_set_info,
2325         .o_llog_init           = lov_llog_init,
2326         .o_llog_finish         = lov_llog_finish,
2327         .o_notify              = lov_notify,
2328 };
2329
2330 int __init lov_init(void)
2331 {
2332         struct lprocfs_static_vars lvars;
2333         int rc;
2334         ENTRY;
2335
2336         lprocfs_init_vars(lov, &lvars);
2337         rc = class_register_type(&lov_obd_ops, NULL, lvars.module_vars,
2338                                  OBD_LOV_DEVICENAME);
2339         RETURN(rc);
2340 }
2341
2342 #ifdef __KERNEL__
2343 static void /*__exit*/ lov_exit(void)
2344 {
2345         class_unregister_type(OBD_LOV_DEVICENAME);
2346 }
2347
2348 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2349 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
2350 MODULE_LICENSE("GPL");
2351
2352 module_init(lov_init);
2353 module_exit(lov_exit);
2354 #endif