Whamcloud - gitweb
merge b_devel into HEAD (20030626 merge tag) for 0.7.1
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1  /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  * Author: Phil Schwan <phil@clusterfs.com>
6  *         Peter Braam <braam@clusterfs.com>
7  *         Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define EXPORT_SYMTAB
26 #define DEBUG_SUBSYSTEM S_LOV
27 #ifdef __KERNEL__
28 #include <linux/slab.h>
29 #include <linux/module.h>
30 #include <linux/init.h>
31 #include <linux/random.h>
32 #include <linux/slab.h>
33 #include <asm/div64.h>
34 #else
35 #include <liblustre.h>
36 #endif
37
38 #include <linux/obd_support.h>
39 #include <linux/lustre_lib.h>
40 #include <linux/lustre_net.h>
41 #include <linux/lustre_idl.h>
42 #include <linux/lustre_lite.h> /* for LL_IOC_LOV_[GS]ETSTRIPE */
43 #include <linux/lustre_mds.h>
44 #include <linux/obd_class.h>
45 #include <linux/obd_lov.h>
46 #include <linux/seq_file.h>
47 #include <linux/lprocfs_status.h>
48
49 struct lov_file_handles {
50         struct portals_handle lfh_handle;
51         atomic_t lfh_refcount;
52         struct list_head lfh_list;
53         int lfh_count;
54         struct obd_client_handle *lfh_och;
55 };
56
57 struct lov_lock_handles {
58         struct portals_handle llh_handle;
59         atomic_t llh_refcount;
60         int llh_stripe_count;
61         struct lustre_handle llh_handles[0];
62 };
63
64 /* lov_file_handles helpers */
65 static void lov_lfh_addref(void *lfhp)
66 {
67         struct lov_file_handles *lfh = lfhp;
68
69         atomic_inc(&lfh->lfh_refcount);
70         CDEBUG(D_INFO, "GETting lfh %p : new refcount %d\n", lfh,
71                atomic_read(&lfh->lfh_refcount));
72 }
73
74 static struct lov_file_handles *lov_lfh_new(void)
75 {
76         struct lov_file_handles *lfh;
77
78         OBD_ALLOC(lfh, sizeof *lfh);
79         if (lfh == NULL) {
80                 CERROR("out of memory\n");
81                 return NULL;
82         }
83
84         atomic_set(&lfh->lfh_refcount, 2);
85
86         INIT_LIST_HEAD(&lfh->lfh_handle.h_link);
87         class_handle_hash(&lfh->lfh_handle, lov_lfh_addref);
88
89         return lfh;
90 }
91
92 static struct lov_file_handles *lov_handle2lfh(struct lustre_handle *handle)
93 {
94         ENTRY;
95         LASSERT(handle != NULL);
96         RETURN(class_handle2object(handle->cookie));
97 }
98
99 static void lov_lfh_put(struct lov_file_handles *lfh)
100 {
101         CDEBUG(D_INFO, "PUTting lfh %p : new refcount %d\n", lfh,
102                atomic_read(&lfh->lfh_refcount) - 1);
103         LASSERT(atomic_read(&lfh->lfh_refcount) > 0 &&
104                 atomic_read(&lfh->lfh_refcount) < 0x5a5a);
105         if (atomic_dec_and_test(&lfh->lfh_refcount)) {
106                 LASSERT(list_empty(&lfh->lfh_handle.h_link));
107                 OBD_FREE(lfh, sizeof *lfh);
108         }
109 }
110
111 static void lov_lfh_destroy(struct lov_file_handles *lfh)
112 {
113         class_handle_unhash(&lfh->lfh_handle);
114         lov_lfh_put(lfh);
115 }
116
117 static void lov_llh_addref(void *llhp)
118 {
119         struct lov_lock_handles *llh = llhp;
120
121         atomic_inc(&llh->llh_refcount);
122         CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
123                atomic_read(&llh->llh_refcount));
124 }
125
126 static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
127 {
128         struct lov_lock_handles *llh;
129
130         OBD_ALLOC(llh, sizeof *llh +
131                   sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
132         if (llh == NULL) {
133                 CERROR("out of memory\n");
134                 return NULL;
135         }
136         atomic_set(&llh->llh_refcount, 2);
137         llh->llh_stripe_count = lsm->lsm_stripe_count;
138         INIT_LIST_HEAD(&llh->llh_handle.h_link);
139         class_handle_hash(&llh->llh_handle, lov_llh_addref);
140         return llh;
141 }
142
143 static struct lov_lock_handles *lov_handle2llh(struct lustre_handle *handle)
144 {
145         ENTRY;
146         LASSERT(handle != NULL);
147         RETURN(class_handle2object(handle->cookie));
148 }
149
150 static void lov_llh_put(struct lov_lock_handles *llh)
151 {
152         CDEBUG(D_INFO, "PUTting llh %p : new refcount %d\n", llh,
153                atomic_read(&llh->llh_refcount) - 1);
154         LASSERT(atomic_read(&llh->llh_refcount) > 0 &&
155                 atomic_read(&llh->llh_refcount) < 0x5a5a);
156         if (atomic_dec_and_test(&llh->llh_refcount)) {
157                 LASSERT(list_empty(&llh->llh_handle.h_link));
158                 OBD_FREE(llh, sizeof *llh +
159                          sizeof(*llh->llh_handles) * llh->llh_stripe_count);
160         }
161 }
162
163 static void lov_llh_destroy(struct lov_lock_handles *llh)
164 {
165         class_handle_unhash(&llh->llh_handle);
166         lov_llh_put(llh);
167 }
168
169 /* obd methods */
170 int lov_attach(struct obd_device *dev, obd_count len, void *data)
171 {
172         struct lprocfs_static_vars lvars;
173         struct proc_dir_entry *entry;
174         int rc;
175
176         lprocfs_init_vars(&lvars);
177         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
178         if (rc) 
179                 return rc;
180
181         entry = create_proc_entry("target_obd", 0444, dev->obd_proc_entry);
182         if (entry == NULL) 
183                 RETURN(-ENOMEM);
184         entry->proc_fops = &ll_proc_target_fops;
185         entry->data = dev;
186         
187         return rc;
188         
189 }
190
191 int lov_detach(struct obd_device *dev)
192 {
193         return lprocfs_obd_detach(dev);
194 }
195
196 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
197                        struct obd_uuid *cluuid)
198 {
199         struct ptlrpc_request *req = NULL;
200         struct lov_obd *lov = &obd->u.lov;
201         struct client_obd *mdc = &lov->mdcobd->u.cli;
202         struct lov_desc *desc = &lov->desc;
203         struct lov_desc *mdesc;
204         struct lov_tgt_desc *tgts;
205         struct obd_export *exp;
206         struct lustre_handle mdc_conn;
207         struct obd_uuid lov_mds_uuid = {"LOV_MDS_UUID"};
208         struct obd_uuid *uuids;
209         int rc, rc2, i;
210         ENTRY;
211
212         rc = class_connect(conn, obd, cluuid);
213         if (rc)
214                 RETURN(rc);
215
216         /* We don't want to actually do the underlying connections more than
217          * once, so keep track. */
218         lov->refcount++;
219         if (lov->refcount > 1)
220                 RETURN(0);
221
222         exp = class_conn2export(conn);
223         spin_lock_init(&exp->exp_lov_data.led_lock);
224         INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
225
226         /* retrieve LOV metadata from MDS */
227         rc = obd_connect(&mdc_conn, lov->mdcobd, &lov_mds_uuid);
228         if (rc) {
229                 CERROR("cannot connect to mdc: rc = %d\n", rc);
230                 GOTO(out_conn, rc);
231         }
232
233         rc = mdc_getlovinfo(obd, &mdc_conn, &req);
234         rc2 = obd_disconnect(&mdc_conn, 0);
235         if (rc) {
236                 CERROR("cannot get lov info %d\n", rc);
237                 GOTO(out_conn, rc);
238         }
239
240         if (rc2) {
241                 CERROR("error disconnecting from MDS %d\n", rc2);
242                 GOTO(out_req, rc = rc2);
243         }
244
245         /* mdc_getlovinfo() has checked and swabbed the reply.  It has also
246          * done some simple checks (e.g. #uuids consistent with desc, uuid
247          * array fits in LOV_MAX_UUID_BUFFER_SIZE and all uuids are
248          * terminated), but I still need to verify it makes overall
249          * sense */
250         mdesc = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*mdesc));
251         LASSERT (mdesc != NULL);
252         LASSERT_REPSWABBED (req, 0);
253
254         *desc = *mdesc;
255
256         if (!obd_uuid_equals(&obd->obd_uuid, &desc->ld_uuid)) {
257                 CERROR("LOV desc: uuid %s not on mds device (%s)\n",
258                        obd->obd_uuid.uuid, desc->ld_uuid.uuid);
259                 GOTO(out_req, rc = -EINVAL);
260         }
261
262         /* Because of 64-bit divide/mod operations only work with a 32-bit
263          * divisor in a 32-bit kernel, we cannot support a stripe width
264          * of 4GB or larger on 32-bit CPUs.
265          */
266         if ((desc->ld_default_stripe_count ?
267              desc->ld_default_stripe_count : desc->ld_tgt_count) *
268              desc->ld_default_stripe_size > ~0UL) {
269                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
270                        desc->ld_default_stripe_size,
271                        desc->ld_default_stripe_count ?
272                        desc->ld_default_stripe_count : desc->ld_tgt_count,~0UL);
273                 GOTO(out_req, rc = -EINVAL);
274         }
275
276         /* We know ld_tgt_count is reasonable (the array of UUIDS fits in
277          * the maximum buffer size, so we won't be making outrageous
278          * demands on memory here. */
279         lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
280         OBD_ALLOC(lov->tgts, lov->bufsize);
281         if (!lov->tgts) {
282                 CERROR("Out of memory\n");
283                 GOTO(out_req, rc = -ENOMEM);
284         }
285
286         uuids = lustre_msg_buf(req->rq_repmsg, 1,
287                                sizeof(*uuids) * desc->ld_tgt_count);
288         LASSERT (uuids != NULL);
289         LASSERT_REPSWABBED (req, 1);
290
291         for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) {
292                 struct obd_uuid *uuid = &tgts->uuid;
293                 struct obd_device *tgt_obd;
294                 struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
295
296                 /* NULL termination already checked */
297                 *uuid = uuids[i];
298
299                 tgt_obd = client_tgtuuid2obd(uuid);
300
301                 if (!tgt_obd) {
302                         CERROR("Target %s not attached\n", uuid->uuid);
303                         GOTO(out_disc, rc = -EINVAL);
304                 }
305
306                 if (!tgt_obd->obd_set_up) {
307                         CERROR("Target %s not set up\n", uuid->uuid);
308                         GOTO(out_disc, rc = -EINVAL);
309                 }
310
311                 rc = obd_connect(&tgts->conn, tgt_obd, &lov_osc_uuid);
312
313                 if (rc) {
314                         CERROR("Target %s connect error %d\n", uuid->uuid, rc);
315                         GOTO(out_disc, rc);
316                 }
317
318                 rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &tgts->conn,
319                                    sizeof(struct obd_device *), obd, NULL);
320                 if (rc) {
321                         CERROR("Target %s REGISTER_LOV error %d\n",
322                                uuid->uuid, rc);
323                         obd_disconnect(&tgts->conn, 0);
324                         GOTO(out_disc, rc);
325                 }
326
327                 desc->ld_active_tgt_count++;
328                 tgts->active = 1;
329         }
330
331         mdc->cl_max_mds_easize = obd_size_diskmd(conn, NULL);
332         ptlrpc_req_finished (req);
333         class_export_put(exp);
334         RETURN (0);
335
336  out_disc:
337         while (i-- > 0) {
338                 struct obd_uuid uuid;
339                 --tgts;
340                 --desc->ld_active_tgt_count;
341                 tgts->active = 0;
342                 /* save for CERROR below; (we know it's terminated) */
343                 uuid = tgts->uuid;
344                 rc2 = obd_disconnect(&tgts->conn, 0);
345                 if (rc2)
346                         CERROR("error: LOV target %s disconnect on OST idx %d: "
347                                "rc = %d\n", uuid.uuid, i, rc2);
348         }
349         OBD_FREE(lov->tgts, lov->bufsize);
350  out_req:
351         ptlrpc_req_finished (req);
352  out_conn:
353         class_export_put(exp);
354         class_disconnect(conn, 0);
355         RETURN (rc);
356 }
357
358 static int lov_disconnect(struct lustre_handle *conn, int failover)
359 {
360         struct obd_device *obd = class_conn2obd(conn);
361         struct lov_obd *lov = &obd->u.lov;
362         struct obd_export *exp;
363         struct list_head *p, *n;
364         int rc, i;
365         ENTRY;
366
367         if (!lov->tgts)
368                 goto out_local;
369
370         /* Only disconnect the underlying layers on the final disconnect. */
371         lov->refcount--;
372         if (lov->refcount != 0)
373                 goto out_local;
374
375         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
376                 if (obd->obd_no_recov) {
377                         /* Pass it on to our clients.
378                          * XXX This should be an argument to disconnect,
379                          * XXX not a back-door flag on the OBD.  Ah well.
380                          */
381                         struct obd_device *osc_obd =
382                                 class_conn2obd(&lov->tgts[i].conn);
383                         osc_obd->obd_no_recov = 1;
384                 }
385                 rc = obd_disconnect(&lov->tgts[i].conn, failover);
386                 if (rc) {
387                         if (lov->tgts[i].active) {
388                                 CERROR("Target %s disconnect error %d\n",
389                                        lov->tgts[i].uuid.uuid, rc);
390                         }
391                         rc = 0;
392                 }
393                 if (lov->tgts[i].active) {
394                         lov->desc.ld_active_tgt_count--;
395                         lov->tgts[i].active = 0;
396                 }
397         }
398         OBD_FREE(lov->tgts, lov->bufsize);
399         lov->bufsize = 0;
400         lov->tgts = NULL;
401
402         exp = class_conn2export(conn);
403         if (exp == NULL) {
404                 CERROR("export handle "LPU64" invalid!  If you can reproduce, "
405                        "please send a full debug log to phik\n", conn->cookie);
406                 RETURN(0);
407         }
408         spin_lock(&exp->exp_lov_data.led_lock);
409         list_for_each_safe(p, n, &exp->exp_lov_data.led_open_head) {
410                 /* XXX close these, instead of just discarding them? */
411                 struct lov_file_handles *lfh;
412                 lfh = list_entry(p, typeof(*lfh), lfh_list);
413                 CERROR("discarding open LOV handle %p:"LPX64"\n",
414                        lfh, lfh->lfh_handle.h_cookie);
415                 list_del(&lfh->lfh_list);
416                 OBD_FREE(lfh->lfh_och, lfh->lfh_count * FD_OSTDATA_SIZE);
417                 lov_lfh_destroy(lfh);
418                 lov_lfh_put(lfh);
419         }
420         spin_unlock(&exp->exp_lov_data.led_lock);
421         class_export_put(exp);
422
423  out_local:
424         rc = class_disconnect(conn, 0);
425         RETURN(rc);
426 }
427
428 /* Error codes:
429  *
430  *  -EINVAL  : UUID can't be found in the LOV's target list
431  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
432  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
433  */
434 static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
435                               int activate)
436 {
437         struct obd_device *obd;
438         struct lov_tgt_desc *tgt;
439         int i, rc = 0;
440         ENTRY;
441
442         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
443                lov, uuid->uuid, activate);
444
445         spin_lock(&lov->lov_lock);
446         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
447                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
448                        i, tgt->uuid.uuid, tgt->conn.cookie);
449                 if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof uuid->uuid) == 0)
450                         break;
451         }
452
453         if (i == lov->desc.ld_tgt_count)
454                 GOTO(out, rc = -EINVAL);
455
456         obd = class_conn2obd(&tgt->conn);
457         if (obd == NULL) {
458                 /* This can happen if OST failure races with node shutdown */
459                 GOTO(out, rc = -ENOTCONN);
460         }
461
462         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LOV idx %d\n",
463                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
464                obd->obd_type->typ_name, i);
465         LASSERT(strcmp(obd->obd_type->typ_name, "osc") == 0);
466
467         if (tgt->active == activate) {
468                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
469                        activate ? "" : "in");
470                 GOTO(out, rc);
471         }
472
473         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
474
475         tgt->active = activate;
476         if (activate) {
477                 /*
478                  * foreach(export)
479                  *     foreach(open_file)
480                  *         if (file_handle uses this_osc)
481                  *             if (has_no_filehandle)
482                  *                 open(file_handle, this_osc);
483                  */
484                 /* XXX reconnect? */
485                 lov->desc.ld_active_tgt_count++;
486         } else {
487                 /*
488                  * Should I invalidate filehandles that refer to this OSC, so
489                  * that I reopen them during reactivation?
490                  */
491                 /* XXX disconnect from OSC? */
492                 lov->desc.ld_active_tgt_count--;
493         }
494
495 #warning "FIXME: walk open files list for objects that need opening"
496         EXIT;
497  out:
498         spin_unlock(&lov->lov_lock);
499         return rc;
500 }
501
502 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
503 {
504         struct obd_ioctl_data *data = buf;
505         struct lov_obd *lov = &obd->u.lov;
506         struct obd_uuid uuid;
507         int rc = 0;
508         ENTRY;
509
510         if (data->ioc_inllen1 < 1) {
511                 CERROR("LOV setup requires an MDC UUID\n");
512                 RETURN(-EINVAL);
513         }
514
515         if (data->ioc_inllen1 > 37) {
516                 CERROR("mdc UUID must be 36 characters or less\n");
517                 RETURN(-EINVAL);
518         }
519
520         spin_lock_init(&lov->lov_lock);
521         obd_str2uuid(&uuid, data->ioc_inlbuf1);
522         lov->mdcobd = class_uuid2obd(&uuid);
523         if (!lov->mdcobd) {
524                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid.uuid,
525                        data->ioc_inlbuf1);
526                 rc = -EINVAL;
527         }
528         RETURN(rc);
529 }
530
531 /* compute object size given "stripeno" and the ost size */
532 static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
533                                 int stripeno)
534 {
535         unsigned long ssize  = lsm->lsm_stripe_size;
536         unsigned long swidth = ssize * lsm->lsm_stripe_count;
537         unsigned long stripe_size;
538         obd_size lov_size;
539
540         if (ost_size == 0)
541                 return 0;
542
543         /* do_div(a, b) returns a % b, and a = a / b */
544         stripe_size = do_div(ost_size, ssize);
545
546         if (stripe_size)
547                 lov_size = ost_size * swidth + stripeno * ssize + stripe_size;
548         else
549                 lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize;
550
551         return lov_size;
552 }
553
554 static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
555                             struct lov_stripe_md *lsm, int stripeno, int *set)
556 {
557         if (*set) {
558                 if (valid & OBD_MD_FLSIZE) {
559                         /* this handles sparse files properly */
560                         obd_size lov_size;
561
562                         lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
563                         if (lov_size > tgt->o_size)
564                                 tgt->o_size = lov_size;
565                 }
566                 if (valid & OBD_MD_FLBLOCKS)
567                         tgt->o_blocks += src->o_blocks;
568                 if (valid & OBD_MD_FLBLKSZ)
569                         tgt->o_blksize += src->o_blksize;
570                 if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
571                         tgt->o_ctime = src->o_ctime;
572                 if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
573                         tgt->o_mtime = src->o_mtime;
574         } else {
575                 obdo_cpy_md(tgt, src, valid);
576                 if (valid & OBD_MD_FLSIZE)
577                         tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
578                 *set = 1;
579         }
580 }
581
582 /* the LOV expects oa->o_id to be set to the LOV object id */
583 static int lov_create(struct lustre_handle *conn, struct obdo *oa,
584                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
585 {
586         struct obd_export *export = class_conn2export(conn);
587         struct lov_obd *lov;
588         struct lov_stripe_md *lsm;
589         struct lov_oinfo *loi;
590         struct obdo *tmp;
591         unsigned ost_count, ost_idx;
592         int set = 0, obj_alloc = 0;
593         int rc = 0, i;
594         ENTRY;
595
596         LASSERT(ea);
597
598         if (!export)
599                 GOTO(out_exp, rc = -EINVAL);
600
601         lov = &export->exp_obd->u.lov;
602
603         if (!lov->desc.ld_active_tgt_count)
604                 GOTO(out_exp, rc = -EIO);
605
606         tmp = obdo_alloc();
607         if (!tmp)
608                 GOTO(out_exp, rc = -ENOMEM);
609
610         lsm = *ea;
611
612         if (!lsm) {
613                 rc = obd_alloc_memmd(conn, &lsm);
614                 if (rc < 0)
615                         GOTO(out_tmp, rc);
616
617                 rc = 0;
618                 lsm->lsm_magic = LOV_MAGIC;
619         }
620
621         ost_count = lov->desc.ld_tgt_count;
622
623         LASSERT(oa->o_valid & OBD_MD_FLID);
624         lsm->lsm_object_id = oa->o_id;
625         if (!lsm->lsm_stripe_size)
626                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
627
628         if (!*ea || lsm->lsm_stripe_offset >= ost_count) {
629                 get_random_bytes(&ost_idx, 2);
630                 ost_idx %= ost_count;
631         } else
632                 ost_idx = lsm->lsm_stripe_offset;
633
634         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
635                lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
636
637         loi = lsm->lsm_oinfo;
638         for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
639                 struct lov_stripe_md obj_md;
640                 struct lov_stripe_md *obj_mdp = &obj_md;
641                 int err;
642
643                 if (lov->tgts[ost_idx].active == 0) {
644                         CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
645                         continue;
646                 }
647
648                 /* create data objects with "parent" OA */
649                 memcpy(tmp, oa, sizeof(*tmp));
650                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
651                 err = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp, oti);
652                 if (err) {
653                         if (lov->tgts[ost_idx].active) {
654                                 CERROR("error creating objid "LPX64" sub-object"
655                                        " on OST idx %d/%d: rc = %d\n", oa->o_id,
656                                        ost_idx, lsm->lsm_stripe_count, err);
657                                 if (err > 0) {
658                                         CERROR("obd_create returned invalid "
659                                                "err %d\n", err);
660                                         err = -EIO;
661                                 }
662                         }
663                         if (!rc)
664                                 rc = err;
665                         continue;
666                 }
667                 loi->loi_id = tmp->o_id;
668                 loi->loi_ost_idx = ost_idx;
669                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
670                        lsm->lsm_object_id, loi->loi_id, ost_idx);
671
672                 if (!set)
673                         lsm->lsm_stripe_offset = ost_idx;
674                 lov_merge_attrs(oa, tmp, OBD_MD_FLBLKSZ, lsm, obj_alloc, &set);
675
676                 ++obj_alloc;
677                 ++loi;
678
679                 /* If we have allocated enough objects, we are OK */
680                 if (obj_alloc == lsm->lsm_stripe_count)
681                         GOTO(out_done, rc = 0);
682         }
683
684         if (*ea != NULL) {
685                 CERROR("can't lstripe objid "LPX64": have %u want %u, rc %d\n",
686                        lsm->lsm_object_id, obj_alloc, lsm->lsm_stripe_count,rc);
687                 if (rc == 0)
688                         rc = -EFBIG;
689                 GOTO(out_cleanup, rc);
690         } else {
691                 struct lov_stripe_md *lsm_new;
692                 /* XXX LOV STACKING call into osc for sizes */
693                 unsigned size = lov_stripe_md_size(obj_alloc);
694
695                 CERROR("reallocating LSM for objid "LPX64": old %u new %u\n",
696                        lsm->lsm_object_id, obj_alloc, lsm->lsm_stripe_count);
697                 OBD_ALLOC(lsm_new, size);
698                 if (!lsm_new)
699                         GOTO(out_cleanup, rc = -ENOMEM);
700                 memcpy(lsm_new, lsm, size);
701                 lsm_new->lsm_stripe_count = obj_alloc;
702
703                 /* XXX LOV STACKING call into osc for sizes */
704                 OBD_FREE(lsm, lov_stripe_md_size(lsm->lsm_stripe_count));
705                 lsm = lsm_new;
706
707                 rc = 0;
708         }
709  out_done:
710         *ea = lsm;
711
712  out_tmp:
713         obdo_free(tmp);
714  out_exp:
715         class_export_put(export);
716         return rc;
717
718  out_cleanup:
719         while (obj_alloc-- > 0) {
720                 int err;
721
722                 --loi;
723                 /* destroy already created objects here */
724                 memcpy(tmp, oa, sizeof(*tmp));
725                 tmp->o_id = loi->loi_id;
726                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL,
727                                   NULL);
728                 if (err)
729                         CERROR("Failed to uncreate objid "LPX64" subobj "
730                                LPX64" on OST idx %d: rc = %d\n",
731                                oa->o_id, loi->loi_id, loi->loi_ost_idx,
732                                err);
733         }
734         if (*ea == NULL)
735                 obd_free_memmd(conn, &lsm);
736         goto out_tmp;
737 }
738
739 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
740                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
741 {
742         struct obdo tmp;
743         struct obd_export *export = class_conn2export(conn);
744         struct lov_obd *lov;
745         struct lov_oinfo *loi;
746         struct lov_file_handles *lfh = NULL;
747         int rc = 0, i;
748         ENTRY;
749
750         if (!lsm) {
751                 CERROR("LOV requires striping ea for destruction\n");
752                 GOTO(out, rc = -EINVAL);
753         }
754
755         if (lsm->lsm_magic != LOV_MAGIC) {
756                 CERROR("LOV striping magic bad %#x != %#x\n",
757                        lsm->lsm_magic, LOV_MAGIC);
758                 GOTO(out, rc = -EINVAL);
759         }
760
761         if (!export || !export->exp_obd)
762                 GOTO(out, rc = -ENODEV);
763
764         if (oa->o_valid & OBD_MD_FLHANDLE)
765                 lfh = lov_handle2lfh(obdo_handle(oa));
766
767         lov = &export->exp_obd->u.lov;
768         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
769                 int err;
770                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
771                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
772                         /* Orphan clean up will (someday) fix this up. */
773                         continue;
774                 }
775
776                 memcpy(&tmp, oa, sizeof(tmp));
777                 tmp.o_id = loi->loi_id;
778                 if (lfh)
779                         memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
780                                FD_OSTDATA_SIZE);
781                 else
782                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
783                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
784                                   NULL, NULL);
785                 if (err && lov->tgts[loi->loi_ost_idx].active) {
786                         CERROR("error: destroying objid "LPX64" subobj "
787                                LPX64" on OST idx %d: rc = %d\n",
788                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
789                         if (!rc)
790                                 rc = err;
791                 }
792         }
793         if (lfh != NULL)
794                 lov_lfh_put(lfh);
795         EXIT;
796  out:
797         class_export_put(export);
798         return rc;
799 }
800
801 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
802                        struct lov_stripe_md *lsm)
803 {
804         struct obdo tmp;
805         struct obd_export *export = class_conn2export(conn);
806         struct lov_obd *lov;
807         struct lov_oinfo *loi;
808         struct lov_file_handles *lfh = NULL;
809         int i, rc = 0, set = 0;
810         ENTRY;
811
812         if (!lsm) {
813                 CERROR("LOV requires striping ea\n");
814                 GOTO(out, rc = -EINVAL);
815         }
816
817         if (lsm->lsm_magic != LOV_MAGIC) {
818                 CERROR("LOV striping magic bad %#x != %#x\n",
819                        lsm->lsm_magic, LOV_MAGIC);
820                 GOTO(out, rc = -EINVAL);
821         }
822
823         if (!export || !export->exp_obd)
824                 GOTO(out, rc = -ENODEV);
825
826         lov = &export->exp_obd->u.lov;
827
828         if (oa->o_valid & OBD_MD_FLHANDLE)
829                 lfh = lov_handle2lfh(obdo_handle(oa));
830
831         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
832                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
833         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
834                 int err;
835
836                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
837                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
838                         continue;
839                 }
840
841                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
842                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
843                 /* create data objects with "parent" OA */
844                 memcpy(&tmp, oa, sizeof(tmp));
845                 tmp.o_id = loi->loi_id;
846                 if (lfh)
847                         memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
848                                FD_OSTDATA_SIZE);
849                 else
850                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
851
852                 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
853                 if (err) {
854                         if (lov->tgts[loi->loi_ost_idx].active) {
855                                 CERROR("error: getattr objid "LPX64" subobj "
856                                        LPX64" on OST idx %d: rc = %d\n",
857                                        oa->o_id, loi->loi_id, loi->loi_ost_idx,
858                                        err);
859                                 GOTO(out, rc = err);
860                         }
861                 } else {
862                         lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &set);
863                 }
864         }
865         if (!set)
866                 rc = -EIO;
867         GOTO(out, rc);
868  out:
869         if (lfh != NULL)
870                 lov_lfh_put(lfh);
871         class_export_put(export);
872         return rc;
873 }
874
875 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
876                                  struct lov_getattr_async_args *aa, int rc)
877 {
878         struct lov_stripe_md *lsm = aa->aa_lsm;
879         struct obdo          *oa = aa->aa_oa;
880         struct obdo          *obdos = aa->aa_stripe_oas;
881         struct lov_oinfo     *loi;
882         int                   i;
883         int                   set = 0;
884         ENTRY;
885
886         if (rc == 0) {
887                 /* NB all stripe requests succeeded to get here */
888
889                 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
890                      i++,loi++) {
891                         if (obdos[i].o_valid == 0)      /* inactive stripe */
892                                 continue;
893
894                         lov_merge_attrs(oa, &obdos[i], obdos[i].o_valid, lsm,
895                                         i, &set);
896                 }
897
898                 if (!set) {
899                         CERROR ("No stripes had valid attrs\n");
900                         rc = -EIO;
901                 }
902         }
903
904         OBD_FREE (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
905         RETURN (rc);
906 }
907
908 static int lov_getattr_async (struct lustre_handle *conn, struct obdo *oa,
909                               struct lov_stripe_md *lsm,
910                               struct ptlrpc_request_set *rqset)
911 {
912         struct obdo *obdos;
913         struct obd_export *export = class_conn2export(conn);
914         struct lov_obd *lov;
915         struct lov_oinfo *loi;
916         struct lov_file_handles *lfh = NULL;
917         struct lov_getattr_async_args *aa;
918         int i;
919         int set = 0;
920         int rc = 0;
921         ENTRY;
922
923         if (!lsm) {
924                 CERROR("LOV requires striping ea\n");
925                 GOTO(out, rc = -EINVAL);
926         }
927
928         if (lsm->lsm_magic != LOV_MAGIC) {
929                 CERROR("LOV striping magic bad %#x != %#x\n",
930                        lsm->lsm_magic, LOV_MAGIC);
931                 GOTO(out, rc = -EINVAL);
932         }
933
934         if (!export || !export->exp_obd)
935                 GOTO(out, rc = -ENODEV);
936
937         lov = &export->exp_obd->u.lov;
938
939         OBD_ALLOC (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
940         if (obdos == NULL)
941                 GOTO (out, rc = -ENOMEM);
942
943         if (oa->o_valid & OBD_MD_FLHANDLE)
944                 lfh = lov_handle2lfh(obdo_handle(oa));
945
946         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
947                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
948         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
949                 int err;
950
951                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
952                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
953                         /* leaves obdos[i].obd_valid unset */
954                         continue;
955                 }
956
957                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
958                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
959                 /* create data objects with "parent" OA */
960                 memcpy(&obdos[i], oa, sizeof(obdos[i]));
961                 obdos[i].o_id = loi->loi_id;
962                 if (lfh)
963                         memcpy(obdo_handle(&obdos[i]), lfh->lfh_och + i,
964                                FD_OSTDATA_SIZE);
965                 else
966                         obdos[i].o_valid &= ~OBD_MD_FLHANDLE;
967
968                 err = obd_getattr_async (&lov->tgts[loi->loi_ost_idx].conn,
969                                          &obdos[i], NULL, rqset);
970                 if (err) {
971                         CERROR("error: getattr objid "LPX64" subobj "
972                                LPX64" on OST idx %d: rc = %d\n",
973                                oa->o_id, loi->loi_id, loi->loi_ost_idx,
974                                err);
975                         GOTO(out_obdos, rc = err);
976                 }
977                 set = 1;
978         }
979         if (!set)
980                 GOTO (out_obdos, rc = -EIO);
981
982         LASSERT (rqset->set_interpret == NULL);
983         rqset->set_interpret = lov_getattr_interpret;
984         LASSERT (sizeof (rqset->set_args) >= sizeof (*aa));
985         aa = (struct lov_getattr_async_args *)&rqset->set_args;
986         aa->aa_lsm = lsm;
987         aa->aa_oa = oa;
988         aa->aa_stripe_oas = obdos;
989         GOTO (out, rc = 0);
990
991  out_obdos:
992         OBD_FREE (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
993  out:
994         if (lfh != NULL)
995                 lov_lfh_put(lfh);
996         class_export_put(export);
997         RETURN (rc);
998 }
999
1000 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
1001                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
1002 {
1003         struct obdo *tmp;
1004         struct obd_export *export = class_conn2export(conn);
1005         struct lov_obd *lov;
1006         struct lov_oinfo *loi;
1007         struct lov_file_handles *lfh = NULL;
1008         int rc = 0, i, set = 0;
1009         ENTRY;
1010
1011         if (!lsm) {
1012                 CERROR("LOV requires striping ea\n");
1013                 GOTO(out, rc = -EINVAL);
1014         }
1015
1016         if (lsm->lsm_magic != LOV_MAGIC) {
1017                 CERROR("LOV striping magic bad %#x != %#x\n",
1018                        lsm->lsm_magic, LOV_MAGIC);
1019                 GOTO(out, rc = -EINVAL);
1020         }
1021
1022         if (!export || !export->exp_obd)
1023                 GOTO(out, rc = -ENODEV);
1024
1025         /* size changes should go through punch and not setattr */
1026         LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
1027
1028         /* for now, we only expect mtime updates here */
1029         LASSERT(!(oa->o_valid & ~(OBD_MD_FLID |OBD_MD_FLTYPE |OBD_MD_FLMTIME)));
1030
1031         tmp = obdo_alloc();
1032         if (!tmp)
1033                 GOTO(out, rc = -ENOMEM);
1034
1035         if (oa->o_valid & OBD_MD_FLHANDLE)
1036                 lfh = lov_handle2lfh(obdo_handle(oa));
1037
1038         lov = &export->exp_obd->u.lov;
1039         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1040                 int err;
1041
1042                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1043                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1044                         continue;
1045                 }
1046
1047                 obdo_cpy_md(tmp, oa, oa->o_valid);
1048
1049                 if (lfh)
1050                         memcpy(obdo_handle(tmp), lfh->lfh_och + i,
1051                                FD_OSTDATA_SIZE);
1052                 else
1053                         tmp->o_valid &= ~OBD_MD_FLHANDLE;
1054
1055                 tmp->o_id = loi->loi_id;
1056
1057                 err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp,
1058                                   NULL, NULL);
1059                 if (err) {
1060                         if (lov->tgts[loi->loi_ost_idx].active) {
1061                                 CERROR("error: setattr objid "LPX64" subobj "
1062                                        LPX64" on OST idx %d: rc = %d\n",
1063                                        oa->o_id, loi->loi_id, loi->loi_ost_idx,
1064                                        err);
1065                                 if (!rc)
1066                                         rc = err;
1067                         }
1068                 } else
1069                         set = 1;
1070         }
1071         obdo_free(tmp);
1072         if (!set && !rc)
1073                 rc = -EIO;
1074         if (lfh != NULL)
1075                 lov_lfh_put(lfh);
1076         GOTO(out, rc);
1077  out:
1078         class_export_put(export);
1079         return rc;
1080 }
1081
1082 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
1083                     struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1084                     struct obd_client_handle *och)
1085 {
1086         struct obdo *tmp; /* on the heap here, on the stack in lov_close? */
1087         struct obd_export *export = class_conn2export(conn);
1088         struct lov_obd *lov;
1089         struct lov_oinfo *loi;
1090         struct lov_file_handles *lfh = NULL;
1091         int set = 0, rc = 0, i;
1092         ENTRY;
1093         LASSERT(och != NULL);
1094
1095         if (!lsm) {
1096                 CERROR("LOV requires striping ea for opening\n");
1097                 GOTO(out_exp, rc = -EINVAL);
1098         }
1099
1100         if (lsm->lsm_magic != LOV_MAGIC) {
1101                 CERROR("LOV striping magic bad %#x != %#x\n",
1102                        lsm->lsm_magic, LOV_MAGIC);
1103                 GOTO(out_exp, rc = -EINVAL);
1104         }
1105
1106         if (!export || !export->exp_obd)
1107                 GOTO(out_exp, rc = -ENODEV);
1108
1109         tmp = obdo_alloc();
1110         if (!tmp)
1111                 GOTO(out_exp, rc = -ENOMEM);
1112
1113         lfh = lov_lfh_new();
1114         if (lfh == NULL)
1115                 GOTO(out_tmp, rc = -ENOMEM);
1116         OBD_ALLOC(lfh->lfh_och, lsm->lsm_stripe_count * sizeof *och);
1117         if (!lfh->lfh_och)
1118                 GOTO(out_lfh, rc = -ENOMEM);
1119
1120         lov = &export->exp_obd->u.lov;
1121         oa->o_size = 0;
1122         oa->o_blocks = 0;
1123         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1124                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1125                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1126                         continue;
1127                 }
1128
1129                 /* create data objects with "parent" OA */
1130                 memcpy(tmp, oa, sizeof(*tmp));
1131                 tmp->o_id = loi->loi_id;
1132
1133                 rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp,
1134                               NULL, NULL, lfh->lfh_och + i);
1135                 if (rc) {
1136                         if (!lov->tgts[loi->loi_ost_idx].active) {
1137                                 rc = 0;
1138                                 continue;
1139                         }
1140                         CERROR("error: open objid "LPX64" subobj "LPX64
1141                                " on OST idx %d: rc = %d\n",
1142                                oa->o_id, lsm->lsm_oinfo[i].loi_id,
1143                                loi->loi_ost_idx, rc);
1144                         goto out_handles;
1145                 }
1146
1147                 lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &set);
1148         }
1149
1150         lfh->lfh_count = lsm->lsm_stripe_count;
1151         och->och_fh.cookie = lfh->lfh_handle.h_cookie;
1152         obdo_handle(oa)->cookie = lfh->lfh_handle.h_cookie;
1153         oa->o_valid |= OBD_MD_FLHANDLE;
1154
1155         /* llfh refcount transfers to list */
1156         spin_lock(&export->exp_lov_data.led_lock);
1157         list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
1158         spin_unlock(&export->exp_lov_data.led_lock);
1159
1160         GOTO(out_tmp, rc);
1161  out_tmp:
1162         obdo_free(tmp);
1163  out_exp:
1164         class_export_put(export);
1165         return rc;
1166
1167  out_handles:
1168         for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) {
1169                 int err;
1170
1171                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1172                         continue;
1173
1174                 memcpy(tmp, oa, sizeof(*tmp));
1175                 tmp->o_id = loi->loi_id;
1176                 memcpy(obdo_handle(tmp), lfh->lfh_och + i, FD_OSTDATA_SIZE);
1177
1178                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp,
1179                                 NULL, NULL);
1180                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1181                         CERROR("error: closing objid "LPX64" subobj "LPX64
1182                                " on OST idx %d after open error: rc=%d\n",
1183                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
1184                 }
1185         }
1186
1187         OBD_FREE(lfh->lfh_och, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
1188  out_lfh:
1189         lov_lfh_destroy(lfh);
1190         lov_lfh_put(lfh);
1191         goto out_tmp;
1192 }
1193
1194 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
1195                      struct lov_stripe_md *lsm, struct obd_trans_info *oti)
1196 {
1197         struct obdo tmp;
1198         struct obd_export *export = class_conn2export(conn);
1199         struct lov_obd *lov;
1200         struct lov_oinfo *loi;
1201         struct lov_file_handles *lfh = NULL;
1202         int rc = 0, i;
1203         ENTRY;
1204
1205         if (!lsm) {
1206                 CERROR("LOV requires striping ea\n");
1207                 GOTO(out, rc = -EINVAL);
1208         }
1209
1210         if (lsm->lsm_magic != LOV_MAGIC) {
1211                 CERROR("LOV striping magic bad %#x != %#x\n",
1212                        lsm->lsm_magic, LOV_MAGIC);
1213                 GOTO(out, rc = -EINVAL);
1214         }
1215
1216         if (!export || !export->exp_obd)
1217                 GOTO(out, rc = -ENODEV);
1218
1219         if (oa->o_valid & OBD_MD_FLHANDLE)
1220                 lfh = lov_handle2lfh(obdo_handle(oa));
1221
1222         lov = &export->exp_obd->u.lov;
1223         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1224                 int err;
1225
1226                 /* create data objects with "parent" OA */
1227                 memcpy(&tmp, oa, sizeof(tmp));
1228                 tmp.o_id = loi->loi_id;
1229                 if (lfh)
1230                         memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
1231                                FD_OSTDATA_SIZE);
1232                 else
1233                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
1234
1235                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
1236                                 NULL, NULL);
1237                 if (err) {
1238                         if (lov->tgts[loi->loi_ost_idx].active) {
1239                                 CERROR("error: close objid "LPX64" subobj "LPX64
1240                                        " on OST idx %d: rc = %d\n", oa->o_id,
1241                                        loi->loi_id, loi->loi_ost_idx, err);
1242                         }
1243                         if (!rc)
1244                                 rc = err;
1245                 }
1246         }
1247         if (lfh != NULL) {
1248                 spin_lock(&export->exp_lov_data.led_lock);
1249                 list_del(&lfh->lfh_list);
1250                 spin_unlock(&export->exp_lov_data.led_lock);
1251                 lov_lfh_put(lfh); /* drop the reference owned by the list */
1252
1253                 OBD_FREE(lfh->lfh_och, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
1254                 lov_lfh_destroy(lfh);
1255                 lov_lfh_put(lfh); /* balance handle2lfh above */
1256         }
1257         GOTO(out, rc);
1258  out:
1259         class_export_put(export);
1260         return rc;
1261 }
1262
1263 #ifndef log2
1264 #define log2(n) ffz(~(n))
1265 #endif
1266
1267 /* we have an offset in file backed by an lov and want to find out where
1268  * that offset lands in our given stripe of the file.  for the easy
1269  * case where the offset is within the stripe, we just have to scale the
1270  * offset down to make it relative to the stripe instead of the lov.
1271  *
1272  * the harder case is what to do when the offset doesn't intersect the
1273  * stripe.  callers will want start offsets clamped ahead to the start
1274  * of the nearest stripe in the file.  end offsets similarly clamped to the
1275  * nearest ending byte of a stripe in the file:
1276  *
1277  * all this function does is move offsets to the nearest region of the
1278  * stripe, and it does its work "mod" the full length of all the stripes.
1279  * consider a file with 3 stripes:
1280  *
1281  *             S                                              E
1282  * ---------------------------------------------------------------------
1283  * |    0    |     1     |     2     |    0    |     1     |     2     |
1284  * ---------------------------------------------------------------------
1285  *
1286  * to find stripe 1's offsets for S and E, it divides by the full stripe
1287  * width and does its math in the context of a single set of stripes:
1288  *
1289  *             S         E
1290  * -----------------------------------
1291  * |    0    |     1     |     2     |
1292  * -----------------------------------
1293  *
1294  * it'll notice that E is outside stripe 1 and clamp it to the end of the
1295  * stripe, then multiply it back out by lov_off to give the real offsets in
1296  * the stripe:
1297  *
1298  *   S                   E
1299  * ---------------------------------------------------------------------
1300  * |    1    |     1     |     1     |    1    |     1     |     1     |
1301  * ---------------------------------------------------------------------
1302  *
1303  * it would have done similarly and pulled S forward to the start of a 1
1304  * stripe if, say, S had landed in a 0 stripe.
1305  *
1306  * this rounding isn't always correct.  consider an E lov offset that lands
1307  * on a 0 stripe, the "mod stripe width" math will pull it forward to the
1308  * start of a 1 stripe, when in fact it wanted to be rounded back to the end
1309  * of a previous 1 stripe.  this logic is handled by callers and this is why:
1310  *
1311  * this function returns < 0 when the offset was "before" the stripe and
1312  * was moved forward to the start of the stripe in question;  0 when it
1313  * falls in the stripe and no shifting was done; > 0 when the offset
1314  * was outside the stripe and was pulled back to its final byte. */
1315 static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
1316                              int stripeno, obd_off *obd_off)
1317 {
1318         unsigned long ssize  = lsm->lsm_stripe_size;
1319         unsigned long swidth = ssize * lsm->lsm_stripe_count;
1320         unsigned long stripe_off, this_stripe;
1321         int ret = 0;
1322
1323         if (lov_off == OBD_OBJECT_EOF) {
1324                 *obd_off = OBD_OBJECT_EOF;
1325                 return 0;
1326         }
1327
1328         /* do_div(a, b) returns a % b, and a = a / b */
1329         stripe_off = do_div(lov_off, swidth);
1330
1331         this_stripe = stripeno * ssize;
1332         if (stripe_off < this_stripe) {
1333                 stripe_off = 0;
1334                 ret = -1;
1335         } else {
1336                 stripe_off -= this_stripe;
1337
1338                 if (stripe_off >= ssize) {
1339                         stripe_off = ssize;
1340                         ret = 1;
1341                 }
1342         }
1343
1344         *obd_off = lov_off * ssize + stripe_off;
1345         return ret;
1346 }
1347
1348 /* given an extent in an lov and a stripe, calculate the extent of the stripe
1349  * that is contained within the lov extent.  this returns true if the given
1350  * stripe does intersect with the lov extent. */
1351 static int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno,
1352                                  obd_off start, obd_off end,
1353                                  obd_off *obd_start, obd_off *obd_end)
1354 {
1355         int start_side, end_side;
1356
1357         start_side = lov_stripe_offset(lsm, start, stripeno, obd_start);
1358         end_side = lov_stripe_offset(lsm, end, stripeno, obd_end);
1359
1360         CDEBUG(D_INODE, "["LPU64"->"LPU64"] -> [(%d) "LPU64"->"LPU64" (%d)]\n",
1361                start, end, start_side, *obd_start, *obd_end, end_side);
1362
1363         /* this stripe doesn't intersect the file extent when neither
1364          * start or the end intersected the stripe and obd_start and
1365          * obd_end got rounded up to the save value. */
1366         if (start_side != 0 && end_side != 0 && *obd_start == *obd_end)
1367                 return 0;
1368
1369         /* as mentioned in the lov_stripe_offset commentary, end
1370          * might have been shifted in the wrong direction.  This
1371          * happens when an end offset is before the stripe when viewed
1372          * through the "mod stripe size" math. we detect it being shifted
1373          * in the wrong direction and touch it up.
1374          * interestingly, this can't underflow since end must be > start
1375          * if we passed through the previous check.
1376          * (should we assert for that somewhere?) */
1377         if (end_side != 0)
1378                 (*obd_end)--;
1379
1380         return 1;
1381 }
1382
1383 /* compute which stripe number "lov_off" will be written into */
1384 static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
1385 {
1386         unsigned long ssize  = lsm->lsm_stripe_size;
1387         unsigned long swidth = ssize * lsm->lsm_stripe_count;
1388         unsigned long stripe_off;
1389
1390         stripe_off = do_div(lov_off, swidth);
1391
1392         return stripe_off / ssize;
1393 }
1394
1395 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1396  * we can send this 'punch' to just the authoritative node and the nodes
1397  * that the punch will affect. */
1398 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
1399                      struct lov_stripe_md *lsm,
1400                      obd_off start, obd_off end, struct obd_trans_info *oti)
1401 {
1402         struct obdo tmp;
1403         struct obd_export *export = class_conn2export(conn);
1404         struct lov_obd *lov;
1405         struct lov_oinfo *loi;
1406         struct lov_file_handles *lfh = NULL;
1407         int rc = 0, i;
1408         ENTRY;
1409
1410         if (!lsm) {
1411                 CERROR("LOV requires striping ea\n");
1412                 GOTO(out, rc = -EINVAL);
1413         }
1414
1415         if (lsm->lsm_magic != LOV_MAGIC) {
1416                 CERROR("LOV striping magic bad %#x != %#x\n",
1417                        lsm->lsm_magic, LOV_MAGIC);
1418                 GOTO(out, rc = -EINVAL);
1419         }
1420
1421         if (!export || !export->exp_obd)
1422                 GOTO(out, rc = -ENODEV);
1423
1424         if (oa->o_valid & OBD_MD_FLHANDLE)
1425                 lfh = lov_handle2lfh(obdo_handle(oa));
1426
1427         lov = &export->exp_obd->u.lov;
1428         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1429                 obd_off starti, endi;
1430                 int err;
1431
1432                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1433                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1434                         continue;
1435                 }
1436
1437                 if (!lov_stripe_intersects(lsm, i, start, end, &starti, &endi))
1438                         continue;
1439
1440                 /* create data objects with "parent" OA */
1441                 memcpy(&tmp, oa, sizeof(tmp));
1442                 tmp.o_id = loi->loi_id;
1443                 if (lfh)
1444                         memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
1445                                FD_OSTDATA_SIZE);
1446                 else
1447                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
1448
1449                 err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL,
1450                                 starti, endi, NULL);
1451                 if (err) {
1452                         if (lov->tgts[loi->loi_ost_idx].active) {
1453                                 CERROR("error: punch objid "LPX64" subobj "LPX64
1454                                        " on OST idx %d: rc = %d\n", oa->o_id,
1455                                        loi->loi_id, loi->loi_ost_idx, err);
1456                         }
1457                         if (!rc)
1458                                 rc = err;
1459                 }
1460         }
1461         if (lfh != NULL)
1462                 lov_lfh_put(lfh);
1463         GOTO(out, rc);
1464  out:
1465         class_export_put(export);
1466         return rc;
1467 }
1468
1469 static int lov_brw_check(struct lov_obd *lov, struct lov_stripe_md *lsm,
1470                          obd_count oa_bufs, struct brw_page *pga)
1471 {
1472         int i;
1473
1474         /* The caller just wants to know if there's a chance that this
1475          * I/O can succeed */
1476         for (i = 0; i < oa_bufs; i++) {
1477                 int stripe = lov_stripe_number(lsm, pga[i].off);
1478                 int ost = lsm->lsm_oinfo[stripe].loi_ost_idx;
1479                 struct ldlm_extent ext, subext;
1480                 ext.start = pga[i].off;
1481                 ext.start = pga[i].off + pga[i].count;
1482
1483                 if (!lov_stripe_intersects(lsm, i, ext.start, ext.end,
1484                                            &subext.start, &subext.end))
1485                         continue;
1486
1487                 if (lov->tgts[ost].active == 0) {
1488                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1489                         return -EIO;
1490                 }
1491         }
1492         return 0;
1493 }
1494
1495 static int lov_brw(int cmd, struct lustre_handle *conn,
1496                    struct lov_stripe_md *lsm, obd_count oa_bufs,
1497                    struct brw_page *pga, struct obd_trans_info *oti)
1498 {
1499         struct {
1500                 int bufct;
1501                 int index;
1502                 int subcount;
1503                 struct lov_stripe_md lsm;
1504                 int ost_idx;
1505         } *stripeinfo, *si, *si_last;
1506         struct obd_export *export = class_conn2export(conn);
1507         struct lov_obd *lov;
1508         struct brw_page *ioarr;
1509         struct lov_oinfo *loi;
1510         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
1511         ENTRY;
1512
1513         if (!lsm) {
1514                 CERROR("LOV requires striping ea\n");
1515                 GOTO(out_exp, rc = -EINVAL);
1516         }
1517
1518         if (lsm->lsm_magic != LOV_MAGIC) {
1519                 CERROR("LOV striping magic bad %#x != %#x\n",
1520                        lsm->lsm_magic, LOV_MAGIC);
1521                 GOTO(out_exp, rc = -EINVAL);
1522         }
1523
1524         lov = &export->exp_obd->u.lov;
1525
1526         if (cmd == OBD_BRW_CHECK) {
1527                 rc = lov_brw_check(lov, lsm, oa_bufs, pga);
1528                 GOTO(out_exp, rc);
1529         }
1530
1531         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1532         if (!stripeinfo)
1533                 GOTO(out_exp, rc = -ENOMEM);
1534
1535         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1536         if (!where)
1537                 GOTO(out_sinfo, rc = -ENOMEM);
1538
1539         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1540         if (!ioarr)
1541                 GOTO(out_where, rc = -ENOMEM);
1542
1543         for (i = 0; i < oa_bufs; i++) {
1544                 where[i] = lov_stripe_number(lsm, pga[i].off);
1545                 stripeinfo[where[i]].bufct++;
1546         }
1547
1548         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1549              i < stripe_count; i++, loi++, si_last = si, si++) {
1550                 if (i > 0)
1551                         si->index = si_last->index + si_last->bufct;
1552                 si->lsm.lsm_object_id = loi->loi_id;
1553                 si->ost_idx = loi->loi_ost_idx;
1554         }
1555
1556         for (i = 0; i < oa_bufs; i++) {
1557                 int which = where[i];
1558                 int shift;
1559
1560                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1561                 LASSERT(shift < oa_bufs);
1562                 ioarr[shift] = pga[i];
1563                 lov_stripe_offset(lsm, pga[i].off, which, &ioarr[shift].off);
1564                 stripeinfo[which].subcount++;
1565         }
1566
1567         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1568                 int shift = si->index;
1569
1570                 if (lov->tgts[si->ost_idx].active == 0) {
1571                         CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx);
1572                         GOTO(out_ioarr, rc = -EIO);
1573                 }
1574
1575                 if (si->bufct) {
1576                         LASSERT(shift < oa_bufs);
1577                         rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
1578                                      &si->lsm, si->bufct, &ioarr[shift],
1579                                      oti);
1580                         if (rc)
1581                                 GOTO(out_ioarr, rc);
1582                 }
1583         }
1584         GOTO(out_ioarr, rc);
1585  out_ioarr:
1586         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1587  out_where:
1588         OBD_FREE(where, sizeof(*where) * oa_bufs);
1589  out_sinfo:
1590         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1591  out_exp:
1592         class_export_put(export);
1593         return rc;
1594 }
1595
1596 static int lov_brw_interpret (struct ptlrpc_request_set *set,
1597                               struct lov_brw_async_args *aa, int rc)
1598 {
1599         obd_count        oa_bufs = aa->aa_oa_bufs;
1600         struct brw_page *ioarr = aa->aa_ioarr;
1601         ENTRY;
1602
1603         OBD_FREE (ioarr, sizeof (*ioarr) * oa_bufs);
1604         RETURN (rc);
1605 }
1606
1607 static int lov_brw_async(int cmd, struct lustre_handle *conn,
1608                          struct lov_stripe_md *lsm, obd_count oa_bufs,
1609                          struct brw_page *pga, struct ptlrpc_request_set *set,
1610                          struct obd_trans_info *oti)
1611 {
1612         struct {
1613                 int bufct;
1614                 int index;
1615                 int subcount;
1616                 struct lov_stripe_md lsm;
1617                 int ost_idx;
1618         } *stripeinfo, *si, *si_last;
1619         struct obd_export *export = class_conn2export(conn);
1620         struct lov_obd *lov;
1621         struct brw_page *ioarr;
1622         struct lov_oinfo *loi;
1623         struct lov_brw_async_args *aa;
1624         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
1625         ENTRY;
1626
1627         if (!lsm) {
1628                 CERROR("LOV requires striping ea\n");
1629                 GOTO(out_exp, rc = -EINVAL);
1630         }
1631
1632         if (lsm->lsm_magic != LOV_MAGIC) {
1633                 CERROR("LOV striping magic bad %#x != %#x\n",
1634                        lsm->lsm_magic, LOV_MAGIC);
1635                 GOTO(out_exp, rc = -EINVAL);
1636         }
1637
1638         lov = &export->exp_obd->u.lov;
1639
1640         if (cmd == OBD_BRW_CHECK) {
1641                 rc = lov_brw_check(lov, lsm, oa_bufs, pga);
1642                 GOTO(out_exp, rc);
1643         }
1644
1645         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1646         if (!stripeinfo)
1647                 GOTO(out_exp, rc = -ENOMEM);
1648
1649         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1650         if (!where)
1651                 GOTO(out_sinfo, rc = -ENOMEM);
1652
1653         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1654         if (!ioarr)
1655                 GOTO(out_where, rc = -ENOMEM);
1656
1657         for (i = 0; i < oa_bufs; i++) {
1658                 where[i] = lov_stripe_number(lsm, pga[i].off);
1659                 stripeinfo[where[i]].bufct++;
1660         }
1661
1662         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1663              i < stripe_count; i++, loi++, si_last = si, si++) {
1664                 if (i > 0)
1665                         si->index = si_last->index + si_last->bufct;
1666                 si->lsm.lsm_object_id = loi->loi_id;
1667                 si->ost_idx = loi->loi_ost_idx;
1668         }
1669
1670         for (i = 0; i < oa_bufs; i++) {
1671                 int which = where[i];
1672                 int shift;
1673
1674                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1675                 LASSERT(shift < oa_bufs);
1676                 ioarr[shift] = pga[i];
1677                 lov_stripe_offset(lsm, pga[i].off, which, &ioarr[shift].off);
1678                 stripeinfo[which].subcount++;
1679         }
1680
1681         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1682                 int shift = si->index;
1683
1684                 if (si->bufct == 0)
1685                         continue;
1686
1687                 if (lov->tgts[si->ost_idx].active == 0) {
1688                         CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx);
1689                         GOTO(out_ioarr, rc = -EIO);
1690                 }
1691
1692                 LASSERT(shift < oa_bufs);
1693                 rc = obd_brw_async(cmd, &lov->tgts[si->ost_idx].conn,
1694                                    &si->lsm, si->bufct, &ioarr[shift],
1695                                    set, oti);
1696                 if (rc)
1697                         GOTO(out_ioarr, rc);
1698         }
1699         LASSERT (rc == 0);
1700         LASSERT (set->set_interpret == NULL);
1701         set->set_interpret = lov_brw_interpret;
1702         LASSERT (sizeof (set->set_args) >= sizeof (struct lov_brw_async_args));
1703         aa = (struct lov_brw_async_args *)&set->set_args;
1704         aa->aa_oa_bufs = oa_bufs;
1705         aa->aa_ioarr = ioarr;
1706         GOTO(out_where, rc);
1707  out_ioarr:
1708         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1709  out_where:
1710         OBD_FREE(where, sizeof(*where) * oa_bufs);
1711  out_sinfo:
1712         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1713  out_exp:
1714         class_export_put(export);
1715         return rc;
1716 }
1717
1718 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1719                        struct lustre_handle *parent_lock,
1720                        __u32 type, void *cookie, int cookielen, __u32 mode,
1721                        int *flags, void *cb, void *data,
1722                        struct lustre_handle *lockh)
1723 {
1724         struct obd_export *export = class_conn2export(conn);
1725         struct lov_lock_handles *lov_lockh = NULL;
1726         struct lustre_handle *lov_lockhp;
1727         struct lov_obd *lov;
1728         struct lov_oinfo *loi;
1729         struct lov_stripe_md submd;
1730         ldlm_error_t rc;
1731         int i;
1732         ENTRY;
1733
1734         if (!lsm) {
1735                 CERROR("LOV requires striping ea\n");
1736                 GOTO(out_exp, rc = -EINVAL);
1737         }
1738
1739         if (lsm->lsm_magic != LOV_MAGIC) {
1740                 CERROR("LOV striping magic bad %#x != %#x\n",
1741                        lsm->lsm_magic, LOV_MAGIC);
1742                 GOTO(out_exp, rc = -EINVAL);
1743         }
1744
1745         /* we should never be asked to replay a lock this way. */
1746         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1747
1748         if (!export || !export->exp_obd)
1749                 GOTO(out_exp, rc = -ENODEV);
1750
1751         if (lsm->lsm_stripe_count > 1) {
1752                 lov_lockh = lov_llh_new(lsm);
1753                 if (lov_lockh == NULL)
1754                         GOTO(out_exp, rc = -ENOMEM);
1755
1756                 lockh->cookie = lov_lockh->llh_handle.h_cookie;
1757                 lov_lockhp = lov_lockh->llh_handles;
1758         } else {
1759                 lov_lockhp = lockh;
1760         }
1761
1762         lov = &export->exp_obd->u.lov;
1763         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1764              i++, loi++, lov_lockhp++) {
1765                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
1766                 struct ldlm_extent sub_ext;
1767
1768                 *flags = 0;
1769                 if (!lov_stripe_intersects(lsm, i, extent->start, extent->end,
1770                                            &sub_ext.start, &sub_ext.end))
1771                         continue;
1772
1773                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1774                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1775                         continue;
1776                 }
1777
1778                 /* XXX LOV STACKING: submd should be from the subobj */
1779                 submd.lsm_object_id = loi->loi_id;
1780                 submd.lsm_stripe_count = 0;
1781                 /* XXX submd is not fully initialized here */
1782                 *flags = 0;
1783                 rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
1784                                   parent_lock, type, &sub_ext, sizeof(sub_ext),
1785                                   mode, flags, cb, data, lov_lockhp);
1786
1787                 // XXX add a lock debug statement here
1788                 if (rc != ELDLM_OK) {
1789                         memset(lov_lockhp, 0, sizeof(*lov_lockhp));
1790                         if (lov->tgts[loi->loi_ost_idx].active) {
1791                                 CERROR("error: enqueue objid "LPX64" subobj "
1792                                        LPX64" on OST idx %d: rc = %d\n",
1793                                        lsm->lsm_object_id, loi->loi_id,
1794                                        loi->loi_ost_idx, rc);
1795                                 goto out_locks;
1796                         }
1797                 }
1798         }
1799         if (lsm->lsm_stripe_count > 1)
1800                 lov_llh_put(lov_lockh);
1801         GOTO(out_exp, rc = ELDLM_OK);
1802
1803  out_locks:
1804         while (loi--, lov_lockhp--, i-- > 0) {
1805                 struct lov_stripe_md submd;
1806                 int err;
1807
1808                 if (lov_lockhp->cookie == 0)
1809                         continue;
1810
1811                 /* XXX LOV STACKING: submd should be from the subobj */
1812                 submd.lsm_object_id = loi->loi_id;
1813                 submd.lsm_stripe_count = 0;
1814                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1815                                  mode, lov_lockhp);
1816                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1817                         CERROR("error: cancelling objid "LPX64" on OST "
1818                                "idx %d after enqueue error: rc = %d\n",
1819                                loi->loi_id, loi->loi_ost_idx, err);
1820                 }
1821         }
1822
1823         if (lsm->lsm_stripe_count > 1) {
1824                 lov_llh_destroy(lov_lockh);
1825                 lov_llh_put(lov_lockh);
1826         }
1827  out_exp:
1828         class_export_put(export);
1829         RETURN(rc);
1830 }
1831
1832 static int lov_match(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1833                      __u32 type, void *cookie, int cookielen, __u32 mode,
1834                      int *flags, void *data, struct lustre_handle *lockh)
1835 {
1836         struct obd_export *export = class_conn2export(conn);
1837         struct lov_lock_handles *lov_lockh = NULL;
1838         struct lustre_handle *lov_lockhp;
1839         struct lov_obd *lov;
1840         struct lov_oinfo *loi;
1841         struct lov_stripe_md submd;
1842         ldlm_error_t rc = 0;
1843         int i;
1844         ENTRY;
1845
1846         if (!lsm) {
1847                 CERROR("LOV requires striping ea\n");
1848                 GOTO(out_exp, rc = -EINVAL);
1849         }
1850
1851         if (lsm->lsm_magic != LOV_MAGIC) {
1852                 CERROR("LOV striping magic bad %#x != %#x\n",
1853                        lsm->lsm_magic, LOV_MAGIC);
1854                 GOTO(out_exp, rc = -EINVAL);
1855         }
1856
1857         if (!export || !export->exp_obd)
1858                 GOTO(out_exp, rc = -ENODEV);
1859
1860         if (lsm->lsm_stripe_count > 1) {
1861                 lov_lockh = lov_llh_new(lsm);
1862                 if (lov_lockh == NULL)
1863                         GOTO(out_exp, rc = -ENOMEM);
1864
1865                 lockh->cookie = lov_lockh->llh_handle.h_cookie;
1866                 lov_lockhp = lov_lockh->llh_handles;
1867         } else {
1868                 lov_lockhp = lockh;
1869         }
1870
1871         lov = &export->exp_obd->u.lov;
1872         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1873              i++, loi++, lov_lockhp++) {
1874                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
1875                 struct ldlm_extent sub_ext;
1876                 int lov_flags;
1877
1878                 if (!lov_stripe_intersects(lsm, i, extent->start, extent->end,
1879                                            &sub_ext.start, &sub_ext.end))
1880                         continue;
1881
1882                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1883                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1884                         rc = -EIO;
1885                         break;
1886                 }
1887
1888                 /* XXX LOV STACKING: submd should be from the subobj */
1889                 submd.lsm_object_id = loi->loi_id;
1890                 submd.lsm_stripe_count = 0;
1891                 lov_flags = *flags;
1892                 /* XXX submd is not fully initialized here */
1893                 rc = obd_match(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
1894                                type, &sub_ext, sizeof(sub_ext), mode,
1895                                &lov_flags, data, lov_lockhp);
1896                 if (rc != 1)
1897                         break;
1898         }
1899         if (rc == 1) {
1900                 if (lsm->lsm_stripe_count > 1)
1901                         lov_llh_put(lov_lockh);
1902                 GOTO(out_exp, 1);
1903         }
1904
1905         while (loi--, lov_lockhp--, i-- > 0) {
1906                 struct lov_stripe_md submd;
1907                 int err;
1908
1909                 if (lov_lockhp->cookie == 0)
1910                         continue;
1911
1912                 /* XXX LOV STACKING: submd should be from the subobj */
1913                 submd.lsm_object_id = loi->loi_id;
1914                 submd.lsm_stripe_count = 0;
1915                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1916                                  mode, lov_lockhp);
1917                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1918                         CERROR("error: cancelling objid "LPX64" on OST "
1919                                "idx %d after match failure: rc = %d\n",
1920                                loi->loi_id, loi->loi_ost_idx, err);
1921                 }
1922         }
1923
1924         if (lsm->lsm_stripe_count > 1) {
1925                 lov_llh_destroy(lov_lockh);
1926                 lov_llh_put(lov_lockh);
1927         }
1928  out_exp:
1929         class_export_put(export);
1930         RETURN(rc);
1931 }
1932
1933 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1934                       __u32 mode, struct lustre_handle *lockh)
1935 {
1936         struct obd_export *export = class_conn2export(conn);
1937         struct lov_lock_handles *lov_lockh = NULL;
1938         struct lustre_handle *lov_lockhp;
1939         struct lov_obd *lov;
1940         struct lov_oinfo *loi;
1941         int rc = 0, i;
1942         ENTRY;
1943
1944         if (!lsm) {
1945                 CERROR("LOV requires striping ea\n");
1946                 GOTO(out, rc = -EINVAL);
1947         }
1948
1949         if (lsm->lsm_magic != LOV_MAGIC) {
1950                 CERROR("LOV striping magic bad %#x != %#x\n",
1951                        lsm->lsm_magic, LOV_MAGIC);
1952                 GOTO(out, rc = -EINVAL);
1953         }
1954
1955         if (!export || !export->exp_obd)
1956                 GOTO(out, rc = -ENODEV);
1957
1958         LASSERT(lockh);
1959         if (lsm->lsm_stripe_count > 1) {
1960                 lov_lockh = lov_handle2llh(lockh);
1961                 if (!lov_lockh) {
1962                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
1963                         GOTO(out, rc = -EINVAL);
1964                 }
1965
1966                 lov_lockhp = lov_lockh->llh_handles;
1967         } else {
1968                 lov_lockhp = lockh;
1969         }
1970
1971         lov = &export->exp_obd->u.lov;
1972         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1973              i++, loi++, lov_lockhp++) {
1974                 struct lov_stripe_md submd;
1975                 int err;
1976
1977                 if (lov_lockhp->cookie == 0) {
1978                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
1979                                loi->loi_ost_idx, loi->loi_id);
1980                         continue;
1981                 }
1982
1983                 /* XXX LOV STACKING: submd should be from the subobj */
1984                 submd.lsm_object_id = loi->loi_id;
1985                 submd.lsm_stripe_count = 0;
1986                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1987                                  mode, lov_lockhp);
1988                 if (err) {
1989                         if (lov->tgts[loi->loi_ost_idx].active) {
1990                                 CERROR("error: cancel objid "LPX64" subobj "
1991                                        LPX64" on OST idx %d: rc = %d\n",
1992                                        lsm->lsm_object_id,
1993                                        loi->loi_id, loi->loi_ost_idx, err);
1994                                 if (!rc)
1995                                         rc = err;
1996                         }
1997                 }
1998         }
1999
2000         if (lsm->lsm_stripe_count > 1)
2001                 lov_llh_destroy(lov_lockh);
2002         if (lov_lockh != NULL)
2003                 lov_llh_put(lov_lockh);
2004         GOTO(out, rc);
2005  out:
2006         class_export_put(export);
2007         return rc;
2008 }
2009
2010 static int lov_cancel_unused(struct lustre_handle *conn,
2011                              struct lov_stripe_md *lsm, int flags, void *opaque)
2012 {
2013         struct obd_export *export = class_conn2export(conn);
2014         struct lov_obd *lov;
2015         struct lov_oinfo *loi;
2016         int rc = 0, i;
2017         ENTRY;
2018
2019         if (!lsm) {
2020                 CERROR("LOV requires striping ea for lock cancellation\n");
2021                 GOTO(out, rc = -EINVAL);
2022         }
2023
2024         if (!export || !export->exp_obd)
2025                 GOTO(out, rc = -ENODEV);
2026
2027         lov = &export->exp_obd->u.lov;
2028         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
2029                 struct lov_stripe_md submd;
2030                 int err;
2031
2032                 if (lov->tgts[loi->loi_ost_idx].active == 0)
2033                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
2034
2035                 submd.lsm_object_id = loi->loi_id;
2036                 submd.lsm_stripe_count = 0;
2037                 err = obd_cancel_unused(&lov->tgts[loi->loi_ost_idx].conn,
2038                                         &submd, flags, opaque);
2039                 if (err && lov->tgts[loi->loi_ost_idx].active) {
2040                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
2041                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
2042                                loi->loi_id, loi->loi_ost_idx, err);
2043                         if (!rc)
2044                                 rc = err;
2045                 }
2046         }
2047         GOTO(out, rc);
2048  out:
2049         class_export_put(export);
2050         return rc;
2051 }
2052
2053 #define LOV_U64_MAX ((__u64)~0ULL)
2054 #define LOV_SUM_MAX(tot, add)                                           \
2055         do {                                                            \
2056                 if ((tot) + (add) < (tot))                              \
2057                         (tot) = LOV_U64_MAX;                            \
2058                 else                                                    \
2059                         (tot) += (add);                                 \
2060         } while(0)
2061
2062 static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
2063 {
2064         struct obd_export *export = class_conn2export(conn);
2065         struct lov_obd *lov;
2066         struct obd_statfs lov_sfs;
2067         int set = 0;
2068         int rc = 0;
2069         int i;
2070         ENTRY;
2071
2072         if (!export || !export->exp_obd)
2073                 GOTO(out, rc = -ENODEV);
2074
2075         lov = &export->exp_obd->u.lov;
2076
2077         /* We only get block data from the OBD */
2078         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2079                 int err;
2080
2081                 if (!lov->tgts[i].active) {
2082                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
2083                         continue;
2084                 }
2085
2086                 err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
2087                 if (err) {
2088                         if (lov->tgts[i].active) {
2089                                 CERROR("error: statfs OSC %s on OST idx %d: "
2090                                        "err = %d\n",
2091                                        lov->tgts[i].uuid.uuid, i, err);
2092                                 if (!rc)
2093                                         rc = err;
2094                         }
2095                         continue;
2096                 }
2097                 if (!set) {
2098                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
2099                         set = 1;
2100                 } else {
2101                         osfs->os_bfree += lov_sfs.os_bfree;
2102                         osfs->os_bavail += lov_sfs.os_bavail;
2103                         osfs->os_blocks += lov_sfs.os_blocks;
2104                         /* XXX not sure about this one - depends on policy.
2105                          *   - could be minimum if we always stripe on all OBDs
2106                          *     (but that would be wrong for any other policy,
2107                          *     if one of the OBDs has no more objects left)
2108                          *   - could be sum if we stripe whole objects
2109                          *   - could be average, just to give a nice number
2110                          *
2111                          * To give a "reasonable" (if not wholly accurate)
2112                          * number, we divide the total number of free objects
2113                          * by expected stripe count (watch out for overflow).
2114                          */
2115                         LOV_SUM_MAX(osfs->os_files, lov_sfs.os_files);
2116                         LOV_SUM_MAX(osfs->os_ffree, lov_sfs.os_ffree);
2117                 }
2118         }
2119         if (set) {
2120                 __u32 expected_stripes = lov->desc.ld_default_stripe_count ?
2121                                          lov->desc.ld_default_stripe_count :
2122                                          lov->desc.ld_active_tgt_count;
2123
2124                 if (osfs->os_files != LOV_U64_MAX)
2125                         do_div(osfs->os_files, expected_stripes);
2126                 if (osfs->os_ffree != LOV_U64_MAX)
2127                         do_div(osfs->os_ffree, expected_stripes);
2128         } else if (!rc)
2129                 rc = -EIO;
2130         GOTO(out, rc);
2131  out:
2132         class_export_put(export);
2133         return rc;
2134 }
2135
2136 static int lov_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
2137                          void *karg, void *uarg)
2138 {
2139         struct obd_device *obddev = class_conn2obd(conn);
2140         struct lov_obd *lov = &obddev->u.lov;
2141         int i, count = lov->desc.ld_tgt_count;
2142         struct obd_uuid *uuidp;
2143         int rc;
2144
2145         ENTRY;
2146
2147         switch (cmd) {
2148         case IOC_LOV_SET_OSC_ACTIVE: {
2149                 struct obd_ioctl_data *data = karg;
2150                 uuidp = (struct obd_uuid *)data->ioc_inlbuf1;
2151                 rc = lov_set_osc_active(lov, uuidp, data->ioc_offset);
2152                 break;
2153         }
2154         case OBD_IOC_LOV_GET_CONFIG: {
2155                 struct obd_ioctl_data *data = karg;
2156                 struct lov_tgt_desc *tgtdesc;
2157                 struct lov_desc *desc;
2158                 char *buf = NULL;
2159
2160                 buf = NULL;
2161                 len = 0;
2162                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2163                         RETURN(-EINVAL);
2164
2165                 data = (struct obd_ioctl_data *)buf;
2166
2167                 if (sizeof(*desc) > data->ioc_inllen1) {
2168                         OBD_FREE(buf, len);
2169                         RETURN(-EINVAL);
2170                 }
2171
2172                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
2173                         OBD_FREE(buf, len);
2174                         RETURN(-EINVAL);
2175                 }
2176
2177                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2178                 memcpy(desc, &(lov->desc), sizeof(*desc));
2179
2180                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
2181                 tgtdesc = lov->tgts;
2182                 for (i = 0; i < count; i++, uuidp++, tgtdesc++)
2183                         obd_str2uuid(uuidp, tgtdesc->uuid.uuid);
2184
2185                 rc = copy_to_user((void *)uarg, buf, len);
2186                 if (rc)
2187                         rc = -EFAULT;
2188                 obd_ioctl_freedata(buf, len);
2189                 break;
2190         }
2191         case LL_IOC_LOV_SETSTRIPE:
2192                 rc = lov_setstripe(conn, karg, uarg);
2193                 break;
2194         case LL_IOC_LOV_GETSTRIPE:
2195                 rc = lov_getstripe(conn, karg, uarg);
2196                 break;
2197         default: {
2198                 int set = 0;
2199                 if (count == 0)
2200                         RETURN(-ENOTTY);
2201                 rc = 0;
2202                 for (i = 0; i < count; i++) {
2203                         int err;
2204
2205                         err = obd_iocontrol(cmd, &lov->tgts[i].conn,
2206                                             len, karg, uarg);
2207                         if (err) {
2208                                 if (lov->tgts[i].active) {
2209                                         CERROR("error: iocontrol OSC %s on OST"
2210                                                "idx %d: err = %d\n",
2211                                                lov->tgts[i].uuid.uuid, i, err);
2212                                         if (!rc)
2213                                                 rc = err;
2214                                 }
2215                         } else
2216                                 set = 1;
2217                 }
2218                 if (!set && !rc)
2219                         rc = -EIO;
2220         }
2221         }
2222
2223         RETURN(rc);
2224 }
2225
2226 static int lov_get_info(struct lustre_handle *conn, __u32 keylen,
2227                         void *key, __u32 *vallen, void *val)
2228 {
2229         struct obd_device *obddev = class_conn2obd(conn);
2230         struct lov_obd *lov = &obddev->u.lov;
2231         int i;
2232         ENTRY;
2233
2234         if (!vallen || !val)
2235                 RETURN(-EFAULT);
2236
2237         if (keylen > strlen("lock_to_stripe") &&
2238             strcmp(key, "lock_to_stripe") == 0) {
2239                 struct {
2240                         char name[16];
2241                         struct ldlm_lock *lock;
2242                         struct lov_stripe_md *lsm;
2243                 } *data = key;
2244                 __u32 *stripe = val;
2245                 struct lov_oinfo *loi;
2246
2247                 if (*vallen < sizeof(*stripe))
2248                         RETURN(-EFAULT);
2249                 *vallen = sizeof(*stripe);
2250
2251                 /* XXX This is another one of those bits that will need to
2252                  * change if we ever actually support nested LOVs.  It uses
2253                  * the lock's connection to find out which stripe it is. */
2254                 for (i = 0, loi = data->lsm->lsm_oinfo;
2255                      i < data->lsm->lsm_stripe_count;
2256                      i++, loi++) {
2257                         if (lov->tgts[loi->loi_ost_idx].conn.cookie ==
2258                             data->lock->l_connh->cookie) {
2259                                 *stripe = i;
2260                                 RETURN(0);
2261                         }
2262                 }
2263                 RETURN(-ENXIO);
2264         }
2265
2266         RETURN(-EINVAL);
2267 }
2268
2269 struct obd_ops lov_obd_ops = {
2270         o_owner:       THIS_MODULE,
2271         o_attach:      lov_attach,
2272         o_detach:      lov_detach,
2273         o_setup:       lov_setup,
2274         o_connect:     lov_connect,
2275         o_disconnect:  lov_disconnect,
2276         o_statfs:      lov_statfs,
2277         o_packmd:      lov_packmd,
2278         o_unpackmd:    lov_unpackmd,
2279         o_create:      lov_create,
2280         o_destroy:     lov_destroy,
2281         o_getattr:     lov_getattr,
2282         o_getattr_async: lov_getattr_async,
2283         o_setattr:     lov_setattr,
2284         o_open:        lov_open,
2285         o_close:       lov_close,
2286         o_brw:         lov_brw,
2287         o_brw_async:   lov_brw_async,
2288         o_punch:       lov_punch,
2289         o_enqueue:     lov_enqueue,
2290         o_match:       lov_match,
2291         o_cancel:      lov_cancel,
2292         o_cancel_unused: lov_cancel_unused,
2293         o_iocontrol:   lov_iocontrol,
2294         o_get_info:    lov_get_info
2295 };
2296
2297 int __init lov_init(void)
2298 {
2299         struct lprocfs_static_vars lvars;
2300         int rc;
2301
2302         printk(KERN_INFO "Lustre Logical Object Volume driver; "
2303                "info@clusterfs.com\n");
2304         lprocfs_init_vars(&lvars);
2305         rc = class_register_type(&lov_obd_ops, lvars.module_vars,
2306                                  OBD_LOV_DEVICENAME);
2307         RETURN(rc);
2308 }
2309
2310 static void __exit lov_exit(void)
2311 {
2312         class_unregister_type(OBD_LOV_DEVICENAME);
2313 }
2314
2315 #ifdef __KERNEL__
2316 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2317 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
2318 MODULE_LICENSE("GPL");
2319
2320 module_init(lov_init);
2321 module_exit(lov_exit);
2322 #endif