Whamcloud - gitweb
file ext3-largefile.diff was initially added on branch b_devel.
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1  /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  * Author: Phil Schwan <phil@clusterfs.com>
6  *         Peter Braam <braam@clusterfs.com>
7  *         Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define EXPORT_SYMTAB
26 #define DEBUG_SUBSYSTEM S_LOV
27
28 #include <linux/slab.h>
29 #include <linux/module.h>
30 #include <linux/obd_support.h>
31 #include <linux/lustre_lib.h>
32 #include <linux/lustre_net.h>
33 #include <linux/lustre_idl.h>
34 #include <linux/lustre_lite.h> /* for LL_IOC_LOV_[GS]ETSTRIPE */
35 #include <linux/lustre_mds.h>
36 #include <linux/obd_class.h>
37 #include <linux/obd_lov.h>
38 #include <linux/init.h>
39 #include <linux/random.h>
40 #include <linux/slab.h>
41 #include <asm/div64.h>
42 #include <linux/lprocfs_status.h>
43
44
45 static kmem_cache_t *lov_file_cache;
46
47 struct lov_file_handles {
48         struct list_head lfh_list;
49         __u64 lfh_cookie;
50         int lfh_count;
51         struct lustre_handle *lfh_handles;
52 };
53
54 struct lov_lock_handles {
55         __u64 llh_cookie;
56         struct lustre_handle llh_handles[0];
57 };
58
59 extern int lov_packmd(struct lustre_handle *conn, struct lov_mds_md **lmm,
60                        struct lov_stripe_md *lsm);
61 extern int lov_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsm,
62                          struct lov_mds_md *lmm);
63 extern int lov_setstripe(struct lustre_handle *conn,
64                          struct lov_stripe_md **lsmp, struct lov_mds_md *lmmu);
65 extern int lov_getstripe(struct lustre_handle *conn, struct lov_mds_md *lmmu,
66                          struct lov_stripe_md *lsm);
67
68 /* obd methods */
69 int lov_attach(struct obd_device *dev, obd_count len, void *data)
70 {
71         struct lprocfs_static_vars lvars;
72
73         lprocfs_init_vars(&lvars);
74         return lprocfs_obd_attach(dev, lvars.obd_vars);
75 }
76
77 int lov_detach(struct obd_device *dev)
78 {
79         return lprocfs_obd_detach(dev);
80 }
81
82 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
83                        struct obd_uuid *cluuid, struct recovd_obd *recovd,
84                        ptlrpc_recovery_cb_t recover)
85 {
86         struct ptlrpc_request *req = NULL;
87         struct lov_obd *lov = &obd->u.lov;
88         struct client_obd *mdc = &lov->mdcobd->u.cli;
89         struct lov_desc *desc = &lov->desc;
90         struct obd_export *exp;
91         struct lustre_handle mdc_conn;
92         struct obd_uuid lov_mds_uuid = {"LOV_MDS_UUID"};
93         struct obd_uuid uuid;
94         char *tmp;
95         int rc, rc2, i;
96         ENTRY;
97
98         rc = class_connect(conn, obd, cluuid);
99         if (rc)
100                 RETURN(rc);
101
102         /* We don't want to actually do the underlying connections more than
103          * once, so keep track. */
104         lov->refcount++;
105         if (lov->refcount > 1)
106                 RETURN(0);
107
108         exp = class_conn2export(conn);
109         spin_lock_init(&exp->exp_lov_data.led_lock);
110         INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
111
112         /* retrieve LOV metadata from MDS */
113         rc = obd_connect(&mdc_conn, lov->mdcobd, &lov_mds_uuid, recovd,recover);
114         if (rc) {
115                 CERROR("cannot connect to mdc: rc = %d\n", rc);
116                 GOTO(out_conn, rc);
117         }
118
119         rc = mdc_getlovinfo(obd, &mdc_conn, &req);
120         rc2 = obd_disconnect(&mdc_conn);
121         if (rc) {
122                 CERROR("cannot get lov info %d\n", rc);
123                 GOTO(out_conn, rc);
124         }
125
126         if (rc2) {
127                 CERROR("error disconnecting from MDS %d\n", rc2);
128                 GOTO(out_conn, rc = rc2);
129         }
130
131         /* sanity... */
132         if (req->rq_repmsg->bufcount < 2 ||
133             req->rq_repmsg->buflens[0] < sizeof(*desc)) {
134                 CERROR("LOV desc: invalid descriptor returned\n");
135                 GOTO(out_conn, rc = -EINVAL);
136         }
137
138         memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
139         lov_unpackdesc(desc);
140
141         if (req->rq_repmsg->buflens[1] < sizeof(uuid.uuid)*desc->ld_tgt_count){
142                 CERROR("LOV desc: invalid uuid array returned\n");
143                 GOTO(out_conn, rc = -EINVAL);
144         }
145
146         if (memcmp(obd->obd_uuid.uuid, desc->ld_uuid.uuid,
147                    sizeof(desc->ld_uuid.uuid))) {
148                 CERROR("LOV desc: uuid %s not on mds device (%s)\n",
149                        obd->obd_uuid.uuid, desc->ld_uuid.uuid);
150                 GOTO(out_conn, rc = -EINVAL);
151         }
152
153         if (desc->ld_tgt_count > 1000) {
154                 CERROR("LOV desc: target count > 1000 (%d)\n",
155                        desc->ld_tgt_count);
156                 GOTO(out_conn, rc = -EINVAL);
157         }
158
159         /* Because of 64-bit divide/mod operations only work with a 32-bit
160          * divisor in a 32-bit kernel, we cannot support a stripe width
161          * of 4GB or larger on 32-bit CPUs.
162          */
163         if ((desc->ld_default_stripe_count ?
164              desc->ld_default_stripe_count : desc->ld_tgt_count) *
165              desc->ld_default_stripe_size > ~0UL) {
166                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
167                        desc->ld_default_stripe_size,
168                        desc->ld_default_stripe_count ?
169                        desc->ld_default_stripe_count : desc->ld_tgt_count,~0UL);
170                 GOTO(out_conn, rc = -EINVAL);
171         }
172
173         lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
174         OBD_ALLOC(lov->tgts, lov->bufsize);
175         if (!lov->tgts) {
176                 CERROR("Out of memory\n");
177                 GOTO(out_conn, rc = -ENOMEM);
178         }
179
180         tmp = lustre_msg_buf(req->rq_repmsg, 1);
181         for (i = 0; i < desc->ld_tgt_count; i++) {
182                 struct obd_device *tgt;
183                 struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
184
185                 strncpy(uuid.uuid, tmp, sizeof(uuid.uuid));
186                 memcpy(&lov->tgts[i].uuid, &uuid, sizeof(uuid));
187                 tgt = client_tgtuuid2obd(&uuid);
188                 tmp += sizeof(uuid.uuid);
189
190                 if (!tgt) {
191                         CERROR("Target %s not attached\n", uuid.uuid);
192                         GOTO(out_disc, rc = -EINVAL);
193                 }
194
195                 if (!(tgt->obd_flags & OBD_SET_UP)) {
196                         CERROR("Target %s not set up\n", uuid.uuid);
197                         GOTO(out_disc, rc = -EINVAL);
198                 }
199
200                 rc = obd_connect(&lov->tgts[i].conn, tgt, &lov_osc_uuid, recovd,
201                                  recover);
202
203                 if (rc) {
204                         CERROR("Target %s connect error %d\n", uuid.uuid,
205                                rc);
206                         GOTO(out_disc, rc);
207                 }
208
209                 rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
210                                     sizeof(struct obd_device *), obd, NULL);
211                 if (rc) {
212                         CERROR("Target %s REGISTER_LOV error %d\n",
213                                uuid.uuid, rc);
214                         GOTO(out_disc, rc);
215                 }
216
217                 desc->ld_active_tgt_count++;
218                 lov->tgts[i].active = 1;
219         }
220
221         mdc->cl_max_mds_easize = obd_size_wiremd(conn, NULL);
222
223  out:
224         ptlrpc_req_finished(req);
225         RETURN(rc);
226
227  out_disc:
228         i--; /* skip failed-connect OSC */
229         while (i-- > 0) {
230                 desc->ld_active_tgt_count--;
231                 lov->tgts[i].active = 0;
232                 memcpy(&uuid, &lov->tgts[i].uuid, sizeof(uuid));
233                 rc2 = obd_disconnect(&lov->tgts[i].conn);
234                 if (rc2)
235                         CERROR("error: LOV target %s disconnect on OST idx %d: "
236                                "rc = %d\n", uuid.uuid, i, rc2);
237         }
238         OBD_FREE(lov->tgts, lov->bufsize);
239  out_conn:
240         class_disconnect(conn);
241         goto out;
242 }
243
244 static int lov_disconnect(struct lustre_handle *conn)
245 {
246         struct obd_device *obd = class_conn2obd(conn);
247         struct lov_obd *lov = &obd->u.lov;
248         struct obd_export *exp;
249         struct list_head *p, *n;
250         int rc, i;
251
252         if (!lov->tgts)
253                 goto out_local;
254
255         /* Only disconnect the underlying layers on the final disconnect. */
256         lov->refcount--;
257         if (lov->refcount != 0)
258                 goto out_local;
259
260         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
261                 rc = obd_disconnect(&lov->tgts[i].conn);
262                 if (rc) {
263                         if (lov->tgts[i].active) {
264                                 CERROR("Target %s disconnect error %d\n",
265                                        lov->tgts[i].uuid.uuid, rc);
266                         }
267                         rc = 0;
268                 }
269                 if (lov->tgts[i].active) {
270                         lov->desc.ld_active_tgt_count--;
271                         lov->tgts[i].active = 0;
272                 }
273         }
274         OBD_FREE(lov->tgts, lov->bufsize);
275         lov->bufsize = 0;
276         lov->tgts = NULL;
277
278         exp = class_conn2export(conn);
279         spin_lock(&exp->exp_lov_data.led_lock);
280         list_for_each_safe(p, n, &exp->exp_lov_data.led_open_head) {
281                 /* XXX close these, instead of just discarding them? */
282                 struct lov_file_handles *lfh;
283                 lfh = list_entry(p, typeof(*lfh), lfh_list);
284                 CERROR("discarding open LOV handle %p:"LPX64"\n",
285                        lfh, lfh->lfh_cookie);
286                 list_del(&lfh->lfh_list);
287                 OBD_FREE(lfh->lfh_handles,
288                          lfh->lfh_count * sizeof(*lfh->lfh_handles));
289                 kmem_cache_free(lov_file_cache, lfh);
290         }
291         spin_unlock(&exp->exp_lov_data.led_lock);
292
293  out_local:
294         rc = class_disconnect(conn);
295         return rc;
296 }
297
298 /* Error codes:
299  *
300  *  -EINVAL  : UUID can't be found in the LOV's target list
301  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
302  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
303  *  -EALREADY: The OSC is already marked (in)active
304  */
305 static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
306                               int activate)
307 {
308         struct obd_device *obd;
309         struct lov_tgt_desc *tgt;
310         int i, rc = 0;
311         ENTRY;
312
313         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
314                lov, uuid->uuid, activate);
315
316         spin_lock(&lov->lov_lock);
317         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
318                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
319                        i, tgt->uuid.uuid, tgt->conn.addr);
320                 if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof(uuid->uuid)) == 0)
321                         break;
322         }
323
324         if (i == lov->desc.ld_tgt_count)
325                 GOTO(out, rc = -EINVAL);
326
327         obd = class_conn2obd(&tgt->conn);
328         if (obd == NULL) {
329                 LBUG();
330                 GOTO(out, rc = -ENOTCONN);
331         }
332
333         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LOV idx %d\n",
334                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
335                obd->obd_type->typ_name, i);
336         if (strcmp(obd->obd_type->typ_name, "osc") != 0) {
337                 LBUG();
338                 GOTO(out, rc = -EBADF);
339         }
340
341         if (tgt->active == activate) {
342                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
343                        activate ? "" : "in");
344                 GOTO(out, rc = -EALREADY);
345         }
346
347         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
348
349         tgt->active = activate;
350         if (activate) {
351                 /*
352                  * foreach(export)
353                  *     foreach(open_file)
354                  *         if (file_handle uses this_osc)
355                  *             if (has_no_filehandle)
356                  *                 open(file_handle, this_osc);
357                  */
358                 /* XXX reconnect? */
359                 lov->desc.ld_active_tgt_count++;
360         } else {
361                 /*
362                  * Should I invalidate filehandles that refer to this OSC, so
363                  * that I reopen them during reactivation?
364                  */
365                 /* XXX disconnect from OSC? */
366                 lov->desc.ld_active_tgt_count--;
367         }
368
369 #warning "FIXME: walk open files list for objects that need opening"
370         EXIT;
371  out:
372         spin_unlock(&lov->lov_lock);
373         return rc;
374 }
375
376 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
377 {
378         struct obd_ioctl_data *data = buf;
379         struct lov_obd *lov = &obd->u.lov;
380         struct obd_uuid uuid;
381         int rc = 0;
382         ENTRY;
383
384         if (data->ioc_inllen1 < 1) {
385                 CERROR("LOV setup requires an MDC UUID\n");
386                 RETURN(-EINVAL);
387         }
388
389         if (data->ioc_inllen1 > 37) {
390                 CERROR("mdc UUID must be 36 characters or less\n");
391                 RETURN(-EINVAL);
392         }
393
394         spin_lock_init(&lov->lov_lock);
395         obd_str2uuid(&uuid, data->ioc_inlbuf1);
396         lov->mdcobd = class_uuid2obd(&uuid);
397         if (!lov->mdcobd) {
398                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid.uuid,
399                        data->ioc_inlbuf1);
400                 rc = -EINVAL;
401         }
402         RETURN(rc);
403 }
404
405 static struct lov_file_handles *lov_handle2lfh(struct lustre_handle *handle)
406 {
407         struct lov_file_handles *lfh = NULL;
408
409         if (!handle || !handle->addr)
410                 RETURN(NULL);
411
412         lfh = (struct lov_file_handles *)(unsigned long)(handle->addr);
413         if (!kmem_cache_validate(lov_file_cache, lfh))
414                 RETURN(NULL);
415
416         if (lfh->lfh_cookie != handle->cookie)
417                 RETURN(NULL);
418
419         return lfh;
420 }
421
422 /* the LOV expects oa->o_id to be set to the LOV object id */
423 static int lov_create(struct lustre_handle *conn, struct obdo *oa,
424                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
425 {
426         struct obd_export *export = class_conn2export(conn);
427         struct lov_obd *lov;
428         struct lov_stripe_md *lsm;
429         struct lov_oinfo *loi;
430         struct obdo *tmp;
431         int ost_count, ost_idx;
432         int first = 1, obj_alloc = 0;
433         int rc = 0, i;
434         ENTRY;
435
436         LASSERT(ea);
437
438         if (!export)
439                 RETURN(-EINVAL);
440
441         lov = &export->exp_obd->u.lov;
442
443         if (!lov->desc.ld_active_tgt_count)
444                 RETURN(-EIO);
445
446         tmp = obdo_alloc();
447         if (!tmp)
448                 RETURN(-ENOMEM);
449
450         lsm = *ea;
451
452         if (!lsm) {
453                 rc = obd_alloc_memmd(conn, &lsm);
454                 if (rc < 0)
455                         GOTO(out_tmp, rc);
456
457                 rc = 0;
458                 lsm->lsm_magic = LOV_MAGIC;
459         }
460
461         ost_count = lov->desc.ld_tgt_count;
462
463         LASSERT(oa->o_valid & OBD_MD_FLID);
464         lsm->lsm_object_id = oa->o_id;
465         if (!lsm->lsm_stripe_size)
466                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
467
468         if (!*ea || lsm->lsm_stripe_offset >= ost_count) {
469                 int mult = lsm->lsm_object_id * lsm->lsm_stripe_count;
470                 int stripe_offset = mult % ost_count;
471                 int sub_offset = (mult / ost_count);
472
473                 ost_idx = (stripe_offset + sub_offset) % ost_count;
474         } else
475                 ost_idx = lsm->lsm_stripe_offset;
476
477         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
478                lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
479
480         loi = lsm->lsm_oinfo;
481         for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
482                 struct lov_stripe_md obj_md;
483                 struct lov_stripe_md *obj_mdp = &obj_md;
484                 int err;
485
486                 if (lov->tgts[ost_idx].active == 0) {
487                         CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
488                         continue;
489                 }
490
491                 /* create data objects with "parent" OA */
492                 memcpy(tmp, oa, sizeof(*tmp));
493                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
494                 err = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp, oti);
495                 if (err) {
496                         if (lov->tgts[ost_idx].active) {
497                                 CERROR("error creating objid "LPX64" sub-object"
498                                        " on OST idx %d/%d: rc = %d\n", oa->o_id,
499                                        ost_idx, lsm->lsm_stripe_count, err);
500                                 if (err > 0) {
501                                         CERROR("obd_create returned invalid "
502                                                "err %d\n", err);
503                                         err = -EIO;
504                                 }
505                                 if (!rc)
506                                         rc = err;
507                         }
508                         continue;
509                 }
510                 loi->loi_id = tmp->o_id;
511                 loi->loi_ost_idx = ost_idx;
512                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
513                        lsm->lsm_object_id, loi->loi_id, ost_idx);
514
515                 if (first) {
516                         lsm->lsm_stripe_offset = ost_idx;
517                         first = 0;
518                 }
519
520                 ++obj_alloc;
521                 ++loi;
522
523                 /* If we have allocated enough objects, we are OK */
524                 if (obj_alloc == lsm->lsm_stripe_count) {
525                         rc = 0;
526                         GOTO(out_done, rc);
527                 }
528         }
529
530         if (*ea)
531                 GOTO(out_cleanup, rc);
532         else {
533                 struct lov_stripe_md *lsm_new;
534                 /* XXX LOV STACKING call into osc for sizes */
535                 int size = lov_stripe_md_size(obj_alloc);
536
537                 OBD_ALLOC(lsm_new, size);
538                 if (!lsm_new)
539                         GOTO(out_cleanup, rc = -ENOMEM);
540                 memcpy(lsm_new, lsm, size);
541                 /* XXX LOV STACKING call into osc for sizes */
542                 OBD_FREE(lsm, lov_stripe_md_size(lsm->lsm_stripe_count));
543                 lsm = lsm_new;
544         }
545  out_done:
546         *ea = lsm;
547
548  out_tmp:
549         obdo_free(tmp);
550         return rc;
551
552  out_cleanup:
553         while (obj_alloc-- > 0) {
554                 int err;
555
556                 --loi;
557                 /* destroy already created objects here */
558                 memcpy(tmp, oa, sizeof(*tmp));
559                 tmp->o_id = loi->loi_id;
560                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL, NULL);
561                 if (err)
562                         CERROR("Failed to uncreate objid "LPX64" subobj "
563                                LPX64" on OST idx %d: rc = %d\n",
564                                oa->o_id, loi->loi_id, loi->loi_ost_idx,
565                                err);
566         }
567         if (!*ea)
568                 obd_free_memmd(conn, &lsm);
569         goto out_tmp;
570 }
571
572 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
573                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
574 {
575         struct obdo tmp;
576         struct obd_export *export = class_conn2export(conn);
577         struct lov_obd *lov;
578         struct lov_oinfo *loi;
579         struct lov_file_handles *lfh = NULL;
580         int rc = 0, i;
581         ENTRY;
582
583         if (!lsm) {
584                 CERROR("LOV requires striping ea for destruction\n");
585                 RETURN(-EINVAL);
586         }
587
588         if (lsm->lsm_magic != LOV_MAGIC) {
589                 CERROR("LOV striping magic bad %#x != %#x\n",
590                        lsm->lsm_magic, LOV_MAGIC);
591                 RETURN(-EINVAL);
592         }
593
594         if (!export || !export->exp_obd)
595                 RETURN(-ENODEV);
596
597         if (oa->o_valid & OBD_MD_FLHANDLE)
598                 lfh = lov_handle2lfh(obdo_handle(oa));
599
600         lov = &export->exp_obd->u.lov;
601         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
602                 int err;
603                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
604                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
605                         /* Orphan clean up will (someday) fix this up. */
606                         continue;
607                 }
608
609                 memcpy(&tmp, oa, sizeof(tmp));
610                 tmp.o_id = loi->loi_id;
611                 if (lfh)
612                         memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
613                                sizeof(lfh->lfh_handles[i]));
614                 else
615                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
616                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
617                                   NULL, NULL);
618                 if (err && lov->tgts[loi->loi_ost_idx].active) {
619                         CERROR("error: destroying objid "LPX64" subobj "
620                                LPX64" on OST idx %d\n: rc = %d",
621                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
622                         if (!rc)
623                                 rc = err;
624                 }
625         }
626         RETURN(rc);
627 }
628
629 /* compute object size given "stripeno" and the ost size */
630 static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
631                                 int stripeno)
632 {
633         unsigned long ssize  = lsm->lsm_stripe_size;
634         unsigned long swidth = ssize * lsm->lsm_stripe_count;
635         unsigned long stripe_size;
636         obd_size lov_size;
637
638         if (ost_size == 0)
639                 return 0;
640
641         /* do_div(a, b) returns a % b, and a = a / b */
642         stripe_size = do_div(ost_size, ssize);
643
644         if (stripe_size)
645                 lov_size = ost_size * swidth + stripeno * ssize + stripe_size;
646         else
647                 lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize;
648
649         return lov_size;
650 }
651
652 static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
653                             struct lov_stripe_md *lsm, int stripeno, int *set)
654 {
655         if (*set) {
656                 if (valid & OBD_MD_FLSIZE) {
657                         /* this handles sparse files properly */
658                         obd_size lov_size;
659
660                         lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
661                         if (lov_size > tgt->o_size)
662                                 tgt->o_size = lov_size;
663                 }
664                 if (valid & OBD_MD_FLBLOCKS)
665                         tgt->o_blocks += src->o_blocks;
666                 if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
667                         tgt->o_ctime = src->o_ctime;
668                 if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
669                         tgt->o_mtime = src->o_mtime;
670         } else {
671                 obdo_cpy_md(tgt, src, valid);
672                 if (valid & OBD_MD_FLSIZE)
673                         tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
674                 *set = 1;
675         }
676 }
677
678 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
679                        struct lov_stripe_md *lsm)
680 {
681         struct obdo tmp;
682         struct obd_export *export = class_conn2export(conn);
683         struct lov_obd *lov;
684         struct lov_oinfo *loi;
685         struct lov_file_handles *lfh = NULL;
686         int i;
687         int set = 0;
688         ENTRY;
689
690         if (!lsm) {
691                 CERROR("LOV requires striping ea\n");
692                 RETURN(-EINVAL);
693         }
694
695         if (lsm->lsm_magic != LOV_MAGIC) {
696                 CERROR("LOV striping magic bad %#x != %#x\n",
697                        lsm->lsm_magic, LOV_MAGIC);
698                 RETURN(-EINVAL);
699         }
700
701         if (!export || !export->exp_obd)
702                 RETURN(-ENODEV);
703
704         lov = &export->exp_obd->u.lov;
705
706         if (oa->o_valid & OBD_MD_FLHANDLE)
707                 lfh = lov_handle2lfh(obdo_handle(oa));
708
709         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
710                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
711         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
712                 int err;
713
714                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
715                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
716                         continue;
717                 }
718
719                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
720                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
721                 /* create data objects with "parent" OA */
722                 memcpy(&tmp, oa, sizeof(tmp));
723                 tmp.o_id = loi->loi_id;
724                 if (lfh)
725                         memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
726                                sizeof(lfh->lfh_handles[i]));
727                 else
728                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
729
730                 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
731                 if (err) {
732                         if (lov->tgts[loi->loi_ost_idx].active) {
733                                 CERROR("error: getattr objid "LPX64" subobj "
734                                        LPX64" on OST idx %d: rc = %d\n",
735                                        oa->o_id, loi->loi_id, loi->loi_ost_idx,
736                                        err);
737                                 RETURN(err);
738                         }
739                 } else {
740                         lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &set);
741                 }
742         }
743
744         RETURN(set ? 0 : -EIO);
745 }
746
747 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
748                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
749 {
750         struct obdo *tmp;
751         struct obd_export *export = class_conn2export(conn);
752         struct lov_obd *lov;
753         struct lov_oinfo *loi;
754         struct lov_file_handles *lfh = NULL;
755         int rc = 0, i, set = 0;
756         ENTRY;
757
758         if (!lsm) {
759                 CERROR("LOV requires striping ea\n");
760                 RETURN(-EINVAL);
761         }
762
763         if (lsm->lsm_magic != LOV_MAGIC) {
764                 CERROR("LOV striping magic bad %#x != %#x\n",
765                        lsm->lsm_magic, LOV_MAGIC);
766                 RETURN(-EINVAL);
767         }
768
769         if (!export || !export->exp_obd)
770                 RETURN(-ENODEV);
771
772         /* size changes should go through punch and not setattr */
773         LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
774
775         /* for now, we only expect mtime updates here */
776         LASSERT(!(oa->o_valid & ~(OBD_MD_FLID |OBD_MD_FLTYPE |OBD_MD_FLMTIME)));
777
778         tmp = obdo_alloc();
779         if (!tmp)
780                 RETURN(-ENOMEM);
781
782         if (oa->o_valid & OBD_MD_FLHANDLE)
783                 lfh = lov_handle2lfh(obdo_handle(oa));
784
785         lov = &export->exp_obd->u.lov;
786         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
787                 int err;
788
789                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
790                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
791                         continue;
792                 }
793
794                 obdo_cpy_md(tmp, oa, oa->o_valid);
795
796                 if (lfh)
797                         memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
798                                sizeof(lfh->lfh_handles[i]));
799                 else
800                         tmp->o_valid &= ~OBD_MD_FLHANDLE;
801
802                 tmp->o_id = loi->loi_id;
803
804                 err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp,
805                                   NULL, NULL);
806                 if (err) {
807                         if (lov->tgts[loi->loi_ost_idx].active) {
808                                 CERROR("error: setattr objid "LPX64" subobj "
809                                        LPX64" on OST idx %d: rc = %d\n",
810                                        oa->o_id, loi->loi_id, loi->loi_ost_idx,
811                                        err);
812                                 if (!rc)
813                                         rc = err;
814                         }
815                 } else
816                         set = 1;
817         }
818         obdo_free(tmp);
819         if (!set && !rc)
820                 rc = -EIO;
821         RETURN(rc);
822 }
823
824 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
825                     struct lov_stripe_md *lsm, struct obd_trans_info *oti)
826 {
827         struct obdo *tmp; /* on the heap here, on the stack in lov_close? */
828         struct obd_export *export = class_conn2export(conn);
829         struct lov_obd *lov;
830         struct lov_oinfo *loi;
831         struct lov_file_handles *lfh = NULL;
832         struct lustre_handle *handle;
833         int set = 0;
834         int rc = 0, i;
835         ENTRY;
836
837         if (!lsm) {
838                 CERROR("LOV requires striping ea for opening\n");
839                 RETURN(-EINVAL);
840         }
841
842         if (lsm->lsm_magic != LOV_MAGIC) {
843                 CERROR("LOV striping magic bad %#x != %#x\n",
844                        lsm->lsm_magic, LOV_MAGIC);
845                 RETURN(-EINVAL);
846         }
847
848         if (!export || !export->exp_obd)
849                 RETURN(-ENODEV);
850
851         tmp = obdo_alloc();
852         if (!tmp)
853                 RETURN(-ENOMEM);
854
855         lfh = kmem_cache_alloc(lov_file_cache, GFP_KERNEL);
856         if (!lfh)
857                 GOTO(out_tmp, rc = -ENOMEM);
858         OBD_ALLOC(lfh->lfh_handles,
859                   lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
860         if (!lfh->lfh_handles)
861                 GOTO(out_lfh, rc = -ENOMEM);
862
863         lov = &export->exp_obd->u.lov;
864         oa->o_size = 0;
865         oa->o_blocks = 0;
866         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
867                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
868                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
869                         continue;
870                 }
871
872                 /* create data objects with "parent" OA */
873                 memcpy(tmp, oa, sizeof(*tmp));
874                 tmp->o_id = loi->loi_id;
875
876                 rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp,
877                               NULL, NULL);
878                 if (rc) {
879                         if (lov->tgts[loi->loi_ost_idx].active) {
880                                 CERROR("error: open objid "LPX64" subobj "LPX64
881                                        " on OST idx %d: rc = %d\n",
882                                        oa->o_id, lsm->lsm_oinfo[i].loi_id,
883                                        loi->loi_ost_idx, rc);
884                                 goto out_handles;
885                         }
886                         continue;
887                 }
888
889                 lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &set);
890
891                 if (tmp->o_valid & OBD_MD_FLHANDLE)
892                         memcpy(&lfh->lfh_handles[i], obdo_handle(tmp),
893                                sizeof(lfh->lfh_handles[i]));
894         }
895
896         handle = obdo_handle(oa);
897
898         lfh->lfh_count = lsm->lsm_stripe_count;
899         get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie));
900
901         handle->addr = (__u64)(unsigned long)lfh;
902         handle->cookie = lfh->lfh_cookie;
903         oa->o_valid |= OBD_MD_FLHANDLE;
904         spin_lock(&export->exp_lov_data.led_lock);
905         list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
906         spin_unlock(&export->exp_lov_data.led_lock);
907
908         if (!set && !rc)
909                 rc = -EIO;
910 out_tmp:
911         obdo_free(tmp);
912         RETURN(rc);
913
914 out_handles:
915         for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) {
916                 int err;
917
918                 if (lov->tgts[loi->loi_ost_idx].active == 0)
919                         continue;
920
921                 memcpy(tmp, oa, sizeof(*tmp));
922                 tmp->o_id = loi->loi_id;
923                 memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
924                        sizeof(lfh->lfh_handles[i]));
925
926                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp,
927                                 NULL, NULL);
928                 if (err && lov->tgts[loi->loi_ost_idx].active) {
929                         CERROR("error: closing objid "LPX64" subobj "LPX64
930                                " on OST idx %d after open error: rc=%d\n",
931                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
932                 }
933         }
934
935         OBD_FREE(lfh->lfh_handles,
936                  lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
937 out_lfh:
938         lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
939         kmem_cache_free(lov_file_cache, lfh);
940         goto out_tmp;
941 }
942
943 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
944                      struct lov_stripe_md *lsm, struct obd_trans_info *oti)
945 {
946         struct obdo tmp;
947         struct obd_export *export = class_conn2export(conn);
948         struct lov_obd *lov;
949         struct lov_oinfo *loi;
950         struct lov_file_handles *lfh = NULL;
951         int rc = 0, i;
952         ENTRY;
953
954         if (!lsm) {
955                 CERROR("LOV requires striping ea\n");
956                 RETURN(-EINVAL);
957         }
958
959         if (lsm->lsm_magic != LOV_MAGIC) {
960                 CERROR("LOV striping magic bad %#x != %#x\n",
961                        lsm->lsm_magic, LOV_MAGIC);
962                 RETURN(-EINVAL);
963         }
964
965         if (!export || !export->exp_obd)
966                 RETURN(-ENODEV);
967
968         if (oa->o_valid & OBD_MD_FLHANDLE)
969                 lfh = lov_handle2lfh(obdo_handle(oa));
970
971         lov = &export->exp_obd->u.lov;
972         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
973                 int err;
974
975                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
976                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
977                         continue;
978                 }
979
980                 /* create data objects with "parent" OA */
981                 memcpy(&tmp, oa, sizeof(tmp));
982                 tmp.o_id = loi->loi_id;
983                 if (lfh)
984                         memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
985                                sizeof(lfh->lfh_handles[i]));
986                 else
987                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
988
989                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
990                                 NULL, NULL);
991                 if (err) {
992                         if (lov->tgts[loi->loi_ost_idx].active) {
993                                 CERROR("error: close objid "LPX64" subobj "LPX64
994                                        " on OST idx %d: rc = %d\n", oa->o_id,
995                                        loi->loi_id, loi->loi_ost_idx, err);
996                         }
997                         if (!rc)
998                                 rc = err;
999                 }
1000         }
1001         if (lfh) {
1002                 list_del(&lfh->lfh_list);
1003                 OBD_FREE(lfh->lfh_handles,
1004                          lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
1005                 lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
1006                 kmem_cache_free(lov_file_cache, lfh);
1007         }
1008
1009         RETURN(rc);
1010 }
1011
1012 #ifndef log2
1013 #define log2(n) ffz(~(n))
1014 #endif
1015
1016 #warning FIXME: merge these two functions now that they are nearly the same
1017
1018 /* compute ost offset in stripe "stripeno" corresponding to offset "lov_off" */
1019 static obd_off lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
1020                                  int stripeno)
1021 {
1022         unsigned long ssize  = lsm->lsm_stripe_size;
1023         unsigned long swidth = ssize * lsm->lsm_stripe_count;
1024         unsigned long stripe_off, this_stripe;
1025
1026         if (lov_off == OBD_OBJECT_EOF || lov_off == 0)
1027                 return lov_off;
1028
1029         /* do_div(a, b) returns a % b, and a = a / b */
1030         stripe_off = do_div(lov_off, swidth);
1031
1032         this_stripe = stripeno * ssize;
1033         if (stripe_off <= this_stripe)
1034                 stripe_off = 0;
1035         else {
1036                 stripe_off -= this_stripe;
1037
1038                 if (stripe_off > ssize)
1039                         stripe_off = ssize;
1040         }
1041
1042
1043         return lov_off * ssize + stripe_off;
1044 }
1045
1046 /* compute which stripe number "lov_off" will be written into */
1047 static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
1048 {
1049         unsigned long ssize  = lsm->lsm_stripe_size;
1050         unsigned long swidth = ssize * lsm->lsm_stripe_count;
1051         unsigned long stripe_off;
1052
1053         stripe_off = do_div(lov_off, swidth);
1054
1055         return stripe_off / ssize;
1056 }
1057
1058
1059 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1060  * we can send this 'punch' to just the authoritative node and the nodes
1061  * that the punch will affect. */
1062 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
1063                      struct lov_stripe_md *lsm,
1064                      obd_off start, obd_off end, struct obd_trans_info *oti)
1065 {
1066         struct obdo tmp;
1067         struct obd_export *export = class_conn2export(conn);
1068         struct lov_obd *lov;
1069         struct lov_oinfo *loi;
1070         struct lov_file_handles *lfh = NULL;
1071         int rc = 0, i;
1072         ENTRY;
1073
1074         if (!lsm) {
1075                 CERROR("LOV requires striping ea\n");
1076                 RETURN(-EINVAL);
1077         }
1078
1079         if (lsm->lsm_magic != LOV_MAGIC) {
1080                 CERROR("LOV striping magic bad %#x != %#x\n",
1081                        lsm->lsm_magic, LOV_MAGIC);
1082                 RETURN(-EINVAL);
1083         }
1084
1085         if (!export || !export->exp_obd)
1086                 RETURN(-ENODEV);
1087
1088         if (oa->o_valid & OBD_MD_FLHANDLE)
1089                 lfh = lov_handle2lfh(obdo_handle(oa));
1090
1091         lov = &export->exp_obd->u.lov;
1092         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1093                 obd_off starti = lov_stripe_offset(lsm, start, i);
1094                 obd_off endi = lov_stripe_offset(lsm, end, i);
1095                 int err;
1096
1097                 if (starti == endi)
1098                         continue;
1099
1100                 /* create data objects with "parent" OA */
1101                 memcpy(&tmp, oa, sizeof(tmp));
1102                 tmp.o_id = loi->loi_id;
1103                 if (lfh)
1104                         memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
1105                                sizeof(lfh->lfh_handles[i]));
1106                 else
1107                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
1108
1109                 err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL,
1110                                 starti, endi, NULL);
1111                 if (err) {
1112                         if (lov->tgts[loi->loi_ost_idx].active) {
1113                                 CERROR("error: punch objid "LPX64" subobj "LPX64
1114                                        " on OST idx %d: rc = %d\n", oa->o_id,
1115                                        loi->loi_id, loi->loi_ost_idx, err);
1116                         }
1117                         if (!rc)
1118                                 rc = err;
1119                 }
1120         }
1121         RETURN(rc);
1122 }
1123
1124 static inline int lov_brw(int cmd, struct lustre_handle *conn,
1125                           struct lov_stripe_md *lsm, obd_count oa_bufs,
1126                           struct brw_page *pga, struct obd_brw_set *set,
1127                           struct obd_trans_info *oti)
1128 {
1129         struct {
1130                 int bufct;
1131                 int index;
1132                 int subcount;
1133                 struct lov_stripe_md lsm;
1134                 int ost_idx;
1135         } *stripeinfo, *si, *si_last;
1136         struct obd_export *export = class_conn2export(conn);
1137         struct lov_obd *lov;
1138         struct brw_page *ioarr;
1139         struct lov_oinfo *loi;
1140         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
1141         ENTRY;
1142
1143         if (!lsm) {
1144                 CERROR("LOV requires striping ea\n");
1145                 RETURN(-EINVAL);
1146         }
1147
1148         if (lsm->lsm_magic != LOV_MAGIC) {
1149                 CERROR("LOV striping magic bad %#x != %#x\n",
1150                        lsm->lsm_magic, LOV_MAGIC);
1151                 RETURN(-EINVAL);
1152         }
1153
1154         lov = &export->exp_obd->u.lov;
1155
1156         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1157         if (!stripeinfo)
1158                 GOTO(out_cbdata, rc = -ENOMEM);
1159
1160         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1161         if (!where)
1162                 GOTO(out_sinfo, rc = -ENOMEM);
1163
1164         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1165         if (!ioarr)
1166                 GOTO(out_where, rc = -ENOMEM);
1167
1168         for (i = 0; i < oa_bufs; i++) {
1169                 where[i] = lov_stripe_number(lsm, pga[i].off);
1170                 stripeinfo[where[i]].bufct++;
1171         }
1172
1173         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1174              i < stripe_count; i++, loi++, si_last = si, si++) {
1175                 if (i > 0)
1176                         si->index = si_last->index + si_last->bufct;
1177                 si->lsm.lsm_object_id = loi->loi_id;
1178                 si->ost_idx = loi->loi_ost_idx;
1179         }
1180
1181         for (i = 0; i < oa_bufs; i++) {
1182                 int which = where[i];
1183                 int shift;
1184
1185                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1186                 LASSERT(shift < oa_bufs);
1187                 ioarr[shift] = pga[i];
1188                 ioarr[shift].off = lov_stripe_offset(lsm, pga[i].off, which);
1189                 stripeinfo[which].subcount++;
1190         }
1191
1192         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1193                 int shift = si->index;
1194
1195                 if (si->bufct) {
1196                         LASSERT(shift < oa_bufs);
1197                         rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
1198                                      &si->lsm, si->bufct, &ioarr[shift],
1199                                      set, oti);
1200                         if (rc)
1201                                 GOTO(out_ioarr, rc);
1202                 }
1203         }
1204
1205  out_ioarr:
1206         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1207  out_where:
1208         OBD_FREE(where, sizeof(*where) * oa_bufs);
1209  out_sinfo:
1210         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1211  out_cbdata:
1212         RETURN(rc);
1213 }
1214
1215 static struct lov_lock_handles *lov_newlockh(struct lov_stripe_md *lsm)
1216 {
1217         struct lov_lock_handles *lov_lockh;
1218
1219         OBD_ALLOC(lov_lockh, sizeof(*lov_lockh) +
1220                   sizeof(*lov_lockh->llh_handles) * lsm->lsm_stripe_count);
1221         if (!lov_lockh)
1222                 return NULL;
1223
1224         get_random_bytes(&lov_lockh->llh_cookie, sizeof(lov_lockh->llh_cookie));
1225
1226         return lov_lockh;
1227 }
1228
1229 /* We are only ever passed local lock handles here, so we do not need to
1230  * validate (and we can't really because these structs are variable sized
1231  * and therefore alloced, and not from a private slab).
1232  *
1233  * We just check because we can...
1234  */
1235 static struct lov_lock_handles *lov_h2lovlockh(struct lustre_handle *handle)
1236 {
1237         struct lov_lock_handles *lov_lockh = NULL;
1238
1239         if (!handle || !handle->addr)
1240                 RETURN(NULL);
1241
1242         lov_lockh = (struct lov_lock_handles *)(unsigned long)(handle->addr);
1243         if (lov_lockh->llh_cookie != handle->cookie)
1244                 RETURN(NULL);
1245
1246         return lov_lockh;
1247 }
1248
1249 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1250                        struct lustre_handle *parent_lock,
1251                        __u32 type, void *cookie, int cookielen, __u32 mode,
1252                        int *flags, void *cb, void *data, int datalen,
1253                        struct lustre_handle *lockh)
1254 {
1255         struct obd_export *export = class_conn2export(conn);
1256         struct lov_lock_handles *lov_lockh = NULL;
1257         struct lustre_handle *lov_lockhp;
1258         struct lov_obd *lov;
1259         struct lov_oinfo *loi;
1260         struct lov_stripe_md submd;
1261         int rc = 0, i;
1262         ENTRY;
1263
1264         if (!lsm) {
1265                 CERROR("LOV requires striping ea\n");
1266                 RETURN(-EINVAL);
1267         }
1268
1269         if (lsm->lsm_magic != LOV_MAGIC) {
1270                 CERROR("LOV striping magic bad %#x != %#x\n",
1271                        lsm->lsm_magic, LOV_MAGIC);
1272                 RETURN(-EINVAL);
1273         }
1274
1275         /* we should never be asked to replay a lock. */
1276
1277         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1278
1279         if (!export || !export->exp_obd)
1280                 RETURN(-ENODEV);
1281
1282         if (lsm->lsm_stripe_count > 1) {
1283                 lov_lockh = lov_newlockh(lsm);
1284                 if (!lov_lockh)
1285                         RETURN(-ENOMEM);
1286
1287                 lockh->addr = (__u64)(unsigned long)lov_lockh;
1288                 lockh->cookie = lov_lockh->llh_cookie;
1289                 lov_lockhp = lov_lockh->llh_handles;
1290         } else
1291                 lov_lockhp = lockh;
1292
1293         lov = &export->exp_obd->u.lov;
1294         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1295              i++, loi++, lov_lockhp++) {
1296                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
1297                 struct ldlm_extent sub_ext;
1298
1299                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1300                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1301                         continue;
1302                 }
1303
1304                 *flags = 0;
1305                 sub_ext.start = lov_stripe_offset(lsm, extent->start, i);
1306                 sub_ext.end = lov_stripe_offset(lsm, extent->end, i);
1307                 if (sub_ext.start == sub_ext.end /* || !active */)
1308                         continue;
1309
1310                 /* XXX LOV STACKING: submd should be from the subobj */
1311                 submd.lsm_object_id = loi->loi_id;
1312                 submd.lsm_stripe_count = 0;
1313                 /* XXX submd is not fully initialized here */
1314                 *flags = 0;
1315                 rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
1316                                  parent_lock, type, &sub_ext, sizeof(sub_ext),
1317                                  mode, flags, cb, data, datalen, lov_lockhp);
1318                 // XXX add a lock debug statement here
1319                 if (rc)
1320                         memset(lov_lockhp, 0, sizeof(*lov_lockhp));
1321                 if (rc && lov->tgts[loi->loi_ost_idx].active) {
1322                         CERROR("error: enqueue objid "LPX64" subobj "LPX64
1323                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1324                                loi->loi_id, loi->loi_ost_idx, rc);
1325                         goto out_locks;
1326                 }
1327         }
1328         RETURN(0);
1329
1330 out_locks:
1331         while (loi--, lov_lockhp--, i-- > 0) {
1332                 struct lov_stripe_md submd;
1333                 int err;
1334
1335                 if (lov_lockhp->addr == 0 ||
1336                     lov->tgts[loi->loi_ost_idx].active == 0)
1337                         continue;
1338
1339                 /* XXX LOV STACKING: submd should be from the subobj */
1340                 submd.lsm_object_id = loi->loi_id;
1341                 submd.lsm_stripe_count = 0;
1342                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1343                                  mode, lov_lockhp);
1344                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1345                         CERROR("error: cancelling objid "LPX64" on OST "
1346                                "idx %d after enqueue error: rc = %d\n",
1347                                loi->loi_id, loi->loi_ost_idx, err);
1348                 }
1349         }
1350
1351         if (lsm->lsm_stripe_count > 1) {
1352                 lov_lockh->llh_cookie = DEAD_HANDLE_MAGIC;
1353                 OBD_FREE(lov_lockh, sizeof(*lov_lockh) +
1354                           sizeof(*lov_lockh->llh_handles) *
1355                           lsm->lsm_stripe_count);
1356         }
1357         lockh->addr = 0;
1358         lockh->cookie = DEAD_HANDLE_MAGIC;
1359
1360         RETURN(rc);
1361 }
1362
1363 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1364                       __u32 mode, struct lustre_handle *lockh)
1365 {
1366         struct obd_export *export = class_conn2export(conn);
1367         struct lov_lock_handles *lov_lockh = NULL;
1368         struct lustre_handle *lov_lockhp;
1369         struct lov_obd *lov;
1370         struct lov_oinfo *loi;
1371         int rc = 0, i;
1372         ENTRY;
1373
1374         if (!lsm) {
1375                 CERROR("LOV requires striping ea\n");
1376                 RETURN(-EINVAL);
1377         }
1378
1379         if (lsm->lsm_magic != LOV_MAGIC) {
1380                 CERROR("LOV striping magic bad %#x != %#x\n",
1381                        lsm->lsm_magic, LOV_MAGIC);
1382                 RETURN(-EINVAL);
1383         }
1384
1385         if (!export || !export->exp_obd)
1386                 RETURN(-ENODEV);
1387
1388         LASSERT(lockh);
1389         if (lsm->lsm_stripe_count > 1) {
1390                 lov_lockh = lov_h2lovlockh(lockh);
1391                 if (!lov_lockh) {
1392                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
1393                         RETURN(-EINVAL);
1394                 }
1395
1396                 lov_lockhp = lov_lockh->llh_handles;
1397         } else
1398                 lov_lockhp = lockh;
1399
1400         lov = &export->exp_obd->u.lov;
1401         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1402              i++, loi++, lov_lockhp++ ) {
1403                 struct lov_stripe_md submd;
1404                 int err;
1405
1406                 if (lov_lockhp->addr == 0) {
1407                         CDEBUG(D_HA, "lov idx %d no lock?\n", loi->loi_ost_idx);
1408                         continue;
1409                 }
1410
1411                 /* XXX LOV STACKING: submd should be from the subobj */
1412                 submd.lsm_object_id = loi->loi_id;
1413                 submd.lsm_stripe_count = 0;
1414                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1415                                  mode, lov_lockhp);
1416                 if (err) {
1417                         if (lov->tgts[loi->loi_ost_idx].active) {
1418                                 CERROR("error: cancel objid "LPX64" subobj "
1419                                        LPX64" on OST idx %d: rc = %d\n",
1420                                        lsm->lsm_object_id,
1421                                        loi->loi_id, loi->loi_ost_idx, err);
1422                                 if (!rc)
1423                                         rc = err;
1424                         }
1425                 }
1426         }
1427
1428         if (lsm->lsm_stripe_count > 1) {
1429                 lov_lockh->llh_cookie = DEAD_HANDLE_MAGIC;
1430                 OBD_FREE(lov_lockh, sizeof(*lov_lockh) +
1431                           sizeof(*lov_lockh->llh_handles) *
1432                           lsm->lsm_stripe_count);
1433         }
1434         lockh->addr = 0;
1435         lockh->cookie = DEAD_HANDLE_MAGIC;
1436
1437         RETURN(rc);
1438 }
1439
1440 static int lov_cancel_unused(struct lustre_handle *conn,
1441                              struct lov_stripe_md *lsm, int flags)
1442 {
1443         struct obd_export *export = class_conn2export(conn);
1444         struct lov_obd *lov;
1445         struct lov_oinfo *loi;
1446         int rc = 0, i;
1447         ENTRY;
1448
1449         if (!lsm) {
1450                 CERROR("LOV requires striping ea for lock cancellation\n");
1451                 RETURN(-EINVAL);
1452         }
1453
1454         if (!export || !export->exp_obd)
1455                 RETURN(-ENODEV);
1456
1457         lov = &export->exp_obd->u.lov;
1458         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1459                 struct lov_stripe_md submd;
1460                 int err;
1461
1462                 submd.lsm_object_id = loi->loi_id;
1463                 submd.lsm_stripe_count = 0;
1464                 err = obd_cancel_unused(&lov->tgts[loi->loi_ost_idx].conn,
1465                                        &submd, flags);
1466                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1467                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
1468                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1469                                loi->loi_id, loi->loi_ost_idx, err);
1470                         if (!rc)
1471                                 rc = err;
1472                 }
1473         }
1474
1475         RETURN(rc);
1476 }
1477
1478 static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
1479 {
1480         struct obd_export *export = class_conn2export(conn);
1481         struct lov_obd *lov;
1482         struct obd_statfs lov_sfs;
1483         int set = 0;
1484         int rc = 0;
1485         int i;
1486         ENTRY;
1487
1488         if (!export || !export->exp_obd)
1489                 RETURN(-ENODEV);
1490
1491         lov = &export->exp_obd->u.lov;
1492
1493         /* We only get block data from the OBD */
1494         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1495                 int err;
1496
1497                 if (!lov->tgts[i].active) {
1498                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1499                         continue;
1500                 }
1501
1502                 err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
1503                 if (err) {
1504                         if (lov->tgts[i].active) {
1505                                 CERROR("error: statfs OSC %s on OST idx %d: "
1506                                        "err = %d\n",
1507                                        lov->tgts[i].uuid.uuid, i, err);
1508                                 if (!rc)
1509                                         rc = err;
1510                         }
1511                         continue;
1512                 }
1513                 if (!set) {
1514                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
1515                         set = 1;
1516                 } else {
1517                         osfs->os_bfree += lov_sfs.os_bfree;
1518                         osfs->os_bavail += lov_sfs.os_bavail;
1519                         osfs->os_blocks += lov_sfs.os_blocks;
1520                         /* XXX not sure about this one - depends on policy.
1521                          *   - could be minimum if we always stripe on all OBDs
1522                          *     (but that would be wrong for any other policy,
1523                          *     if one of the OBDs has no more objects left)
1524                          *   - could be sum if we stripe whole objects
1525                          *   - could be average, just to give a nice number
1526                          *   - we just pick first OST and hope it is enough
1527                         sfs->f_ffree += lov_sfs.f_ffree;
1528                          */
1529                 }
1530         }
1531         if (!set && !rc)
1532                 rc = -EIO;
1533         RETURN(rc);
1534 }
1535
1536 static int lov_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
1537                          void *karg, void *uarg)
1538 {
1539         struct obd_device *obddev = class_conn2obd(conn);
1540         struct lov_obd *lov = &obddev->u.lov;
1541         int i, count = lov->desc.ld_tgt_count;
1542         struct obd_uuid *uuidp;
1543         int rc;
1544
1545         ENTRY;
1546
1547         switch (cmd) {
1548         case IOC_LOV_SET_OSC_ACTIVE: {
1549                 struct obd_ioctl_data *data = karg;
1550                 uuidp = (struct obd_uuid *)data->ioc_inlbuf1;
1551                 rc = lov_set_osc_active(lov, uuidp, data->ioc_offset);
1552                 break;
1553         }
1554         case OBD_IOC_LOV_GET_CONFIG: {
1555                 struct obd_ioctl_data *data = karg;
1556                 struct lov_tgt_desc *tgtdesc;
1557                 struct lov_desc *desc;
1558                 char *buf = NULL;
1559
1560                 buf = NULL;
1561                 len = 0;
1562                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1563                         RETURN(-EINVAL);
1564
1565                 data = (struct obd_ioctl_data *)buf;
1566
1567                 if (sizeof(*desc) > data->ioc_inllen1) {
1568                         OBD_FREE(buf, len);
1569                         RETURN(-EINVAL);
1570                 }
1571
1572                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
1573                         OBD_FREE(buf, len);
1574                         RETURN(-EINVAL);
1575                 }
1576
1577                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1578                 memcpy(desc, &(lov->desc), sizeof(*desc));
1579
1580                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
1581                 tgtdesc = lov->tgts;
1582                 for (i = 0; i < count; i++, uuidp++, tgtdesc++)
1583                         obd_str2uuid(uuidp, tgtdesc->uuid.uuid);
1584
1585                 rc = copy_to_user((void *)uarg, buf, len);
1586                 if (rc)
1587                         rc = -EFAULT;
1588                 OBD_FREE(buf, len);
1589                 break;
1590         }
1591         case LL_IOC_LOV_SETSTRIPE:
1592                 rc = lov_setstripe(conn, karg, uarg);
1593                 break;
1594         case LL_IOC_LOV_GETSTRIPE:
1595                 rc = lov_getstripe(conn, karg, uarg);
1596                 break;
1597         default: {
1598                 int set = 0;
1599                 if (count == 0)
1600                         RETURN(-ENOTTY);
1601                 rc = 0;
1602                 for (i = 0; i < count; i++) {
1603                         int err;
1604
1605                         err = obd_iocontrol(cmd, &lov->tgts[i].conn,
1606                                             len, karg, uarg);
1607                         if (err) {
1608                                 if (lov->tgts[i].active) {
1609                                         CERROR("error: iocontrol OSC %s on OST"
1610                                                "idx %d: err = %d\n",
1611                                                lov->tgts[i].uuid.uuid, i, err);
1612                                         if (!rc)
1613                                                 rc = err;
1614                                 }
1615                         } else
1616                                 set = 1;
1617                 }
1618                 if (!set && !rc)
1619                         rc = -EIO;
1620         }
1621         }
1622
1623         RETURN(rc);
1624 }
1625
1626 struct obd_ops lov_obd_ops = {
1627         o_owner:       THIS_MODULE,
1628         o_attach:      lov_attach,
1629         o_detach:      lov_detach,
1630         o_setup:       lov_setup,
1631         o_connect:     lov_connect,
1632         o_disconnect:  lov_disconnect,
1633         o_statfs:      lov_statfs,
1634         o_packmd:      lov_packmd,
1635         o_unpackmd:    lov_unpackmd,
1636         o_create:      lov_create,
1637         o_destroy:     lov_destroy,
1638         o_getattr:     lov_getattr,
1639         o_setattr:     lov_setattr,
1640         o_open:        lov_open,
1641         o_close:       lov_close,
1642         o_brw:         lov_brw,
1643         o_punch:       lov_punch,
1644         o_enqueue:     lov_enqueue,
1645         o_cancel:      lov_cancel,
1646         o_cancel_unused: lov_cancel_unused,
1647         o_iocontrol:   lov_iocontrol
1648 };
1649
1650 static int __init lov_init(void)
1651 {
1652         struct lprocfs_static_vars lvars;
1653         int rc;
1654
1655         printk(KERN_INFO "Lustre Logical Object Volume driver; "
1656                "info@clusterfs.com\n");
1657         lov_file_cache = kmem_cache_create("ll_lov_file_data",
1658                                            sizeof(struct lov_file_handles),
1659                                            0, 0, NULL, NULL);
1660         if (!lov_file_cache)
1661                 RETURN(-ENOMEM);
1662
1663         lprocfs_init_vars(&lvars);
1664         rc = class_register_type(&lov_obd_ops, lvars.module_vars,
1665                                  OBD_LOV_DEVICENAME);
1666         RETURN(rc);
1667 }
1668
1669 static void __exit lov_exit(void)
1670 {
1671         if (kmem_cache_destroy(lov_file_cache))
1672                 CERROR("couldn't free LOV open cache\n");
1673         class_unregister_type(OBD_LOV_DEVICENAME);
1674 }
1675
1676 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1677 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
1678 MODULE_LICENSE("GPL");
1679
1680 module_init(lov_init);
1681 module_exit(lov_exit);