Whamcloud - gitweb
Return an error from lov_create() if all OSCs are inactive.
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1  /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  * Author: Phil Schwan <phil@clusterfs.com>
6  *         Peter Braam <braam@clusterfs.com>
7  *         Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define EXPORT_SYMTAB
26 #define DEBUG_SUBSYSTEM S_LOV
27 #ifdef __KERNEL__
28 #include <linux/slab.h>
29 #include <linux/module.h>
30 #include <linux/init.h>
31 #include <linux/random.h>
32 #include <linux/slab.h>
33 #include <linux/pagemap.h>
34 #include <asm/div64.h>
35 #else
36 #include <liblustre.h>
37 #endif
38
39 #include <linux/obd_support.h>
40 #include <linux/lustre_lib.h>
41 #include <linux/lustre_net.h>
42 #include <linux/lustre_idl.h>
43 #include <linux/lustre_lite.h> /* for LL_IOC_LOV_[GS]ETSTRIPE */
44 #include <linux/lustre_mds.h>
45 #include <linux/obd_class.h>
46 #include <linux/obd_lov.h>
47 #include <linux/seq_file.h>
48 #include <linux/lprocfs_status.h>
49
50 #include "lov_internal.h"
51
52 static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
53                              int stripeno, obd_off *obd_off);
54
55 struct lov_file_handles {
56         struct portals_handle lfh_handle;
57         atomic_t lfh_refcount;
58         struct list_head lfh_list;
59         int lfh_count;
60         struct obd_client_handle *lfh_och;
61 };
62
63 struct lov_lock_handles {
64         struct portals_handle llh_handle;
65         atomic_t llh_refcount;
66         int llh_stripe_count;
67         struct lustre_handle llh_handles[0];
68 };
69
70 /* lov_file_handles helpers */
71 static void lov_lfh_addref(void *lfhp)
72 {
73         struct lov_file_handles *lfh = lfhp;
74
75         atomic_inc(&lfh->lfh_refcount);
76         CDEBUG(D_MALLOC, "GETting lfh %p : new refcount %d\n", lfh,
77                atomic_read(&lfh->lfh_refcount));
78 }
79
80 static struct lov_file_handles *lov_lfh_new(void)
81 {
82         struct lov_file_handles *lfh;
83
84         OBD_ALLOC(lfh, sizeof *lfh);
85         if (lfh == NULL) {
86                 CERROR("out of memory\n");
87                 return NULL;
88         }
89
90         atomic_set(&lfh->lfh_refcount, 2);
91
92         INIT_LIST_HEAD(&lfh->lfh_handle.h_link);
93         class_handle_hash(&lfh->lfh_handle, lov_lfh_addref);
94
95         return lfh;
96 }
97
98 static struct lov_file_handles *lov_handle2lfh(struct lustre_handle *handle)
99 {
100         ENTRY;
101         LASSERT(handle != NULL);
102         RETURN(class_handle2object(handle->cookie));
103 }
104
105 static void lov_lfh_put(struct lov_file_handles *lfh)
106 {
107         CDEBUG(D_MALLOC, "PUTting lfh %p : new refcount %d\n", lfh,
108                atomic_read(&lfh->lfh_refcount) - 1);
109         LASSERT(atomic_read(&lfh->lfh_refcount) > 0 &&
110                 atomic_read(&lfh->lfh_refcount) < 0x5a5a);
111         if (atomic_dec_and_test(&lfh->lfh_refcount)) {
112                 LASSERT(list_empty(&lfh->lfh_handle.h_link));
113                 OBD_FREE(lfh, sizeof *lfh);
114         }
115 }
116
117 static void lov_lfh_destroy(struct lov_file_handles *lfh)
118 {
119         class_handle_unhash(&lfh->lfh_handle);
120         lov_lfh_put(lfh);
121 }
122
123 static void lov_llh_addref(void *llhp)
124 {
125         struct lov_lock_handles *llh = llhp;
126
127         atomic_inc(&llh->llh_refcount);
128         CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
129                atomic_read(&llh->llh_refcount));
130 }
131
132 static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
133 {
134         struct lov_lock_handles *llh;
135
136         OBD_ALLOC(llh, sizeof *llh +
137                   sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
138         if (llh == NULL) {
139                 CERROR("out of memory\n");
140                 return NULL;
141         }
142         atomic_set(&llh->llh_refcount, 2);
143         llh->llh_stripe_count = lsm->lsm_stripe_count;
144         INIT_LIST_HEAD(&llh->llh_handle.h_link);
145         class_handle_hash(&llh->llh_handle, lov_llh_addref);
146         return llh;
147 }
148
149 static struct lov_lock_handles *lov_handle2llh(struct lustre_handle *handle)
150 {
151         ENTRY;
152         LASSERT(handle != NULL);
153         RETURN(class_handle2object(handle->cookie));
154 }
155
156 static void lov_llh_put(struct lov_lock_handles *llh)
157 {
158         CDEBUG(D_INFO, "PUTting llh %p : new refcount %d\n", llh,
159                atomic_read(&llh->llh_refcount) - 1);
160         LASSERT(atomic_read(&llh->llh_refcount) > 0 &&
161                 atomic_read(&llh->llh_refcount) < 0x5a5a);
162         if (atomic_dec_and_test(&llh->llh_refcount)) {
163                 LASSERT(list_empty(&llh->llh_handle.h_link));
164                 OBD_FREE(llh, sizeof *llh +
165                          sizeof(*llh->llh_handles) * llh->llh_stripe_count);
166         }
167 }
168
169 static void lov_llh_destroy(struct lov_lock_handles *llh)
170 {
171         class_handle_unhash(&llh->llh_handle);
172         lov_llh_put(llh);
173 }
174
175 /* obd methods */
176 int lov_attach(struct obd_device *dev, obd_count len, void *data)
177 {
178         struct lprocfs_static_vars lvars;
179         struct proc_dir_entry *entry;
180         int rc;
181
182         lprocfs_init_vars(lov, &lvars);
183         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
184         if (rc)
185                 return rc;
186
187         entry = create_proc_entry("target_obd", 0444, dev->obd_proc_entry);
188         if (entry == NULL)
189                 RETURN(-ENOMEM);
190         entry->proc_fops = &lov_proc_target_fops;
191         entry->data = dev;
192
193         return rc;
194 }
195
196 int lov_detach(struct obd_device *dev)
197 {
198         return lprocfs_obd_detach(dev);
199 }
200
201 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
202                        struct obd_uuid *cluuid)
203 {
204         struct ptlrpc_request *req = NULL;
205         struct lov_obd *lov = &obd->u.lov;
206         struct client_obd *mdc = &lov->mdcobd->u.cli;
207         struct lov_desc *desc = &lov->desc;
208         struct lov_desc *mdesc;
209         struct lov_tgt_desc *tgts;
210         struct obd_export *exp;
211         struct lustre_handle mdc_conn;
212         struct obd_uuid lov_mds_uuid = {"LOV_MDS_UUID"};
213         struct obd_uuid *uuids;
214         int rc, rc2, i;
215         ENTRY;
216
217         rc = class_connect(conn, obd, cluuid);
218         if (rc)
219                 RETURN(rc);
220
221         exp = class_conn2export(conn);
222         spin_lock_init(&exp->exp_lov_data.led_lock);
223         INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
224
225         /* We don't want to actually do the underlying connections more than
226          * once, so keep track. */
227         lov->refcount++;
228         if (lov->refcount > 1) {
229                 class_export_put(exp);
230                 RETURN(0);
231         }
232
233         /* retrieve LOV metadata from MDS */
234         rc = obd_connect(&mdc_conn, lov->mdcobd, &lov_mds_uuid);
235         if (rc) {
236                 CERROR("cannot connect to mdc: rc = %d\n", rc);
237                 GOTO(out_conn, rc);
238         }
239
240         rc = mdc_getlovinfo(obd, &mdc_conn, &req);
241         rc2 = obd_disconnect(&mdc_conn, 0);
242         if (rc) {
243                 CERROR("cannot get lov info %d\n", rc);
244                 GOTO(out_conn, rc);
245         }
246
247         if (rc2) {
248                 CERROR("error disconnecting from MDS %d\n", rc2);
249                 GOTO(out_req, rc = rc2);
250         }
251
252         /* mdc_getlovinfo() has checked and swabbed the reply.  It has also
253          * done some simple checks (e.g. #uuids consistent with desc, uuid
254          * array fits in LOV_MAX_UUID_BUFFER_SIZE and all uuids are
255          * terminated), but I still need to verify it makes overall
256          * sense */
257         mdesc = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*mdesc));
258         LASSERT(mdesc != NULL);
259         LASSERT_REPSWABBED(req, 0);
260
261         *desc = *mdesc;
262
263         /* XXX We need a separate LOV 'service' UUID from the client device
264          *     UUID so that we can mount more than once on a client */
265         if (!obd_uuid_equals(&obd->obd_uuid, &desc->ld_uuid)) {
266                 CERROR("LOV desc: uuid %s not on mds device (%s)\n",
267                        obd->obd_uuid.uuid, desc->ld_uuid.uuid);
268                 GOTO(out_req, rc = -EINVAL);
269         }
270
271         /* Because of 64-bit divide/mod operations only work with a 32-bit
272          * divisor in a 32-bit kernel, we cannot support a stripe width
273          * of 4GB or larger on 32-bit CPUs.
274          */
275         if ((desc->ld_default_stripe_count ?
276              desc->ld_default_stripe_count : desc->ld_tgt_count) *
277              desc->ld_default_stripe_size > ~0UL) {
278                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
279                        desc->ld_default_stripe_size,
280                        desc->ld_default_stripe_count ?
281                        desc->ld_default_stripe_count : desc->ld_tgt_count,~0UL);
282                 GOTO(out_req, rc = -EINVAL);
283         }
284
285         /* We know ld_tgt_count is reasonable (the array of UUIDS fits in
286          * the maximum buffer size, so we won't be making outrageous
287          * demands on memory here. */
288         lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
289         OBD_ALLOC(lov->tgts, lov->bufsize);
290         if (lov->tgts == NULL) {
291                 CERROR("Out of memory\n");
292                 GOTO(out_req, rc = -ENOMEM);
293         }
294
295         uuids = lustre_msg_buf(req->rq_repmsg, 1,
296                                sizeof(*uuids) * desc->ld_tgt_count);
297         LASSERT(uuids != NULL);
298         LASSERT_REPSWABBED(req, 1);
299
300         for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) {
301                 struct obd_uuid *uuid = &tgts->uuid;
302                 struct obd_device *tgt_obd;
303                 struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
304
305                 /* NULL termination already checked */
306                 *uuid = uuids[i];
307
308                 tgt_obd = client_tgtuuid2obd(uuid);
309
310                 if (!tgt_obd) {
311                         CERROR("Target %s not attached\n", uuid->uuid);
312                         GOTO(out_disc, rc = -EINVAL);
313                 }
314
315                 if (!tgt_obd->obd_set_up) {
316                         CERROR("Target %s not set up\n", uuid->uuid);
317                         GOTO(out_disc, rc = -EINVAL);
318                 }
319
320                 rc = obd_connect(&tgts->conn, tgt_obd, &lov_osc_uuid);
321
322                 if (rc) {
323                         CERROR("Target %s connect error %d\n", uuid->uuid, rc);
324                         GOTO(out_disc, rc);
325                 }
326
327                 rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &tgts->conn,
328                                    sizeof(struct obd_device *), obd, NULL);
329                 if (rc) {
330                         CERROR("Target %s REGISTER_LOV error %d\n",
331                                uuid->uuid, rc);
332                         obd_disconnect(&tgts->conn, 0);
333                         GOTO(out_disc, rc);
334                 }
335
336                 desc->ld_active_tgt_count++;
337                 tgts->active = 1;
338         }
339
340         mdc->cl_max_mds_easize = obd_size_diskmd(conn, NULL);
341         mdc->cl_max_mds_cookiesize = desc->ld_tgt_count *
342                 sizeof(struct llog_cookie);
343         ptlrpc_req_finished(req);
344         class_export_put(exp);
345         RETURN (0);
346
347  out_disc:
348         while (i-- > 0) {
349                 struct obd_uuid uuid;
350                 --tgts;
351                 --desc->ld_active_tgt_count;
352                 tgts->active = 0;
353                 /* save for CERROR below; (we know it's terminated) */
354                 uuid = tgts->uuid;
355                 rc2 = obd_disconnect(&tgts->conn, 0);
356                 if (rc2)
357                         CERROR("error: LOV target %s disconnect on OST idx %d: "
358                                "rc = %d\n", uuid.uuid, i, rc2);
359         }
360         OBD_FREE(lov->tgts, lov->bufsize);
361  out_req:
362         ptlrpc_req_finished (req);
363  out_conn:
364         class_export_put(exp);
365         class_disconnect(conn, 0);
366         RETURN (rc);
367 }
368
369 static int lov_disconnect(struct lustre_handle *conn, int flags)
370 {
371         struct obd_device *obd = class_conn2obd(conn);
372         struct lov_obd *lov = &obd->u.lov;
373         struct obd_export *exp;
374         struct list_head *p, *n;
375         int rc, i;
376         ENTRY;
377
378         if (!lov->tgts)
379                 goto out_local;
380
381         /* Only disconnect the underlying layers on the final disconnect. */
382         lov->refcount--;
383         if (lov->refcount != 0)
384                 goto out_local;
385
386         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
387                 if (obd->obd_no_recov) {
388                         /* Pass it on to our clients.
389                          * XXX This should be an argument to disconnect,
390                          * XXX not a back-door flag on the OBD.  Ah well.
391                          */
392                         struct obd_device *osc_obd =
393                                 class_conn2obd(&lov->tgts[i].conn);
394                         osc_obd->obd_no_recov = 1;
395                 }
396                 rc = obd_disconnect(&lov->tgts[i].conn, flags);
397                 if (rc) {
398                         if (lov->tgts[i].active) {
399                                 CERROR("Target %s disconnect error %d\n",
400                                        lov->tgts[i].uuid.uuid, rc);
401                         }
402                         rc = 0;
403                 }
404                 if (lov->tgts[i].active) {
405                         lov->desc.ld_active_tgt_count--;
406                         lov->tgts[i].active = 0;
407                 }
408         }
409         OBD_FREE(lov->tgts, lov->bufsize);
410         lov->bufsize = 0;
411         lov->tgts = NULL;
412
413  out_local:
414         exp = class_conn2export(conn);
415         if (exp == NULL) {
416                 CERROR("export handle "LPU64" invalid!  If you can reproduce, "
417                        "please send a full debug log to phik\n", conn->cookie);
418                 RETURN(0);
419         }
420         spin_lock(&exp->exp_lov_data.led_lock);
421         list_for_each_safe(p, n, &exp->exp_lov_data.led_open_head) {
422                 /* XXX close these, instead of just discarding them? */
423                 struct lov_file_handles *lfh;
424                 lfh = list_entry(p, typeof(*lfh), lfh_list);
425                 CERROR("discarding open LOV handle %p:"LPX64"\n",
426                        lfh, lfh->lfh_handle.h_cookie);
427                 list_del(&lfh->lfh_list);
428                 OBD_FREE(lfh->lfh_och, lfh->lfh_count * FD_OSTDATA_SIZE);
429                 lov_lfh_destroy(lfh);
430                 lov_lfh_put(lfh);
431         }
432         spin_unlock(&exp->exp_lov_data.led_lock);
433         class_export_put(exp);
434
435         rc = class_disconnect(conn, 0);
436         RETURN(rc);
437 }
438
439 /* Error codes:
440  *
441  *  -EINVAL  : UUID can't be found in the LOV's target list
442  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
443  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
444  */
445 static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
446                               int activate)
447 {
448         struct obd_device *obd;
449         struct lov_tgt_desc *tgt;
450         int i, rc = 0;
451         ENTRY;
452
453         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
454                lov, uuid->uuid, activate);
455
456         spin_lock(&lov->lov_lock);
457         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
458                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
459                        i, tgt->uuid.uuid, tgt->conn.cookie);
460                 if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof uuid->uuid) == 0)
461                         break;
462         }
463
464         if (i == lov->desc.ld_tgt_count)
465                 GOTO(out, rc = -EINVAL);
466
467         obd = class_conn2obd(&tgt->conn);
468         if (obd == NULL) {
469                 /* This can happen if OST failure races with node shutdown */
470                 GOTO(out, rc = -ENOTCONN);
471         }
472
473         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LOV idx %d\n",
474                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
475                obd->obd_type->typ_name, i);
476         LASSERT(strcmp(obd->obd_type->typ_name, "osc") == 0);
477
478         if (tgt->active == activate) {
479                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
480                        activate ? "" : "in");
481                 GOTO(out, rc);
482         }
483
484         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
485
486         tgt->active = activate;
487         if (activate) {
488                 /*
489                  * foreach(export)
490                  *     foreach(open_file)
491                  *         if (file_handle uses this_osc)
492                  *             if (has_no_filehandle)
493                  *                 open(file_handle, this_osc);
494                  */
495                 /* XXX reconnect? */
496                 lov->desc.ld_active_tgt_count++;
497         } else {
498                 /*
499                  * Should I invalidate filehandles that refer to this OSC, so
500                  * that I reopen them during reactivation?
501                  */
502                 /* XXX disconnect from OSC? */
503                 lov->desc.ld_active_tgt_count--;
504         }
505
506 #warning "FIXME: walk open files list for objects that need opening"
507         EXIT;
508  out:
509         spin_unlock(&lov->lov_lock);
510         return rc;
511 }
512
513 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
514 {
515         struct obd_ioctl_data *data = buf;
516         struct lov_obd *lov = &obd->u.lov;
517         int rc = 0;
518         ENTRY;
519
520         if (data->ioc_inllen1 < 1) {
521                 CERROR("LOV setup requires an MDC name\n");
522                 RETURN(-EINVAL);
523         }
524
525         spin_lock_init(&lov->lov_lock);
526         lov->mdcobd = class_name2obd(data->ioc_inlbuf1);
527         if (!lov->mdcobd) {
528                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid.uuid,
529                        data->ioc_inlbuf1);
530                 rc = -EINVAL;
531         }
532         RETURN(rc);
533 }
534
535 /* compute object size given "stripeno" and the ost size */
536 static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
537                                 int stripeno)
538 {
539         unsigned long ssize  = lsm->lsm_stripe_size;
540         unsigned long swidth = ssize * lsm->lsm_stripe_count;
541         unsigned long stripe_size;
542         obd_size lov_size;
543
544         if (ost_size == 0)
545                 return 0;
546
547         /* do_div(a, b) returns a % b, and a = a / b */
548         stripe_size = do_div(ost_size, ssize);
549
550         if (stripe_size)
551                 lov_size = ost_size * swidth + stripeno * ssize + stripe_size;
552         else
553                 lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize;
554
555         return lov_size;
556 }
557
558 static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
559                             struct lov_stripe_md *lsm, int stripeno, int *set)
560 {
561         valid &= src->o_valid;
562
563         if (*set) {
564                 if (valid & OBD_MD_FLSIZE) {
565                         /* this handles sparse files properly */
566                         obd_size lov_size;
567
568                         lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
569                         if (lov_size > tgt->o_size)
570                                 tgt->o_size = lov_size;
571                 }
572                 if (valid & OBD_MD_FLBLOCKS)
573                         tgt->o_blocks += src->o_blocks;
574                 if (valid & OBD_MD_FLBLKSZ)
575                         tgt->o_blksize += src->o_blksize;
576                 if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
577                         tgt->o_ctime = src->o_ctime;
578                 if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
579                         tgt->o_mtime = src->o_mtime;
580         } else {
581                 memcpy(tgt, src, sizeof(*tgt));
582                 tgt->o_id = lsm->lsm_object_id;
583                 if (valid & OBD_MD_FLSIZE)
584                         tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
585                 *set = 1;
586         }
587 }
588
589 #ifndef log2
590 #define log2(n) ffz(~(n))
591 #endif
592
593 /* the LOV expects oa->o_id to be set to the LOV object id */
594 static int lov_create(struct lustre_handle *conn, struct obdo *src_oa,
595                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
596 {
597         struct obd_export *export = class_conn2export(conn);
598         struct lov_obd *lov;
599         struct lov_stripe_md *lsm;
600         struct lov_oinfo *loi = NULL;
601         struct obdo *tmp_oa, *ret_oa;
602         struct llog_cookie *cookies = NULL;
603         unsigned ost_count, ost_idx;
604         int set = 0, obj_alloc = 0, cookie_sent = 0, rc = 0, i;
605         ENTRY;
606
607         LASSERT(ea);
608
609         if (!export)
610                 RETURN(-EINVAL);
611
612         lov = &export->exp_obd->u.lov;
613
614         if (!lov->desc.ld_active_tgt_count)
615                 GOTO(out_exp, rc = -EIO);
616
617         ret_oa = obdo_alloc();
618         if (!ret_oa)
619                 GOTO(out_exp, rc = -ENOMEM);
620
621         tmp_oa = obdo_alloc();
622         if (!tmp_oa)
623                 GOTO(out_oa, rc = -ENOMEM);
624
625         lsm = *ea;
626
627         if (!lsm) {
628                 int stripes;
629                 ost_count = lov_get_stripecnt(lov, 0);
630
631                 /* If the MDS file was truncated up to some size, stripe over
632                  * enough OSTs to allow the file to be created at that size.
633                  */
634                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
635                         stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
636                         do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
637
638                         if (stripes > lov->desc.ld_active_tgt_count)
639                                 GOTO(out_exp, rc = -EFBIG);
640                         if (stripes < ost_count)
641                                 stripes = ost_count;
642                 } else
643                         stripes = ost_count;
644
645                 rc = lov_alloc_memmd(&lsm, stripes);
646                 if (rc < 0)
647                         GOTO(out_tmp, rc);
648
649                 rc = 0;
650         }
651
652         ost_count = lov->desc.ld_tgt_count;
653
654         LASSERT(src_oa->o_valid & OBD_MD_FLID);
655         lsm->lsm_object_id = src_oa->o_id;
656         if (!lsm->lsm_stripe_size)
657                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
658
659         if (!*ea || lsm->lsm_stripe_offset >= ost_count) {
660                 get_random_bytes(&ost_idx, 2);
661                 ost_idx %= ost_count;
662         } else {
663                 ost_idx = lsm->lsm_stripe_offset;
664         }
665
666         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
667                lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
668
669         /* XXX LOV STACKING: need to figure out how many real OSCs */
670         if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
671                 oti_alloc_cookies(oti, lsm->lsm_stripe_count);
672                 if (!oti->oti_logcookies)
673                         GOTO(out_cleanup, rc = -ENOMEM);
674                 cookies = oti->oti_logcookies;
675         }
676
677         loi = lsm->lsm_oinfo;
678         for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
679                 struct lov_stripe_md obj_md;
680                 struct lov_stripe_md *obj_mdp = &obj_md;
681                 int err;
682
683                 if (lov->tgts[ost_idx].active == 0) {
684                         CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
685                         continue;
686                 }
687
688                 /* create data objects with "parent" OA */
689                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
690
691                 /* XXX When we start creating objects on demand, we need to
692                  *     make sure that we always create the object on the
693                  *     stripe which holds the existing file size.
694                  */
695                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
696                         if (lov_stripe_offset(lsm, src_oa->o_size, i,
697                                               &tmp_oa->o_size) < 0 &&
698                             tmp_oa->o_size)
699                                 tmp_oa->o_size--;
700
701                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
702                                i, tmp_oa->o_size, src_oa->o_size);
703                 }
704
705                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
706                 err = obd_create(&lov->tgts[ost_idx].conn, tmp_oa,&obj_mdp,oti);
707                 if (err) {
708                         if (lov->tgts[ost_idx].active) {
709                                 CERROR("error creating objid "LPX64" sub-object"
710                                        " on OST idx %d/%d: rc = %d\n",
711                                        src_oa->o_id, ost_idx,
712                                        lsm->lsm_stripe_count, err);
713                                 if (err > 0) {
714                                         CERROR("obd_create returned invalid "
715                                                "err %d\n", err);
716                                         err = -EIO;
717                                 }
718                         }
719                         if (!rc)
720                                 rc = err;
721                         continue;
722                 }
723                 loi->loi_id = tmp_oa->o_id;
724                 loi->loi_ost_idx = ost_idx;
725                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
726                        lsm->lsm_object_id, loi->loi_id, ost_idx);
727
728                 if (set == 0)
729                         lsm->lsm_stripe_offset = ost_idx;
730                 lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm,
731                                 obj_alloc, &set);
732                 loi->loi_dirty_ot = &loi->loi_dirty_ot_inline;
733                 ot_init(loi->loi_dirty_ot);
734
735                 if (cookies)
736                         ++oti->oti_logcookies;
737                 if (tmp_oa->o_valid & OBD_MD_FLCOOKIE)
738                         ++cookie_sent;
739                 ++obj_alloc;
740                 ++loi;
741
742                 /* If we have allocated enough objects, we are OK */
743                 if (obj_alloc == lsm->lsm_stripe_count)
744                         GOTO(out_done, rc = 0);
745         }
746
747         if (obj_alloc == 0) {
748                 if (rc == 0)
749                         rc = -EIO;
750                 GOTO(out_cleanup, rc);
751         }
752
753         /* If we were passed specific striping params, then a failure to
754          * meet those requirements is an error, since we can't reallocate
755          * that memory (it might be part of a larger array or something).
756          *
757          * We can only get here if lsm_stripe_count was originally > 1.
758          */
759         if (*ea != NULL) {
760                 CERROR("can't lstripe objid "LPX64": have %u want %u, rc %d\n",
761                        lsm->lsm_object_id, obj_alloc, lsm->lsm_stripe_count,rc);
762                 if (rc == 0)
763                         rc = -EFBIG;
764                 GOTO(out_cleanup, rc);
765         } else {
766                 struct lov_stripe_md *lsm_new;
767                 /* XXX LOV STACKING call into osc for sizes */
768                 unsigned oldsize, newsize;
769
770                 if (oti && cookies && cookie_sent) {
771                         oldsize = lsm->lsm_stripe_count * sizeof(*cookies);
772                         newsize = obj_alloc * sizeof(*cookies);
773
774                         oti_alloc_cookies(oti, obj_alloc);
775                         if (oti->oti_logcookies) {
776                                 memcpy(oti->oti_logcookies, cookies, newsize);
777                                 OBD_FREE(cookies, oldsize);
778                                 cookies = oti->oti_logcookies;
779                         } else {
780                                 CWARN("'leaking' %d bytes\n", oldsize-newsize);
781                         }
782                 }
783
784                 CERROR("reallocating LSM for objid "LPX64": old %u new %u\n",
785                        lsm->lsm_object_id, lsm->lsm_stripe_count, obj_alloc);
786                 oldsize = lov_stripe_md_size(lsm->lsm_stripe_count);
787                 newsize = lov_stripe_md_size(obj_alloc);
788                 OBD_ALLOC(lsm_new, newsize);
789                 if (lsm_new != NULL) {
790                         memcpy(lsm_new, lsm, newsize);
791                         lsm_new->lsm_stripe_count = obj_alloc;
792                         OBD_FREE(lsm, newsize);
793                         lsm = lsm_new;
794                 } else {
795                         CWARN("'leaking' %d bytes\n", oldsize - newsize);
796                 }
797                 rc = 0;
798         }
799         EXIT;
800  out_done:
801         *ea = lsm;
802         if (src_oa->o_valid & OBD_MD_FLSIZE &&
803             ret_oa->o_size != src_oa->o_size) {
804                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
805                        src_oa->o_size, ret_oa->o_size);
806                 LBUG();
807         }
808         ret_oa->o_id = src_oa->o_id;
809         memcpy(src_oa, ret_oa, sizeof(*src_oa));
810
811  out_tmp:
812         obdo_free(tmp_oa);
813  out_oa:
814         obdo_free(ret_oa);
815         if (oti && cookies) {
816                 oti->oti_logcookies = cookies;
817                 if (!cookie_sent) {
818                         oti_free_cookies(oti);
819                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
820                 } else {
821                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
822                 }
823         }
824  out_exp:
825         class_export_put(export);
826         return rc;
827
828  out_cleanup:
829         while (obj_alloc-- > 0) {
830                 int err;
831
832                 --loi;
833                 /* destroy already created objects here */
834                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
835                 tmp_oa->o_id = loi->loi_id;
836
837                 if (oti && cookie_sent) {
838                         err = obd_log_cancel(&lov->tgts[loi->loi_ost_idx].conn,
839                                              NULL, 1, --oti->oti_logcookies,
840                                              OBD_LLOG_FL_SENDNOW);
841                         if (err)
842                                 CERROR("Failed to cancel objid "LPX64" subobj "
843                                        LPX64" cookie on OST idx %d: rc = %d\n",
844                                        src_oa->o_id, loi->loi_id,
845                                        loi->loi_ost_idx, err);
846                 }
847
848                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp_oa,
849                                   NULL, oti);
850                 if (err)
851                         CERROR("Failed to uncreate objid "LPX64" subobj "LPX64
852                                " on OST idx %d: rc = %d\n", src_oa->o_id,
853                                loi->loi_id, loi->loi_ost_idx, err);
854         }
855         if (*ea == NULL)
856                 obd_free_memmd(conn, &lsm);
857         goto out_tmp;
858 }
859
860 #define lsm_bad_magic(LSMP)                                     \
861 ({                                                              \
862         struct lov_stripe_md *_lsm__ = (LSMP);                  \
863         int _ret__ = 0;                                         \
864         if (!_lsm__) {                                          \
865                 CERROR("LOV requires striping ea\n");           \
866                 _ret__ = 1;                                     \
867         } else if (_lsm__->lsm_magic != LOV_MAGIC) {            \
868                 CERROR("LOV striping magic bad %#x != %#x\n",   \
869                        _lsm__->lsm_magic, LOV_MAGIC);           \
870                 _ret__ = 1;                                     \
871         }                                                       \
872         _ret__;                                                 \
873 })
874
875 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
876                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
877 {
878         struct obdo tmp;
879         struct obd_export *export = class_conn2export(conn);
880         struct lov_obd *lov;
881         struct lov_oinfo *loi;
882         struct lov_file_handles *lfh = NULL;
883         int rc = 0, i;
884         ENTRY;
885
886         if (lsm_bad_magic(lsm))
887                 GOTO(out, rc = -EINVAL);
888
889         if (!export || !export->exp_obd)
890                 GOTO(out, rc = -ENODEV);
891
892         if (oa->o_valid & OBD_MD_FLHANDLE)
893                 lfh = lov_handle2lfh(obdo_handle(oa));
894
895         lov = &export->exp_obd->u.lov;
896         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
897                 int err;
898                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
899                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
900                         /* Orphan clean up will (someday) fix this up. */
901                         continue;
902                 }
903
904                 memcpy(&tmp, oa, sizeof(tmp));
905                 tmp.o_id = loi->loi_id;
906                 if (lfh)
907                         memcpy(obdo_handle(&tmp), &lfh->lfh_och[i].och_fh,
908                                sizeof(lfh->lfh_och[i].och_fh));
909                 else
910                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
911                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
912                                   NULL, oti);
913                 if (err && lov->tgts[loi->loi_ost_idx].active) {
914                         CERROR("error: destroying objid "LPX64" subobj "
915                                LPX64" on OST idx %d: rc = %d\n",
916                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
917                         if (!rc)
918                                 rc = err;
919                 }
920         }
921         if (lfh != NULL)
922                 lov_lfh_put(lfh);
923         EXIT;
924  out:
925         class_export_put(export);
926         return rc;
927 }
928
929 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
930                        struct lov_stripe_md *lsm)
931 {
932         struct obdo tmp;
933         struct obd_export *export = class_conn2export(conn);
934         struct lov_obd *lov;
935         struct lov_oinfo *loi;
936         struct lov_file_handles *lfh = NULL;
937         int i, rc = 0, set = 0;
938         ENTRY;
939
940         if (lsm_bad_magic(lsm))
941                 GOTO(out, rc = -EINVAL);
942
943         if (!export || !export->exp_obd)
944                 GOTO(out, rc = -ENODEV);
945
946         lov = &export->exp_obd->u.lov;
947
948         if (oa->o_valid & OBD_MD_FLHANDLE)
949                 lfh = lov_handle2lfh(obdo_handle(oa));
950
951         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
952                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
953         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
954                 int err;
955
956                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
957                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
958                         continue;
959                 }
960
961                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
962                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
963                 /* create data objects with "parent" OA */
964                 memcpy(&tmp, oa, sizeof(tmp));
965                 tmp.o_id = loi->loi_id;
966                 if (lfh)
967                         memcpy(obdo_handle(&tmp), &lfh->lfh_och[i].och_fh,
968                                sizeof(lfh->lfh_och[i].och_fh));
969                 else
970                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
971
972                 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
973                 if (err) {
974                         if (lov->tgts[loi->loi_ost_idx].active) {
975                                 CERROR("error: getattr objid "LPX64" subobj "
976                                        LPX64" on OST idx %d: rc = %d\n",
977                                        oa->o_id, loi->loi_id, loi->loi_ost_idx,
978                                        err);
979                                 GOTO(out, rc = err);
980                         }
981                 } else {
982                         lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &set);
983                 }
984         }
985         if (!set)
986                 rc = -EIO;
987         GOTO(out, rc);
988  out:
989         if (lfh != NULL)
990                 lov_lfh_put(lfh);
991         class_export_put(export);
992         return rc;
993 }
994
995 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data, 
996                                  int rc)
997 {
998         struct lov_getattr_async_args *aa = data;
999         struct lov_stripe_md *lsm = aa->aa_lsm;
1000         struct obdo          *oa = aa->aa_oa;
1001         struct obdo          *obdos = aa->aa_obdos;
1002         struct lov_oinfo     *loi;
1003         int                   i;
1004         int                   set = 0;
1005         ENTRY;
1006
1007         if (rc == 0) {
1008                 /* NB all stripe requests succeeded to get here */
1009
1010                 for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1011                      i++, loi++) {
1012                         if (obdos[i].o_valid == 0)      /* inactive stripe */
1013                                 continue;
1014
1015                         lov_merge_attrs(oa, &obdos[i], obdos[i].o_valid, lsm,
1016                                         i, &set);
1017                 }
1018
1019                 if (!set) {
1020                         CERROR ("No stripes had valid attrs\n");
1021                         rc = -EIO;
1022                 }
1023         }
1024
1025         OBD_FREE (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
1026         RETURN (rc);
1027 }
1028
1029 static int lov_getattr_async (struct lustre_handle *conn, struct obdo *oa,
1030                               struct lov_stripe_md *lsm,
1031                               struct ptlrpc_request_set *rqset)
1032 {
1033         struct obdo *obdos;
1034         struct obd_export *export = class_conn2export(conn);
1035         struct lov_obd *lov;
1036         struct lov_oinfo *loi;
1037         struct lov_file_handles *lfh = NULL;
1038         struct lov_getattr_async_args *aa;
1039         int i;
1040         int set = 0;
1041         int rc = 0;
1042         ENTRY;
1043
1044         if (!lsm) {
1045                 CERROR("LOV requires striping ea\n");
1046                 GOTO(out, rc = -EINVAL);
1047         }
1048
1049         if (lsm->lsm_magic != LOV_MAGIC) {
1050                 CERROR("LOV striping magic bad %#x != %#x\n",
1051                        lsm->lsm_magic, LOV_MAGIC);
1052                 GOTO(out, rc = -EINVAL);
1053         }
1054
1055         if (!export || !export->exp_obd)
1056                 GOTO(out, rc = -ENODEV);
1057
1058         lov = &export->exp_obd->u.lov;
1059
1060         OBD_ALLOC (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
1061         if (obdos == NULL)
1062                 GOTO (out, rc = -ENOMEM);
1063
1064         if (oa->o_valid & OBD_MD_FLHANDLE)
1065                 lfh = lov_handle2lfh(obdo_handle(oa));
1066
1067         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
1068                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
1069         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1070                 int err;
1071
1072                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1073                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1074                         /* leaves obdos[i].obd_valid unset */
1075                         continue;
1076                 }
1077
1078                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
1079                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
1080                 /* create data objects with "parent" OA */
1081                 memcpy(&obdos[i], oa, sizeof(obdos[i]));
1082                 obdos[i].o_id = loi->loi_id;
1083                 if (lfh)
1084                         memcpy(obdo_handle(&obdos[i]), &lfh->lfh_och[i].och_fh,
1085                                sizeof(lfh->lfh_och[i].och_fh));
1086                 else
1087                         obdos[i].o_valid &= ~OBD_MD_FLHANDLE;
1088
1089                 err = obd_getattr_async (&lov->tgts[loi->loi_ost_idx].conn,
1090                                          &obdos[i], NULL, rqset);
1091                 if (err) {
1092                         CERROR("error: getattr objid "LPX64" subobj "
1093                                LPX64" on OST idx %d: rc = %d\n",
1094                                oa->o_id, loi->loi_id, loi->loi_ost_idx,
1095                                err);
1096                         GOTO(out_obdos, rc = err);
1097                 }
1098                 set = 1;
1099         }
1100         if (!set)
1101                 GOTO (out_obdos, rc = -EIO);
1102
1103         LASSERT (rqset->set_interpret == NULL);
1104         rqset->set_interpret = lov_getattr_interpret;
1105         LASSERT (sizeof (rqset->set_args) >= sizeof (*aa));
1106         aa = (struct lov_getattr_async_args *)&rqset->set_args;
1107         aa->aa_lsm = lsm;
1108         aa->aa_oa = oa;
1109         aa->aa_obdos = obdos;
1110         GOTO (out, rc = 0);
1111
1112  out_obdos:
1113         OBD_FREE (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
1114  out:
1115         if (lfh != NULL)
1116                 lov_lfh_put(lfh);
1117         class_export_put(export);
1118         RETURN (rc);
1119 }
1120
1121 static int lov_setattr(struct lustre_handle *conn, struct obdo *src_oa,
1122                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
1123 {
1124         struct obdo *tmp_oa, *ret_oa;
1125         struct obd_export *export = class_conn2export(conn);
1126         struct lov_obd *lov;
1127         struct lov_oinfo *loi;
1128         struct lov_file_handles *lfh = NULL;
1129         int rc = 0, i, set = 0;
1130         ENTRY;
1131
1132         if (lsm_bad_magic(lsm))
1133                 GOTO(out, rc = -EINVAL);
1134
1135         if (!export || !export->exp_obd)
1136                 GOTO(out, rc = -ENODEV);
1137
1138         /* for now, we only expect time updates here */
1139         LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE|OBD_MD_FLMODE|
1140                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
1141                                       OBD_MD_FLCTIME)));
1142         ret_oa = obdo_alloc();
1143         if (!ret_oa)
1144                 GOTO(out, rc = -ENOMEM);
1145
1146         tmp_oa = obdo_alloc();
1147         if (!tmp_oa)
1148                 GOTO(out_oa, rc = -ENOMEM);
1149
1150         lov = &export->exp_obd->u.lov;
1151         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1152                 int err;
1153
1154                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1155                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1156                         continue;
1157                 }
1158
1159                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
1160
1161                 if (lfh)
1162                         memcpy(obdo_handle(tmp_oa), &lfh->lfh_och[i].och_fh,
1163                                sizeof(lfh->lfh_och[i].och_fh));
1164                 else
1165                         tmp_oa->o_valid &= ~OBD_MD_FLHANDLE;
1166
1167                 tmp_oa->o_id = loi->loi_id;
1168
1169                 err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp_oa,
1170                                   NULL, NULL);
1171                 if (err) {
1172                         if (lov->tgts[loi->loi_ost_idx].active) {
1173                                 CERROR("error: setattr objid "LPX64" subobj "
1174                                        LPX64" on OST idx %d: rc = %d\n",
1175                                        src_oa->o_id, loi->loi_id,
1176                                        loi->loi_ost_idx, err);
1177                                 if (!rc)
1178                                         rc = err;
1179                         }
1180                         continue;
1181                 }
1182
1183                 lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm, i, &set);
1184         }
1185         if (!set && !rc)
1186                 rc = -EIO;
1187         if (lfh != NULL)
1188                 lov_lfh_put(lfh);
1189
1190         ret_oa->o_id = src_oa->o_id;
1191         memcpy(src_oa, ret_oa, sizeof(*src_oa));
1192         GOTO(out_tmp, rc);
1193 out_tmp:
1194         obdo_free(tmp_oa);
1195 out_oa:
1196         obdo_free(ret_oa);
1197 out:
1198         class_export_put(export);
1199         return rc;
1200 }
1201
1202 static int lov_open(struct lustre_handle *conn, struct obdo *src_oa,
1203                     struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1204                     struct obd_client_handle *och)
1205 {
1206         struct obdo *tmp_oa, *ret_oa;
1207         struct obd_export *export = class_conn2export(conn);
1208         struct lov_obd *lov;
1209         struct lov_oinfo *loi;
1210         struct lov_file_handles *lfh = NULL;
1211         int set = 0, rc = 0, i;
1212         ENTRY;
1213         LASSERT(och != NULL);
1214
1215         if (lsm_bad_magic(lsm))
1216                 GOTO(out_exp, rc = -EINVAL);
1217
1218         if (!export || !export->exp_obd)
1219                 GOTO(out_exp, rc = -ENODEV);
1220
1221         ret_oa = obdo_alloc();
1222         if (!ret_oa)
1223                 GOTO(out_exp, rc = -ENOMEM);
1224
1225         tmp_oa = obdo_alloc();
1226         if (!tmp_oa)
1227                 GOTO(out_oa, rc = -ENOMEM);
1228
1229         lfh = lov_lfh_new();
1230         if (lfh == NULL)
1231                 GOTO(out_tmp, rc = -ENOMEM);
1232         OBD_ALLOC(lfh->lfh_och, lsm->lsm_stripe_count * sizeof(*och));
1233         if (!lfh->lfh_och)
1234                 GOTO(out_lfh, rc = -ENOMEM);
1235
1236         lov = &export->exp_obd->u.lov;
1237         src_oa->o_size = 0;
1238         src_oa->o_blocks = 0;
1239         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1240                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1241                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1242                         continue;
1243                 }
1244
1245                 /* create data objects with "parent" OA */
1246                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
1247                 tmp_oa->o_id = loi->loi_id;
1248
1249                 rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp_oa,
1250                               NULL, NULL, &lfh->lfh_och[i]);
1251                 if (rc) {
1252                         if (!lov->tgts[loi->loi_ost_idx].active) {
1253                                 rc = 0;
1254                                 continue;
1255                         }
1256                         CERROR("error: open objid "LPX64" subobj "LPX64
1257                                " on OST idx %d: rc = %d\n",
1258                                src_oa->o_id, lsm->lsm_oinfo[i].loi_id,
1259                                loi->loi_ost_idx, rc);
1260                         goto out_handles;
1261                 }
1262
1263                 lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm, i, &set);
1264         }
1265
1266         lfh->lfh_count = lsm->lsm_stripe_count;
1267         och->och_fh.cookie = lfh->lfh_handle.h_cookie;
1268         obdo_handle(ret_oa)->cookie = lfh->lfh_handle.h_cookie;
1269         ret_oa->o_valid |= OBD_MD_FLHANDLE;
1270         ret_oa->o_id = src_oa->o_id;
1271         memcpy(src_oa, ret_oa, sizeof(*src_oa));
1272
1273         /* lfh refcount transfers to list */
1274         spin_lock(&export->exp_lov_data.led_lock);
1275         list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
1276         spin_unlock(&export->exp_lov_data.led_lock);
1277
1278         GOTO(out_tmp, rc);
1279  out_tmp:
1280         obdo_free(tmp_oa);
1281  out_oa:
1282         obdo_free(ret_oa);
1283  out_exp:
1284         class_export_put(export);
1285         return rc;
1286
1287  out_handles:
1288         for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) {
1289                 int err;
1290
1291                 if (lov->tgts[loi->loi_ost_idx].active == 0)
1292                         continue;
1293
1294                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
1295                 tmp_oa->o_id = loi->loi_id;
1296                 memcpy(obdo_handle(tmp_oa), &lfh->lfh_och[i], FD_OSTDATA_SIZE);
1297
1298                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp_oa,
1299                                 NULL, NULL);
1300                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1301                         CERROR("error: closing objid "LPX64" subobj "LPX64
1302                                " on OST idx %d after open error: rc=%d\n",
1303                                src_oa->o_id, loi->loi_id, loi->loi_ost_idx,err);
1304                 }
1305         }
1306
1307         OBD_FREE(lfh->lfh_och, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
1308  out_lfh:
1309         lov_lfh_destroy(lfh);
1310         lov_lfh_put(lfh);
1311         goto out_tmp;
1312 }
1313
1314 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
1315                      struct lov_stripe_md *lsm, struct obd_trans_info *oti)
1316 {
1317         struct obdo tmp;
1318         struct obd_export *export = class_conn2export(conn);
1319         struct lov_obd *lov;
1320         struct lov_oinfo *loi;
1321         struct lov_file_handles *lfh = NULL;
1322         int rc = 0, i;
1323         ENTRY;
1324
1325         if (lsm_bad_magic(lsm))
1326                 GOTO(out, rc = -EINVAL);
1327
1328         if (!export || !export->exp_obd)
1329                 GOTO(out, rc = -ENODEV);
1330
1331         if (oa->o_valid & OBD_MD_FLHANDLE)
1332                 lfh = lov_handle2lfh(obdo_handle(oa));
1333         if (!lfh)
1334                 LBUG();
1335
1336         lov = &export->exp_obd->u.lov;
1337         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1338                 int err;
1339
1340                 /* create data objects with "parent" OA */
1341                 memcpy(&tmp, oa, sizeof(tmp));
1342                 tmp.o_id = loi->loi_id;
1343                 if (lfh)
1344                         memcpy(obdo_handle(&tmp), &lfh->lfh_och[i],
1345                                FD_OSTDATA_SIZE);
1346                 else
1347                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
1348
1349                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
1350                                 NULL, NULL);
1351                 if (err) {
1352                         if (lov->tgts[loi->loi_ost_idx].active) {
1353                                 CERROR("error: close objid "LPX64" subobj "LPX64
1354                                        " on OST idx %d: rc = %d\n", oa->o_id,
1355                                        loi->loi_id, loi->loi_ost_idx, err);
1356                         }
1357                         if (!rc)
1358                                 rc = err;
1359                 }
1360         }
1361         if (lfh != NULL) {
1362                 spin_lock(&export->exp_lov_data.led_lock);
1363                 list_del(&lfh->lfh_list);
1364                 spin_unlock(&export->exp_lov_data.led_lock);
1365                 lov_lfh_put(lfh); /* drop the reference owned by the list */
1366
1367                 OBD_FREE(lfh->lfh_och, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
1368                 lov_lfh_destroy(lfh);
1369                 LASSERT(atomic_read(&lfh->lfh_refcount) == 1);
1370                 lov_lfh_put(lfh); /* balance handle2lfh above */
1371         } else
1372                 LBUG();
1373         GOTO(out, rc);
1374  out:
1375         class_export_put(export);
1376         return rc;
1377 }
1378
1379 /* we have an offset in file backed by an lov and want to find out where
1380  * that offset lands in our given stripe of the file.  for the easy
1381  * case where the offset is within the stripe, we just have to scale the
1382  * offset down to make it relative to the stripe instead of the lov.
1383  *
1384  * the harder case is what to do when the offset doesn't intersect the
1385  * stripe.  callers will want start offsets clamped ahead to the start
1386  * of the nearest stripe in the file.  end offsets similarly clamped to the
1387  * nearest ending byte of a stripe in the file:
1388  *
1389  * all this function does is move offsets to the nearest region of the
1390  * stripe, and it does its work "mod" the full length of all the stripes.
1391  * consider a file with 3 stripes:
1392  *
1393  *             S                                              E
1394  * ---------------------------------------------------------------------
1395  * |    0    |     1     |     2     |    0    |     1     |     2     |
1396  * ---------------------------------------------------------------------
1397  *
1398  * to find stripe 1's offsets for S and E, it divides by the full stripe
1399  * width and does its math in the context of a single set of stripes:
1400  *
1401  *             S         E
1402  * -----------------------------------
1403  * |    0    |     1     |     2     |
1404  * -----------------------------------
1405  *
1406  * it'll notice that E is outside stripe 1 and clamp it to the end of the
1407  * stripe, then multiply it back out by lov_off to give the real offsets in
1408  * the stripe:
1409  *
1410  *   S                   E
1411  * ---------------------------------------------------------------------
1412  * |    1    |     1     |     1     |    1    |     1     |     1     |
1413  * ---------------------------------------------------------------------
1414  *
1415  * it would have done similarly and pulled S forward to the start of a 1
1416  * stripe if, say, S had landed in a 0 stripe.
1417  *
1418  * this rounding isn't always correct.  consider an E lov offset that lands
1419  * on a 0 stripe, the "mod stripe width" math will pull it forward to the
1420  * start of a 1 stripe, when in fact it wanted to be rounded back to the end
1421  * of a previous 1 stripe.  this logic is handled by callers and this is why:
1422  *
1423  * this function returns < 0 when the offset was "before" the stripe and
1424  * was moved forward to the start of the stripe in question;  0 when it
1425  * falls in the stripe and no shifting was done; > 0 when the offset
1426  * was outside the stripe and was pulled back to its final byte. */
1427 static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
1428                              int stripeno, obd_off *obd_off)
1429 {
1430         unsigned long ssize  = lsm->lsm_stripe_size;
1431         unsigned long swidth = ssize * lsm->lsm_stripe_count;
1432         unsigned long stripe_off, this_stripe;
1433         int ret = 0;
1434
1435         if (lov_off == OBD_OBJECT_EOF) {
1436                 *obd_off = OBD_OBJECT_EOF;
1437                 return 0;
1438         }
1439
1440         /* do_div(a, b) returns a % b, and a = a / b */
1441         stripe_off = do_div(lov_off, swidth);
1442
1443         this_stripe = stripeno * ssize;
1444         if (stripe_off < this_stripe) {
1445                 stripe_off = 0;
1446                 ret = -1;
1447         } else {
1448                 stripe_off -= this_stripe;
1449
1450                 if (stripe_off >= ssize) {
1451                         stripe_off = ssize;
1452                         ret = 1;
1453                 }
1454         }
1455
1456         *obd_off = lov_off * ssize + stripe_off;
1457         return ret;
1458 }
1459
1460 /* given an extent in an lov and a stripe, calculate the extent of the stripe
1461  * that is contained within the lov extent.  this returns true if the given
1462  * stripe does intersect with the lov extent. */
1463 static int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno,
1464                                  obd_off start, obd_off end,
1465                                  obd_off *obd_start, obd_off *obd_end)
1466 {
1467         int start_side, end_side;
1468
1469         start_side = lov_stripe_offset(lsm, start, stripeno, obd_start);
1470         end_side = lov_stripe_offset(lsm, end, stripeno, obd_end);
1471
1472         CDEBUG(D_INODE, "["LPU64"->"LPU64"] -> [(%d) "LPU64"->"LPU64" (%d)]\n",
1473                start, end, start_side, *obd_start, *obd_end, end_side);
1474
1475         /* this stripe doesn't intersect the file extent when neither
1476          * start or the end intersected the stripe and obd_start and
1477          * obd_end got rounded up to the save value. */
1478         if (start_side != 0 && end_side != 0 && *obd_start == *obd_end)
1479                 return 0;
1480
1481         /* as mentioned in the lov_stripe_offset commentary, end
1482          * might have been shifted in the wrong direction.  This
1483          * happens when an end offset is before the stripe when viewed
1484          * through the "mod stripe size" math. we detect it being shifted
1485          * in the wrong direction and touch it up.
1486          * interestingly, this can't underflow since end must be > start
1487          * if we passed through the previous check.
1488          * (should we assert for that somewhere?) */
1489         if (end_side != 0)
1490                 (*obd_end)--;
1491
1492         return 1;
1493 }
1494
1495 /* compute which stripe number "lov_off" will be written into */
1496 static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
1497 {
1498         unsigned long ssize  = lsm->lsm_stripe_size;
1499         unsigned long swidth = ssize * lsm->lsm_stripe_count;
1500         unsigned long stripe_off;
1501
1502         stripe_off = do_div(lov_off, swidth);
1503
1504         return stripe_off / ssize;
1505 }
1506
1507 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1508  * we can send this 'punch' to just the authoritative node and the nodes
1509  * that the punch will affect. */
1510 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
1511                      struct lov_stripe_md *lsm,
1512                      obd_off start, obd_off end, struct obd_trans_info *oti)
1513 {
1514         struct obdo tmp;
1515         struct obd_export *export = class_conn2export(conn);
1516         struct lov_obd *lov;
1517         struct lov_oinfo *loi;
1518         struct lov_file_handles *lfh = NULL;
1519         int rc = 0, i;
1520         ENTRY;
1521
1522         if (lsm_bad_magic(lsm))
1523                 GOTO(out, rc = -EINVAL);
1524
1525         if (!export || !export->exp_obd)
1526                 GOTO(out, rc = -ENODEV);
1527
1528         if (oa->o_valid & OBD_MD_FLHANDLE)
1529                 lfh = lov_handle2lfh(obdo_handle(oa));
1530
1531         lov = &export->exp_obd->u.lov;
1532         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1533                 obd_off starti, endi;
1534                 int err;
1535
1536                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1537                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1538                         continue;
1539                 }
1540
1541                 if (!lov_stripe_intersects(lsm, i, start, end, &starti, &endi))
1542                         continue;
1543
1544                 /* create data objects with "parent" OA */
1545                 memcpy(&tmp, oa, sizeof(tmp));
1546                 tmp.o_id = loi->loi_id;
1547                 if (lfh)
1548                         memcpy(obdo_handle(&tmp), &lfh->lfh_och[i].och_fh,
1549                                sizeof(lfh->lfh_och[i].och_fh));
1550                 else
1551                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
1552
1553                 err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL,
1554                                 starti, endi, NULL);
1555                 if (err) {
1556                         if (lov->tgts[loi->loi_ost_idx].active) {
1557                                 CERROR("error: punch objid "LPX64" subobj "LPX64
1558                                        " on OST idx %d: rc = %d\n", oa->o_id,
1559                                        loi->loi_id, loi->loi_ost_idx, err);
1560                         }
1561                         if (!rc)
1562                                 rc = err;
1563                 }
1564         }
1565         if (lfh != NULL)
1566                 lov_lfh_put(lfh);
1567         GOTO(out, rc);
1568  out:
1569         class_export_put(export);
1570         return rc;
1571 }
1572
1573 static int lov_brw_check(struct lov_obd *lov, struct lov_stripe_md *lsm,
1574                          obd_count oa_bufs, struct brw_page *pga)
1575 {
1576         int i;
1577
1578         /* The caller just wants to know if there's a chance that this
1579          * I/O can succeed */
1580         for (i = 0; i < oa_bufs; i++) {
1581                 int stripe = lov_stripe_number(lsm, pga[i].off);
1582                 int ost = lsm->lsm_oinfo[stripe].loi_ost_idx;
1583                 struct ldlm_extent ext, subext;
1584                 ext.start = pga[i].off;
1585                 ext.start = pga[i].off + pga[i].count;
1586
1587                 if (!lov_stripe_intersects(lsm, i, ext.start, ext.end,
1588                                            &subext.start, &subext.end))
1589                         continue;
1590
1591                 if (lov->tgts[ost].active == 0) {
1592                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1593                         return -EIO;
1594                 }
1595         }
1596         return 0;
1597 }
1598
1599 static int lov_brw(int cmd, struct lustre_handle *conn, struct obdo *src_oa,
1600                    struct lov_stripe_md *lsm, obd_count oa_bufs,
1601                    struct brw_page *pga, struct obd_trans_info *oti)
1602 {
1603         struct {
1604                 int bufct;
1605                 int index;
1606                 int subcount;
1607                 struct lov_stripe_md lsm;
1608                 int ost_idx;
1609         } *stripeinfo, *si, *si_last;
1610         struct obd_export *export = class_conn2export(conn);
1611         struct obdo *ret_oa = NULL, *tmp_oa = NULL;
1612         struct lov_file_handles *lfh = NULL;
1613         struct lov_obd *lov;
1614         struct brw_page *ioarr;
1615         struct lov_oinfo *loi;
1616         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count, set = 0;
1617         ENTRY;
1618
1619         if (lsm_bad_magic(lsm))
1620                 GOTO(out_exp, rc = -EINVAL);
1621
1622         lov = &export->exp_obd->u.lov;
1623
1624         if (cmd == OBD_BRW_CHECK) {
1625                 rc = lov_brw_check(lov, lsm, oa_bufs, pga);
1626                 GOTO(out_exp, rc);
1627         }
1628
1629         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1630         if (!stripeinfo)
1631                 GOTO(out_exp, rc = -ENOMEM);
1632
1633         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1634         if (!where)
1635                 GOTO(out_sinfo, rc = -ENOMEM);
1636
1637         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1638         if (!ioarr)
1639                 GOTO(out_where, rc = -ENOMEM);
1640
1641         if (src_oa) {
1642                 ret_oa = obdo_alloc();
1643                 if (!ret_oa)
1644                         GOTO(out_ioarr, rc = -ENOMEM);
1645
1646                 tmp_oa = obdo_alloc();
1647                 if (!tmp_oa)
1648                         GOTO(out_oa, rc = -ENOMEM);
1649
1650                 if (src_oa->o_valid & OBD_MD_FLHANDLE)
1651                         lfh = lov_handle2lfh(obdo_handle(src_oa));
1652                 else
1653                         src_oa->o_valid &= ~OBD_MD_FLHANDLE;
1654         }
1655
1656         for (i = 0; i < oa_bufs; i++) {
1657                 where[i] = lov_stripe_number(lsm, pga[i].off);
1658                 stripeinfo[where[i]].bufct++;
1659         }
1660
1661         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1662              i < stripe_count; i++, loi++, si_last = si, si++) {
1663                 if (i > 0)
1664                         si->index = si_last->index + si_last->bufct;
1665                 si->lsm.lsm_object_id = loi->loi_id;
1666                 si->ost_idx = loi->loi_ost_idx;
1667         }
1668
1669         for (i = 0; i < oa_bufs; i++) {
1670                 int which = where[i];
1671                 int shift;
1672
1673                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1674                 LASSERT(shift < oa_bufs);
1675                 ioarr[shift] = pga[i];
1676                 lov_stripe_offset(lsm, pga[i].off, which, &ioarr[shift].off);
1677                 stripeinfo[which].subcount++;
1678         }
1679
1680         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1681                 int shift = si->index;
1682
1683                 if (lov->tgts[si->ost_idx].active == 0) {
1684                         CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx);
1685                         GOTO(out_oa, rc = -EIO);
1686                 }
1687
1688                 if (si->bufct) {
1689                         LASSERT(shift < oa_bufs);
1690                         if (src_oa) {
1691                                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
1692                                 if (lfh)
1693                                         memcpy(obdo_handle(tmp_oa),
1694                                                &lfh->lfh_och[i].och_fh,
1695                                                sizeof(lfh->lfh_och[i].och_fh));
1696                         }
1697
1698                         tmp_oa->o_id = si->lsm.lsm_object_id;
1699                         rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn, tmp_oa,
1700                                      &si->lsm, si->bufct, &ioarr[shift],
1701                                      oti);
1702                         if (rc)
1703                                 GOTO(out_ioarr, rc);
1704
1705                         lov_merge_attrs(ret_oa, tmp_oa, tmp_oa->o_valid, lsm,
1706                                         i, &set);
1707                 }
1708         }
1709
1710         ret_oa->o_id = src_oa->o_id;
1711         memcpy(src_oa, ret_oa, sizeof(*src_oa));
1712
1713         GOTO(out_oa, rc);
1714  out_oa:
1715         if (tmp_oa)
1716                 obdo_free(tmp_oa);
1717         if (ret_oa)
1718                 obdo_free(ret_oa);
1719  out_ioarr:
1720         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1721  out_where:
1722         OBD_FREE(where, sizeof(*where) * oa_bufs);
1723         if (lfh)
1724                 lov_lfh_put(lfh);
1725  out_sinfo:
1726         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1727  out_exp:
1728         class_export_put(export);
1729         return rc;
1730 }
1731
1732 static int lov_brw_interpret(struct ptlrpc_request_set *rqset,
1733                              struct lov_brw_async_args *aa, int rc)
1734 {
1735         struct lov_stripe_md *lsm = aa->aa_lsm;
1736         obd_count             oa_bufs = aa->aa_oa_bufs;
1737         struct obdo          *oa = aa->aa_oa;
1738         struct obdo          *obdos = aa->aa_obdos;
1739         struct brw_page      *ioarr = aa->aa_ioarr;
1740         struct lov_oinfo     *loi;
1741         int i, set = 0;
1742         ENTRY;
1743
1744         if (rc == 0) {
1745                 /* NB all stripe requests succeeded to get here */
1746
1747                 for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1748                      i++, loi++) {
1749                         if (obdos[i].o_valid == 0)      /* inactive stripe */
1750                                 continue;
1751
1752                         lov_merge_attrs(oa, &obdos[i], obdos[i].o_valid, lsm,
1753                                         i, &set);
1754                 }
1755
1756                 if (!set) {
1757                         CERROR("No stripes had valid attrs\n");
1758                         rc = -EIO;
1759                 }
1760         }
1761         oa->o_id = lsm->lsm_object_id;
1762
1763         OBD_FREE(obdos, lsm->lsm_stripe_count * sizeof(*obdos));
1764         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1765         RETURN(rc);
1766 }
1767
1768 static int lov_brw_async(int cmd, struct lustre_handle *conn, struct obdo *oa,
1769                          struct lov_stripe_md *lsm, obd_count oa_bufs,
1770                          struct brw_page *pga, struct ptlrpc_request_set *set,
1771                          struct obd_trans_info *oti)
1772 {
1773         struct {
1774                 int bufct;
1775                 int index;
1776                 int subcount;
1777                 struct lov_stripe_md lsm;
1778                 int ost_idx;
1779         } *stripeinfo, *si, *si_last;
1780         struct obd_export *export = class_conn2export(conn);
1781         struct lov_obd *lov;
1782         struct lov_file_handles *lfh = NULL;
1783         struct brw_page *ioarr;
1784         struct obdo *obdos = NULL;
1785         struct lov_oinfo *loi;
1786         struct lov_brw_async_args *aa;
1787         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
1788         ENTRY;
1789
1790         if (lsm_bad_magic(lsm))
1791                 GOTO(out_exp, rc = -EINVAL);
1792
1793         lov = &export->exp_obd->u.lov;
1794
1795         if (cmd == OBD_BRW_CHECK) {
1796                 rc = lov_brw_check(lov, lsm, oa_bufs, pga);
1797                 GOTO(out_exp, rc);
1798         }
1799
1800         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1801         if (!stripeinfo)
1802                 GOTO(out_exp, rc = -ENOMEM);
1803
1804         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1805         if (!where)
1806                 GOTO(out_sinfo, rc = -ENOMEM);
1807
1808         if (oa) {
1809                 OBD_ALLOC(obdos, sizeof(*obdos) * stripe_count);
1810                 if (!obdos)
1811                         GOTO(out_where, rc = -ENOMEM);
1812
1813                 if (oa->o_valid & OBD_MD_FLHANDLE)
1814                         lfh = lov_handle2lfh(obdo_handle(oa));
1815                 else
1816                         oa->o_valid &= ~OBD_MD_FLHANDLE;
1817         }
1818
1819         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1820         if (!ioarr)
1821                 GOTO(out_obdos, rc = -ENOMEM);
1822
1823         for (i = 0; i < oa_bufs; i++) {
1824                 where[i] = lov_stripe_number(lsm, pga[i].off);
1825                 stripeinfo[where[i]].bufct++;
1826         }
1827
1828         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1829              i < stripe_count; i++, loi++, si_last = si, si++) {
1830                 if (i > 0)
1831                         si->index = si_last->index + si_last->bufct;
1832                 si->lsm.lsm_object_id = loi->loi_id;
1833                 si->ost_idx = loi->loi_ost_idx;
1834
1835                 if (oa) {
1836                         memcpy(&obdos[i], oa, sizeof(*obdos));
1837                         obdos[i].o_id = si->lsm.lsm_object_id;
1838                         if (lfh)
1839                                 memcpy(obdo_handle(&obdos[i]),
1840                                        &lfh->lfh_och[i].och_fh,
1841                                        sizeof(lfh->lfh_och[i].och_fh));
1842                 }
1843         }
1844
1845         for (i = 0; i < oa_bufs; i++) {
1846                 int which = where[i];
1847                 int shift;
1848
1849                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1850                 LASSERT(shift < oa_bufs);
1851                 ioarr[shift] = pga[i];
1852                 lov_stripe_offset(lsm, pga[i].off, which, &ioarr[shift].off);
1853                 stripeinfo[which].subcount++;
1854         }
1855
1856         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1857                 int shift = si->index;
1858
1859                 if (si->bufct == 0)
1860                         continue;
1861
1862                 if (lov->tgts[si->ost_idx].active == 0) {
1863                         CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx);
1864                         GOTO(out_ioarr, rc = -EIO);
1865                 }
1866
1867                 LASSERT(shift < oa_bufs);
1868
1869                 rc = obd_brw_async(cmd, &lov->tgts[si->ost_idx].conn,
1870                                    &obdos[i], &si->lsm, si->bufct,
1871                                    &ioarr[shift], set, oti);
1872                 if (rc)
1873                         GOTO(out_ioarr, rc);
1874         }
1875         LASSERT(rc == 0);
1876         LASSERT(set->set_interpret == NULL);
1877         set->set_interpret = (set_interpreter_func)lov_brw_interpret;
1878         LASSERT(sizeof(set->set_args) >= sizeof(struct lov_brw_async_args));
1879         aa = (struct lov_brw_async_args *)&set->set_args;
1880         aa->aa_lsm = lsm;
1881         aa->aa_obdos = obdos;
1882         aa->aa_oa = oa;
1883         aa->aa_ioarr = ioarr;
1884         aa->aa_oa_bufs = oa_bufs;
1885
1886         /* Don't free ioarr or obdos - that's done in lov_brw_interpret */
1887         GOTO(out_where, rc);
1888
1889  out_ioarr:
1890         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1891  out_obdos:
1892         OBD_FREE(obdos, stripe_count * sizeof(*obdos));
1893  out_where:
1894         OBD_FREE(where, sizeof(*where) * oa_bufs);
1895         if (lfh)
1896                 lov_lfh_put(lfh);
1897  out_sinfo:
1898         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1899  out_exp:
1900         class_export_put(export);
1901         return rc;
1902 }
1903
1904 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1905                        struct lustre_handle *parent_lock,
1906                        __u32 type, void *cookie, int cookielen, __u32 mode,
1907                        int *flags, void *cb, void *data,
1908                        struct lustre_handle *lockh)
1909 {
1910         struct obd_export *export = class_conn2export(conn);
1911         struct lov_lock_handles *lov_lockh = NULL;
1912         struct lustre_handle *lov_lockhp;
1913         struct lov_obd *lov;
1914         struct lov_oinfo *loi;
1915         struct lov_stripe_md submd;
1916         ldlm_error_t rc;
1917         int i;
1918         ENTRY;
1919
1920         if (lsm_bad_magic(lsm))
1921                 GOTO(out_exp, rc = -EINVAL);
1922
1923         /* we should never be asked to replay a lock this way. */
1924         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1925
1926         if (!export || !export->exp_obd)
1927                 GOTO(out_exp, rc = -ENODEV);
1928
1929         if (lsm->lsm_stripe_count > 1) {
1930                 lov_lockh = lov_llh_new(lsm);
1931                 if (lov_lockh == NULL)
1932                         GOTO(out_exp, rc = -ENOMEM);
1933
1934                 lockh->cookie = lov_lockh->llh_handle.h_cookie;
1935                 lov_lockhp = lov_lockh->llh_handles;
1936         } else {
1937                 lov_lockhp = lockh;
1938         }
1939
1940         lov = &export->exp_obd->u.lov;
1941         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1942              i++, loi++, lov_lockhp++) {
1943                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
1944                 struct ldlm_extent sub_ext;
1945
1946                 *flags = 0;
1947                 if (!lov_stripe_intersects(lsm, i, extent->start, extent->end,
1948                                            &sub_ext.start, &sub_ext.end))
1949                         continue;
1950
1951                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1952                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1953                         continue;
1954                 }
1955
1956                 /* XXX LOV STACKING: submd should be from the subobj */
1957                 submd.lsm_object_id = loi->loi_id;
1958                 submd.lsm_stripe_count = 0;
1959                 /* XXX submd is not fully initialized here */
1960                 *flags = 0;
1961                 rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
1962                                   parent_lock, type, &sub_ext, sizeof(sub_ext),
1963                                   mode, flags, cb, data, lov_lockhp);
1964
1965                 // XXX add a lock debug statement here
1966                 if (rc != ELDLM_OK) {
1967                         memset(lov_lockhp, 0, sizeof(*lov_lockhp));
1968                         if (lov->tgts[loi->loi_ost_idx].active) {
1969                                 CERROR("error: enqueue objid "LPX64" subobj "
1970                                        LPX64" on OST idx %d: rc = %d\n",
1971                                        lsm->lsm_object_id, loi->loi_id,
1972                                        loi->loi_ost_idx, rc);
1973                                 GOTO(out_locks, rc);
1974                         }
1975                 }
1976         }
1977         if (lsm->lsm_stripe_count > 1)
1978                 lov_llh_put(lov_lockh);
1979         GOTO(out_exp, rc = ELDLM_OK);
1980
1981  out_locks:
1982         while (loi--, lov_lockhp--, i-- > 0) {
1983                 struct lov_stripe_md submd;
1984                 int err;
1985
1986                 if (lov_lockhp->cookie == 0)
1987                         continue;
1988
1989                 /* XXX LOV STACKING: submd should be from the subobj */
1990                 submd.lsm_object_id = loi->loi_id;
1991                 submd.lsm_stripe_count = 0;
1992                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1993                                  mode, lov_lockhp);
1994                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1995                         CERROR("error: cancelling objid "LPX64" on OST "
1996                                "idx %d after enqueue error: rc = %d\n",
1997                                loi->loi_id, loi->loi_ost_idx, err);
1998                 }
1999         }
2000
2001         if (lsm->lsm_stripe_count > 1) {
2002                 lov_llh_destroy(lov_lockh);
2003                 lov_llh_put(lov_lockh);
2004         }
2005  out_exp:
2006         class_export_put(export);
2007         return(rc);
2008 }
2009
2010 static int lov_match(struct lustre_handle *conn, struct lov_stripe_md *lsm,
2011                      __u32 type, void *cookie, int cookielen, __u32 mode,
2012                      int *flags, void *data, struct lustre_handle *lockh)
2013 {
2014         struct obd_export *export = class_conn2export(conn);
2015         struct lov_lock_handles *lov_lockh = NULL;
2016         struct lustre_handle *lov_lockhp;
2017         struct lov_obd *lov;
2018         struct lov_oinfo *loi;
2019         struct lov_stripe_md submd;
2020         ldlm_error_t rc = 0;
2021         int i;
2022         ENTRY;
2023
2024         if (lsm_bad_magic(lsm))
2025                 GOTO(out_exp, rc = -EINVAL);
2026
2027         if (!export || !export->exp_obd)
2028                 GOTO(out_exp, rc = -ENODEV);
2029
2030         if (lsm->lsm_stripe_count > 1) {
2031                 lov_lockh = lov_llh_new(lsm);
2032                 if (lov_lockh == NULL)
2033                         GOTO(out_exp, rc = -ENOMEM);
2034
2035                 lockh->cookie = lov_lockh->llh_handle.h_cookie;
2036                 lov_lockhp = lov_lockh->llh_handles;
2037         } else {
2038                 lov_lockhp = lockh;
2039         }
2040
2041         lov = &export->exp_obd->u.lov;
2042         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
2043              i++, loi++, lov_lockhp++) {
2044                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
2045                 struct ldlm_extent sub_ext;
2046                 int lov_flags;
2047
2048                 if (!lov_stripe_intersects(lsm, i, extent->start, extent->end,
2049                                            &sub_ext.start, &sub_ext.end))
2050                         continue;
2051
2052                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
2053                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
2054                         rc = -EIO;
2055                         break;
2056                 }
2057
2058                 /* XXX LOV STACKING: submd should be from the subobj */
2059                 submd.lsm_object_id = loi->loi_id;
2060                 submd.lsm_stripe_count = 0;
2061                 lov_flags = *flags;
2062                 /* XXX submd is not fully initialized here */
2063                 rc = obd_match(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
2064                                type, &sub_ext, sizeof(sub_ext), mode,
2065                                &lov_flags, data, lov_lockhp);
2066                 if (rc != 1)
2067                         break;
2068         }
2069         if (rc == 1) {
2070                 if (lsm->lsm_stripe_count > 1)
2071                         lov_llh_put(lov_lockh);
2072                 GOTO(out_exp, 1);
2073         }
2074
2075         while (loi--, lov_lockhp--, i-- > 0) {
2076                 struct lov_stripe_md submd;
2077                 int err;
2078
2079                 if (lov_lockhp->cookie == 0)
2080                         continue;
2081
2082                 /* XXX LOV STACKING: submd should be from the subobj */
2083                 submd.lsm_object_id = loi->loi_id;
2084                 submd.lsm_stripe_count = 0;
2085                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
2086                                  mode, lov_lockhp);
2087                 if (err && lov->tgts[loi->loi_ost_idx].active) {
2088                         CERROR("error: cancelling objid "LPX64" on OST "
2089                                "idx %d after match failure: rc = %d\n",
2090                                loi->loi_id, loi->loi_ost_idx, err);
2091                 }
2092         }
2093
2094         if (lsm->lsm_stripe_count > 1) {
2095                 lov_llh_destroy(lov_lockh);
2096                 lov_llh_put(lov_lockh);
2097         }
2098  out_exp:
2099         class_export_put(export);
2100         RETURN(rc);
2101 }
2102
2103 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
2104                       __u32 mode, struct lustre_handle *lockh)
2105 {
2106         struct obd_export *export = class_conn2export(conn);
2107         struct lov_lock_handles *lov_lockh = NULL;
2108         struct lustre_handle *lov_lockhp;
2109         struct lov_obd *lov;
2110         struct lov_oinfo *loi;
2111         int rc = 0, i;
2112         ENTRY;
2113
2114         if (lsm_bad_magic(lsm))
2115                 GOTO(out, rc = -EINVAL);
2116
2117         if (!export || !export->exp_obd)
2118                 GOTO(out, rc = -ENODEV);
2119
2120         LASSERT(lockh);
2121         if (lsm->lsm_stripe_count > 1) {
2122                 lov_lockh = lov_handle2llh(lockh);
2123                 if (!lov_lockh) {
2124                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
2125                         GOTO(out, rc = -EINVAL);
2126                 }
2127
2128                 lov_lockhp = lov_lockh->llh_handles;
2129         } else {
2130                 lov_lockhp = lockh;
2131         }
2132
2133         lov = &export->exp_obd->u.lov;
2134         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
2135              i++, loi++, lov_lockhp++) {
2136                 struct lov_stripe_md submd;
2137                 int err;
2138
2139                 if (lov_lockhp->cookie == 0) {
2140                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
2141                                loi->loi_ost_idx, loi->loi_id);
2142                         continue;
2143                 }
2144
2145                 /* XXX LOV STACKING: submd should be from the subobj */
2146                 submd.lsm_object_id = loi->loi_id;
2147                 submd.lsm_stripe_count = 0;
2148                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
2149                                  mode, lov_lockhp);
2150                 if (err) {
2151                         if (lov->tgts[loi->loi_ost_idx].active) {
2152                                 CERROR("error: cancel objid "LPX64" subobj "
2153                                        LPX64" on OST idx %d: rc = %d\n",
2154                                        lsm->lsm_object_id,
2155                                        loi->loi_id, loi->loi_ost_idx, err);
2156                                 if (!rc)
2157                                         rc = err;
2158                         }
2159                 }
2160         }
2161
2162         if (lsm->lsm_stripe_count > 1)
2163                 lov_llh_destroy(lov_lockh);
2164         if (lov_lockh != NULL)
2165                 lov_llh_put(lov_lockh);
2166         GOTO(out, rc);
2167  out:
2168         class_export_put(export);
2169         return rc;
2170 }
2171
2172 static int lov_cancel_unused(struct lustre_handle *conn,
2173                              struct lov_stripe_md *lsm, int flags, void *opaque)
2174 {
2175         struct obd_export *export = class_conn2export(conn);
2176         struct lov_obd *lov;
2177         struct lov_oinfo *loi;
2178         int rc = 0, i;
2179         ENTRY;
2180
2181         if (lsm_bad_magic(lsm))
2182                 GOTO(out, rc = -EINVAL);
2183
2184         if (!export || !export->exp_obd)
2185                 GOTO(out, rc = -ENODEV);
2186
2187         lov = &export->exp_obd->u.lov;
2188         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
2189                 struct lov_stripe_md submd;
2190                 int err;
2191
2192                 if (lov->tgts[loi->loi_ost_idx].active == 0)
2193                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
2194
2195                 submd.lsm_object_id = loi->loi_id;
2196                 submd.lsm_stripe_count = 0;
2197                 err = obd_cancel_unused(&lov->tgts[loi->loi_ost_idx].conn,
2198                                         &submd, flags, opaque);
2199                 if (err && lov->tgts[loi->loi_ost_idx].active) {
2200                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
2201                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
2202                                loi->loi_id, loi->loi_ost_idx, err);
2203                         if (!rc)
2204                                 rc = err;
2205                 }
2206         }
2207         GOTO(out, rc);
2208  out:
2209         class_export_put(export);
2210         return rc;
2211 }
2212
2213 #define LOV_U64_MAX ((__u64)~0ULL)
2214 #define LOV_SUM_MAX(tot, add)                                           \
2215         do {                                                            \
2216                 if ((tot) + (add) < (tot))                              \
2217                         (tot) = LOV_U64_MAX;                            \
2218                 else                                                    \
2219                         (tot) += (add);                                 \
2220         } while(0)
2221
2222 static int lov_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2223                       unsigned long max_age)
2224 {
2225         struct lov_obd *lov = &obd->u.lov;
2226         struct obd_statfs lov_sfs;
2227         int set = 0;
2228         int rc = 0;
2229         int i;
2230         ENTRY;
2231
2232
2233         /* We only get block data from the OBD */
2234         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2235                 int err;
2236
2237                 if (!lov->tgts[i].active) {
2238                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
2239                         continue;
2240                 }
2241
2242                 err = obd_statfs(class_conn2obd(&lov->tgts[i].conn), &lov_sfs,
2243                                  max_age);
2244                 if (err) {
2245                         if (lov->tgts[i].active) {
2246                                 CERROR("error: statfs OSC %s on OST idx %d: "
2247                                        "err = %d\n",
2248                                        lov->tgts[i].uuid.uuid, i, err);
2249                                 if (!rc)
2250                                         rc = err;
2251                         }
2252                         continue;
2253                 }
2254
2255                 if (!set) {
2256                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
2257                         set = 1;
2258                 } else {
2259                         osfs->os_bfree += lov_sfs.os_bfree;
2260                         osfs->os_bavail += lov_sfs.os_bavail;
2261                         osfs->os_blocks += lov_sfs.os_blocks;
2262                         /* XXX not sure about this one - depends on policy.
2263                          *   - could be minimum if we always stripe on all OBDs
2264                          *     (but that would be wrong for any other policy,
2265                          *     if one of the OBDs has no more objects left)
2266                          *   - could be sum if we stripe whole objects
2267                          *   - could be average, just to give a nice number
2268                          *
2269                          * To give a "reasonable" (if not wholly accurate)
2270                          * number, we divide the total number of free objects
2271                          * by expected stripe count (watch out for overflow).
2272                          */
2273                         LOV_SUM_MAX(osfs->os_files, lov_sfs.os_files);
2274                         LOV_SUM_MAX(osfs->os_ffree, lov_sfs.os_ffree);
2275                 }
2276         }
2277
2278         if (set) {
2279                 __u32 expected_stripes = lov->desc.ld_default_stripe_count ?
2280                                          lov->desc.ld_default_stripe_count :
2281                                          lov->desc.ld_active_tgt_count;
2282
2283                 if (osfs->os_files != LOV_U64_MAX)
2284                         do_div(osfs->os_files, expected_stripes);
2285                 if (osfs->os_ffree != LOV_U64_MAX)
2286                         do_div(osfs->os_ffree, expected_stripes);
2287         } else if (!rc)
2288                 rc = -EIO;
2289
2290         RETURN(rc);
2291 }
2292
2293 static int lov_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
2294                          void *karg, void *uarg)
2295 {
2296         struct obd_device *obddev = class_conn2obd(conn);
2297         struct lov_obd *lov = &obddev->u.lov;
2298         int i, count = lov->desc.ld_tgt_count;
2299         struct obd_uuid *uuidp;
2300         int rc;
2301
2302         ENTRY;
2303
2304         switch (cmd) {
2305         case IOC_LOV_SET_OSC_ACTIVE: {
2306                 struct obd_ioctl_data *data = karg;
2307                 uuidp = (struct obd_uuid *)data->ioc_inlbuf1;
2308                 rc = lov_set_osc_active(lov, uuidp, data->ioc_offset);
2309                 break;
2310         }
2311         case OBD_IOC_LOV_GET_CONFIG: {
2312                 struct obd_ioctl_data *data = karg;
2313                 struct lov_tgt_desc *tgtdesc;
2314                 struct lov_desc *desc;
2315                 char *buf = NULL;
2316
2317                 buf = NULL;
2318                 len = 0;
2319                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2320                         RETURN(-EINVAL);
2321
2322                 data = (struct obd_ioctl_data *)buf;
2323
2324                 if (sizeof(*desc) > data->ioc_inllen1) {
2325                         OBD_FREE(buf, len);
2326                         RETURN(-EINVAL);
2327                 }
2328
2329                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
2330                         OBD_FREE(buf, len);
2331                         RETURN(-EINVAL);
2332                 }
2333
2334                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2335                 memcpy(desc, &(lov->desc), sizeof(*desc));
2336
2337                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
2338                 tgtdesc = lov->tgts;
2339                 for (i = 0; i < count; i++, uuidp++, tgtdesc++)
2340                         obd_str2uuid(uuidp, tgtdesc->uuid.uuid);
2341
2342                 rc = copy_to_user((void *)uarg, buf, len);
2343                 if (rc)
2344                         rc = -EFAULT;
2345                 obd_ioctl_freedata(buf, len);
2346                 break;
2347         }
2348         case LL_IOC_LOV_SETSTRIPE:
2349                 rc = lov_setstripe(conn, karg, uarg);
2350                 break;
2351         case LL_IOC_LOV_GETSTRIPE:
2352                 rc = lov_getstripe(conn, karg, uarg);
2353                 break;
2354         default: {
2355                 int set = 0;
2356                 if (count == 0)
2357                         RETURN(-ENOTTY);
2358                 rc = 0;
2359                 for (i = 0; i < count; i++) {
2360                         int err;
2361
2362                         err = obd_iocontrol(cmd, &lov->tgts[i].conn,
2363                                             len, karg, uarg);
2364                         if (err) {
2365                                 if (lov->tgts[i].active) {
2366                                         CERROR("error: iocontrol OSC %s on OST"
2367                                                "idx %d: err = %d\n",
2368                                                lov->tgts[i].uuid.uuid, i, err);
2369                                         if (!rc)
2370                                                 rc = err;
2371                                 }
2372                         } else
2373                                 set = 1;
2374                 }
2375                 if (!set && !rc)
2376                         rc = -EIO;
2377         }
2378         }
2379
2380         RETURN(rc);
2381 }
2382
2383 static int lov_get_info(struct lustre_handle *conn, __u32 keylen,
2384                         void *key, __u32 *vallen, void *val)
2385 {
2386         struct obd_device *obddev = class_conn2obd(conn);
2387         struct lov_obd *lov = &obddev->u.lov;
2388         int i;
2389         ENTRY;
2390
2391         if (!vallen || !val)
2392                 RETURN(-EFAULT);
2393
2394         if (keylen > strlen("lock_to_stripe") &&
2395             strcmp(key, "lock_to_stripe") == 0) {
2396                 struct {
2397                         char name[16];
2398                         struct ldlm_lock *lock;
2399                         struct lov_stripe_md *lsm;
2400                 } *data = key;
2401                 __u32 *stripe = val;
2402                 struct lov_oinfo *loi;
2403
2404                 if (*vallen < sizeof(*stripe))
2405                         RETURN(-EFAULT);
2406                 *vallen = sizeof(*stripe);
2407
2408                 /* XXX This is another one of those bits that will need to
2409                  * change if we ever actually support nested LOVs.  It uses
2410                  * the lock's connection to find out which stripe it is. */
2411                 for (i = 0, loi = data->lsm->lsm_oinfo;
2412                      i < data->lsm->lsm_stripe_count;
2413                      i++, loi++) {
2414                         if (lov->tgts[loi->loi_ost_idx].conn.cookie ==
2415                             data->lock->l_connh->cookie) {
2416                                 *stripe = i;
2417                                 RETURN(0);
2418                         }
2419                 }
2420                 RETURN(-ENXIO);
2421         }
2422
2423         RETURN(-EINVAL);
2424 }
2425
2426 static int lov_set_info(struct lustre_handle *conn, obd_count keylen,
2427                         void *key, obd_count vallen, void *val)
2428 {
2429         struct obd_device *obddev = class_conn2obd(conn);
2430         struct lov_obd *lov = &obddev->u.lov;
2431         int i, rc = 0;
2432         ENTRY;
2433
2434         if (keylen < strlen("mds_conn") ||
2435             memcmp(key, "mds_conn", strlen("mds_conn")) != 0)
2436                 RETURN(-EINVAL);
2437
2438         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2439                 int er;
2440                 er = obd_set_info(&lov->tgts[i].conn, keylen, key, vallen, val);
2441                 if (!rc)
2442                         rc = er;
2443         }
2444         RETURN(rc);
2445 }
2446
2447 static int lov_mark_page_dirty(struct lustre_handle *conn,
2448                                struct lov_stripe_md *lsm, unsigned long offset)
2449 {
2450         struct lov_obd *lov = &class_conn2obd(conn)->u.lov;
2451         struct lov_oinfo *loi;
2452         struct lov_stripe_md *submd;
2453         int stripe, rc;
2454         obd_off off;
2455         ENTRY;
2456
2457         if (lsm_bad_magic(lsm))
2458                 RETURN(-EINVAL);
2459
2460         OBD_ALLOC(submd, lov_stripe_md_size(1));
2461         if (submd == NULL)
2462                 RETURN(-ENOMEM);
2463
2464         stripe = lov_stripe_number(lsm, (obd_off)offset << PAGE_CACHE_SHIFT);
2465         lov_stripe_offset(lsm, (obd_off)offset << PAGE_CACHE_SHIFT, stripe,
2466                           &off);
2467         off >>= PAGE_CACHE_SHIFT;
2468
2469         loi = &lsm->lsm_oinfo[stripe];
2470         CDEBUG(D_INODE, "off %lu => off %lu on stripe %d\n", offset,
2471                (unsigned long)off, stripe);
2472         submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
2473
2474         rc = obd_mark_page_dirty(&lov->tgts[loi->loi_ost_idx].conn, submd, off);
2475         OBD_FREE(submd, lov_stripe_md_size(1));
2476         RETURN(rc);
2477 }
2478
2479 static int lov_clear_dirty_pages(struct lustre_handle *conn,
2480                                  struct lov_stripe_md *lsm, unsigned long start,
2481                                  unsigned long end, unsigned long *cleared)
2482
2483 {
2484         struct obd_export *export = class_conn2export(conn);
2485         __u64 start_off = (__u64)start << PAGE_CACHE_SHIFT;
2486         __u64 end_off = (__u64)end << PAGE_CACHE_SHIFT;
2487         __u64 obd_start, obd_end;
2488         struct lov_stripe_md *submd = NULL;
2489         struct lov_obd *lov;
2490         struct lov_oinfo *loi;
2491         int i, rc;
2492         unsigned long osc_cleared;
2493         ENTRY;
2494
2495         *cleared = 0;
2496
2497         if (lsm_bad_magic(lsm))
2498                 GOTO(out_exp, rc = -EINVAL);
2499
2500         if (!export || !export->exp_obd)
2501                 GOTO(out_exp, rc = -ENODEV);
2502
2503         OBD_ALLOC(submd, lov_stripe_md_size(1));
2504         if (submd == NULL)
2505                 GOTO(out_exp, rc = -ENOMEM);
2506
2507         lov = &export->exp_obd->u.lov;
2508         rc = 0;
2509         for (i = 0, loi = lsm->lsm_oinfo;
2510              i < lsm->lsm_stripe_count;
2511              i++, loi++) {
2512                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
2513                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
2514                         continue;
2515                 }
2516
2517                 if(!lov_stripe_intersects(lsm, i, start_off, end_off,
2518                                           &obd_start, &obd_end))
2519                         continue;
2520                 obd_start >>= PAGE_CACHE_SHIFT;
2521                 obd_end >>= PAGE_CACHE_SHIFT;
2522
2523                 CDEBUG(D_INODE, "offs [%lu,%lu] => offs [%lu,%lu] stripe %d\n",
2524                        start, end, (unsigned long)obd_start,
2525                        (unsigned long)obd_end, loi->loi_ost_idx);
2526                 submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
2527                 rc = obd_clear_dirty_pages(&lov->tgts[loi->loi_ost_idx].conn,
2528                                            submd, obd_start, obd_end,
2529                                            &osc_cleared);
2530                 if (rc)
2531                         break;
2532                 *cleared += osc_cleared;
2533         }
2534 out_exp:
2535         if (submd)
2536                 OBD_FREE(submd, lov_stripe_md_size(1));
2537         class_export_put(export);
2538         RETURN(rc);
2539 }
2540
2541 static int lov_last_dirty_offset(struct lustre_handle *conn,
2542                                  struct lov_stripe_md *lsm,
2543                                  unsigned long *offset)
2544 {
2545         struct obd_export *export = class_conn2export(conn);
2546         struct lov_stripe_md *submd = NULL;
2547         struct lov_obd *lov;
2548         struct lov_oinfo *loi;
2549         unsigned long tmp, count, skip;
2550         int err, i, rc;
2551         ENTRY;
2552
2553         if (lsm_bad_magic(lsm))
2554                 GOTO(out_exp, rc = -EINVAL);
2555
2556         if (!export || !export->exp_obd)
2557                 GOTO(out_exp, rc = -ENODEV);
2558
2559         OBD_ALLOC(submd, lov_stripe_md_size(1));
2560         if (submd == NULL)
2561                 GOTO(out_exp, rc = -ENOMEM);
2562
2563         *offset = 0;
2564         lov = &export->exp_obd->u.lov;
2565         rc = -ENOENT;
2566
2567         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++){
2568                 count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
2569                 skip = (lsm->lsm_stripe_count - 1) * count;
2570
2571                 submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
2572
2573                 err = obd_last_dirty_offset(&lov->tgts[loi->loi_ost_idx].conn,
2574                                             submd, &tmp);
2575                 if (err == -ENOENT)
2576                         continue;
2577                 if (err)
2578                         GOTO(out_exp, rc = err);
2579
2580                 rc = 0;
2581                 if (tmp != ~0)
2582                         tmp += (tmp/count * skip) + (i * count);
2583                 if (tmp > *offset)
2584                         *offset = tmp;
2585         }
2586 out_exp:
2587         if (submd)
2588                 OBD_FREE(submd, lov_stripe_md_size(1));
2589         class_export_put(export);
2590         RETURN(rc);
2591 }
2592
2593 /* For LOV catalogs, we "nest" catalogs from the parent catalog.  What this
2594  * means is that the parent catalog has a bunch of log cookies that are
2595  * pointing at one catalog for each OSC.  The OSC catalogs in turn hold
2596  * cookies for actual log files. */
2597 static int lov_get_catalogs(struct lov_obd *lov, struct llog_handle *cathandle)
2598 {
2599         int i, rc;
2600
2601         ENTRY;
2602         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2603                 lov->tgts[i].ltd_cathandle = llog_new_log(cathandle,
2604                                                           &lov->tgts[i].uuid);
2605                 if (IS_ERR(lov->tgts[i].ltd_cathandle))
2606                         continue;
2607                 rc = llog_init_catalog(cathandle, &lov->tgts[i].uuid);
2608                 if (rc)
2609                         GOTO(err_logs, rc);
2610         }
2611         lov->lo_catalog_loaded = 1;
2612         RETURN(0);
2613 err_logs:
2614         while (i-- > 0) {
2615                 llog_delete_log(cathandle, lov->tgts[i].ltd_cathandle);
2616                 llog_close_log(cathandle, lov->tgts[i].ltd_cathandle);
2617         }
2618         return rc;
2619 }
2620
2621 /* Add log records for each OSC that this object is striped over, and return
2622  * cookies for each one.  We _would_ have nice abstraction here, except that
2623  * we need to keep cookies in stripe order, even if some are NULL, so that
2624  * the right cookies are passed back to the right OSTs at the client side.
2625  * Unset cookies should be all-zero (which will never occur naturally). */
2626 static int lov_log_add(struct lustre_handle *conn,
2627                        struct llog_handle *cathandle,
2628                        struct llog_trans_hdr *rec, struct lov_stripe_md *lsm,
2629                        struct llog_cookie *logcookies, int numcookies)
2630 {
2631         struct obd_device *obd = class_conn2obd(conn);
2632         struct lov_obd *lov = &obd->u.lov;
2633         struct lov_oinfo *loi;
2634         int i, rc = 0;
2635         ENTRY;
2636
2637         LASSERT(logcookies && numcookies >= lsm->lsm_stripe_count);
2638
2639         if (unlikely(!lov->lo_catalog_loaded))
2640                 lov_get_catalogs(lov, cathandle);
2641
2642         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
2643                 rc += obd_log_add(&lov->tgts[loi->loi_ost_idx].conn,
2644                                   lov->tgts[loi->loi_ost_idx].ltd_cathandle,
2645                                   rec, NULL, logcookies + rc, numcookies - rc);
2646         }
2647
2648         RETURN(rc);
2649 }
2650
2651 static int lov_log_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
2652                           int count, struct llog_cookie *cookies, int flags)
2653 {
2654         struct obd_export *export = class_conn2export(conn);
2655         struct lov_obd *lov;
2656         struct lov_oinfo *loi;
2657         int rc = 0, i;
2658         ENTRY;
2659
2660         LASSERT(lsm != NULL);
2661         if (export == NULL || export->exp_obd == NULL)
2662                 GOTO(out, rc = -ENODEV);
2663
2664         LASSERT(count == lsm->lsm_stripe_count);
2665
2666         loi = lsm->lsm_oinfo;
2667         lov = &export->exp_obd->u.lov;
2668         for (i = 0; i < count; i++, cookies++, loi++) {
2669                 int err;
2670
2671                 err = obd_log_cancel(&lov->tgts[loi->loi_ost_idx].conn,
2672                                      NULL, 1, cookies, flags);
2673                 if (err && lov->tgts[loi->loi_ost_idx].active) {
2674                         CERROR("error: objid "LPX64" subobj "LPX64
2675                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
2676                                loi->loi_id, loi->loi_ost_idx, err);
2677                         if (!rc)
2678                                 rc = err;
2679                 }
2680         }
2681         GOTO(out, rc);
2682  out:
2683         class_export_put(export);
2684         return rc;
2685 }
2686
2687 struct obd_ops lov_obd_ops = {
2688         o_owner:       THIS_MODULE,
2689         o_attach:      lov_attach,
2690         o_detach:      lov_detach,
2691         o_setup:       lov_setup,
2692         o_connect:     lov_connect,
2693         o_disconnect:  lov_disconnect,
2694         o_statfs:      lov_statfs,
2695         o_packmd:      lov_packmd,
2696         o_unpackmd:    lov_unpackmd,
2697         o_create:      lov_create,
2698         o_destroy:     lov_destroy,
2699         o_getattr:     lov_getattr,
2700         o_getattr_async: lov_getattr_async,
2701         o_setattr:     lov_setattr,
2702         o_open:        lov_open,
2703         o_close:       lov_close,
2704         o_brw:         lov_brw,
2705         o_brw_async:   lov_brw_async,
2706         o_punch:       lov_punch,
2707         o_enqueue:     lov_enqueue,
2708         o_match:       lov_match,
2709         o_cancel:      lov_cancel,
2710         o_cancel_unused: lov_cancel_unused,
2711         o_iocontrol:   lov_iocontrol,
2712         o_get_info:    lov_get_info,
2713         o_set_info:    lov_set_info,
2714         o_log_add:     lov_log_add,
2715         o_log_cancel:  lov_log_cancel,
2716         o_mark_page_dirty:   lov_mark_page_dirty,
2717         o_clear_dirty_pages: lov_clear_dirty_pages,
2718         o_last_dirty_offset: lov_last_dirty_offset,
2719 };
2720
2721 int __init lov_init(void)
2722 {
2723         struct lprocfs_static_vars lvars;
2724         int rc;
2725
2726         lprocfs_init_vars(lov, &lvars);
2727         rc = class_register_type(&lov_obd_ops, lvars.module_vars,
2728                                  OBD_LOV_DEVICENAME);
2729         RETURN(rc);
2730 }
2731
2732 static void /*__exit*/ lov_exit(void)
2733 {
2734         class_unregister_type(OBD_LOV_DEVICENAME);
2735 }
2736
2737 #ifdef __KERNEL__
2738 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2739 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
2740 MODULE_LICENSE("GPL");
2741
2742 module_init(lov_init);
2743 module_exit(lov_exit);
2744 #endif