Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1  /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  * Author: Phil Schwan <phil@clusterfs.com>
6  *         Peter Braam <braam@clusterfs.com>
7  *         Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define EXPORT_SYMTAB
26 #define DEBUG_SUBSYSTEM S_LOV
27 #ifdef __KERNEL__
28 #include <linux/slab.h>
29 #include <linux/module.h>
30 #include <linux/init.h>
31 #include <linux/random.h>
32 #include <linux/slab.h>
33 #include <asm/div64.h>
34 #else
35 #include <liblustre.h>
36 #endif
37
38 #include <linux/obd_support.h>
39 #include <linux/lustre_lib.h>
40 #include <linux/lustre_net.h>
41 #include <linux/lustre_idl.h>
42 #include <linux/lustre_lite.h> /* for LL_IOC_LOV_[GS]ETSTRIPE */
43 #include <linux/lustre_mds.h>
44 #include <linux/obd_class.h>
45 #include <linux/obd_lov.h>
46 #include <linux/lprocfs_status.h>
47
48 struct lov_file_handles {
49         struct portals_handle lfh_handle;
50         atomic_t lfh_refcount;
51         struct list_head lfh_list;
52         int lfh_count;
53         struct obd_client_handle *lfh_och;
54 };
55
56 struct lov_lock_handles {
57         struct portals_handle llh_handle;
58         atomic_t llh_refcount;
59         int llh_stripe_count;
60         struct lustre_handle llh_handles[0];
61 };
62
63 /* lov_file_handles helpers */
64 static void lov_lfh_addref(void *lfhp)
65 {
66         struct lov_file_handles *lfh = lfhp;
67
68         atomic_inc(&lfh->lfh_refcount);
69         CDEBUG(D_INFO, "GETting lfh %p : new refcount %d\n", lfh,
70                atomic_read(&lfh->lfh_refcount));
71 }
72
73 static struct lov_file_handles *lov_lfh_new(void)
74 {
75         struct lov_file_handles *lfh;
76
77         OBD_ALLOC(lfh, sizeof *lfh);
78         if (lfh == NULL) {
79                 CERROR("out of memory\n");
80                 return NULL;
81         }
82
83         atomic_set(&lfh->lfh_refcount, 2);
84
85         INIT_LIST_HEAD(&lfh->lfh_handle.h_link);
86         class_handle_hash(&lfh->lfh_handle, lov_lfh_addref);
87
88         return lfh;
89 }
90
91 static struct lov_file_handles *lov_handle2lfh(struct lustre_handle *handle)
92 {
93         ENTRY;
94         LASSERT(handle != NULL);
95         RETURN(class_handle2object(handle->cookie));
96 }
97
98 static void lov_lfh_put(struct lov_file_handles *lfh)
99 {
100         CDEBUG(D_INFO, "PUTting lfh %p : new refcount %d\n", lfh,
101                atomic_read(&lfh->lfh_refcount) - 1);
102         LASSERT(atomic_read(&lfh->lfh_refcount) > 0 &&
103                 atomic_read(&lfh->lfh_refcount) < 0x5a5a);
104         if (atomic_dec_and_test(&lfh->lfh_refcount)) {
105                 LASSERT(list_empty(&lfh->lfh_handle.h_link));
106                 OBD_FREE(lfh, sizeof *lfh);
107         }
108 }
109
110 static void lov_lfh_destroy(struct lov_file_handles *lfh)
111 {
112         class_handle_unhash(&lfh->lfh_handle);
113         lov_lfh_put(lfh);
114 }
115
116 static void lov_llh_addref(void *llhp)
117 {
118         struct lov_lock_handles *llh = llhp;
119
120         atomic_inc(&llh->llh_refcount);
121         CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
122                atomic_read(&llh->llh_refcount));
123 }
124
125 static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
126 {
127         struct lov_lock_handles *llh;
128
129         OBD_ALLOC(llh, sizeof *llh +
130                   sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
131         if (llh == NULL) {
132                 CERROR("out of memory\n");
133                 return NULL;
134         }
135         atomic_set(&llh->llh_refcount, 2);
136         llh->llh_stripe_count = lsm->lsm_stripe_count;
137         INIT_LIST_HEAD(&llh->llh_handle.h_link);
138         class_handle_hash(&llh->llh_handle, lov_llh_addref);
139         return llh;
140 }
141
142 static struct lov_lock_handles *lov_handle2llh(struct lustre_handle *handle)
143 {
144         ENTRY;
145         LASSERT(handle != NULL);
146         RETURN(class_handle2object(handle->cookie));
147 }
148
149 static void lov_llh_put(struct lov_lock_handles *llh)
150 {
151         CDEBUG(D_INFO, "PUTting llh %p : new refcount %d\n", llh,
152                atomic_read(&llh->llh_refcount) - 1);
153         LASSERT(atomic_read(&llh->llh_refcount) > 0 &&
154                 atomic_read(&llh->llh_refcount) < 0x5a5a);
155         if (atomic_dec_and_test(&llh->llh_refcount)) {
156                 LASSERT(list_empty(&llh->llh_handle.h_link));
157                 OBD_FREE(llh, sizeof *llh +
158                          sizeof(*llh->llh_handles) * llh->llh_stripe_count);
159         }
160 }
161
162 static void lov_llh_destroy(struct lov_lock_handles *llh)
163 {
164         class_handle_unhash(&llh->llh_handle);
165         lov_llh_put(llh);
166 }
167
168 /* obd methods */
169 int lov_attach(struct obd_device *dev, obd_count len, void *data)
170 {
171         struct lprocfs_static_vars lvars;
172
173         lprocfs_init_vars(&lvars);
174         return lprocfs_obd_attach(dev, lvars.obd_vars);
175 }
176
177 int lov_detach(struct obd_device *dev)
178 {
179         return lprocfs_obd_detach(dev);
180 }
181
182 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
183                        struct obd_uuid *cluuid)
184 {
185         struct ptlrpc_request *req = NULL;
186         struct lov_obd *lov = &obd->u.lov;
187         struct client_obd *mdc = &lov->mdcobd->u.cli;
188         struct lov_desc *desc = &lov->desc;
189         struct lov_desc *mdesc;
190         struct lov_tgt_desc *tgts;
191         struct obd_export *exp;
192         struct lustre_handle mdc_conn;
193         struct obd_uuid lov_mds_uuid = {"LOV_MDS_UUID"};
194         struct obd_uuid *uuids;
195         int rc, rc2, i;
196         ENTRY;
197
198         rc = class_connect(conn, obd, cluuid);
199         if (rc)
200                 RETURN(rc);
201
202         /* We don't want to actually do the underlying connections more than
203          * once, so keep track. */
204         lov->refcount++;
205         if (lov->refcount > 1)
206                 RETURN(0);
207
208         exp = class_conn2export(conn);
209         spin_lock_init(&exp->exp_lov_data.led_lock);
210         INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
211
212         /* retrieve LOV metadata from MDS */
213         rc = obd_connect(&mdc_conn, lov->mdcobd, &lov_mds_uuid);
214         if (rc) {
215                 CERROR("cannot connect to mdc: rc = %d\n", rc);
216                 GOTO(out_conn, rc);
217         }
218
219         rc = mdc_getlovinfo(obd, &mdc_conn, &req);
220         rc2 = obd_disconnect(&mdc_conn, 0);
221         if (rc) {
222                 CERROR("cannot get lov info %d\n", rc);
223                 GOTO(out_conn, rc);
224         }
225
226         if (rc2) {
227                 CERROR("error disconnecting from MDS %d\n", rc2);
228                 GOTO(out_req, rc = rc2);
229         }
230
231         /* mdc_getlovinfo() has checked and swabbed the reply.  It has also
232          * done some simple checks (e.g. #uuids consistent with desc, uuid
233          * array fits in LOV_MAX_UUID_BUFFER_SIZE and all uuids are
234          * terminated), but I still need to verify it makes overall
235          * sense */
236         mdesc = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*mdesc));
237         LASSERT (mdesc != NULL);
238         LASSERT_REPSWABBED (req, 0);
239
240         *desc = *mdesc;
241
242         if (!obd_uuid_equals(&obd->obd_uuid, &desc->ld_uuid)) {
243                 CERROR("LOV desc: uuid %s not on mds device (%s)\n",
244                        obd->obd_uuid.uuid, desc->ld_uuid.uuid);
245                 GOTO(out_req, rc = -EINVAL);
246         }
247
248         /* Because of 64-bit divide/mod operations only work with a 32-bit
249          * divisor in a 32-bit kernel, we cannot support a stripe width
250          * of 4GB or larger on 32-bit CPUs.
251          */
252         if ((desc->ld_default_stripe_count ?
253              desc->ld_default_stripe_count : desc->ld_tgt_count) *
254              desc->ld_default_stripe_size > ~0UL) {
255                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
256                        desc->ld_default_stripe_size,
257                        desc->ld_default_stripe_count ?
258                        desc->ld_default_stripe_count : desc->ld_tgt_count,~0UL);
259                 GOTO(out_req, rc = -EINVAL);
260         }
261
262         /* We know ld_tgt_count is reasonable (the array of UUIDS fits in
263          * the maximum buffer size, so we won't be making outrageous
264          * demands on memory here. */
265         lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
266         OBD_ALLOC(lov->tgts, lov->bufsize);
267         if (!lov->tgts) {
268                 CERROR("Out of memory\n");
269                 GOTO(out_req, rc = -ENOMEM);
270         }
271
272         uuids = lustre_msg_buf(req->rq_repmsg, 1,
273                                sizeof(*uuids) * desc->ld_tgt_count);
274         LASSERT (uuids != NULL);
275         LASSERT_REPSWABBED (req, 1);
276
277         for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) {
278                 struct obd_uuid *uuid = &tgts->uuid;
279                 struct obd_device *tgt_obd;
280                 struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
281
282                 /* NULL termination already checked */
283                 *uuid = uuids[i];
284
285                 tgt_obd = client_tgtuuid2obd(uuid);
286
287                 if (!tgt_obd) {
288                         CERROR("Target %s not attached\n", uuid->uuid);
289                         GOTO(out_disc, rc = -EINVAL);
290                 }
291
292                 if (!tgt_obd->obd_set_up) {
293                         CERROR("Target %s not set up\n", uuid->uuid);
294                         GOTO(out_disc, rc = -EINVAL);
295                 }
296
297                 rc = obd_connect(&tgts->conn, tgt_obd, &lov_osc_uuid);
298
299                 if (rc) {
300                         CERROR("Target %s connect error %d\n", uuid->uuid, rc);
301                         GOTO(out_disc, rc);
302                 }
303
304                 rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &tgts->conn,
305                                    sizeof(struct obd_device *), obd, NULL);
306                 if (rc) {
307                         CERROR("Target %s REGISTER_LOV error %d\n",
308                                uuid->uuid, rc);
309                         obd_disconnect(&tgts->conn, 0);
310                         GOTO(out_disc, rc);
311                 }
312
313                 desc->ld_active_tgt_count++;
314                 tgts->active = 1;
315         }
316
317         mdc->cl_max_mds_easize = obd_size_diskmd(conn, NULL);
318         ptlrpc_req_finished (req);
319         class_export_put(exp);
320         RETURN (0);
321
322  out_disc:
323         while (i-- > 0) {
324                 struct obd_uuid uuid;
325                 --tgts;
326                 --desc->ld_active_tgt_count;
327                 tgts->active = 0;
328                 /* save for CERROR below; (we know it's terminated) */
329                 uuid = tgts->uuid;
330                 rc2 = obd_disconnect(&tgts->conn, 0);
331                 if (rc2)
332                         CERROR("error: LOV target %s disconnect on OST idx %d: "
333                                "rc = %d\n", uuid.uuid, i, rc2);
334         }
335         OBD_FREE(lov->tgts, lov->bufsize);
336  out_req:
337         ptlrpc_req_finished (req);
338  out_conn:
339         class_export_put(exp);
340         class_disconnect(conn, 0);
341         RETURN (rc);
342 }
343
344 static int lov_disconnect(struct lustre_handle *conn, int failover)
345 {
346         struct obd_device *obd = class_conn2obd(conn);
347         struct lov_obd *lov = &obd->u.lov;
348         struct obd_export *exp;
349         struct list_head *p, *n;
350         int rc, i;
351         ENTRY;
352
353         if (!lov->tgts)
354                 goto out_local;
355
356         /* Only disconnect the underlying layers on the final disconnect. */
357         lov->refcount--;
358         if (lov->refcount != 0)
359                 goto out_local;
360
361         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
362                 if (obd->obd_no_recov) {
363                         /* Pass it on to our clients.
364                          * XXX This should be an argument to disconnect,
365                          * XXX not a back-door flag on the OBD.  Ah well.
366                          */
367                         struct obd_device *osc_obd =
368                                 class_conn2obd(&lov->tgts[i].conn);
369                         osc_obd->obd_no_recov = 1;
370                 }
371                 rc = obd_disconnect(&lov->tgts[i].conn, failover);
372                 if (rc) {
373                         if (lov->tgts[i].active) {
374                                 CERROR("Target %s disconnect error %d\n",
375                                        lov->tgts[i].uuid.uuid, rc);
376                         }
377                         rc = 0;
378                 }
379                 if (lov->tgts[i].active) {
380                         lov->desc.ld_active_tgt_count--;
381                         lov->tgts[i].active = 0;
382                 }
383         }
384         OBD_FREE(lov->tgts, lov->bufsize);
385         lov->bufsize = 0;
386         lov->tgts = NULL;
387
388         exp = class_conn2export(conn);
389         if (exp == NULL) {
390                 CERROR("export handle "LPU64" invalid!  If you can reproduce, "
391                        "please send a full debug log to phik\n", conn->cookie);
392                 RETURN(0);
393         }
394         spin_lock(&exp->exp_lov_data.led_lock);
395         list_for_each_safe(p, n, &exp->exp_lov_data.led_open_head) {
396                 /* XXX close these, instead of just discarding them? */
397                 struct lov_file_handles *lfh;
398                 lfh = list_entry(p, typeof(*lfh), lfh_list);
399                 CERROR("discarding open LOV handle %p:"LPX64"\n",
400                        lfh, lfh->lfh_handle.h_cookie);
401                 list_del(&lfh->lfh_list);
402                 OBD_FREE(lfh->lfh_och, lfh->lfh_count * FD_OSTDATA_SIZE);
403                 lov_lfh_destroy(lfh);
404                 lov_lfh_put(lfh);
405         }
406         spin_unlock(&exp->exp_lov_data.led_lock);
407         class_export_put(exp);
408
409  out_local:
410         rc = class_disconnect(conn, 0);
411         RETURN(rc);
412 }
413
414 /* Error codes:
415  *
416  *  -EINVAL  : UUID can't be found in the LOV's target list
417  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
418  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
419  */
420 static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
421                               int activate)
422 {
423         struct obd_device *obd;
424         struct lov_tgt_desc *tgt;
425         int i, rc = 0;
426         ENTRY;
427
428         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
429                lov, uuid->uuid, activate);
430
431         spin_lock(&lov->lov_lock);
432         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
433                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
434                        i, tgt->uuid.uuid, tgt->conn.cookie);
435                 if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof uuid->uuid) == 0)
436                         break;
437         }
438
439         if (i == lov->desc.ld_tgt_count)
440                 GOTO(out, rc = -EINVAL);
441
442         obd = class_conn2obd(&tgt->conn);
443         if (obd == NULL) {
444                 /* This can happen if OST failure races with node shutdown */
445                 GOTO(out, rc = -ENOTCONN);
446         }
447
448         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LOV idx %d\n",
449                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
450                obd->obd_type->typ_name, i);
451         LASSERT(strcmp(obd->obd_type->typ_name, "osc") == 0);
452
453         if (tgt->active == activate) {
454                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
455                        activate ? "" : "in");
456                 GOTO(out, rc);
457         }
458
459         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
460
461         tgt->active = activate;
462         if (activate) {
463                 /*
464                  * foreach(export)
465                  *     foreach(open_file)
466                  *         if (file_handle uses this_osc)
467                  *             if (has_no_filehandle)
468                  *                 open(file_handle, this_osc);
469                  */
470                 /* XXX reconnect? */
471                 lov->desc.ld_active_tgt_count++;
472         } else {
473                 /*
474                  * Should I invalidate filehandles that refer to this OSC, so
475                  * that I reopen them during reactivation?
476                  */
477                 /* XXX disconnect from OSC? */
478                 lov->desc.ld_active_tgt_count--;
479         }
480
481 #warning "FIXME: walk open files list for objects that need opening"
482         EXIT;
483  out:
484         spin_unlock(&lov->lov_lock);
485         return rc;
486 }
487
488 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
489 {
490         struct obd_ioctl_data *data = buf;
491         struct lov_obd *lov = &obd->u.lov;
492         struct obd_uuid uuid;
493         int rc = 0;
494         ENTRY;
495
496         if (data->ioc_inllen1 < 1) {
497                 CERROR("LOV setup requires an MDC UUID\n");
498                 RETURN(-EINVAL);
499         }
500
501         if (data->ioc_inllen1 > 37) {
502                 CERROR("mdc UUID must be 36 characters or less\n");
503                 RETURN(-EINVAL);
504         }
505
506         spin_lock_init(&lov->lov_lock);
507         obd_str2uuid(&uuid, data->ioc_inlbuf1);
508         lov->mdcobd = class_uuid2obd(&uuid);
509         if (!lov->mdcobd) {
510                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid.uuid,
511                        data->ioc_inlbuf1);
512                 rc = -EINVAL;
513         }
514         RETURN(rc);
515 }
516
517 /* compute object size given "stripeno" and the ost size */
518 static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
519                                 int stripeno)
520 {
521         unsigned long ssize  = lsm->lsm_stripe_size;
522         unsigned long swidth = ssize * lsm->lsm_stripe_count;
523         unsigned long stripe_size;
524         obd_size lov_size;
525
526         if (ost_size == 0)
527                 return 0;
528
529         /* do_div(a, b) returns a % b, and a = a / b */
530         stripe_size = do_div(ost_size, ssize);
531
532         if (stripe_size)
533                 lov_size = ost_size * swidth + stripeno * ssize + stripe_size;
534         else
535                 lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize;
536
537         return lov_size;
538 }
539
540 static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
541                             struct lov_stripe_md *lsm, int stripeno, int *set)
542 {
543         if (*set) {
544                 if (valid & OBD_MD_FLSIZE) {
545                         /* this handles sparse files properly */
546                         obd_size lov_size;
547
548                         lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
549                         if (lov_size > tgt->o_size)
550                                 tgt->o_size = lov_size;
551                 }
552                 if (valid & OBD_MD_FLBLOCKS)
553                         tgt->o_blocks += src->o_blocks;
554                 if (valid & OBD_MD_FLBLKSZ)
555                         tgt->o_blksize += src->o_blksize;
556                 if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
557                         tgt->o_ctime = src->o_ctime;
558                 if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
559                         tgt->o_mtime = src->o_mtime;
560         } else {
561                 obdo_cpy_md(tgt, src, valid);
562                 if (valid & OBD_MD_FLSIZE)
563                         tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
564                 *set = 1;
565         }
566 }
567
568 /* the LOV expects oa->o_id to be set to the LOV object id */
569 static int lov_create(struct lustre_handle *conn, struct obdo *oa,
570                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
571 {
572         struct obd_export *export = class_conn2export(conn);
573         struct lov_obd *lov;
574         struct lov_stripe_md *lsm;
575         struct lov_oinfo *loi;
576         struct obdo *tmp;
577         unsigned ost_count, ost_idx;
578         int set = 0, obj_alloc = 0;
579         int rc = 0, i;
580         ENTRY;
581
582         LASSERT(ea);
583
584         if (!export)
585                 GOTO(out_exp, rc = -EINVAL);
586
587         lov = &export->exp_obd->u.lov;
588
589         if (!lov->desc.ld_active_tgt_count)
590                 GOTO(out_exp, rc = -EIO);
591
592         tmp = obdo_alloc();
593         if (!tmp)
594                 GOTO(out_exp, rc = -ENOMEM);
595
596         lsm = *ea;
597
598         if (!lsm) {
599                 rc = obd_alloc_memmd(conn, &lsm);
600                 if (rc < 0)
601                         GOTO(out_tmp, rc);
602
603                 rc = 0;
604                 lsm->lsm_magic = LOV_MAGIC;
605         }
606
607         ost_count = lov->desc.ld_tgt_count;
608
609         LASSERT(oa->o_valid & OBD_MD_FLID);
610         lsm->lsm_object_id = oa->o_id;
611         if (!lsm->lsm_stripe_size)
612                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
613
614         if (!*ea || lsm->lsm_stripe_offset >= ost_count) {
615                 get_random_bytes(&ost_idx, 2);
616                 ost_idx %= ost_count;
617         } else
618                 ost_idx = lsm->lsm_stripe_offset;
619
620         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
621                lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
622
623         loi = lsm->lsm_oinfo;
624         for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
625                 struct lov_stripe_md obj_md;
626                 struct lov_stripe_md *obj_mdp = &obj_md;
627                 int err;
628
629                 if (lov->tgts[ost_idx].active == 0) {
630                         CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
631                         continue;
632                 }
633
634                 /* create data objects with "parent" OA */
635                 memcpy(tmp, oa, sizeof(*tmp));
636                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
637                 err = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp, oti);
638                 if (err) {
639                         if (lov->tgts[ost_idx].active) {
640                                 CERROR("error creating objid "LPX64" sub-object"
641                                        " on OST idx %d/%d: rc = %d\n", oa->o_id,
642                                        ost_idx, lsm->lsm_stripe_count, err);
643                                 if (err > 0) {
644                                         CERROR("obd_create returned invalid "
645                                                "err %d\n", err);
646                                         err = -EIO;
647                                 }
648                                 if (!rc)
649                                         rc = err;
650                         }
651                         continue;
652                 }
653                 loi->loi_id = tmp->o_id;
654                 loi->loi_ost_idx = ost_idx;
655                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
656                        lsm->lsm_object_id, loi->loi_id, ost_idx);
657
658                 if (!set)
659                         lsm->lsm_stripe_offset = ost_idx;
660                 lov_merge_attrs(oa, tmp, OBD_MD_FLBLKSZ, lsm, obj_alloc, &set);
661
662                 ++obj_alloc;
663                 ++loi;
664
665                 /* If we have allocated enough objects, we are OK */
666                 if (obj_alloc == lsm->lsm_stripe_count) {
667                         rc = 0;
668                         GOTO(out_done, rc);
669                 }
670         }
671
672         if (*ea != NULL) {
673                 GOTO(out_cleanup, rc);
674         } else {
675                 struct lov_stripe_md *lsm_new;
676                 /* XXX LOV STACKING call into osc for sizes */
677                 unsigned size = lov_stripe_md_size(obj_alloc);
678
679                 CERROR("reallocating LSM for objid "LPX64": old %u new %u\n",
680                        lsm->lsm_object_id, obj_alloc, lsm->lsm_stripe_count);
681                 OBD_ALLOC(lsm_new, size);
682                 if (!lsm_new)
683                         GOTO(out_cleanup, rc = -ENOMEM);
684                 memcpy(lsm_new, lsm, size);
685                 lsm_new->lsm_stripe_count = obj_alloc;
686
687                 /* XXX LOV STACKING call into osc for sizes */
688                 OBD_FREE(lsm, lov_stripe_md_size(lsm->lsm_stripe_count));
689                 lsm = lsm_new;
690         }
691  out_done:
692         *ea = lsm;
693
694  out_tmp:
695         obdo_free(tmp);
696  out_exp:
697         class_export_put(export);
698         return rc;
699
700  out_cleanup:
701         while (obj_alloc-- > 0) {
702                 int err;
703
704                 --loi;
705                 /* destroy already created objects here */
706                 memcpy(tmp, oa, sizeof(*tmp));
707                 tmp->o_id = loi->loi_id;
708                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL,
709                                   NULL);
710                 if (err)
711                         CERROR("Failed to uncreate objid "LPX64" subobj "
712                                LPX64" on OST idx %d: rc = %d\n",
713                                oa->o_id, loi->loi_id, loi->loi_ost_idx,
714                                err);
715         }
716         if (*ea == NULL)
717                 obd_free_memmd(conn, &lsm);
718         goto out_tmp;
719 }
720
721 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
722                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
723 {
724         struct obdo tmp;
725         struct obd_export *export = class_conn2export(conn);
726         struct lov_obd *lov;
727         struct lov_oinfo *loi;
728         struct lov_file_handles *lfh = NULL;
729         int rc = 0, i;
730         ENTRY;
731
732         if (!lsm) {
733                 CERROR("LOV requires striping ea for destruction\n");
734                 GOTO(out, rc = -EINVAL);
735         }
736
737         if (lsm->lsm_magic != LOV_MAGIC) {
738                 CERROR("LOV striping magic bad %#x != %#x\n",
739                        lsm->lsm_magic, LOV_MAGIC);
740                 GOTO(out, rc = -EINVAL);
741         }
742
743         if (!export || !export->exp_obd)
744                 GOTO(out, rc = -ENODEV);
745
746         if (oa->o_valid & OBD_MD_FLHANDLE)
747                 lfh = lov_handle2lfh(obdo_handle(oa));
748
749         lov = &export->exp_obd->u.lov;
750         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
751                 int err;
752                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
753                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
754                         /* Orphan clean up will (someday) fix this up. */
755                         continue;
756                 }
757
758                 memcpy(&tmp, oa, sizeof(tmp));
759                 tmp.o_id = loi->loi_id;
760                 if (lfh)
761                         memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
762                                FD_OSTDATA_SIZE);
763                 else
764                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
765                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
766                                   NULL, NULL);
767                 if (err && lov->tgts[loi->loi_ost_idx].active) {
768                         CERROR("error: destroying objid "LPX64" subobj "
769                                LPX64" on OST idx %d: rc = %d\n",
770                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
771                         if (!rc)
772                                 rc = err;
773                 }
774         }
775         if (lfh != NULL)
776                 lov_lfh_put(lfh);
777         EXIT;
778  out:
779         class_export_put(export);
780         return rc;
781 }
782
783 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
784                        struct lov_stripe_md *lsm)
785 {
786         struct obdo tmp;
787         struct obd_export *export = class_conn2export(conn);
788         struct lov_obd *lov;
789         struct lov_oinfo *loi;
790         struct lov_file_handles *lfh = NULL;
791         int i, rc = 0, set = 0;
792         ENTRY;
793
794         if (!lsm) {
795                 CERROR("LOV requires striping ea\n");
796                 GOTO(out, rc = -EINVAL);
797         }
798
799         if (lsm->lsm_magic != LOV_MAGIC) {
800                 CERROR("LOV striping magic bad %#x != %#x\n",
801                        lsm->lsm_magic, LOV_MAGIC);
802                 GOTO(out, rc = -EINVAL);
803         }
804
805         if (!export || !export->exp_obd)
806                 GOTO(out, rc = -ENODEV);
807
808         lov = &export->exp_obd->u.lov;
809
810         if (oa->o_valid & OBD_MD_FLHANDLE)
811                 lfh = lov_handle2lfh(obdo_handle(oa));
812
813         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
814                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
815         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
816                 int err;
817
818                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
819                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
820                         continue;
821                 }
822
823                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
824                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
825                 /* create data objects with "parent" OA */
826                 memcpy(&tmp, oa, sizeof(tmp));
827                 tmp.o_id = loi->loi_id;
828                 if (lfh)
829                         memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
830                                FD_OSTDATA_SIZE);
831                 else
832                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
833
834                 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
835                 if (err) {
836                         if (lov->tgts[loi->loi_ost_idx].active) {
837                                 CERROR("error: getattr objid "LPX64" subobj "
838                                        LPX64" on OST idx %d: rc = %d\n",
839                                        oa->o_id, loi->loi_id, loi->loi_ost_idx,
840                                        err);
841                                 GOTO(out, rc = err);
842                         }
843                 } else {
844                         lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &set);
845                 }
846         }
847         if (!set)
848                 rc = -EIO;
849         GOTO(out, rc);
850  out:
851         if (lfh != NULL)
852                 lov_lfh_put(lfh);
853         class_export_put(export);
854         return rc;
855 }
856
857 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
858                                  struct lov_getattr_async_args *aa, int rc)
859 {
860         struct lov_stripe_md *lsm = aa->aa_lsm;
861         struct obdo          *oa = aa->aa_oa;
862         struct obdo          *obdos = aa->aa_stripe_oas;
863         struct lov_oinfo     *loi;
864         int                   i;
865         int                   set = 0;
866         ENTRY;
867
868         if (rc == 0) {
869                 /* NB all stripe requests succeeded to get here */
870
871                 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
872                      i++,loi++) {
873                         if (obdos[i].o_valid == 0)      /* inactive stripe */
874                                 continue;
875
876                         lov_merge_attrs(oa, &obdos[i], obdos[i].o_valid, lsm,
877                                         i, &set);
878                 }
879
880                 if (!set) {
881                         CERROR ("No stripes had valid attrs\n");
882                         rc = -EIO;
883                 }
884         }
885
886         OBD_FREE (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
887         RETURN (rc);
888 }
889
890 static int lov_getattr_async (struct lustre_handle *conn, struct obdo *oa,
891                               struct lov_stripe_md *lsm,
892                               struct ptlrpc_request_set *rqset)
893 {
894         struct obdo *obdos;
895         struct obd_export *export = class_conn2export(conn);
896         struct lov_obd *lov;
897         struct lov_oinfo *loi;
898         struct lov_file_handles *lfh = NULL;
899         struct lov_getattr_async_args *aa;
900         int i;
901         int set = 0;
902         int rc = 0;
903         ENTRY;
904
905         if (!lsm) {
906                 CERROR("LOV requires striping ea\n");
907                 GOTO(out, rc = -EINVAL);
908         }
909
910         if (lsm->lsm_magic != LOV_MAGIC) {
911                 CERROR("LOV striping magic bad %#x != %#x\n",
912                        lsm->lsm_magic, LOV_MAGIC);
913                 GOTO(out, rc = -EINVAL);
914         }
915
916         if (!export || !export->exp_obd)
917                 GOTO(out, rc = -ENODEV);
918
919         lov = &export->exp_obd->u.lov;
920
921         OBD_ALLOC (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
922         if (obdos == NULL)
923                 GOTO (out, rc = -ENOMEM);
924
925         if (oa->o_valid & OBD_MD_FLHANDLE)
926                 lfh = lov_handle2lfh(obdo_handle(oa));
927
928         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
929                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
930         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
931                 int err;
932
933                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
934                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
935                         /* leaves obdos[i].obd_valid unset */
936                         continue;
937                 }
938
939                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
940                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
941                 /* create data objects with "parent" OA */
942                 memcpy(&obdos[i], oa, sizeof(obdos[i]));
943                 obdos[i].o_id = loi->loi_id;
944                 if (lfh)
945                         memcpy(obdo_handle(&obdos[i]), lfh->lfh_och + i,
946                                FD_OSTDATA_SIZE);
947                 else
948                         obdos[i].o_valid &= ~OBD_MD_FLHANDLE;
949
950                 err = obd_getattr_async (&lov->tgts[loi->loi_ost_idx].conn,
951                                          &obdos[i], NULL, rqset);
952                 if (err) {
953                         CERROR("error: getattr objid "LPX64" subobj "
954                                LPX64" on OST idx %d: rc = %d\n",
955                                oa->o_id, loi->loi_id, loi->loi_ost_idx,
956                                err);
957                         GOTO(out_obdos, rc = err);
958                 }
959                 set = 1;
960         }
961         if (!set)
962                 GOTO (out_obdos, rc = -EIO);
963
964         LASSERT (rqset->set_interpret == NULL);
965         rqset->set_interpret = lov_getattr_interpret;
966         LASSERT (sizeof (rqset->set_args) >= sizeof (*aa));
967         aa = (struct lov_getattr_async_args *)&rqset->set_args;
968         aa->aa_lsm = lsm;
969         aa->aa_oa = oa;
970         aa->aa_stripe_oas = obdos;
971         GOTO (out, rc = 0);
972
973  out_obdos:
974         OBD_FREE (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
975  out:
976         if (lfh != NULL)
977                 lov_lfh_put(lfh);
978         class_export_put(export);
979         RETURN (rc);
980 }
981
982 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
983                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
984 {
985         struct obdo *tmp;
986         struct obd_export *export = class_conn2export(conn);
987         struct lov_obd *lov;
988         struct lov_oinfo *loi;
989         struct lov_file_handles *lfh = NULL;
990         int rc = 0, i, set = 0;
991         ENTRY;
992
993         if (!lsm) {
994                 CERROR("LOV requires striping ea\n");
995                 GOTO(out, rc = -EINVAL);
996         }
997
998         if (lsm->lsm_magic != LOV_MAGIC) {
999                 CERROR("LOV striping magic bad %#x != %#x\n",
1000                        lsm->lsm_magic, LOV_MAGIC);
1001                 GOTO(out, rc = -EINVAL);
1002         }
1003
1004         if (!export || !export->exp_obd)
1005                 GOTO(out, rc = -ENODEV);
1006
1007         /* size changes should go through punch and not setattr */
1008         LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
1009
1010         /* for now, we only expect mtime updates here */
1011         LASSERT(!(oa->o_valid & ~(OBD_MD_FLID |OBD_MD_FLTYPE |OBD_MD_FLMTIME)));
1012
1013         tmp = obdo_alloc();
1014         if (!tmp)
1015                 GOTO(out, rc = -ENOMEM);
1016
1017         if (oa->o_valid & OBD_MD_FLHANDLE)
1018                 lfh = lov_handle2lfh(obdo_handle(oa));
1019
1020         lov = &export->exp_obd->u.lov;
1021         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1022                 int err;
1023
1024                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1025                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1026                         continue;
1027                 }
1028
1029                 obdo_cpy_md(tmp, oa, oa->o_valid);
1030
1031                 if (lfh)
1032                         memcpy(obdo_handle(tmp), lfh->lfh_och + i,
1033                                FD_OSTDATA_SIZE);
1034                 else
1035                         tmp->o_valid &= ~OBD_MD_FLHANDLE;
1036
1037                 tmp->o_id = loi->loi_id;
1038
1039                 err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp,
1040                                   NULL, NULL);
1041                 if (err) {
1042                         if (lov->tgts[loi->loi_ost_idx].active) {
1043                                 CERROR("error: setattr objid "LPX64" subobj "
1044                                        LPX64" on OST idx %d: rc = %d\n",
1045                                        oa->o_id, loi->loi_id, loi->loi_ost_idx,
1046                                        err);
1047                                 if (!rc)
1048                                         rc = err;
1049                         }
1050                 } else
1051                         set = 1;
1052         }
1053         obdo_free(tmp);
1054         if (!set && !rc)
1055                 rc = -EIO;
1056         if (lfh != NULL)
1057                 lov_lfh_put(lfh);
1058         GOTO(out, rc);
1059  out:
1060         class_export_put(export);
1061         return rc;
1062 }
1063
1064 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
1065                     struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1066                     struct obd_client_handle *och)
1067 {
1068         struct obdo *tmp; /* on the heap here, on the stack in lov_close? */
1069         struct obd_export *export = class_conn2export(conn);
1070         struct lov_obd *lov;
1071         struct lov_oinfo *loi;
1072         struct lov_file_handles *lfh = NULL;
1073         int set = 0, rc = 0, i;
1074         ENTRY;
1075         LASSERT(och != NULL);
1076
1077         if (!lsm) {
1078                 CERROR("LOV requires striping ea for opening\n");
1079                 GOTO(out_exp, rc = -EINVAL);
1080         }
1081
1082         if (lsm->lsm_magic != LOV_MAGIC) {
1083                 CERROR("LOV striping magic bad %#x != %#x\n",
1084                        lsm->lsm_magic, LOV_MAGIC);
1085                 GOTO(out_exp, rc = -EINVAL);
1086         }
1087
1088         if (!export || !export->exp_obd)
1089                 GOTO(out_exp, rc = -ENODEV);
1090
1091         tmp = obdo_alloc();
1092         if (!tmp)
1093                 GOTO(out_exp, rc = -ENOMEM);
1094
1095         lfh = lov_lfh_new();
1096         if (lfh == NULL)
1097                 GOTO(out_tmp, rc = -ENOMEM);
1098         OBD_ALLOC(lfh->lfh_och, lsm->lsm_stripe_count * sizeof *och);
1099         if (!lfh->lfh_och)
1100                 GOTO(out_lfh, rc = -ENOMEM);
1101
1102         lov = &export->exp_obd->u.lov;
1103         oa->o_size = 0;
1104         oa->o_blocks = 0;
1105         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1106                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1107                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1108                         continue;
1109                 }
1110
1111                 /* create data objects with "parent" OA */
1112                 memcpy(tmp, oa, sizeof(*tmp));
1113                 tmp->o_id = loi->loi_id;
1114
1115                 rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp,
1116                               NULL, NULL, lfh->lfh_och + i);
1117                 if (rc) {
1118                         if (!lov->tgts[loi->loi_ost_idx].active) {
1119                                 rc = 0;
1120                                 continue;
1121                         }
1122                         CERROR("error: open objid "LPX64" subobj "LPX64
1123                                " on OST idx %d: rc = %d\n",
1124                                oa->o_id, lsm->lsm_oinfo[i].loi_id,
1125                                loi->loi_ost_idx, rc);
1126                         goto out_handles;
1127                 }
1128
1129                 lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &set);
1130         }
1131
1132         lfh->lfh_count = lsm->lsm_stripe_count;
1133         och->och_fh.cookie = lfh->lfh_handle.h_cookie;
1134         obdo_handle(oa)->cookie = lfh->lfh_handle.h_cookie;
1135         oa->o_valid |= OBD_MD_FLHANDLE;
1136
1137         /* llfh refcount transfers to list */
1138         spin_lock(&export->exp_lov_data.led_lock);
1139         list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
1140         spin_unlock(&export->exp_lov_data.led_lock);
1141
1142         GOTO(out_tmp, rc);
1143  out_tmp:
1144         obdo_free(tmp);
1145  out_exp:
1146         class_export_put(export);
1147         return rc;
1148
1149  out_handles:
1150         for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) {
1151                 int err;
1152
1153                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1154                         continue;
1155
1156                 memcpy(tmp, oa, sizeof(*tmp));
1157                 tmp->o_id = loi->loi_id;
1158                 memcpy(obdo_handle(tmp), lfh->lfh_och + i, FD_OSTDATA_SIZE);
1159
1160                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp,
1161                                 NULL, NULL);
1162                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1163                         CERROR("error: closing objid "LPX64" subobj "LPX64
1164                                " on OST idx %d after open error: rc=%d\n",
1165                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
1166                 }
1167         }
1168
1169         OBD_FREE(lfh->lfh_och, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
1170  out_lfh:
1171         lov_lfh_destroy(lfh);
1172         lov_lfh_put(lfh);
1173         goto out_tmp;
1174 }
1175
1176 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
1177                      struct lov_stripe_md *lsm, struct obd_trans_info *oti)
1178 {
1179         struct obdo tmp;
1180         struct obd_export *export = class_conn2export(conn);
1181         struct lov_obd *lov;
1182         struct lov_oinfo *loi;
1183         struct lov_file_handles *lfh = NULL;
1184         int rc = 0, i;
1185         ENTRY;
1186
1187         if (!lsm) {
1188                 CERROR("LOV requires striping ea\n");
1189                 GOTO(out, rc = -EINVAL);
1190         }
1191
1192         if (lsm->lsm_magic != LOV_MAGIC) {
1193                 CERROR("LOV striping magic bad %#x != %#x\n",
1194                        lsm->lsm_magic, LOV_MAGIC);
1195                 GOTO(out, rc = -EINVAL);
1196         }
1197
1198         if (!export || !export->exp_obd)
1199                 GOTO(out, rc = -ENODEV);
1200
1201         if (oa->o_valid & OBD_MD_FLHANDLE)
1202                 lfh = lov_handle2lfh(obdo_handle(oa));
1203
1204         lov = &export->exp_obd->u.lov;
1205         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1206                 int err;
1207
1208                 /* create data objects with "parent" OA */
1209                 memcpy(&tmp, oa, sizeof(tmp));
1210                 tmp.o_id = loi->loi_id;
1211                 if (lfh)
1212                         memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
1213                                FD_OSTDATA_SIZE);
1214                 else
1215                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
1216
1217                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
1218                                 NULL, NULL);
1219                 if (err) {
1220                         if (lov->tgts[loi->loi_ost_idx].active) {
1221                                 CERROR("error: close objid "LPX64" subobj "LPX64
1222                                        " on OST idx %d: rc = %d\n", oa->o_id,
1223                                        loi->loi_id, loi->loi_ost_idx, err);
1224                         }
1225                         if (!rc)
1226                                 rc = err;
1227                 }
1228         }
1229         if (lfh != NULL) {
1230                 spin_lock(&export->exp_lov_data.led_lock);
1231                 list_del(&lfh->lfh_list);
1232                 spin_unlock(&export->exp_lov_data.led_lock);
1233                 lov_lfh_put(lfh); /* drop the reference owned by the list */
1234
1235                 OBD_FREE(lfh->lfh_och, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
1236                 lov_lfh_destroy(lfh);
1237                 lov_lfh_put(lfh); /* balance handle2lfh above */
1238         }
1239         GOTO(out, rc);
1240  out:
1241         class_export_put(export);
1242         return rc;
1243 }
1244
1245 #ifndef log2
1246 #define log2(n) ffz(~(n))
1247 #endif
1248
1249 /* we have an offset in file backed by an lov and want to find out where
1250  * that offset lands in our given stripe of the file.  for the easy
1251  * case where the offset is within the stripe, we just have to scale the
1252  * offset down to make it relative to the stripe instead of the lov.
1253  *
1254  * the harder case is what to do when the offset doesn't intersect the
1255  * stripe.  callers will want start offsets clamped ahead to the start
1256  * of the nearest stripe in the file.  end offsets similarly clamped to the
1257  * nearest ending byte of a stripe in the file:
1258  *
1259  * all this function does is move offsets to the nearest region of the
1260  * stripe, and it does its work "mod" the full length of all the stripes.
1261  * consider a file with 3 stripes:
1262  *
1263  *             S                                              E
1264  * ---------------------------------------------------------------------
1265  * |    0    |     1     |     2     |    0    |     1     |     2     |
1266  * ---------------------------------------------------------------------
1267  *
1268  * to find stripe 1's offsets for S and E, it divides by the full stripe
1269  * width and does its math in the context of a single set of stripes:
1270  *
1271  *             S         E
1272  * -----------------------------------
1273  * |    0    |     1     |     2     |
1274  * -----------------------------------
1275  *
1276  * it'll notice that E is outside stripe 1 and clamp it to the end of the
1277  * stripe, then multiply it back out by lov_off to give the real offsets in
1278  * the stripe:
1279  *
1280  *   S                   E
1281  * ---------------------------------------------------------------------
1282  * |    1    |     1     |     1     |    1    |     1     |     1     |
1283  * ---------------------------------------------------------------------
1284  *
1285  * it would have done similarly and pulled S forward to the start of a 1
1286  * stripe if, say, S had landed in a 0 stripe.
1287  *
1288  * this rounding isn't always correct.  consider an E lov offset that lands
1289  * on a 0 stripe, the "mod stripe width" math will pull it forward to the
1290  * start of a 1 stripe, when in fact it wanted to be rounded back to the end
1291  * of a previous 1 stripe.  this logic is handled by callers and this is why:
1292  *
1293  * this function returns < 0 when the offset was "before" the stripe and
1294  * was moved forward to the start of the stripe in question;  0 when it
1295  * falls in the stripe and no shifting was done; > 0 when the offset
1296  * was outside the stripe and was pulled back to its final byte. */
1297 static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
1298                              int stripeno, obd_off *obd_off)
1299 {
1300         unsigned long ssize  = lsm->lsm_stripe_size;
1301         unsigned long swidth = ssize * lsm->lsm_stripe_count;
1302         unsigned long stripe_off, this_stripe;
1303         int ret = 0;
1304
1305         if (lov_off == OBD_OBJECT_EOF) {
1306                 *obd_off = OBD_OBJECT_EOF;
1307                 return 0;
1308         }
1309
1310         /* do_div(a, b) returns a % b, and a = a / b */
1311         stripe_off = do_div(lov_off, swidth);
1312
1313         this_stripe = stripeno * ssize;
1314         if (stripe_off < this_stripe) {
1315                 stripe_off = 0;
1316                 ret = -1;
1317         } else {
1318                 stripe_off -= this_stripe;
1319
1320                 if (stripe_off >= ssize) {
1321                         stripe_off = ssize;
1322                         ret = 1;
1323                 }
1324         }
1325
1326         *obd_off = lov_off * ssize + stripe_off;
1327         return ret;
1328 }
1329
1330 /* given an extent in an lov and a stripe, calculate the extent of the stripe
1331  * that is contained within the lov extent.  this returns true if the given
1332  * stripe does intersect with the lov extent. */
1333 static int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno,
1334                                  obd_off start, obd_off end,
1335                                  obd_off *obd_start, obd_off *obd_end)
1336 {
1337         int start_side, end_side;
1338
1339         start_side = lov_stripe_offset(lsm, start, stripeno, obd_start);
1340         end_side = lov_stripe_offset(lsm, end, stripeno, obd_end);
1341
1342         CDEBUG(D_INODE, "["LPU64"->"LPU64"] -> [(%d) "LPU64"->"LPU64" (%d)]\n",
1343                start, end, start_side, *obd_start, *obd_end, end_side);
1344
1345         /* this stripe doesn't intersect the file extent when neither
1346          * start or the end intersected the stripe and obd_start and
1347          * obd_end got rounded up to the save value. */
1348         if (start_side != 0 && end_side != 0 && *obd_start == *obd_end)
1349                 return 0;
1350
1351         /* as mentioned in the lov_stripe_offset commentary, end
1352          * might have been shifted in the wrong direction.  This
1353          * happens when an end offset is before the stripe when viewed
1354          * through the "mod stripe size" math. we detect it being shifted
1355          * in the wrong direction and touch it up.
1356          * interestingly, this can't underflow since end must be > start
1357          * if we passed through the previous check.
1358          * (should we assert for that somewhere?) */
1359         if (end_side != 0)
1360                 (*obd_end)--;
1361
1362         return 1;
1363 }
1364
1365 /* compute which stripe number "lov_off" will be written into */
1366 static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
1367 {
1368         unsigned long ssize  = lsm->lsm_stripe_size;
1369         unsigned long swidth = ssize * lsm->lsm_stripe_count;
1370         unsigned long stripe_off;
1371
1372         stripe_off = do_div(lov_off, swidth);
1373
1374         return stripe_off / ssize;
1375 }
1376
1377 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1378  * we can send this 'punch' to just the authoritative node and the nodes
1379  * that the punch will affect. */
1380 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
1381                      struct lov_stripe_md *lsm,
1382                      obd_off start, obd_off end, struct obd_trans_info *oti)
1383 {
1384         struct obdo tmp;
1385         struct obd_export *export = class_conn2export(conn);
1386         struct lov_obd *lov;
1387         struct lov_oinfo *loi;
1388         struct lov_file_handles *lfh = NULL;
1389         int rc = 0, i;
1390         ENTRY;
1391
1392         if (!lsm) {
1393                 CERROR("LOV requires striping ea\n");
1394                 GOTO(out, rc = -EINVAL);
1395         }
1396
1397         if (lsm->lsm_magic != LOV_MAGIC) {
1398                 CERROR("LOV striping magic bad %#x != %#x\n",
1399                        lsm->lsm_magic, LOV_MAGIC);
1400                 GOTO(out, rc = -EINVAL);
1401         }
1402
1403         if (!export || !export->exp_obd)
1404                 GOTO(out, rc = -ENODEV);
1405
1406         if (oa->o_valid & OBD_MD_FLHANDLE)
1407                 lfh = lov_handle2lfh(obdo_handle(oa));
1408
1409         lov = &export->exp_obd->u.lov;
1410         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1411                 obd_off starti, endi;
1412                 int err;
1413
1414                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1415                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1416                         continue;
1417                 }
1418
1419                 if (!lov_stripe_intersects(lsm, i, start, end, &starti, &endi))
1420                         continue;
1421
1422                 /* create data objects with "parent" OA */
1423                 memcpy(&tmp, oa, sizeof(tmp));
1424                 tmp.o_id = loi->loi_id;
1425                 if (lfh)
1426                         memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
1427                                FD_OSTDATA_SIZE);
1428                 else
1429                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
1430
1431                 err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL,
1432                                 starti, endi, NULL);
1433                 if (err) {
1434                         if (lov->tgts[loi->loi_ost_idx].active) {
1435                                 CERROR("error: punch objid "LPX64" subobj "LPX64
1436                                        " on OST idx %d: rc = %d\n", oa->o_id,
1437                                        loi->loi_id, loi->loi_ost_idx, err);
1438                         }
1439                         if (!rc)
1440                                 rc = err;
1441                 }
1442         }
1443         if (lfh != NULL)
1444                 lov_lfh_put(lfh);
1445         GOTO(out, rc);
1446  out:
1447         class_export_put(export);
1448         return rc;
1449 }
1450
1451 static int lov_brw_check(struct lov_obd *lov, struct lov_stripe_md *lsm,
1452                          obd_count oa_bufs, struct brw_page *pga)
1453 {
1454         int i;
1455
1456         /* The caller just wants to know if there's a chance that this
1457          * I/O can succeed */
1458         for (i = 0; i < oa_bufs; i++) {
1459                 int stripe = lov_stripe_number(lsm, pga[i].off);
1460                 int ost = lsm->lsm_oinfo[stripe].loi_ost_idx;
1461                 struct ldlm_extent ext, subext;
1462                 ext.start = pga[i].off;
1463                 ext.start = pga[i].off + pga[i].count;
1464
1465                 if (!lov_stripe_intersects(lsm, i, ext.start, ext.end,
1466                                            &subext.start, &subext.end))
1467                         continue;
1468
1469                 if (lov->tgts[ost].active == 0) {
1470                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1471                         return -EIO;
1472                 }
1473         }
1474         return 0;
1475 }
1476
1477 static int lov_brw(int cmd, struct lustre_handle *conn,
1478                    struct lov_stripe_md *lsm, obd_count oa_bufs,
1479                    struct brw_page *pga, struct obd_trans_info *oti)
1480 {
1481         struct {
1482                 int bufct;
1483                 int index;
1484                 int subcount;
1485                 struct lov_stripe_md lsm;
1486                 int ost_idx;
1487         } *stripeinfo, *si, *si_last;
1488         struct obd_export *export = class_conn2export(conn);
1489         struct lov_obd *lov;
1490         struct brw_page *ioarr;
1491         struct lov_oinfo *loi;
1492         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
1493         ENTRY;
1494
1495         if (!lsm) {
1496                 CERROR("LOV requires striping ea\n");
1497                 GOTO(out_exp, rc = -EINVAL);
1498         }
1499
1500         if (lsm->lsm_magic != LOV_MAGIC) {
1501                 CERROR("LOV striping magic bad %#x != %#x\n",
1502                        lsm->lsm_magic, LOV_MAGIC);
1503                 GOTO(out_exp, rc = -EINVAL);
1504         }
1505
1506         lov = &export->exp_obd->u.lov;
1507
1508         if (cmd == OBD_BRW_CHECK) {
1509                 rc = lov_brw_check(lov, lsm, oa_bufs, pga);
1510                 GOTO(out_exp, rc);
1511         }
1512
1513         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1514         if (!stripeinfo)
1515                 GOTO(out_exp, rc = -ENOMEM);
1516
1517         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1518         if (!where)
1519                 GOTO(out_sinfo, rc = -ENOMEM);
1520
1521         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1522         if (!ioarr)
1523                 GOTO(out_where, rc = -ENOMEM);
1524
1525         for (i = 0; i < oa_bufs; i++) {
1526                 where[i] = lov_stripe_number(lsm, pga[i].off);
1527                 stripeinfo[where[i]].bufct++;
1528         }
1529
1530         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1531              i < stripe_count; i++, loi++, si_last = si, si++) {
1532                 if (i > 0)
1533                         si->index = si_last->index + si_last->bufct;
1534                 si->lsm.lsm_object_id = loi->loi_id;
1535                 si->ost_idx = loi->loi_ost_idx;
1536         }
1537
1538         for (i = 0; i < oa_bufs; i++) {
1539                 int which = where[i];
1540                 int shift;
1541
1542                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1543                 LASSERT(shift < oa_bufs);
1544                 ioarr[shift] = pga[i];
1545                 lov_stripe_offset(lsm, pga[i].off, which, &ioarr[shift].off);
1546                 stripeinfo[which].subcount++;
1547         }
1548
1549         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1550                 int shift = si->index;
1551
1552                 if (lov->tgts[si->ost_idx].active == 0) {
1553                         CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx);
1554                         GOTO(out_ioarr, rc = -EIO);
1555                 }
1556
1557                 if (si->bufct) {
1558                         LASSERT(shift < oa_bufs);
1559                         rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
1560                                      &si->lsm, si->bufct, &ioarr[shift],
1561                                      oti);
1562                         if (rc)
1563                                 GOTO(out_ioarr, rc);
1564                 }
1565         }
1566         GOTO(out_ioarr, rc);
1567  out_ioarr:
1568         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1569  out_where:
1570         OBD_FREE(where, sizeof(*where) * oa_bufs);
1571  out_sinfo:
1572         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1573  out_exp:
1574         class_export_put(export);
1575         return rc;
1576 }
1577
1578 static int lov_brw_interpret (struct ptlrpc_request_set *set,
1579                               struct lov_brw_async_args *aa, int rc)
1580 {
1581         obd_count        oa_bufs = aa->aa_oa_bufs;
1582         struct brw_page *ioarr = aa->aa_ioarr;
1583         ENTRY;
1584
1585         OBD_FREE (ioarr, sizeof (*ioarr) * oa_bufs);
1586         RETURN (rc);
1587 }
1588
1589 static int lov_brw_async(int cmd, struct lustre_handle *conn,
1590                          struct lov_stripe_md *lsm, obd_count oa_bufs,
1591                          struct brw_page *pga, struct ptlrpc_request_set *set,
1592                          struct obd_trans_info *oti)
1593 {
1594         struct {
1595                 int bufct;
1596                 int index;
1597                 int subcount;
1598                 struct lov_stripe_md lsm;
1599                 int ost_idx;
1600         } *stripeinfo, *si, *si_last;
1601         struct obd_export *export = class_conn2export(conn);
1602         struct lov_obd *lov;
1603         struct brw_page *ioarr;
1604         struct lov_oinfo *loi;
1605         struct lov_brw_async_args *aa;
1606         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
1607         ENTRY;
1608
1609         if (!lsm) {
1610                 CERROR("LOV requires striping ea\n");
1611                 GOTO(out_exp, rc = -EINVAL);
1612         }
1613
1614         if (lsm->lsm_magic != LOV_MAGIC) {
1615                 CERROR("LOV striping magic bad %#x != %#x\n",
1616                        lsm->lsm_magic, LOV_MAGIC);
1617                 GOTO(out_exp, rc = -EINVAL);
1618         }
1619
1620         lov = &export->exp_obd->u.lov;
1621
1622         if (cmd == OBD_BRW_CHECK) {
1623                 rc = lov_brw_check(lov, lsm, oa_bufs, pga);
1624                 GOTO(out_exp, rc);
1625         }
1626
1627         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1628         if (!stripeinfo)
1629                 GOTO(out_exp, rc = -ENOMEM);
1630
1631         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1632         if (!where)
1633                 GOTO(out_sinfo, rc = -ENOMEM);
1634
1635         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1636         if (!ioarr)
1637                 GOTO(out_where, rc = -ENOMEM);
1638
1639         for (i = 0; i < oa_bufs; i++) {
1640                 where[i] = lov_stripe_number(lsm, pga[i].off);
1641                 stripeinfo[where[i]].bufct++;
1642         }
1643
1644         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1645              i < stripe_count; i++, loi++, si_last = si, si++) {
1646                 if (i > 0)
1647                         si->index = si_last->index + si_last->bufct;
1648                 si->lsm.lsm_object_id = loi->loi_id;
1649                 si->ost_idx = loi->loi_ost_idx;
1650         }
1651
1652         for (i = 0; i < oa_bufs; i++) {
1653                 int which = where[i];
1654                 int shift;
1655
1656                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1657                 LASSERT(shift < oa_bufs);
1658                 ioarr[shift] = pga[i];
1659                 lov_stripe_offset(lsm, pga[i].off, which, &ioarr[shift].off);
1660                 stripeinfo[which].subcount++;
1661         }
1662
1663         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1664                 int shift = si->index;
1665
1666                 if (si->bufct == 0)
1667                         continue;
1668
1669                 if (lov->tgts[si->ost_idx].active == 0) {
1670                         CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx);
1671                         GOTO(out_ioarr, rc = -EIO);
1672                 }
1673
1674                 LASSERT(shift < oa_bufs);
1675                 rc = obd_brw_async(cmd, &lov->tgts[si->ost_idx].conn,
1676                                    &si->lsm, si->bufct, &ioarr[shift],
1677                                    set, oti);
1678                 if (rc)
1679                         GOTO(out_ioarr, rc);
1680         }
1681         LASSERT (rc == 0);
1682         LASSERT (set->set_interpret == NULL);
1683         set->set_interpret = lov_brw_interpret;
1684         LASSERT (sizeof (set->set_args) >= sizeof (struct lov_brw_async_args));
1685         aa = (struct lov_brw_async_args *)&set->set_args;
1686         aa->aa_oa_bufs = oa_bufs;
1687         aa->aa_ioarr = ioarr;
1688         GOTO(out_where, rc);
1689  out_ioarr:
1690         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1691  out_where:
1692         OBD_FREE(where, sizeof(*where) * oa_bufs);
1693  out_sinfo:
1694         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1695  out_exp:
1696         class_export_put(export);
1697         return rc;
1698 }
1699
1700 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1701                        struct lustre_handle *parent_lock,
1702                        __u32 type, void *cookie, int cookielen, __u32 mode,
1703                        int *flags, void *cb, void *data, int datalen,
1704                        struct lustre_handle *lockh)
1705 {
1706         struct obd_export *export = class_conn2export(conn);
1707         struct lov_lock_handles *lov_lockh = NULL;
1708         struct lustre_handle *lov_lockhp;
1709         struct lov_obd *lov;
1710         struct lov_oinfo *loi;
1711         struct lov_stripe_md submd;
1712         ldlm_error_t rc;
1713         int i;
1714         ENTRY;
1715
1716         if (!lsm) {
1717                 CERROR("LOV requires striping ea\n");
1718                 GOTO(out_exp, rc = -EINVAL);
1719         }
1720
1721         if (lsm->lsm_magic != LOV_MAGIC) {
1722                 CERROR("LOV striping magic bad %#x != %#x\n",
1723                        lsm->lsm_magic, LOV_MAGIC);
1724                 GOTO(out_exp, rc = -EINVAL);
1725         }
1726
1727         /* we should never be asked to replay a lock this way. */
1728         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1729
1730         if (!export || !export->exp_obd)
1731                 GOTO(out_exp, rc = -ENODEV);
1732
1733         if (lsm->lsm_stripe_count > 1) {
1734                 lov_lockh = lov_llh_new(lsm);
1735                 if (lov_lockh == NULL)
1736                         GOTO(out_exp, rc = -ENOMEM);
1737
1738                 lockh->cookie = lov_lockh->llh_handle.h_cookie;
1739                 lov_lockhp = lov_lockh->llh_handles;
1740         } else {
1741                 lov_lockhp = lockh;
1742         }
1743
1744         lov = &export->exp_obd->u.lov;
1745         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1746              i++, loi++, lov_lockhp++) {
1747                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
1748                 struct ldlm_extent sub_ext;
1749
1750                 *flags = 0;
1751                 if (!lov_stripe_intersects(lsm, i, extent->start, extent->end,
1752                                            &sub_ext.start, &sub_ext.end))
1753                         continue;
1754
1755                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1756                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1757                         continue;
1758                 }
1759
1760                 /* XXX LOV STACKING: submd should be from the subobj */
1761                 submd.lsm_object_id = loi->loi_id;
1762                 submd.lsm_stripe_count = 0;
1763                 /* XXX submd is not fully initialized here */
1764                 *flags = 0;
1765                 rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
1766                                   parent_lock, type, &sub_ext, sizeof(sub_ext),
1767                                   mode, flags, cb, data, datalen, lov_lockhp);
1768
1769                 // XXX add a lock debug statement here
1770                 if (rc != ELDLM_OK) {
1771                         memset(lov_lockhp, 0, sizeof(*lov_lockhp));
1772                         if (lov->tgts[loi->loi_ost_idx].active) {
1773                                 CERROR("error: enqueue objid "LPX64" subobj "
1774                                        LPX64" on OST idx %d: rc = %d\n",
1775                                        lsm->lsm_object_id, loi->loi_id,
1776                                        loi->loi_ost_idx, rc);
1777                                 goto out_locks;
1778                         }
1779                 }
1780         }
1781         if (lsm->lsm_stripe_count > 1)
1782                 lov_llh_put(lov_lockh);
1783         GOTO(out_exp, rc = ELDLM_OK);
1784
1785  out_locks:
1786         while (loi--, lov_lockhp--, i-- > 0) {
1787                 struct lov_stripe_md submd;
1788                 int err;
1789
1790                 if (lov_lockhp->cookie == 0)
1791                         continue;
1792
1793                 /* XXX LOV STACKING: submd should be from the subobj */
1794                 submd.lsm_object_id = loi->loi_id;
1795                 submd.lsm_stripe_count = 0;
1796                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1797                                  mode, lov_lockhp);
1798                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1799                         CERROR("error: cancelling objid "LPX64" on OST "
1800                                "idx %d after enqueue error: rc = %d\n",
1801                                loi->loi_id, loi->loi_ost_idx, err);
1802                 }
1803         }
1804
1805         if (lsm->lsm_stripe_count > 1) {
1806                 lov_llh_destroy(lov_lockh);
1807                 lov_llh_put(lov_lockh);
1808         }
1809  out_exp:
1810         class_export_put(export);
1811         RETURN(rc);
1812 }
1813
1814 static int lov_match(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1815                        __u32 type, void *cookie, int cookielen, __u32 mode,
1816                        int *flags, struct lustre_handle *lockh)
1817 {
1818         struct obd_export *export = class_conn2export(conn);
1819         struct lov_lock_handles *lov_lockh = NULL;
1820         struct lustre_handle *lov_lockhp;
1821         struct lov_obd *lov;
1822         struct lov_oinfo *loi;
1823         struct lov_stripe_md submd;
1824         ldlm_error_t rc = 0;
1825         int i;
1826         ENTRY;
1827
1828         if (!lsm) {
1829                 CERROR("LOV requires striping ea\n");
1830                 GOTO(out_exp, rc = -EINVAL);
1831         }
1832
1833         if (lsm->lsm_magic != LOV_MAGIC) {
1834                 CERROR("LOV striping magic bad %#x != %#x\n",
1835                        lsm->lsm_magic, LOV_MAGIC);
1836                 GOTO(out_exp, rc = -EINVAL);
1837         }
1838
1839         if (!export || !export->exp_obd)
1840                 GOTO(out_exp, rc = -ENODEV);
1841
1842         if (lsm->lsm_stripe_count > 1) {
1843                 lov_lockh = lov_llh_new(lsm);
1844                 if (lov_lockh == NULL)
1845                         GOTO(out_exp, rc = -ENOMEM);
1846
1847                 lockh->cookie = lov_lockh->llh_handle.h_cookie;
1848                 lov_lockhp = lov_lockh->llh_handles;
1849         } else {
1850                 lov_lockhp = lockh;
1851         }
1852
1853         lov = &export->exp_obd->u.lov;
1854         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1855              i++, loi++, lov_lockhp++) {
1856                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
1857                 struct ldlm_extent sub_ext;
1858                 int lov_flags;
1859
1860                 if (!lov_stripe_intersects(lsm, i, extent->start, extent->end,
1861                                            &sub_ext.start, &sub_ext.end))
1862                         continue;
1863
1864                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1865                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1866                         rc = -EIO;
1867                         break;
1868                 }
1869
1870                 /* XXX LOV STACKING: submd should be from the subobj */
1871                 submd.lsm_object_id = loi->loi_id;
1872                 submd.lsm_stripe_count = 0;
1873                 lov_flags = *flags;
1874                 /* XXX submd is not fully initialized here */
1875                 rc = obd_match(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
1876                                type, &sub_ext, sizeof(sub_ext), mode,
1877                                &lov_flags, lov_lockhp);
1878                 if (rc != 1)
1879                         break;
1880         }
1881         if (rc == 1) {
1882                 if (lsm->lsm_stripe_count > 1)
1883                         lov_llh_put(lov_lockh);
1884                 GOTO(out_exp, 1);
1885         }
1886
1887         while (loi--, lov_lockhp--, i-- > 0) {
1888                 struct lov_stripe_md submd;
1889                 int err;
1890
1891                 if (lov_lockhp->cookie == 0)
1892                         continue;
1893
1894                 /* XXX LOV STACKING: submd should be from the subobj */
1895                 submd.lsm_object_id = loi->loi_id;
1896                 submd.lsm_stripe_count = 0;
1897                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1898                                  mode, lov_lockhp);
1899                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1900                         CERROR("error: cancelling objid "LPX64" on OST "
1901                                "idx %d after match failure: rc = %d\n",
1902                                loi->loi_id, loi->loi_ost_idx, err);
1903                 }
1904         }
1905
1906         if (lsm->lsm_stripe_count > 1) {
1907                 lov_llh_destroy(lov_lockh);
1908                 lov_llh_put(lov_lockh);
1909         }
1910  out_exp:
1911         class_export_put(export);
1912         RETURN(rc);
1913 }
1914
1915 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1916                       __u32 mode, struct lustre_handle *lockh)
1917 {
1918         struct obd_export *export = class_conn2export(conn);
1919         struct lov_lock_handles *lov_lockh = NULL;
1920         struct lustre_handle *lov_lockhp;
1921         struct lov_obd *lov;
1922         struct lov_oinfo *loi;
1923         int rc = 0, i;
1924         ENTRY;
1925
1926         if (!lsm) {
1927                 CERROR("LOV requires striping ea\n");
1928                 GOTO(out, rc = -EINVAL);
1929         }
1930
1931         if (lsm->lsm_magic != LOV_MAGIC) {
1932                 CERROR("LOV striping magic bad %#x != %#x\n",
1933                        lsm->lsm_magic, LOV_MAGIC);
1934                 GOTO(out, rc = -EINVAL);
1935         }
1936
1937         if (!export || !export->exp_obd)
1938                 GOTO(out, rc = -ENODEV);
1939
1940         LASSERT(lockh);
1941         if (lsm->lsm_stripe_count > 1) {
1942                 lov_lockh = lov_handle2llh(lockh);
1943                 if (!lov_lockh) {
1944                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
1945                         GOTO(out, rc = -EINVAL);
1946                 }
1947
1948                 lov_lockhp = lov_lockh->llh_handles;
1949         } else {
1950                 lov_lockhp = lockh;
1951         }
1952
1953         lov = &export->exp_obd->u.lov;
1954         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1955              i++, loi++, lov_lockhp++) {
1956                 struct lov_stripe_md submd;
1957                 int err;
1958
1959                 if (lov_lockhp->cookie == 0) {
1960                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
1961                                loi->loi_ost_idx, loi->loi_id);
1962                         continue;
1963                 }
1964
1965                 /* XXX LOV STACKING: submd should be from the subobj */
1966                 submd.lsm_object_id = loi->loi_id;
1967                 submd.lsm_stripe_count = 0;
1968                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1969                                  mode, lov_lockhp);
1970                 if (err) {
1971                         if (lov->tgts[loi->loi_ost_idx].active) {
1972                                 CERROR("error: cancel objid "LPX64" subobj "
1973                                        LPX64" on OST idx %d: rc = %d\n",
1974                                        lsm->lsm_object_id,
1975                                        loi->loi_id, loi->loi_ost_idx, err);
1976                                 if (!rc)
1977                                         rc = err;
1978                         }
1979                 }
1980         }
1981
1982         if (lsm->lsm_stripe_count > 1)
1983                 lov_llh_destroy(lov_lockh);
1984         if (lov_lockh != NULL)
1985                 lov_llh_put(lov_lockh);
1986         GOTO(out, rc);
1987  out:
1988         class_export_put(export);
1989         return rc;
1990 }
1991
1992 static int lov_cancel_unused(struct lustre_handle *conn,
1993                              struct lov_stripe_md *lsm, int flags, void *opaque)
1994 {
1995         struct obd_export *export = class_conn2export(conn);
1996         struct lov_obd *lov;
1997         struct lov_oinfo *loi;
1998         int rc = 0, i;
1999         ENTRY;
2000
2001         if (!lsm) {
2002                 CERROR("LOV requires striping ea for lock cancellation\n");
2003                 GOTO(out, rc = -EINVAL);
2004         }
2005
2006         if (!export || !export->exp_obd)
2007                 GOTO(out, rc = -ENODEV);
2008
2009         lov = &export->exp_obd->u.lov;
2010         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
2011                 struct lov_stripe_md submd;
2012                 int err;
2013
2014                 if (lov->tgts[loi->loi_ost_idx].active == 0)
2015                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
2016
2017                 submd.lsm_object_id = loi->loi_id;
2018                 submd.lsm_stripe_count = 0;
2019                 err = obd_cancel_unused(&lov->tgts[loi->loi_ost_idx].conn,
2020                                         &submd, flags, opaque);
2021                 if (err && lov->tgts[loi->loi_ost_idx].active) {
2022                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
2023                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
2024                                loi->loi_id, loi->loi_ost_idx, err);
2025                         if (!rc)
2026                                 rc = err;
2027                 }
2028         }
2029         GOTO(out, rc);
2030  out:
2031         class_export_put(export);
2032         return rc;
2033 }
2034
2035 #define LOV_U64_MAX ((__u64)~0ULL)
2036 #define LOV_SUM_MAX(tot, add)                                           \
2037         do {                                                            \
2038                 if ((tot) + (add) < (tot))                              \
2039                         (tot) = LOV_U64_MAX;                            \
2040                 else                                                    \
2041                         (tot) += (add);                                 \
2042         } while(0)
2043
2044 static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
2045 {
2046         struct obd_export *export = class_conn2export(conn);
2047         struct lov_obd *lov;
2048         struct obd_statfs lov_sfs;
2049         int set = 0;
2050         int rc = 0;
2051         int i;
2052         ENTRY;
2053
2054         if (!export || !export->exp_obd)
2055                 GOTO(out, rc = -ENODEV);
2056
2057         lov = &export->exp_obd->u.lov;
2058
2059         /* We only get block data from the OBD */
2060         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2061                 int err;
2062
2063                 if (!lov->tgts[i].active) {
2064                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
2065                         continue;
2066                 }
2067
2068                 err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
2069                 if (err) {
2070                         if (lov->tgts[i].active) {
2071                                 CERROR("error: statfs OSC %s on OST idx %d: "
2072                                        "err = %d\n",
2073                                        lov->tgts[i].uuid.uuid, i, err);
2074                                 if (!rc)
2075                                         rc = err;
2076                         }
2077                         continue;
2078                 }
2079                 if (!set) {
2080                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
2081                         set = 1;
2082                 } else {
2083                         osfs->os_bfree += lov_sfs.os_bfree;
2084                         osfs->os_bavail += lov_sfs.os_bavail;
2085                         osfs->os_blocks += lov_sfs.os_blocks;
2086                         /* XXX not sure about this one - depends on policy.
2087                          *   - could be minimum if we always stripe on all OBDs
2088                          *     (but that would be wrong for any other policy,
2089                          *     if one of the OBDs has no more objects left)
2090                          *   - could be sum if we stripe whole objects
2091                          *   - could be average, just to give a nice number
2092                          *
2093                          * To give a "reasonable" (if not wholly accurate)
2094                          * number, we divide the total number of free objects
2095                          * by expected stripe count (watch out for overflow).
2096                          */
2097                         LOV_SUM_MAX(osfs->os_files, lov_sfs.os_files);
2098                         LOV_SUM_MAX(osfs->os_ffree, lov_sfs.os_ffree);
2099                 }
2100         }
2101         if (set) {
2102                 __u32 expected_stripes = lov->desc.ld_default_stripe_count ?
2103                                          lov->desc.ld_default_stripe_count :
2104                                          lov->desc.ld_active_tgt_count;
2105
2106                 if (osfs->os_files != LOV_U64_MAX)
2107                         do_div(osfs->os_files, expected_stripes);
2108                 if (osfs->os_ffree != LOV_U64_MAX)
2109                         do_div(osfs->os_ffree, expected_stripes);
2110         } else if (!rc)
2111                 rc = -EIO;
2112         GOTO(out, rc);
2113  out:
2114         class_export_put(export);
2115         return rc;
2116 }
2117
2118 static int lov_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
2119                          void *karg, void *uarg)
2120 {
2121         struct obd_device *obddev = class_conn2obd(conn);
2122         struct lov_obd *lov = &obddev->u.lov;
2123         int i, count = lov->desc.ld_tgt_count;
2124         struct obd_uuid *uuidp;
2125         int rc;
2126
2127         ENTRY;
2128
2129         switch (cmd) {
2130         case IOC_LOV_SET_OSC_ACTIVE: {
2131                 struct obd_ioctl_data *data = karg;
2132                 uuidp = (struct obd_uuid *)data->ioc_inlbuf1;
2133                 rc = lov_set_osc_active(lov, uuidp, data->ioc_offset);
2134                 break;
2135         }
2136         case OBD_IOC_LOV_GET_CONFIG: {
2137                 struct obd_ioctl_data *data = karg;
2138                 struct lov_tgt_desc *tgtdesc;
2139                 struct lov_desc *desc;
2140                 char *buf = NULL;
2141
2142                 buf = NULL;
2143                 len = 0;
2144                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2145                         RETURN(-EINVAL);
2146
2147                 data = (struct obd_ioctl_data *)buf;
2148
2149                 if (sizeof(*desc) > data->ioc_inllen1) {
2150                         OBD_FREE(buf, len);
2151                         RETURN(-EINVAL);
2152                 }
2153
2154                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
2155                         OBD_FREE(buf, len);
2156                         RETURN(-EINVAL);
2157                 }
2158
2159                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2160                 memcpy(desc, &(lov->desc), sizeof(*desc));
2161
2162                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
2163                 tgtdesc = lov->tgts;
2164                 for (i = 0; i < count; i++, uuidp++, tgtdesc++)
2165                         obd_str2uuid(uuidp, tgtdesc->uuid.uuid);
2166
2167                 rc = copy_to_user((void *)uarg, buf, len);
2168                 if (rc)
2169                         rc = -EFAULT;
2170                 obd_ioctl_freedata(buf, len);
2171                 break;
2172         }
2173         case LL_IOC_LOV_SETSTRIPE:
2174                 rc = lov_setstripe(conn, karg, uarg);
2175                 break;
2176         case LL_IOC_LOV_GETSTRIPE:
2177                 rc = lov_getstripe(conn, karg, uarg);
2178                 break;
2179         default: {
2180                 int set = 0;
2181                 if (count == 0)
2182                         RETURN(-ENOTTY);
2183                 rc = 0;
2184                 for (i = 0; i < count; i++) {
2185                         int err;
2186
2187                         err = obd_iocontrol(cmd, &lov->tgts[i].conn,
2188                                             len, karg, uarg);
2189                         if (err) {
2190                                 if (lov->tgts[i].active) {
2191                                         CERROR("error: iocontrol OSC %s on OST"
2192                                                "idx %d: err = %d\n",
2193                                                lov->tgts[i].uuid.uuid, i, err);
2194                                         if (!rc)
2195                                                 rc = err;
2196                                 }
2197                         } else
2198                                 set = 1;
2199                 }
2200                 if (!set && !rc)
2201                         rc = -EIO;
2202         }
2203         }
2204
2205         RETURN(rc);
2206 }
2207
2208 static int lov_get_info(struct lustre_handle *conn, __u32 keylen,
2209                         void *key, __u32 *vallen, void *val)
2210 {
2211         struct obd_device *obddev = class_conn2obd(conn);
2212         struct lov_obd *lov = &obddev->u.lov;
2213         int i;
2214         ENTRY;
2215
2216         if (!vallen || !val)
2217                 RETURN(-EFAULT);
2218
2219         if (keylen > strlen("lock_to_stripe") &&
2220             strcmp(key, "lock_to_stripe") == 0) {
2221                 struct {
2222                         char name[16];
2223                         struct ldlm_lock *lock;
2224                         struct lov_stripe_md *lsm;
2225                 } *data = key;
2226                 __u32 *stripe = val;
2227                 struct lov_oinfo *loi;
2228
2229                 if (*vallen < sizeof(*stripe))
2230                         RETURN(-EFAULT);
2231                 *vallen = sizeof(*stripe);
2232
2233                 /* XXX This is another one of those bits that will need to
2234                  * change if we ever actually support nested LOVs.  It uses
2235                  * the lock's connection to find out which stripe it is. */
2236                 for (i = 0, loi = data->lsm->lsm_oinfo;
2237                      i < data->lsm->lsm_stripe_count;
2238                      i++, loi++) {
2239                         if (lov->tgts[loi->loi_ost_idx].conn.cookie ==
2240                             data->lock->l_connh->cookie) {
2241                                 *stripe = i;
2242                                 RETURN(0);
2243                         }
2244                 }
2245                 RETURN(-ENXIO);
2246         }
2247
2248         RETURN(-EINVAL);
2249 }
2250
2251 struct obd_ops lov_obd_ops = {
2252         o_owner:       THIS_MODULE,
2253         o_attach:      lov_attach,
2254         o_detach:      lov_detach,
2255         o_setup:       lov_setup,
2256         o_connect:     lov_connect,
2257         o_disconnect:  lov_disconnect,
2258         o_statfs:      lov_statfs,
2259         o_packmd:      lov_packmd,
2260         o_unpackmd:    lov_unpackmd,
2261         o_create:      lov_create,
2262         o_destroy:     lov_destroy,
2263         o_getattr:     lov_getattr,
2264         o_getattr_async: lov_getattr_async,
2265         o_setattr:     lov_setattr,
2266         o_open:        lov_open,
2267         o_close:       lov_close,
2268         o_brw:         lov_brw,
2269         o_brw_async:   lov_brw_async,
2270         o_punch:       lov_punch,
2271         o_enqueue:     lov_enqueue,
2272         o_match:       lov_match,
2273         o_cancel:      lov_cancel,
2274         o_cancel_unused: lov_cancel_unused,
2275         o_iocontrol:   lov_iocontrol,
2276         o_get_info:    lov_get_info
2277 };
2278
2279 int __init lov_init(void)
2280 {
2281         struct lprocfs_static_vars lvars;
2282         int rc;
2283
2284         printk(KERN_INFO "Lustre Logical Object Volume driver; "
2285                "info@clusterfs.com\n");
2286         lprocfs_init_vars(&lvars);
2287         rc = class_register_type(&lov_obd_ops, lvars.module_vars,
2288                                  OBD_LOV_DEVICENAME);
2289         RETURN(rc);
2290 }
2291
2292 static void __exit lov_exit(void)
2293 {
2294         class_unregister_type(OBD_LOV_DEVICENAME);
2295 }
2296
2297 #ifdef __KERNEL__
2298 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2299 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
2300 MODULE_LICENSE("GPL");
2301
2302 module_init(lov_init);
2303 module_exit(lov_exit);
2304 #endif