Whamcloud - gitweb
file setup.in was initially added on branch b_unify.
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1  /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  * Author: Phil Schwan <phil@clusterfs.com>
6  *         Peter Braam <braam@clusterfs.com>
7  *         Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define EXPORT_SYMTAB
26 #define DEBUG_SUBSYSTEM S_LOV
27 #ifdef __KERNEL__
28 #include <linux/slab.h>
29 #include <linux/module.h>
30 #include <linux/init.h>
31 #include <linux/random.h>
32 #include <linux/slab.h>
33 #include <linux/pagemap.h>
34 #include <asm/div64.h>
35 #else
36 #include <liblustre.h>
37 #endif
38
39 #include <linux/obd_support.h>
40 #include <linux/lustre_lib.h>
41 #include <linux/lustre_net.h>
42 #include <linux/lustre_idl.h>
43 #include <linux/lustre_lite.h> /* for LL_IOC_LOV_[GS]ETSTRIPE */
44 #include <linux/lustre_mds.h>
45 #include <linux/obd_class.h>
46 #include <linux/obd_lov.h>
47 #include <linux/seq_file.h>
48 #include <linux/lprocfs_status.h>
49
50 struct lov_file_handles {
51         struct portals_handle lfh_handle;
52         atomic_t lfh_refcount;
53         struct list_head lfh_list;
54         int lfh_count;
55         struct obd_client_handle *lfh_och;
56 };
57
58 struct lov_lock_handles {
59         struct portals_handle llh_handle;
60         atomic_t llh_refcount;
61         int llh_stripe_count;
62         struct lustre_handle llh_handles[0];
63 };
64
65 /* lov_file_handles helpers */
66 static void lov_lfh_addref(void *lfhp)
67 {
68         struct lov_file_handles *lfh = lfhp;
69
70         atomic_inc(&lfh->lfh_refcount);
71         CDEBUG(D_INFO, "GETting lfh %p : new refcount %d\n", lfh,
72                atomic_read(&lfh->lfh_refcount));
73 }
74
75 static struct lov_file_handles *lov_lfh_new(void)
76 {
77         struct lov_file_handles *lfh;
78
79         OBD_ALLOC(lfh, sizeof *lfh);
80         if (lfh == NULL) {
81                 CERROR("out of memory\n");
82                 return NULL;
83         }
84
85         atomic_set(&lfh->lfh_refcount, 2);
86
87         INIT_LIST_HEAD(&lfh->lfh_handle.h_link);
88         class_handle_hash(&lfh->lfh_handle, lov_lfh_addref);
89
90         return lfh;
91 }
92
93 static struct lov_file_handles *lov_handle2lfh(struct lustre_handle *handle)
94 {
95         ENTRY;
96         LASSERT(handle != NULL);
97         RETURN(class_handle2object(handle->cookie));
98 }
99
100 static void lov_lfh_put(struct lov_file_handles *lfh)
101 {
102         CDEBUG(D_INFO, "PUTting lfh %p : new refcount %d\n", lfh,
103                atomic_read(&lfh->lfh_refcount) - 1);
104         LASSERT(atomic_read(&lfh->lfh_refcount) > 0 &&
105                 atomic_read(&lfh->lfh_refcount) < 0x5a5a);
106         if (atomic_dec_and_test(&lfh->lfh_refcount)) {
107                 LASSERT(list_empty(&lfh->lfh_handle.h_link));
108                 OBD_FREE(lfh, sizeof *lfh);
109         }
110 }
111
112 static void lov_lfh_destroy(struct lov_file_handles *lfh)
113 {
114         class_handle_unhash(&lfh->lfh_handle);
115         lov_lfh_put(lfh);
116 }
117
118 static void lov_llh_addref(void *llhp)
119 {
120         struct lov_lock_handles *llh = llhp;
121
122         atomic_inc(&llh->llh_refcount);
123         CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
124                atomic_read(&llh->llh_refcount));
125 }
126
127 static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
128 {
129         struct lov_lock_handles *llh;
130
131         OBD_ALLOC(llh, sizeof *llh +
132                   sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
133         if (llh == NULL) {
134                 CERROR("out of memory\n");
135                 return NULL;
136         }
137         atomic_set(&llh->llh_refcount, 2);
138         llh->llh_stripe_count = lsm->lsm_stripe_count;
139         INIT_LIST_HEAD(&llh->llh_handle.h_link);
140         class_handle_hash(&llh->llh_handle, lov_llh_addref);
141         return llh;
142 }
143
144 static struct lov_lock_handles *lov_handle2llh(struct lustre_handle *handle)
145 {
146         ENTRY;
147         LASSERT(handle != NULL);
148         RETURN(class_handle2object(handle->cookie));
149 }
150
151 static void lov_llh_put(struct lov_lock_handles *llh)
152 {
153         CDEBUG(D_INFO, "PUTting llh %p : new refcount %d\n", llh,
154                atomic_read(&llh->llh_refcount) - 1);
155         LASSERT(atomic_read(&llh->llh_refcount) > 0 &&
156                 atomic_read(&llh->llh_refcount) < 0x5a5a);
157         if (atomic_dec_and_test(&llh->llh_refcount)) {
158                 LASSERT(list_empty(&llh->llh_handle.h_link));
159                 OBD_FREE(llh, sizeof *llh +
160                          sizeof(*llh->llh_handles) * llh->llh_stripe_count);
161         }
162 }
163
164 static void lov_llh_destroy(struct lov_lock_handles *llh)
165 {
166         class_handle_unhash(&llh->llh_handle);
167         lov_llh_put(llh);
168 }
169
170 /* obd methods */
171 int lov_attach(struct obd_device *dev, obd_count len, void *data)
172 {
173         struct lprocfs_static_vars lvars;
174         struct proc_dir_entry *entry;
175         int rc;
176
177         lprocfs_init_vars(&lvars);
178         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
179         if (rc) 
180                 return rc;
181
182         entry = create_proc_entry("target_obd", 0444, dev->obd_proc_entry);
183         if (entry == NULL) 
184                 RETURN(-ENOMEM);
185         entry->proc_fops = &ll_proc_target_fops;
186         entry->data = dev;
187         
188         return rc;
189         
190 }
191
192 int lov_detach(struct obd_device *dev)
193 {
194         return lprocfs_obd_detach(dev);
195 }
196
197 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
198                        struct obd_uuid *cluuid)
199 {
200         struct ptlrpc_request *req = NULL;
201         struct lov_obd *lov = &obd->u.lov;
202         struct client_obd *mdc = &lov->mdcobd->u.cli;
203         struct lov_desc *desc = &lov->desc;
204         struct lov_desc *mdesc;
205         struct lov_tgt_desc *tgts;
206         struct obd_export *exp;
207         struct lustre_handle mdc_conn;
208         struct obd_uuid lov_mds_uuid = {"LOV_MDS_UUID"};
209         struct obd_uuid *uuids;
210         int rc, rc2, i;
211         ENTRY;
212
213         rc = class_connect(conn, obd, cluuid);
214         if (rc)
215                 RETURN(rc);
216
217         /* We don't want to actually do the underlying connections more than
218          * once, so keep track. */
219         lov->refcount++;
220         if (lov->refcount > 1)
221                 RETURN(0);
222
223         exp = class_conn2export(conn);
224         spin_lock_init(&exp->exp_lov_data.led_lock);
225         INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
226
227         /* retrieve LOV metadata from MDS */
228         rc = obd_connect(&mdc_conn, lov->mdcobd, &lov_mds_uuid);
229         if (rc) {
230                 CERROR("cannot connect to mdc: rc = %d\n", rc);
231                 GOTO(out_conn, rc);
232         }
233
234         rc = mdc_getlovinfo(obd, &mdc_conn, &req);
235         rc2 = obd_disconnect(&mdc_conn, 0);
236         if (rc) {
237                 CERROR("cannot get lov info %d\n", rc);
238                 GOTO(out_conn, rc);
239         }
240
241         if (rc2) {
242                 CERROR("error disconnecting from MDS %d\n", rc2);
243                 GOTO(out_req, rc = rc2);
244         }
245
246         /* mdc_getlovinfo() has checked and swabbed the reply.  It has also
247          * done some simple checks (e.g. #uuids consistent with desc, uuid
248          * array fits in LOV_MAX_UUID_BUFFER_SIZE and all uuids are
249          * terminated), but I still need to verify it makes overall
250          * sense */
251         mdesc = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*mdesc));
252         LASSERT (mdesc != NULL);
253         LASSERT_REPSWABBED (req, 0);
254
255         *desc = *mdesc;
256
257         if (!obd_uuid_equals(&obd->obd_uuid, &desc->ld_uuid)) {
258                 CERROR("LOV desc: uuid %s not on mds device (%s)\n",
259                        obd->obd_uuid.uuid, desc->ld_uuid.uuid);
260                 GOTO(out_req, rc = -EINVAL);
261         }
262
263         /* Because of 64-bit divide/mod operations only work with a 32-bit
264          * divisor in a 32-bit kernel, we cannot support a stripe width
265          * of 4GB or larger on 32-bit CPUs.
266          */
267         if ((desc->ld_default_stripe_count ?
268              desc->ld_default_stripe_count : desc->ld_tgt_count) *
269              desc->ld_default_stripe_size > ~0UL) {
270                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
271                        desc->ld_default_stripe_size,
272                        desc->ld_default_stripe_count ?
273                        desc->ld_default_stripe_count : desc->ld_tgt_count,~0UL);
274                 GOTO(out_req, rc = -EINVAL);
275         }
276
277         /* We know ld_tgt_count is reasonable (the array of UUIDS fits in
278          * the maximum buffer size, so we won't be making outrageous
279          * demands on memory here. */
280         lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
281         OBD_ALLOC(lov->tgts, lov->bufsize);
282         if (!lov->tgts) {
283                 CERROR("Out of memory\n");
284                 GOTO(out_req, rc = -ENOMEM);
285         }
286
287         uuids = lustre_msg_buf(req->rq_repmsg, 1,
288                                sizeof(*uuids) * desc->ld_tgt_count);
289         LASSERT (uuids != NULL);
290         LASSERT_REPSWABBED (req, 1);
291
292         for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) {
293                 struct obd_uuid *uuid = &tgts->uuid;
294                 struct obd_device *tgt_obd;
295                 struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
296
297                 /* NULL termination already checked */
298                 *uuid = uuids[i];
299
300                 tgt_obd = client_tgtuuid2obd(uuid);
301
302                 if (!tgt_obd) {
303                         CERROR("Target %s not attached\n", uuid->uuid);
304                         GOTO(out_disc, rc = -EINVAL);
305                 }
306
307                 if (!tgt_obd->obd_set_up) {
308                         CERROR("Target %s not set up\n", uuid->uuid);
309                         GOTO(out_disc, rc = -EINVAL);
310                 }
311
312                 rc = obd_connect(&tgts->conn, tgt_obd, &lov_osc_uuid);
313
314                 if (rc) {
315                         CERROR("Target %s connect error %d\n", uuid->uuid, rc);
316                         GOTO(out_disc, rc);
317                 }
318
319                 rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &tgts->conn,
320                                    sizeof(struct obd_device *), obd, NULL);
321                 if (rc) {
322                         CERROR("Target %s REGISTER_LOV error %d\n",
323                                uuid->uuid, rc);
324                         obd_disconnect(&tgts->conn, 0);
325                         GOTO(out_disc, rc);
326                 }
327
328                 desc->ld_active_tgt_count++;
329                 tgts->active = 1;
330         }
331
332         mdc->cl_max_mds_easize = obd_size_diskmd(conn, NULL);
333         ptlrpc_req_finished (req);
334         class_export_put(exp);
335         RETURN (0);
336
337  out_disc:
338         while (i-- > 0) {
339                 struct obd_uuid uuid;
340                 --tgts;
341                 --desc->ld_active_tgt_count;
342                 tgts->active = 0;
343                 /* save for CERROR below; (we know it's terminated) */
344                 uuid = tgts->uuid;
345                 rc2 = obd_disconnect(&tgts->conn, 0);
346                 if (rc2)
347                         CERROR("error: LOV target %s disconnect on OST idx %d: "
348                                "rc = %d\n", uuid.uuid, i, rc2);
349         }
350         OBD_FREE(lov->tgts, lov->bufsize);
351  out_req:
352         ptlrpc_req_finished (req);
353  out_conn:
354         class_export_put(exp);
355         class_disconnect(conn, 0);
356         RETURN (rc);
357 }
358
359 static int lov_disconnect(struct lustre_handle *conn, int failover)
360 {
361         struct obd_device *obd = class_conn2obd(conn);
362         struct lov_obd *lov = &obd->u.lov;
363         struct obd_export *exp;
364         struct list_head *p, *n;
365         int rc, i;
366         ENTRY;
367
368         if (!lov->tgts)
369                 goto out_local;
370
371         /* Only disconnect the underlying layers on the final disconnect. */
372         lov->refcount--;
373         if (lov->refcount != 0)
374                 goto out_local;
375
376         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
377                 if (obd->obd_no_recov) {
378                         /* Pass it on to our clients.
379                          * XXX This should be an argument to disconnect,
380                          * XXX not a back-door flag on the OBD.  Ah well.
381                          */
382                         struct obd_device *osc_obd =
383                                 class_conn2obd(&lov->tgts[i].conn);
384                         osc_obd->obd_no_recov = 1;
385                 }
386                 rc = obd_disconnect(&lov->tgts[i].conn, failover);
387                 if (rc) {
388                         if (lov->tgts[i].active) {
389                                 CERROR("Target %s disconnect error %d\n",
390                                        lov->tgts[i].uuid.uuid, rc);
391                         }
392                         rc = 0;
393                 }
394                 if (lov->tgts[i].active) {
395                         lov->desc.ld_active_tgt_count--;
396                         lov->tgts[i].active = 0;
397                 }
398         }
399         OBD_FREE(lov->tgts, lov->bufsize);
400         lov->bufsize = 0;
401         lov->tgts = NULL;
402
403         exp = class_conn2export(conn);
404         if (exp == NULL) {
405                 CERROR("export handle "LPU64" invalid!  If you can reproduce, "
406                        "please send a full debug log to phik\n", conn->cookie);
407                 RETURN(0);
408         }
409         spin_lock(&exp->exp_lov_data.led_lock);
410         list_for_each_safe(p, n, &exp->exp_lov_data.led_open_head) {
411                 /* XXX close these, instead of just discarding them? */
412                 struct lov_file_handles *lfh;
413                 lfh = list_entry(p, typeof(*lfh), lfh_list);
414                 CERROR("discarding open LOV handle %p:"LPX64"\n",
415                        lfh, lfh->lfh_handle.h_cookie);
416                 list_del(&lfh->lfh_list);
417                 OBD_FREE(lfh->lfh_och, lfh->lfh_count * FD_OSTDATA_SIZE);
418                 lov_lfh_destroy(lfh);
419                 lov_lfh_put(lfh);
420         }
421         spin_unlock(&exp->exp_lov_data.led_lock);
422         class_export_put(exp);
423
424  out_local:
425         rc = class_disconnect(conn, 0);
426         RETURN(rc);
427 }
428
429 /* Error codes:
430  *
431  *  -EINVAL  : UUID can't be found in the LOV's target list
432  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
433  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
434  */
435 static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
436                               int activate)
437 {
438         struct obd_device *obd;
439         struct lov_tgt_desc *tgt;
440         int i, rc = 0;
441         ENTRY;
442
443         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
444                lov, uuid->uuid, activate);
445
446         spin_lock(&lov->lov_lock);
447         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
448                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
449                        i, tgt->uuid.uuid, tgt->conn.cookie);
450                 if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof uuid->uuid) == 0)
451                         break;
452         }
453
454         if (i == lov->desc.ld_tgt_count)
455                 GOTO(out, rc = -EINVAL);
456
457         obd = class_conn2obd(&tgt->conn);
458         if (obd == NULL) {
459                 /* This can happen if OST failure races with node shutdown */
460                 GOTO(out, rc = -ENOTCONN);
461         }
462
463         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LOV idx %d\n",
464                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
465                obd->obd_type->typ_name, i);
466         LASSERT(strcmp(obd->obd_type->typ_name, "osc") == 0);
467
468         if (tgt->active == activate) {
469                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
470                        activate ? "" : "in");
471                 GOTO(out, rc);
472         }
473
474         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
475
476         tgt->active = activate;
477         if (activate) {
478                 /*
479                  * foreach(export)
480                  *     foreach(open_file)
481                  *         if (file_handle uses this_osc)
482                  *             if (has_no_filehandle)
483                  *                 open(file_handle, this_osc);
484                  */
485                 /* XXX reconnect? */
486                 lov->desc.ld_active_tgt_count++;
487         } else {
488                 /*
489                  * Should I invalidate filehandles that refer to this OSC, so
490                  * that I reopen them during reactivation?
491                  */
492                 /* XXX disconnect from OSC? */
493                 lov->desc.ld_active_tgt_count--;
494         }
495
496 #warning "FIXME: walk open files list for objects that need opening"
497         EXIT;
498  out:
499         spin_unlock(&lov->lov_lock);
500         return rc;
501 }
502
503 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
504 {
505         struct obd_ioctl_data *data = buf;
506         struct lov_obd *lov = &obd->u.lov;
507         int rc = 0;
508         ENTRY;
509
510         if (data->ioc_inllen1 < 1) {
511                 CERROR("LOV setup requires an MDC name\n");
512                 RETURN(-EINVAL);
513         }
514
515         spin_lock_init(&lov->lov_lock);
516         lov->mdcobd = class_name2obd(data->ioc_inlbuf1);
517         if (!lov->mdcobd) {
518                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid.uuid,
519                        data->ioc_inlbuf1);
520                 rc = -EINVAL;
521         }
522         RETURN(rc);
523 }
524
525 /* compute object size given "stripeno" and the ost size */
526 static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
527                                 int stripeno)
528 {
529         unsigned long ssize  = lsm->lsm_stripe_size;
530         unsigned long swidth = ssize * lsm->lsm_stripe_count;
531         unsigned long stripe_size;
532         obd_size lov_size;
533
534         if (ost_size == 0)
535                 return 0;
536
537         /* do_div(a, b) returns a % b, and a = a / b */
538         stripe_size = do_div(ost_size, ssize);
539
540         if (stripe_size)
541                 lov_size = ost_size * swidth + stripeno * ssize + stripe_size;
542         else
543                 lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize;
544
545         return lov_size;
546 }
547
548 static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
549                             struct lov_stripe_md *lsm, int stripeno, int *set)
550 {
551         if (*set) {
552                 if (valid & OBD_MD_FLSIZE) {
553                         /* this handles sparse files properly */
554                         obd_size lov_size;
555
556                         lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
557                         if (lov_size > tgt->o_size)
558                                 tgt->o_size = lov_size;
559                 }
560                 if (valid & OBD_MD_FLBLOCKS)
561                         tgt->o_blocks += src->o_blocks;
562                 if (valid & OBD_MD_FLBLKSZ)
563                         tgt->o_blksize += src->o_blksize;
564                 if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
565                         tgt->o_ctime = src->o_ctime;
566                 if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
567                         tgt->o_mtime = src->o_mtime;
568         } else {
569                 obdo_cpy_md(tgt, src, valid);
570                 if (valid & OBD_MD_FLSIZE)
571                         tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
572                 *set = 1;
573         }
574 }
575
576 /* the LOV expects oa->o_id to be set to the LOV object id */
577 static int lov_create(struct lustre_handle *conn, struct obdo *oa,
578                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
579 {
580         struct obd_export *export = class_conn2export(conn);
581         struct lov_obd *lov;
582         struct lov_stripe_md *lsm;
583         struct lov_oinfo *loi;
584         struct obdo *tmp;
585         unsigned ost_count, ost_idx;
586         int set = 0, obj_alloc = 0;
587         int rc = 0, i;
588         ENTRY;
589
590         LASSERT(ea);
591
592         if (!export)
593                 GOTO(out_exp, rc = -EINVAL);
594
595         lov = &export->exp_obd->u.lov;
596
597         if (!lov->desc.ld_active_tgt_count)
598                 GOTO(out_exp, rc = -EIO);
599
600         tmp = obdo_alloc();
601         if (!tmp)
602                 GOTO(out_exp, rc = -ENOMEM);
603
604         lsm = *ea;
605
606         if (!lsm) {
607                 rc = obd_alloc_memmd(conn, &lsm);
608                 if (rc < 0)
609                         GOTO(out_tmp, rc);
610
611                 rc = 0;
612                 lsm->lsm_magic = LOV_MAGIC;
613         }
614
615         ost_count = lov->desc.ld_tgt_count;
616
617         LASSERT(oa->o_valid & OBD_MD_FLID);
618         lsm->lsm_object_id = oa->o_id;
619         if (!lsm->lsm_stripe_size)
620                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
621
622         if (!*ea || lsm->lsm_stripe_offset >= ost_count) {
623                 get_random_bytes(&ost_idx, 2);
624                 ost_idx %= ost_count;
625         } else
626                 ost_idx = lsm->lsm_stripe_offset;
627
628         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
629                lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
630
631         loi = lsm->lsm_oinfo;
632         for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
633                 struct lov_stripe_md obj_md;
634                 struct lov_stripe_md *obj_mdp = &obj_md;
635                 int err;
636
637                 if (lov->tgts[ost_idx].active == 0) {
638                         CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
639                         continue;
640                 }
641
642                 /* create data objects with "parent" OA */
643                 memcpy(tmp, oa, sizeof(*tmp));
644                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
645                 err = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp, oti);
646                 if (err) {
647                         if (lov->tgts[ost_idx].active) {
648                                 CERROR("error creating objid "LPX64" sub-object"
649                                        " on OST idx %d/%d: rc = %d\n", oa->o_id,
650                                        ost_idx, lsm->lsm_stripe_count, err);
651                                 if (err > 0) {
652                                         CERROR("obd_create returned invalid "
653                                                "err %d\n", err);
654                                         err = -EIO;
655                                 }
656                         }
657                         if (!rc)
658                                 rc = err;
659                         continue;
660                 }
661                 loi->loi_id = tmp->o_id;
662                 loi->loi_ost_idx = ost_idx;
663                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
664                        lsm->lsm_object_id, loi->loi_id, ost_idx);
665
666                 if (set == 0)
667                         lsm->lsm_stripe_offset = ost_idx;
668                 lov_merge_attrs(oa, tmp, OBD_MD_FLBLKSZ, lsm, obj_alloc, &set);
669                 ot_init(&loi->loi_dirty_ot_inline);
670                 loi->loi_dirty_ot = &loi->loi_dirty_ot_inline;
671
672                 ++obj_alloc;
673                 ++loi;
674
675                 /* If we have allocated enough objects, we are OK */
676                 if (obj_alloc == lsm->lsm_stripe_count)
677                         GOTO(out_done, rc = 0);
678         }
679
680         if (*ea != NULL) {
681                 CERROR("can't lstripe objid "LPX64": have %u want %u, rc %d\n",
682                        lsm->lsm_object_id, obj_alloc, lsm->lsm_stripe_count,rc);
683                 if (rc == 0)
684                         rc = -EFBIG;
685                 GOTO(out_cleanup, rc);
686         } else {
687                 struct lov_stripe_md *lsm_new;
688                 /* XXX LOV STACKING call into osc for sizes */
689                 unsigned size = lov_stripe_md_size(obj_alloc);
690
691                 CERROR("reallocating LSM for objid "LPX64": old %u new %u\n",
692                        lsm->lsm_object_id, obj_alloc, lsm->lsm_stripe_count);
693                 OBD_ALLOC(lsm_new, size);
694                 if (!lsm_new)
695                         GOTO(out_cleanup, rc = -ENOMEM);
696                 memcpy(lsm_new, lsm, size);
697                 lsm_new->lsm_stripe_count = obj_alloc;
698
699                 /* XXX LOV STACKING call into osc for sizes */
700                 OBD_FREE(lsm, lov_stripe_md_size(lsm->lsm_stripe_count));
701                 lsm = lsm_new;
702
703                 rc = 0;
704         }
705  out_done:
706         *ea = lsm;
707
708  out_tmp:
709         obdo_free(tmp);
710  out_exp:
711         class_export_put(export);
712         return rc;
713
714  out_cleanup:
715         while (obj_alloc-- > 0) {
716                 int err;
717
718                 --loi;
719                 /* destroy already created objects here */
720                 memcpy(tmp, oa, sizeof(*tmp));
721                 tmp->o_id = loi->loi_id;
722                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL,
723                                   NULL);
724                 if (err)
725                         CERROR("Failed to uncreate objid "LPX64" subobj "
726                                LPX64" on OST idx %d: rc = %d\n",
727                                oa->o_id, loi->loi_id, loi->loi_ost_idx,
728                                err);
729         }
730         if (*ea == NULL)
731                 obd_free_memmd(conn, &lsm);
732         goto out_tmp;
733 }
734
735 #define lsm_bad_magic(LSMP)                                     \
736 ({                                                              \
737         struct lov_stripe_md *_lsm__ = (LSMP);                  \
738         int _ret__ = 0;                                         \
739         if (!_lsm__) {                                          \
740                 CERROR("LOV requires striping ea\n");           \
741                 _ret__ = 1;                                     \
742         } else if (_lsm__->lsm_magic != LOV_MAGIC) {            \
743                 CERROR("LOV striping magic bad %#x != %#x\n",   \
744                        _lsm__->lsm_magic, LOV_MAGIC);           \
745                 _ret__ = 1;                                     \
746         }                                                       \
747         _ret__;                                                 \
748 })
749
750 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
751                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
752 {
753         struct obdo tmp;
754         struct obd_export *export = class_conn2export(conn);
755         struct lov_obd *lov;
756         struct lov_oinfo *loi;
757         struct lov_file_handles *lfh = NULL;
758         int rc = 0, i;
759         ENTRY;
760
761         if (lsm_bad_magic(lsm))
762                 GOTO(out, rc = -EINVAL);
763
764         if (!export || !export->exp_obd)
765                 GOTO(out, rc = -ENODEV);
766
767         if (oa->o_valid & OBD_MD_FLHANDLE)
768                 lfh = lov_handle2lfh(obdo_handle(oa));
769
770         lov = &export->exp_obd->u.lov;
771         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
772                 int err;
773                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
774                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
775                         /* Orphan clean up will (someday) fix this up. */
776                         continue;
777                 }
778
779                 memcpy(&tmp, oa, sizeof(tmp));
780                 tmp.o_id = loi->loi_id;
781                 if (lfh)
782                         memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
783                                FD_OSTDATA_SIZE);
784                 else
785                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
786                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
787                                   NULL, NULL);
788                 if (err && lov->tgts[loi->loi_ost_idx].active) {
789                         CERROR("error: destroying objid "LPX64" subobj "
790                                LPX64" on OST idx %d: rc = %d\n",
791                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
792                         if (!rc)
793                                 rc = err;
794                 }
795         }
796         if (lfh != NULL)
797                 lov_lfh_put(lfh);
798         EXIT;
799  out:
800         class_export_put(export);
801         return rc;
802 }
803
804 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
805                        struct lov_stripe_md *lsm)
806 {
807         struct obdo tmp;
808         struct obd_export *export = class_conn2export(conn);
809         struct lov_obd *lov;
810         struct lov_oinfo *loi;
811         struct lov_file_handles *lfh = NULL;
812         int i, rc = 0, set = 0;
813         ENTRY;
814
815         if (lsm_bad_magic(lsm))
816                 GOTO(out, rc = -EINVAL);
817
818         if (!export || !export->exp_obd)
819                 GOTO(out, rc = -ENODEV);
820
821         lov = &export->exp_obd->u.lov;
822
823         if (oa->o_valid & OBD_MD_FLHANDLE)
824                 lfh = lov_handle2lfh(obdo_handle(oa));
825
826         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
827                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
828         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
829                 int err;
830
831                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
832                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
833                         continue;
834                 }
835
836                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
837                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
838                 /* create data objects with "parent" OA */
839                 memcpy(&tmp, oa, sizeof(tmp));
840                 tmp.o_id = loi->loi_id;
841                 if (lfh)
842                         memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
843                                FD_OSTDATA_SIZE);
844                 else
845                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
846
847                 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
848                 if (err) {
849                         if (lov->tgts[loi->loi_ost_idx].active) {
850                                 CERROR("error: getattr objid "LPX64" subobj "
851                                        LPX64" on OST idx %d: rc = %d\n",
852                                        oa->o_id, loi->loi_id, loi->loi_ost_idx,
853                                        err);
854                                 GOTO(out, rc = err);
855                         }
856                 } else {
857                         lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &set);
858                 }
859         }
860         if (!set)
861                 rc = -EIO;
862         GOTO(out, rc);
863  out:
864         if (lfh != NULL)
865                 lov_lfh_put(lfh);
866         class_export_put(export);
867         return rc;
868 }
869
870 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
871                                  struct lov_getattr_async_args *aa, int rc)
872 {
873         struct lov_stripe_md *lsm = aa->aa_lsm;
874         struct obdo          *oa = aa->aa_oa;
875         struct obdo          *obdos = aa->aa_stripe_oas;
876         struct lov_oinfo     *loi;
877         int                   i;
878         int                   set = 0;
879         ENTRY;
880
881         if (rc == 0) {
882                 /* NB all stripe requests succeeded to get here */
883
884                 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
885                      i++,loi++) {
886                         if (obdos[i].o_valid == 0)      /* inactive stripe */
887                                 continue;
888
889                         lov_merge_attrs(oa, &obdos[i], obdos[i].o_valid, lsm,
890                                         i, &set);
891                 }
892
893                 if (!set) {
894                         CERROR ("No stripes had valid attrs\n");
895                         rc = -EIO;
896                 }
897         }
898
899         OBD_FREE (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
900         RETURN (rc);
901 }
902
903 static int lov_getattr_async (struct lustre_handle *conn, struct obdo *oa,
904                               struct lov_stripe_md *lsm,
905                               struct ptlrpc_request_set *rqset)
906 {
907         struct obdo *obdos;
908         struct obd_export *export = class_conn2export(conn);
909         struct lov_obd *lov;
910         struct lov_oinfo *loi;
911         struct lov_file_handles *lfh = NULL;
912         struct lov_getattr_async_args *aa;
913         int i;
914         int set = 0;
915         int rc = 0;
916         ENTRY;
917
918         if (!lsm) {
919                 CERROR("LOV requires striping ea\n");
920                 GOTO(out, rc = -EINVAL);
921         }
922
923         if (lsm->lsm_magic != LOV_MAGIC) {
924                 CERROR("LOV striping magic bad %#x != %#x\n",
925                        lsm->lsm_magic, LOV_MAGIC);
926                 GOTO(out, rc = -EINVAL);
927         }
928
929         if (!export || !export->exp_obd)
930                 GOTO(out, rc = -ENODEV);
931
932         lov = &export->exp_obd->u.lov;
933
934         OBD_ALLOC (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
935         if (obdos == NULL)
936                 GOTO (out, rc = -ENOMEM);
937
938         if (oa->o_valid & OBD_MD_FLHANDLE)
939                 lfh = lov_handle2lfh(obdo_handle(oa));
940
941         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
942                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
943         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
944                 int err;
945
946                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
947                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
948                         /* leaves obdos[i].obd_valid unset */
949                         continue;
950                 }
951
952                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
953                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
954                 /* create data objects with "parent" OA */
955                 memcpy(&obdos[i], oa, sizeof(obdos[i]));
956                 obdos[i].o_id = loi->loi_id;
957                 if (lfh)
958                         memcpy(obdo_handle(&obdos[i]), lfh->lfh_och + i,
959                                FD_OSTDATA_SIZE);
960                 else
961                         obdos[i].o_valid &= ~OBD_MD_FLHANDLE;
962
963                 err = obd_getattr_async (&lov->tgts[loi->loi_ost_idx].conn,
964                                          &obdos[i], NULL, rqset);
965                 if (err) {
966                         CERROR("error: getattr objid "LPX64" subobj "
967                                LPX64" on OST idx %d: rc = %d\n",
968                                oa->o_id, loi->loi_id, loi->loi_ost_idx,
969                                err);
970                         GOTO(out_obdos, rc = err);
971                 }
972                 set = 1;
973         }
974         if (!set)
975                 GOTO (out_obdos, rc = -EIO);
976
977         LASSERT (rqset->set_interpret == NULL);
978         rqset->set_interpret = lov_getattr_interpret;
979         LASSERT (sizeof (rqset->set_args) >= sizeof (*aa));
980         aa = (struct lov_getattr_async_args *)&rqset->set_args;
981         aa->aa_lsm = lsm;
982         aa->aa_oa = oa;
983         aa->aa_stripe_oas = obdos;
984         GOTO (out, rc = 0);
985
986  out_obdos:
987         OBD_FREE (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
988  out:
989         if (lfh != NULL)
990                 lov_lfh_put(lfh);
991         class_export_put(export);
992         RETURN (rc);
993 }
994
995 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
996                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
997 {
998         struct obdo *tmp;
999         struct obd_export *export = class_conn2export(conn);
1000         struct lov_obd *lov;
1001         struct lov_oinfo *loi;
1002         struct lov_file_handles *lfh = NULL;
1003         int rc = 0, i, set = 0;
1004         ENTRY;
1005
1006         if (lsm_bad_magic(lsm))
1007                 GOTO(out, rc = -EINVAL);
1008
1009         if (!export || !export->exp_obd)
1010                 GOTO(out, rc = -ENODEV);
1011
1012         /* size changes should go through punch and not setattr */
1013         LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
1014
1015         /* for now, we only expect mtime updates here */
1016         LASSERT(!(oa->o_valid & ~(OBD_MD_FLID |OBD_MD_FLTYPE |OBD_MD_FLMTIME)));
1017
1018         tmp = obdo_alloc();
1019         if (!tmp)
1020                 GOTO(out, rc = -ENOMEM);
1021
1022         if (oa->o_valid & OBD_MD_FLHANDLE)
1023                 lfh = lov_handle2lfh(obdo_handle(oa));
1024
1025         lov = &export->exp_obd->u.lov;
1026         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1027                 int err;
1028
1029                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1030                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1031                         continue;
1032                 }
1033
1034                 obdo_cpy_md(tmp, oa, oa->o_valid);
1035
1036                 if (lfh)
1037                         memcpy(obdo_handle(tmp), lfh->lfh_och + i,
1038                                FD_OSTDATA_SIZE);
1039                 else
1040                         tmp->o_valid &= ~OBD_MD_FLHANDLE;
1041
1042                 tmp->o_id = loi->loi_id;
1043
1044                 err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp,
1045                                   NULL, NULL);
1046                 if (err) {
1047                         if (lov->tgts[loi->loi_ost_idx].active) {
1048                                 CERROR("error: setattr objid "LPX64" subobj "
1049                                        LPX64" on OST idx %d: rc = %d\n",
1050                                        oa->o_id, loi->loi_id, loi->loi_ost_idx,
1051                                        err);
1052                                 if (!rc)
1053                                         rc = err;
1054                         }
1055                 } else
1056                         set = 1;
1057         }
1058         obdo_free(tmp);
1059         if (!set && !rc)
1060                 rc = -EIO;
1061         if (lfh != NULL)
1062                 lov_lfh_put(lfh);
1063         GOTO(out, rc);
1064  out:
1065         class_export_put(export);
1066         return rc;
1067 }
1068
1069 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
1070                     struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1071                     struct obd_client_handle *och)
1072 {
1073         struct obdo *tmp; /* on the heap here, on the stack in lov_close? */
1074         struct obd_export *export = class_conn2export(conn);
1075         struct lov_obd *lov;
1076         struct lov_oinfo *loi;
1077         struct lov_file_handles *lfh = NULL;
1078         int set = 0, rc = 0, i;
1079         ENTRY;
1080         LASSERT(och != NULL);
1081
1082         if (lsm_bad_magic(lsm))
1083                 GOTO(out_exp, rc = -EINVAL);
1084
1085         if (!export || !export->exp_obd)
1086                 GOTO(out_exp, rc = -ENODEV);
1087
1088         tmp = obdo_alloc();
1089         if (!tmp)
1090                 GOTO(out_exp, rc = -ENOMEM);
1091
1092         lfh = lov_lfh_new();
1093         if (lfh == NULL)
1094                 GOTO(out_tmp, rc = -ENOMEM);
1095         OBD_ALLOC(lfh->lfh_och, lsm->lsm_stripe_count * sizeof *och);
1096         if (!lfh->lfh_och)
1097                 GOTO(out_lfh, rc = -ENOMEM);
1098
1099         lov = &export->exp_obd->u.lov;
1100         oa->o_size = 0;
1101         oa->o_blocks = 0;
1102         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1103                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1104                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1105                         continue;
1106                 }
1107
1108                 /* create data objects with "parent" OA */
1109                 memcpy(tmp, oa, sizeof(*tmp));
1110                 tmp->o_id = loi->loi_id;
1111
1112                 rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp,
1113                               NULL, NULL, lfh->lfh_och + i);
1114                 if (rc) {
1115                         if (!lov->tgts[loi->loi_ost_idx].active) {
1116                                 rc = 0;
1117                                 continue;
1118                         }
1119                         CERROR("error: open objid "LPX64" subobj "LPX64
1120                                " on OST idx %d: rc = %d\n",
1121                                oa->o_id, lsm->lsm_oinfo[i].loi_id,
1122                                loi->loi_ost_idx, rc);
1123                         goto out_handles;
1124                 }
1125
1126                 lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &set);
1127         }
1128
1129         lfh->lfh_count = lsm->lsm_stripe_count;
1130         och->och_fh.cookie = lfh->lfh_handle.h_cookie;
1131         obdo_handle(oa)->cookie = lfh->lfh_handle.h_cookie;
1132         oa->o_valid |= OBD_MD_FLHANDLE;
1133
1134         /* llfh refcount transfers to list */
1135         spin_lock(&export->exp_lov_data.led_lock);
1136         list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
1137         spin_unlock(&export->exp_lov_data.led_lock);
1138
1139         GOTO(out_tmp, rc);
1140  out_tmp:
1141         obdo_free(tmp);
1142  out_exp:
1143         class_export_put(export);
1144         return rc;
1145
1146  out_handles:
1147         for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) {
1148                 int err;
1149
1150                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1151                         continue;
1152
1153                 memcpy(tmp, oa, sizeof(*tmp));
1154                 tmp->o_id = loi->loi_id;
1155                 memcpy(obdo_handle(tmp), lfh->lfh_och + i, FD_OSTDATA_SIZE);
1156
1157                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp,
1158                                 NULL, NULL);
1159                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1160                         CERROR("error: closing objid "LPX64" subobj "LPX64
1161                                " on OST idx %d after open error: rc=%d\n",
1162                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
1163                 }
1164         }
1165
1166         OBD_FREE(lfh->lfh_och, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
1167  out_lfh:
1168         lov_lfh_destroy(lfh);
1169         lov_lfh_put(lfh);
1170         goto out_tmp;
1171 }
1172
1173 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
1174                      struct lov_stripe_md *lsm, struct obd_trans_info *oti)
1175 {
1176         struct obdo tmp;
1177         struct obd_export *export = class_conn2export(conn);
1178         struct lov_obd *lov;
1179         struct lov_oinfo *loi;
1180         struct lov_file_handles *lfh = NULL;
1181         int rc = 0, i;
1182         ENTRY;
1183
1184         if (lsm_bad_magic(lsm))
1185                 GOTO(out, rc = -EINVAL);
1186
1187         if (!export || !export->exp_obd)
1188                 GOTO(out, rc = -ENODEV);
1189
1190         if (oa->o_valid & OBD_MD_FLHANDLE)
1191                 lfh = lov_handle2lfh(obdo_handle(oa));
1192
1193         lov = &export->exp_obd->u.lov;
1194         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1195                 int err;
1196
1197                 /* create data objects with "parent" OA */
1198                 memcpy(&tmp, oa, sizeof(tmp));
1199                 tmp.o_id = loi->loi_id;
1200                 if (lfh)
1201                         memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
1202                                FD_OSTDATA_SIZE);
1203                 else
1204                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
1205
1206                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
1207                                 NULL, NULL);
1208                 if (err) {
1209                         if (lov->tgts[loi->loi_ost_idx].active) {
1210                                 CERROR("error: close objid "LPX64" subobj "LPX64
1211                                        " on OST idx %d: rc = %d\n", oa->o_id,
1212                                        loi->loi_id, loi->loi_ost_idx, err);
1213                         }
1214                         if (!rc)
1215                                 rc = err;
1216                 }
1217         }
1218         if (lfh != NULL) {
1219                 spin_lock(&export->exp_lov_data.led_lock);
1220                 list_del(&lfh->lfh_list);
1221                 spin_unlock(&export->exp_lov_data.led_lock);
1222                 lov_lfh_put(lfh); /* drop the reference owned by the list */
1223
1224                 OBD_FREE(lfh->lfh_och, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
1225                 lov_lfh_destroy(lfh);
1226                 lov_lfh_put(lfh); /* balance handle2lfh above */
1227         }
1228         GOTO(out, rc);
1229  out:
1230         class_export_put(export);
1231         return rc;
1232 }
1233
1234 #ifndef log2
1235 #define log2(n) ffz(~(n))
1236 #endif
1237
1238 /* we have an offset in file backed by an lov and want to find out where
1239  * that offset lands in our given stripe of the file.  for the easy
1240  * case where the offset is within the stripe, we just have to scale the
1241  * offset down to make it relative to the stripe instead of the lov.
1242  *
1243  * the harder case is what to do when the offset doesn't intersect the
1244  * stripe.  callers will want start offsets clamped ahead to the start
1245  * of the nearest stripe in the file.  end offsets similarly clamped to the
1246  * nearest ending byte of a stripe in the file:
1247  *
1248  * all this function does is move offsets to the nearest region of the
1249  * stripe, and it does its work "mod" the full length of all the stripes.
1250  * consider a file with 3 stripes:
1251  *
1252  *             S                                              E
1253  * ---------------------------------------------------------------------
1254  * |    0    |     1     |     2     |    0    |     1     |     2     |
1255  * ---------------------------------------------------------------------
1256  *
1257  * to find stripe 1's offsets for S and E, it divides by the full stripe
1258  * width and does its math in the context of a single set of stripes:
1259  *
1260  *             S         E
1261  * -----------------------------------
1262  * |    0    |     1     |     2     |
1263  * -----------------------------------
1264  *
1265  * it'll notice that E is outside stripe 1 and clamp it to the end of the
1266  * stripe, then multiply it back out by lov_off to give the real offsets in
1267  * the stripe:
1268  *
1269  *   S                   E
1270  * ---------------------------------------------------------------------
1271  * |    1    |     1     |     1     |    1    |     1     |     1     |
1272  * ---------------------------------------------------------------------
1273  *
1274  * it would have done similarly and pulled S forward to the start of a 1
1275  * stripe if, say, S had landed in a 0 stripe.
1276  *
1277  * this rounding isn't always correct.  consider an E lov offset that lands
1278  * on a 0 stripe, the "mod stripe width" math will pull it forward to the
1279  * start of a 1 stripe, when in fact it wanted to be rounded back to the end
1280  * of a previous 1 stripe.  this logic is handled by callers and this is why:
1281  *
1282  * this function returns < 0 when the offset was "before" the stripe and
1283  * was moved forward to the start of the stripe in question;  0 when it
1284  * falls in the stripe and no shifting was done; > 0 when the offset
1285  * was outside the stripe and was pulled back to its final byte. */
1286 static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
1287                              int stripeno, obd_off *obd_off)
1288 {
1289         unsigned long ssize  = lsm->lsm_stripe_size;
1290         unsigned long swidth = ssize * lsm->lsm_stripe_count;
1291         unsigned long stripe_off, this_stripe;
1292         int ret = 0;
1293
1294         if (lov_off == OBD_OBJECT_EOF) {
1295                 *obd_off = OBD_OBJECT_EOF;
1296                 return 0;
1297         }
1298
1299         /* do_div(a, b) returns a % b, and a = a / b */
1300         stripe_off = do_div(lov_off, swidth);
1301
1302         this_stripe = stripeno * ssize;
1303         if (stripe_off < this_stripe) {
1304                 stripe_off = 0;
1305                 ret = -1;
1306         } else {
1307                 stripe_off -= this_stripe;
1308
1309                 if (stripe_off >= ssize) {
1310                         stripe_off = ssize;
1311                         ret = 1;
1312                 }
1313         }
1314
1315         *obd_off = lov_off * ssize + stripe_off;
1316         return ret;
1317 }
1318
1319 /* given an extent in an lov and a stripe, calculate the extent of the stripe
1320  * that is contained within the lov extent.  this returns true if the given
1321  * stripe does intersect with the lov extent. */
1322 static int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno,
1323                                  obd_off start, obd_off end,
1324                                  obd_off *obd_start, obd_off *obd_end)
1325 {
1326         int start_side, end_side;
1327
1328         start_side = lov_stripe_offset(lsm, start, stripeno, obd_start);
1329         end_side = lov_stripe_offset(lsm, end, stripeno, obd_end);
1330
1331         CDEBUG(D_INODE, "["LPU64"->"LPU64"] -> [(%d) "LPU64"->"LPU64" (%d)]\n",
1332                start, end, start_side, *obd_start, *obd_end, end_side);
1333
1334         /* this stripe doesn't intersect the file extent when neither
1335          * start or the end intersected the stripe and obd_start and
1336          * obd_end got rounded up to the save value. */
1337         if (start_side != 0 && end_side != 0 && *obd_start == *obd_end)
1338                 return 0;
1339
1340         /* as mentioned in the lov_stripe_offset commentary, end
1341          * might have been shifted in the wrong direction.  This
1342          * happens when an end offset is before the stripe when viewed
1343          * through the "mod stripe size" math. we detect it being shifted
1344          * in the wrong direction and touch it up.
1345          * interestingly, this can't underflow since end must be > start
1346          * if we passed through the previous check.
1347          * (should we assert for that somewhere?) */
1348         if (end_side != 0)
1349                 (*obd_end)--;
1350
1351         return 1;
1352 }
1353
1354 /* compute which stripe number "lov_off" will be written into */
1355 static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
1356 {
1357         unsigned long ssize  = lsm->lsm_stripe_size;
1358         unsigned long swidth = ssize * lsm->lsm_stripe_count;
1359         unsigned long stripe_off;
1360
1361         stripe_off = do_div(lov_off, swidth);
1362
1363         return stripe_off / ssize;
1364 }
1365
1366 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1367  * we can send this 'punch' to just the authoritative node and the nodes
1368  * that the punch will affect. */
1369 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
1370                      struct lov_stripe_md *lsm,
1371                      obd_off start, obd_off end, struct obd_trans_info *oti)
1372 {
1373         struct obdo tmp;
1374         struct obd_export *export = class_conn2export(conn);
1375         struct lov_obd *lov;
1376         struct lov_oinfo *loi;
1377         struct lov_file_handles *lfh = NULL;
1378         int rc = 0, i;
1379         ENTRY;
1380
1381         if (lsm_bad_magic(lsm))
1382                 GOTO(out, rc = -EINVAL);
1383
1384         if (!export || !export->exp_obd)
1385                 GOTO(out, rc = -ENODEV);
1386
1387         if (oa->o_valid & OBD_MD_FLHANDLE)
1388                 lfh = lov_handle2lfh(obdo_handle(oa));
1389
1390         lov = &export->exp_obd->u.lov;
1391         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1392                 obd_off starti, endi;
1393                 int err;
1394
1395                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1396                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1397                         continue;
1398                 }
1399
1400                 if (!lov_stripe_intersects(lsm, i, start, end, &starti, &endi))
1401                         continue;
1402
1403                 /* create data objects with "parent" OA */
1404                 memcpy(&tmp, oa, sizeof(tmp));
1405                 tmp.o_id = loi->loi_id;
1406                 if (lfh)
1407                         memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
1408                                FD_OSTDATA_SIZE);
1409                 else
1410                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
1411
1412                 err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL,
1413                                 starti, endi, NULL);
1414                 if (err) {
1415                         if (lov->tgts[loi->loi_ost_idx].active) {
1416                                 CERROR("error: punch objid "LPX64" subobj "LPX64
1417                                        " on OST idx %d: rc = %d\n", oa->o_id,
1418                                        loi->loi_id, loi->loi_ost_idx, err);
1419                         }
1420                         if (!rc)
1421                                 rc = err;
1422                 }
1423         }
1424         if (lfh != NULL)
1425                 lov_lfh_put(lfh);
1426         GOTO(out, rc);
1427  out:
1428         class_export_put(export);
1429         return rc;
1430 }
1431
1432 static int lov_brw_check(struct lov_obd *lov, struct lov_stripe_md *lsm,
1433                          obd_count oa_bufs, struct brw_page *pga)
1434 {
1435         int i;
1436
1437         /* The caller just wants to know if there's a chance that this
1438          * I/O can succeed */
1439         for (i = 0; i < oa_bufs; i++) {
1440                 int stripe = lov_stripe_number(lsm, pga[i].off);
1441                 int ost = lsm->lsm_oinfo[stripe].loi_ost_idx;
1442                 struct ldlm_extent ext, subext;
1443                 ext.start = pga[i].off;
1444                 ext.start = pga[i].off + pga[i].count;
1445
1446                 if (!lov_stripe_intersects(lsm, i, ext.start, ext.end,
1447                                            &subext.start, &subext.end))
1448                         continue;
1449
1450                 if (lov->tgts[ost].active == 0) {
1451                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1452                         return -EIO;
1453                 }
1454         }
1455         return 0;
1456 }
1457
1458 static int lov_brw(int cmd, struct lustre_handle *conn,
1459                    struct lov_stripe_md *lsm, obd_count oa_bufs,
1460                    struct brw_page *pga, struct obd_trans_info *oti)
1461 {
1462         struct {
1463                 int bufct;
1464                 int index;
1465                 int subcount;
1466                 struct lov_stripe_md lsm;
1467                 int ost_idx;
1468         } *stripeinfo, *si, *si_last;
1469         struct obd_export *export = class_conn2export(conn);
1470         struct lov_obd *lov;
1471         struct brw_page *ioarr;
1472         struct lov_oinfo *loi;
1473         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
1474         ENTRY;
1475
1476         if (lsm_bad_magic(lsm))
1477                 GOTO(out_exp, rc = -EINVAL);
1478
1479         lov = &export->exp_obd->u.lov;
1480
1481         if (cmd == OBD_BRW_CHECK) {
1482                 rc = lov_brw_check(lov, lsm, oa_bufs, pga);
1483                 GOTO(out_exp, rc);
1484         }
1485
1486         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1487         if (!stripeinfo)
1488                 GOTO(out_exp, rc = -ENOMEM);
1489
1490         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1491         if (!where)
1492                 GOTO(out_sinfo, rc = -ENOMEM);
1493
1494         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1495         if (!ioarr)
1496                 GOTO(out_where, rc = -ENOMEM);
1497
1498         for (i = 0; i < oa_bufs; i++) {
1499                 where[i] = lov_stripe_number(lsm, pga[i].off);
1500                 stripeinfo[where[i]].bufct++;
1501         }
1502
1503         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1504              i < stripe_count; i++, loi++, si_last = si, si++) {
1505                 if (i > 0)
1506                         si->index = si_last->index + si_last->bufct;
1507                 si->lsm.lsm_object_id = loi->loi_id;
1508                 si->ost_idx = loi->loi_ost_idx;
1509         }
1510
1511         for (i = 0; i < oa_bufs; i++) {
1512                 int which = where[i];
1513                 int shift;
1514
1515                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1516                 LASSERT(shift < oa_bufs);
1517                 ioarr[shift] = pga[i];
1518                 lov_stripe_offset(lsm, pga[i].off, which, &ioarr[shift].off);
1519                 stripeinfo[which].subcount++;
1520         }
1521
1522         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1523                 int shift = si->index;
1524
1525                 if (lov->tgts[si->ost_idx].active == 0) {
1526                         CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx);
1527                         GOTO(out_ioarr, rc = -EIO);
1528                 }
1529
1530                 if (si->bufct) {
1531                         LASSERT(shift < oa_bufs);
1532                         rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
1533                                      &si->lsm, si->bufct, &ioarr[shift],
1534                                      oti);
1535                         if (rc)
1536                                 GOTO(out_ioarr, rc);
1537                 }
1538         }
1539         GOTO(out_ioarr, rc);
1540  out_ioarr:
1541         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1542  out_where:
1543         OBD_FREE(where, sizeof(*where) * oa_bufs);
1544  out_sinfo:
1545         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1546  out_exp:
1547         class_export_put(export);
1548         return rc;
1549 }
1550
1551 static int lov_brw_interpret (struct ptlrpc_request_set *set,
1552                               struct lov_brw_async_args *aa, int rc)
1553 {
1554         obd_count        oa_bufs = aa->aa_oa_bufs;
1555         struct brw_page *ioarr = aa->aa_ioarr;
1556         ENTRY;
1557
1558         OBD_FREE (ioarr, sizeof (*ioarr) * oa_bufs);
1559         RETURN (rc);
1560 }
1561
1562 static int lov_brw_async(int cmd, struct lustre_handle *conn,
1563                          struct lov_stripe_md *lsm, obd_count oa_bufs,
1564                          struct brw_page *pga, struct ptlrpc_request_set *set,
1565                          struct obd_trans_info *oti)
1566 {
1567         struct {
1568                 int bufct;
1569                 int index;
1570                 int subcount;
1571                 struct lov_stripe_md lsm;
1572                 int ost_idx;
1573         } *stripeinfo, *si, *si_last;
1574         struct obd_export *export = class_conn2export(conn);
1575         struct lov_obd *lov;
1576         struct brw_page *ioarr;
1577         struct lov_oinfo *loi;
1578         struct lov_brw_async_args *aa;
1579         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
1580         ENTRY;
1581
1582         if (lsm_bad_magic(lsm))
1583                 GOTO(out_exp, rc = -EINVAL);
1584
1585         lov = &export->exp_obd->u.lov;
1586
1587         if (cmd == OBD_BRW_CHECK) {
1588                 rc = lov_brw_check(lov, lsm, oa_bufs, pga);
1589                 GOTO(out_exp, rc);
1590         }
1591
1592         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1593         if (!stripeinfo)
1594                 GOTO(out_exp, rc = -ENOMEM);
1595
1596         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1597         if (!where)
1598                 GOTO(out_sinfo, rc = -ENOMEM);
1599
1600         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1601         if (!ioarr)
1602                 GOTO(out_where, rc = -ENOMEM);
1603
1604         for (i = 0; i < oa_bufs; i++) {
1605                 where[i] = lov_stripe_number(lsm, pga[i].off);
1606                 stripeinfo[where[i]].bufct++;
1607         }
1608
1609         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1610              i < stripe_count; i++, loi++, si_last = si, si++) {
1611                 if (i > 0)
1612                         si->index = si_last->index + si_last->bufct;
1613                 si->lsm.lsm_object_id = loi->loi_id;
1614                 si->ost_idx = loi->loi_ost_idx;
1615         }
1616
1617         for (i = 0; i < oa_bufs; i++) {
1618                 int which = where[i];
1619                 int shift;
1620
1621                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1622                 LASSERT(shift < oa_bufs);
1623                 ioarr[shift] = pga[i];
1624                 lov_stripe_offset(lsm, pga[i].off, which, &ioarr[shift].off);
1625                 stripeinfo[which].subcount++;
1626         }
1627
1628         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1629                 int shift = si->index;
1630
1631                 if (si->bufct == 0)
1632                         continue;
1633
1634                 if (lov->tgts[si->ost_idx].active == 0) {
1635                         CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx);
1636                         GOTO(out_ioarr, rc = -EIO);
1637                 }
1638
1639                 LASSERT(shift < oa_bufs);
1640                 rc = obd_brw_async(cmd, &lov->tgts[si->ost_idx].conn,
1641                                    &si->lsm, si->bufct, &ioarr[shift],
1642                                    set, oti);
1643                 if (rc)
1644                         GOTO(out_ioarr, rc);
1645         }
1646         LASSERT (rc == 0);
1647         LASSERT (set->set_interpret == NULL);
1648         set->set_interpret = lov_brw_interpret;
1649         LASSERT (sizeof (set->set_args) >= sizeof (struct lov_brw_async_args));
1650         aa = (struct lov_brw_async_args *)&set->set_args;
1651         aa->aa_oa_bufs = oa_bufs;
1652         aa->aa_ioarr = ioarr;
1653         GOTO(out_where, rc);
1654  out_ioarr:
1655         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1656  out_where:
1657         OBD_FREE(where, sizeof(*where) * oa_bufs);
1658  out_sinfo:
1659         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1660  out_exp:
1661         class_export_put(export);
1662         return rc;
1663 }
1664
1665 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1666                        struct lustre_handle *parent_lock,
1667                        __u32 type, void *cookie, int cookielen, __u32 mode,
1668                        int *flags, void *cb, void *data,
1669                        struct lustre_handle *lockh)
1670 {
1671         struct obd_export *export = class_conn2export(conn);
1672         struct lov_lock_handles *lov_lockh = NULL;
1673         struct lustre_handle *lov_lockhp;
1674         struct lov_obd *lov;
1675         struct lov_oinfo *loi;
1676         struct lov_stripe_md submd;
1677         ldlm_error_t rc;
1678         int i;
1679         ENTRY;
1680
1681         if (lsm_bad_magic(lsm))
1682                 GOTO(out_exp, rc = -EINVAL);
1683
1684         /* we should never be asked to replay a lock this way. */
1685         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1686
1687         if (!export || !export->exp_obd)
1688                 GOTO(out_exp, rc = -ENODEV);
1689
1690         if (lsm->lsm_stripe_count > 1) {
1691                 lov_lockh = lov_llh_new(lsm);
1692                 if (lov_lockh == NULL)
1693                         GOTO(out_exp, rc = -ENOMEM);
1694
1695                 lockh->cookie = lov_lockh->llh_handle.h_cookie;
1696                 lov_lockhp = lov_lockh->llh_handles;
1697         } else {
1698                 lov_lockhp = lockh;
1699         }
1700
1701         lov = &export->exp_obd->u.lov;
1702         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1703              i++, loi++, lov_lockhp++) {
1704                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
1705                 struct ldlm_extent sub_ext;
1706
1707                 *flags = 0;
1708                 if (!lov_stripe_intersects(lsm, i, extent->start, extent->end,
1709                                            &sub_ext.start, &sub_ext.end))
1710                         continue;
1711
1712                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1713                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1714                         continue;
1715                 }
1716
1717                 /* XXX LOV STACKING: submd should be from the subobj */
1718                 submd.lsm_object_id = loi->loi_id;
1719                 submd.lsm_stripe_count = 0;
1720                 /* XXX submd is not fully initialized here */
1721                 *flags = 0;
1722                 rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
1723                                   parent_lock, type, &sub_ext, sizeof(sub_ext),
1724                                   mode, flags, cb, data, lov_lockhp);
1725
1726                 // XXX add a lock debug statement here
1727                 if (rc != ELDLM_OK) {
1728                         memset(lov_lockhp, 0, sizeof(*lov_lockhp));
1729                         if (lov->tgts[loi->loi_ost_idx].active) {
1730                                 CERROR("error: enqueue objid "LPX64" subobj "
1731                                        LPX64" on OST idx %d: rc = %d\n",
1732                                        lsm->lsm_object_id, loi->loi_id,
1733                                        loi->loi_ost_idx, rc);
1734                                 goto out_locks;
1735                         }
1736                 }
1737         }
1738         if (lsm->lsm_stripe_count > 1)
1739                 lov_llh_put(lov_lockh);
1740         GOTO(out_exp, rc = ELDLM_OK);
1741
1742  out_locks:
1743         while (loi--, lov_lockhp--, i-- > 0) {
1744                 struct lov_stripe_md submd;
1745                 int err;
1746
1747                 if (lov_lockhp->cookie == 0)
1748                         continue;
1749
1750                 /* XXX LOV STACKING: submd should be from the subobj */
1751                 submd.lsm_object_id = loi->loi_id;
1752                 submd.lsm_stripe_count = 0;
1753                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1754                                  mode, lov_lockhp);
1755                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1756                         CERROR("error: cancelling objid "LPX64" on OST "
1757                                "idx %d after enqueue error: rc = %d\n",
1758                                loi->loi_id, loi->loi_ost_idx, err);
1759                 }
1760         }
1761
1762         if (lsm->lsm_stripe_count > 1) {
1763                 lov_llh_destroy(lov_lockh);
1764                 lov_llh_put(lov_lockh);
1765         }
1766  out_exp:
1767         class_export_put(export);
1768         RETURN(rc);
1769 }
1770
1771 static int lov_match(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1772                      __u32 type, void *cookie, int cookielen, __u32 mode,
1773                      int *flags, void *data, struct lustre_handle *lockh)
1774 {
1775         struct obd_export *export = class_conn2export(conn);
1776         struct lov_lock_handles *lov_lockh = NULL;
1777         struct lustre_handle *lov_lockhp;
1778         struct lov_obd *lov;
1779         struct lov_oinfo *loi;
1780         struct lov_stripe_md submd;
1781         ldlm_error_t rc = 0;
1782         int i;
1783         ENTRY;
1784
1785         if (lsm_bad_magic(lsm))
1786                 GOTO(out_exp, rc = -EINVAL);
1787
1788         if (!export || !export->exp_obd)
1789                 GOTO(out_exp, rc = -ENODEV);
1790
1791         if (lsm->lsm_stripe_count > 1) {
1792                 lov_lockh = lov_llh_new(lsm);
1793                 if (lov_lockh == NULL)
1794                         GOTO(out_exp, rc = -ENOMEM);
1795
1796                 lockh->cookie = lov_lockh->llh_handle.h_cookie;
1797                 lov_lockhp = lov_lockh->llh_handles;
1798         } else {
1799                 lov_lockhp = lockh;
1800         }
1801
1802         lov = &export->exp_obd->u.lov;
1803         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1804              i++, loi++, lov_lockhp++) {
1805                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
1806                 struct ldlm_extent sub_ext;
1807                 int lov_flags;
1808
1809                 if (!lov_stripe_intersects(lsm, i, extent->start, extent->end,
1810                                            &sub_ext.start, &sub_ext.end))
1811                         continue;
1812
1813                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1814                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1815                         rc = -EIO;
1816                         break;
1817                 }
1818
1819                 /* XXX LOV STACKING: submd should be from the subobj */
1820                 submd.lsm_object_id = loi->loi_id;
1821                 submd.lsm_stripe_count = 0;
1822                 lov_flags = *flags;
1823                 /* XXX submd is not fully initialized here */
1824                 rc = obd_match(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
1825                                type, &sub_ext, sizeof(sub_ext), mode,
1826                                &lov_flags, data, lov_lockhp);
1827                 if (rc != 1)
1828                         break;
1829         }
1830         if (rc == 1) {
1831                 if (lsm->lsm_stripe_count > 1)
1832                         lov_llh_put(lov_lockh);
1833                 GOTO(out_exp, 1);
1834         }
1835
1836         while (loi--, lov_lockhp--, i-- > 0) {
1837                 struct lov_stripe_md submd;
1838                 int err;
1839
1840                 if (lov_lockhp->cookie == 0)
1841                         continue;
1842
1843                 /* XXX LOV STACKING: submd should be from the subobj */
1844                 submd.lsm_object_id = loi->loi_id;
1845                 submd.lsm_stripe_count = 0;
1846                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1847                                  mode, lov_lockhp);
1848                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1849                         CERROR("error: cancelling objid "LPX64" on OST "
1850                                "idx %d after match failure: rc = %d\n",
1851                                loi->loi_id, loi->loi_ost_idx, err);
1852                 }
1853         }
1854
1855         if (lsm->lsm_stripe_count > 1) {
1856                 lov_llh_destroy(lov_lockh);
1857                 lov_llh_put(lov_lockh);
1858         }
1859  out_exp:
1860         class_export_put(export);
1861         RETURN(rc);
1862 }
1863
1864 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1865                       __u32 mode, struct lustre_handle *lockh)
1866 {
1867         struct obd_export *export = class_conn2export(conn);
1868         struct lov_lock_handles *lov_lockh = NULL;
1869         struct lustre_handle *lov_lockhp;
1870         struct lov_obd *lov;
1871         struct lov_oinfo *loi;
1872         int rc = 0, i;
1873         ENTRY;
1874
1875         if (lsm_bad_magic(lsm))
1876                 GOTO(out, rc = -EINVAL);
1877
1878         if (!export || !export->exp_obd)
1879                 GOTO(out, rc = -ENODEV);
1880
1881         LASSERT(lockh);
1882         if (lsm->lsm_stripe_count > 1) {
1883                 lov_lockh = lov_handle2llh(lockh);
1884                 if (!lov_lockh) {
1885                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
1886                         GOTO(out, rc = -EINVAL);
1887                 }
1888
1889                 lov_lockhp = lov_lockh->llh_handles;
1890         } else {
1891                 lov_lockhp = lockh;
1892         }
1893
1894         lov = &export->exp_obd->u.lov;
1895         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1896              i++, loi++, lov_lockhp++) {
1897                 struct lov_stripe_md submd;
1898                 int err;
1899
1900                 if (lov_lockhp->cookie == 0) {
1901                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
1902                                loi->loi_ost_idx, loi->loi_id);
1903                         continue;
1904                 }
1905
1906                 /* XXX LOV STACKING: submd should be from the subobj */
1907                 submd.lsm_object_id = loi->loi_id;
1908                 submd.lsm_stripe_count = 0;
1909                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1910                                  mode, lov_lockhp);
1911                 if (err) {
1912                         if (lov->tgts[loi->loi_ost_idx].active) {
1913                                 CERROR("error: cancel objid "LPX64" subobj "
1914                                        LPX64" on OST idx %d: rc = %d\n",
1915                                        lsm->lsm_object_id,
1916                                        loi->loi_id, loi->loi_ost_idx, err);
1917                                 if (!rc)
1918                                         rc = err;
1919                         }
1920                 }
1921         }
1922
1923         if (lsm->lsm_stripe_count > 1)
1924                 lov_llh_destroy(lov_lockh);
1925         if (lov_lockh != NULL)
1926                 lov_llh_put(lov_lockh);
1927         GOTO(out, rc);
1928  out:
1929         class_export_put(export);
1930         return rc;
1931 }
1932
1933 static int lov_cancel_unused(struct lustre_handle *conn,
1934                              struct lov_stripe_md *lsm, int flags, void *opaque)
1935 {
1936         struct obd_export *export = class_conn2export(conn);
1937         struct lov_obd *lov;
1938         struct lov_oinfo *loi;
1939         int rc = 0, i;
1940         ENTRY;
1941
1942         if (lsm_bad_magic(lsm))
1943                 GOTO(out, rc = -EINVAL);
1944
1945         if (!export || !export->exp_obd)
1946                 GOTO(out, rc = -ENODEV);
1947
1948         lov = &export->exp_obd->u.lov;
1949         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1950                 struct lov_stripe_md submd;
1951                 int err;
1952
1953                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1954                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1955
1956                 submd.lsm_object_id = loi->loi_id;
1957                 submd.lsm_stripe_count = 0;
1958                 err = obd_cancel_unused(&lov->tgts[loi->loi_ost_idx].conn,
1959                                         &submd, flags, opaque);
1960                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1961                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
1962                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1963                                loi->loi_id, loi->loi_ost_idx, err);
1964                         if (!rc)
1965                                 rc = err;
1966                 }
1967         }
1968         GOTO(out, rc);
1969  out:
1970         class_export_put(export);
1971         return rc;
1972 }
1973
1974 #define LOV_U64_MAX ((__u64)~0ULL)
1975 #define LOV_SUM_MAX(tot, add)                                           \
1976         do {                                                            \
1977                 if ((tot) + (add) < (tot))                              \
1978                         (tot) = LOV_U64_MAX;                            \
1979                 else                                                    \
1980                         (tot) += (add);                                 \
1981         } while(0)
1982
1983 static int lov_statfs(struct obd_export *export, struct obd_statfs *osfs)
1984 {
1985         struct obd_export *tgt_export;
1986         struct lov_obd *lov;
1987         struct obd_statfs lov_sfs;
1988         int set = 0;
1989         int rc = 0;
1990         int i;
1991         ENTRY;
1992
1993         if (!export || !export->exp_obd)
1994                 RETURN(-ENODEV);
1995
1996         lov = &export->exp_obd->u.lov;
1997
1998         /* We only get block data from the OBD */
1999         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2000                 int err;
2001
2002                 if (!lov->tgts[i].active) {
2003                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
2004                         continue;
2005                 }
2006
2007                 tgt_export = class_conn2export(&lov->tgts[i].conn);
2008                 if (!tgt_export) {
2009                         CDEBUG(D_HA, "lov idx %d NULL export\n", i);
2010                         continue;
2011                 }
2012
2013                 err = obd_statfs(tgt_export, &lov_sfs);
2014                 class_export_put(tgt_export);
2015                 if (err) {
2016                         if (lov->tgts[i].active) {
2017                                 CERROR("error: statfs OSC %s on OST idx %d: "
2018                                        "err = %d\n",
2019                                        lov->tgts[i].uuid.uuid, i, err);
2020                                 if (!rc)
2021                                         rc = err;
2022                         }
2023                         continue;
2024                 }
2025                 if (!set) {
2026                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
2027                         set = 1;
2028                 } else {
2029                         osfs->os_bfree += lov_sfs.os_bfree;
2030                         osfs->os_bavail += lov_sfs.os_bavail;
2031                         osfs->os_blocks += lov_sfs.os_blocks;
2032                         /* XXX not sure about this one - depends on policy.
2033                          *   - could be minimum if we always stripe on all OBDs
2034                          *     (but that would be wrong for any other policy,
2035                          *     if one of the OBDs has no more objects left)
2036                          *   - could be sum if we stripe whole objects
2037                          *   - could be average, just to give a nice number
2038                          *
2039                          * To give a "reasonable" (if not wholly accurate)
2040                          * number, we divide the total number of free objects
2041                          * by expected stripe count (watch out for overflow).
2042                          */
2043                         LOV_SUM_MAX(osfs->os_files, lov_sfs.os_files);
2044                         LOV_SUM_MAX(osfs->os_ffree, lov_sfs.os_ffree);
2045                 }
2046         }
2047         if (set) {
2048                 __u32 expected_stripes = lov->desc.ld_default_stripe_count ?
2049                                          lov->desc.ld_default_stripe_count :
2050                                          lov->desc.ld_active_tgt_count;
2051
2052                 if (osfs->os_files != LOV_U64_MAX)
2053                         do_div(osfs->os_files, expected_stripes);
2054                 if (osfs->os_ffree != LOV_U64_MAX)
2055                         do_div(osfs->os_ffree, expected_stripes);
2056         } else if (!rc)
2057                 rc = -EIO;
2058         RETURN(rc);
2059 }
2060
2061 static int lov_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
2062                          void *karg, void *uarg)
2063 {
2064         struct obd_device *obddev = class_conn2obd(conn);
2065         struct lov_obd *lov = &obddev->u.lov;
2066         int i, count = lov->desc.ld_tgt_count;
2067         struct obd_uuid *uuidp;
2068         int rc;
2069
2070         ENTRY;
2071
2072         switch (cmd) {
2073         case IOC_LOV_SET_OSC_ACTIVE: {
2074                 struct obd_ioctl_data *data = karg;
2075                 uuidp = (struct obd_uuid *)data->ioc_inlbuf1;
2076                 rc = lov_set_osc_active(lov, uuidp, data->ioc_offset);
2077                 break;
2078         }
2079         case OBD_IOC_LOV_GET_CONFIG: {
2080                 struct obd_ioctl_data *data = karg;
2081                 struct lov_tgt_desc *tgtdesc;
2082                 struct lov_desc *desc;
2083                 char *buf = NULL;
2084
2085                 buf = NULL;
2086                 len = 0;
2087                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2088                         RETURN(-EINVAL);
2089
2090                 data = (struct obd_ioctl_data *)buf;
2091
2092                 if (sizeof(*desc) > data->ioc_inllen1) {
2093                         OBD_FREE(buf, len);
2094                         RETURN(-EINVAL);
2095                 }
2096
2097                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
2098                         OBD_FREE(buf, len);
2099                         RETURN(-EINVAL);
2100                 }
2101
2102                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2103                 memcpy(desc, &(lov->desc), sizeof(*desc));
2104
2105                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
2106                 tgtdesc = lov->tgts;
2107                 for (i = 0; i < count; i++, uuidp++, tgtdesc++)
2108                         obd_str2uuid(uuidp, tgtdesc->uuid.uuid);
2109
2110                 rc = copy_to_user((void *)uarg, buf, len);
2111                 if (rc)
2112                         rc = -EFAULT;
2113                 obd_ioctl_freedata(buf, len);
2114                 break;
2115         }
2116         case LL_IOC_LOV_SETSTRIPE:
2117                 rc = lov_setstripe(conn, karg, uarg);
2118                 break;
2119         case LL_IOC_LOV_GETSTRIPE:
2120                 rc = lov_getstripe(conn, karg, uarg);
2121                 break;
2122         default: {
2123                 int set = 0;
2124                 if (count == 0)
2125                         RETURN(-ENOTTY);
2126                 rc = 0;
2127                 for (i = 0; i < count; i++) {
2128                         int err;
2129
2130                         err = obd_iocontrol(cmd, &lov->tgts[i].conn,
2131                                             len, karg, uarg);
2132                         if (err) {
2133                                 if (lov->tgts[i].active) {
2134                                         CERROR("error: iocontrol OSC %s on OST"
2135                                                "idx %d: err = %d\n",
2136                                                lov->tgts[i].uuid.uuid, i, err);
2137                                         if (!rc)
2138                                                 rc = err;
2139                                 }
2140                         } else
2141                                 set = 1;
2142                 }
2143                 if (!set && !rc)
2144                         rc = -EIO;
2145         }
2146         }
2147
2148         RETURN(rc);
2149 }
2150
2151 static int lov_get_info(struct lustre_handle *conn, __u32 keylen,
2152                         void *key, __u32 *vallen, void *val)
2153 {
2154         struct obd_device *obddev = class_conn2obd(conn);
2155         struct lov_obd *lov = &obddev->u.lov;
2156         int i;
2157         ENTRY;
2158
2159         if (!vallen || !val)
2160                 RETURN(-EFAULT);
2161
2162         if (keylen > strlen("lock_to_stripe") &&
2163             strcmp(key, "lock_to_stripe") == 0) {
2164                 struct {
2165                         char name[16];
2166                         struct ldlm_lock *lock;
2167                         struct lov_stripe_md *lsm;
2168                 } *data = key;
2169                 __u32 *stripe = val;
2170                 struct lov_oinfo *loi;
2171
2172                 if (*vallen < sizeof(*stripe))
2173                         RETURN(-EFAULT);
2174                 *vallen = sizeof(*stripe);
2175
2176                 /* XXX This is another one of those bits that will need to
2177                  * change if we ever actually support nested LOVs.  It uses
2178                  * the lock's connection to find out which stripe it is. */
2179                 for (i = 0, loi = data->lsm->lsm_oinfo;
2180                      i < data->lsm->lsm_stripe_count;
2181                      i++, loi++) {
2182                         if (lov->tgts[loi->loi_ost_idx].conn.cookie ==
2183                             data->lock->l_connh->cookie) {
2184                                 *stripe = i;
2185                                 RETURN(0);
2186                         }
2187                 }
2188                 RETURN(-ENXIO);
2189         }
2190
2191         RETURN(-EINVAL);
2192 }
2193
2194 static int lov_mark_page_dirty(struct lustre_handle *conn, 
2195                                struct lov_stripe_md *lsm, unsigned long offset)
2196 {
2197         struct lov_obd *lov = &class_conn2obd(conn)->u.lov;
2198         struct lov_oinfo *loi;
2199         struct lov_stripe_md *submd;
2200         int stripe, rc;
2201         obd_off off;
2202         ENTRY;
2203
2204         if (lsm_bad_magic(lsm))
2205                 RETURN(-EINVAL);
2206
2207         OBD_ALLOC(submd, lov_stripe_md_size(1));
2208         if (submd == NULL)
2209                 RETURN(-ENOMEM);
2210
2211         stripe = lov_stripe_number(lsm, (obd_off)offset << PAGE_CACHE_SHIFT);
2212         lov_stripe_offset(lsm, (obd_off)offset << PAGE_CACHE_SHIFT, stripe, 
2213                           &off);
2214         off >>= PAGE_CACHE_SHIFT;
2215
2216         loi = &lsm->lsm_oinfo[stripe];
2217         CDEBUG(D_INODE, "off %lu => off %lu on stripe %d\n", offset, 
2218                (unsigned long)off, stripe);
2219         submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
2220
2221         rc = obd_mark_page_dirty(&lov->tgts[loi->loi_ost_idx].conn, submd, off);
2222         OBD_FREE(submd, lov_stripe_md_size(1));
2223         RETURN(rc);
2224 }
2225
2226 static int lov_clear_dirty_pages(struct lustre_handle *conn, 
2227                                  struct lov_stripe_md *lsm, unsigned long start,
2228                                  unsigned long end, unsigned long *cleared)
2229
2230 {
2231         struct obd_export *export = class_conn2export(conn);
2232         __u64 start_off = (__u64)start << PAGE_CACHE_SHIFT;
2233         __u64 end_off = (__u64)end << PAGE_CACHE_SHIFT;
2234         __u64 obd_start, obd_end;
2235         struct lov_stripe_md *submd = NULL;
2236         struct lov_obd *lov;
2237         struct lov_oinfo *loi;
2238         int i, rc;
2239         unsigned long osc_cleared;
2240         ENTRY;
2241
2242         *cleared = 0;
2243
2244         if (lsm_bad_magic(lsm))
2245                 GOTO(out_exp, rc = -EINVAL);
2246
2247         if (!export || !export->exp_obd)
2248                 GOTO(out_exp, rc = -ENODEV);
2249
2250         OBD_ALLOC(submd, lov_stripe_md_size(1));
2251         if (submd == NULL)
2252                 GOTO(out_exp, rc = -ENOMEM);
2253
2254         lov = &export->exp_obd->u.lov;
2255         rc = 0;
2256         for (i = 0, loi = lsm->lsm_oinfo;
2257              i < lsm->lsm_stripe_count;
2258              i++, loi++) {
2259                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
2260                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
2261                         continue;
2262                 }
2263
2264                 if(!lov_stripe_intersects(lsm, i, start_off, end_off,
2265                                           &obd_start, &obd_end))
2266                         continue;
2267                 obd_start >>= PAGE_CACHE_SHIFT;
2268                 obd_end >>= PAGE_CACHE_SHIFT;
2269
2270                 CDEBUG(D_INODE, "offs [%lu,%lu] => offs [%lu,%lu] stripe %d\n", 
2271                        start, end, (unsigned long)obd_start, 
2272                        (unsigned long)obd_end, loi->loi_ost_idx);
2273                 submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
2274                 rc = obd_clear_dirty_pages(&lov->tgts[loi->loi_ost_idx].conn, 
2275                                            submd, obd_start, obd_end,
2276                                            &osc_cleared);
2277                 if (rc)
2278                         break;
2279                 *cleared += osc_cleared;
2280         }
2281 out_exp:
2282         if (submd)
2283                 OBD_FREE(submd, lov_stripe_md_size(1));
2284         class_export_put(export);
2285         RETURN(rc);
2286 }
2287
2288 static int lov_last_dirty_offset(struct lustre_handle *conn,
2289                                  struct lov_stripe_md *lsm,
2290                                  unsigned long *offset)
2291 {
2292         struct obd_export *export = class_conn2export(conn);
2293         struct lov_stripe_md *submd = NULL;
2294         struct lov_obd *lov;
2295         struct lov_oinfo *loi;
2296         unsigned long tmp, count, skip;
2297         int err, i, rc;
2298         ENTRY;
2299
2300         if (lsm_bad_magic(lsm))
2301                 GOTO(out_exp, rc = -EINVAL);
2302
2303         if (!export || !export->exp_obd)
2304                 GOTO(out_exp, rc = -ENODEV);
2305
2306         OBD_ALLOC(submd, lov_stripe_md_size(1));
2307         if (submd == NULL)
2308                 GOTO(out_exp, rc = -ENOMEM);
2309
2310         *offset = 0;
2311         lov = &export->exp_obd->u.lov;
2312         rc = -ENOENT;
2313         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; 
2314                                           i++, loi++) {
2315
2316                 count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
2317                 skip = (lsm->lsm_stripe_count - 1) * count;
2318
2319                 submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
2320
2321                 err = obd_last_dirty_offset(&lov->tgts[loi->loi_ost_idx].conn, 
2322                                             submd, &tmp);
2323                 if (err == -ENOENT)
2324                         continue;
2325                 if (err)
2326                         GOTO(out_exp, rc = err);
2327
2328                 rc = 0;
2329                 if (tmp != ~0) 
2330                         tmp += (tmp/count * skip) + (i * count);
2331                 if (tmp > *offset)
2332                         *offset = tmp;
2333         }
2334 out_exp:
2335         if (submd)
2336                 OBD_FREE(submd, lov_stripe_md_size(1));
2337         class_export_put(export);
2338         RETURN(rc);
2339 }
2340
2341 struct obd_ops lov_obd_ops = {
2342         o_owner:       THIS_MODULE,
2343         o_attach:      lov_attach,
2344         o_detach:      lov_detach,
2345         o_setup:       lov_setup,
2346         o_connect:     lov_connect,
2347         o_disconnect:  lov_disconnect,
2348         o_statfs:      lov_statfs,
2349         o_packmd:      lov_packmd,
2350         o_unpackmd:    lov_unpackmd,
2351         o_create:      lov_create,
2352         o_destroy:     lov_destroy,
2353         o_getattr:     lov_getattr,
2354         o_getattr_async: lov_getattr_async,
2355         o_setattr:     lov_setattr,
2356         o_open:        lov_open,
2357         o_close:       lov_close,
2358         o_brw:         lov_brw,
2359         o_brw_async:   lov_brw_async,
2360         o_punch:       lov_punch,
2361         o_enqueue:     lov_enqueue,
2362         o_match:       lov_match,
2363         o_cancel:      lov_cancel,
2364         o_cancel_unused: lov_cancel_unused,
2365         o_iocontrol:   lov_iocontrol,
2366         o_get_info:    lov_get_info,
2367         .o_mark_page_dirty =    lov_mark_page_dirty,
2368         .o_clear_dirty_pages =    lov_clear_dirty_pages,
2369         .o_last_dirty_offset =    lov_last_dirty_offset,
2370 };
2371
2372 int __init lov_init(void)
2373 {
2374         struct lprocfs_static_vars lvars;
2375         int rc;
2376
2377         printk(KERN_INFO "Lustre Logical Object Volume driver; "
2378                "info@clusterfs.com\n");
2379         lprocfs_init_vars(&lvars);
2380         rc = class_register_type(&lov_obd_ops, lvars.module_vars,
2381                                  OBD_LOV_DEVICENAME);
2382         RETURN(rc);
2383 }
2384
2385 static void __exit lov_exit(void)
2386 {
2387         class_unregister_type(OBD_LOV_DEVICENAME);
2388 }
2389
2390 #ifdef __KERNEL__
2391 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2392 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
2393 MODULE_LICENSE("GPL");
2394
2395 module_init(lov_init);
2396 module_exit(lov_exit);
2397 #endif