Whamcloud - gitweb
merge b_devel into HEAD. Includes:
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1  /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  * Author: Phil Schwan <phil@clusterfs.com>
6  *         Peter Braam <braam@clusterfs.com>
7  *         Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define EXPORT_SYMTAB
26 #define DEBUG_SUBSYSTEM S_LOV
27 #ifdef __KERNEL__
28 #include <linux/slab.h>
29 #include <linux/module.h>
30 #include <linux/init.h>
31 #include <linux/random.h>
32 #include <linux/slab.h>
33 #include <asm/div64.h>
34 #else
35 #include <liblustre.h>
36 #endif
37
38 #include <linux/obd_support.h>
39 #include <linux/lustre_lib.h>
40 #include <linux/lustre_net.h>
41 #include <linux/lustre_idl.h>
42 #include <linux/lustre_lite.h> /* for LL_IOC_LOV_[GS]ETSTRIPE */
43 #include <linux/lustre_mds.h>
44 #include <linux/obd_class.h>
45 #include <linux/obd_lov.h>
46 #include <linux/lprocfs_status.h>
47
48 static kmem_cache_t *lov_file_cache;
49
50 struct lov_file_handles {
51         struct list_head lfh_list;
52         __u64 lfh_cookie;
53         int lfh_count;
54         char *lfh_data; /* an array of opaque data saved on behalf of
55                         * each osc, FD_OSTDATA_SIZE bytes for each */
56 };
57
58 struct lov_lock_handles {
59         __u64 llh_cookie;
60         struct lustre_handle llh_handles[0];
61 };
62
63 extern int lov_packmd(struct lustre_handle *conn, struct lov_mds_md **lmm,
64                        struct lov_stripe_md *lsm);
65 extern int lov_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsm,
66                          struct lov_mds_md *lmm);
67 extern int lov_setstripe(struct lustre_handle *conn,
68                          struct lov_stripe_md **lsmp, struct lov_mds_md *lmmu);
69 extern int lov_getstripe(struct lustre_handle *conn, struct lov_mds_md *lmmu,
70                          struct lov_stripe_md *lsm);
71
72 /* obd methods */
73 int lov_attach(struct obd_device *dev, obd_count len, void *data)
74 {
75         struct lprocfs_static_vars lvars;
76
77         lprocfs_init_vars(&lvars);
78         return lprocfs_obd_attach(dev, lvars.obd_vars);
79 }
80
81 int lov_detach(struct obd_device *dev)
82 {
83         return lprocfs_obd_detach(dev);
84 }
85
86 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
87                        struct obd_uuid *cluuid, struct recovd_obd *recovd,
88                        ptlrpc_recovery_cb_t recover)
89 {
90         struct ptlrpc_request *req = NULL;
91         struct lov_obd *lov = &obd->u.lov;
92         struct client_obd *mdc = &lov->mdcobd->u.cli;
93         struct lov_desc *desc = &lov->desc;
94         struct lov_tgt_desc *tgts;
95         struct obd_export *exp;
96         struct lustre_handle mdc_conn;
97         struct obd_uuid lov_mds_uuid = {"LOV_MDS_UUID"};
98         char *tmp;
99         int rc, rc2, i;
100         ENTRY;
101
102         rc = class_connect(conn, obd, cluuid);
103         if (rc)
104                 RETURN(rc);
105
106         /* We don't want to actually do the underlying connections more than
107          * once, so keep track. */
108         lov->refcount++;
109         if (lov->refcount > 1)
110                 RETURN(0);
111
112         exp = class_conn2export(conn);
113         spin_lock_init(&exp->exp_lov_data.led_lock);
114         INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
115
116         /* retrieve LOV metadata from MDS */
117         rc = obd_connect(&mdc_conn, lov->mdcobd, &lov_mds_uuid, recovd,recover);
118         if (rc) {
119                 CERROR("cannot connect to mdc: rc = %d\n", rc);
120                 GOTO(out_conn, rc);
121         }
122
123         rc = mdc_getlovinfo(obd, &mdc_conn, &req);
124         rc2 = obd_disconnect(&mdc_conn);
125         if (rc) {
126                 CERROR("cannot get lov info %d\n", rc);
127                 GOTO(out_conn, rc);
128         }
129
130         if (rc2) {
131                 CERROR("error disconnecting from MDS %d\n", rc2);
132                 GOTO(out_conn, rc = rc2);
133         }
134
135         /* sanity... */
136         if (req->rq_repmsg->bufcount < 2 ||
137             req->rq_repmsg->buflens[0] < sizeof(*desc)) {
138                 CERROR("LOV desc: invalid descriptor returned\n");
139                 GOTO(out_conn, rc = -EINVAL);
140         }
141
142         memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
143         lov_unpackdesc(desc);
144
145         if (req->rq_repmsg->buflens[1] <
146             sizeof(desc->ld_uuid.uuid) * desc->ld_tgt_count){
147                 CERROR("LOV desc: invalid uuid array returned\n");
148                 GOTO(out_conn, rc = -EINVAL);
149         }
150
151         if (memcmp(obd->obd_uuid.uuid, desc->ld_uuid.uuid,
152                    sizeof(desc->ld_uuid.uuid))) {
153                 CERROR("LOV desc: uuid %s not on mds device (%s)\n",
154                        obd->obd_uuid.uuid, desc->ld_uuid.uuid);
155                 GOTO(out_conn, rc = -EINVAL);
156         }
157
158         if (desc->ld_tgt_count > 1000) {
159                 CERROR("LOV desc: target count > 1000 (%d)\n",
160                        desc->ld_tgt_count);
161                 GOTO(out_conn, rc = -EINVAL);
162         }
163
164         /* Because of 64-bit divide/mod operations only work with a 32-bit
165          * divisor in a 32-bit kernel, we cannot support a stripe width
166          * of 4GB or larger on 32-bit CPUs.
167          */
168         if ((desc->ld_default_stripe_count ?
169              desc->ld_default_stripe_count : desc->ld_tgt_count) *
170              desc->ld_default_stripe_size > ~0UL) {
171                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
172                        desc->ld_default_stripe_size,
173                        desc->ld_default_stripe_count ?
174                        desc->ld_default_stripe_count : desc->ld_tgt_count,~0UL);
175                 GOTO(out_conn, rc = -EINVAL);
176         }
177
178         lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
179         OBD_ALLOC(lov->tgts, lov->bufsize);
180         if (!lov->tgts) {
181                 CERROR("Out of memory\n");
182                 GOTO(out_conn, rc = -ENOMEM);
183         }
184
185         tmp = lustre_msg_buf(req->rq_repmsg, 1);
186         for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) {
187                 struct obd_uuid *uuid = &tgts->uuid;
188                 struct obd_device *tgt_obd;
189                 struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
190
191                 obd_str2uuid(uuid, tmp);
192                 tgt_obd = client_tgtuuid2obd(uuid);
193                 tmp += sizeof(uuid->uuid);
194
195                 if (!tgt_obd) {
196                         CERROR("Target %s not attached\n", uuid->uuid);
197                         GOTO(out_disc, rc = -EINVAL);
198                 }
199
200                 if (!(tgt_obd->obd_flags & OBD_SET_UP)) {
201                         CERROR("Target %s not set up\n", uuid->uuid);
202                         GOTO(out_disc, rc = -EINVAL);
203                 }
204
205                 rc = obd_connect(&tgts->conn, tgt_obd, &lov_osc_uuid, recovd,
206                                  recover);
207
208                 if (rc) {
209                         CERROR("Target %s connect error %d\n", uuid->uuid, rc);
210                         GOTO(out_disc, rc);
211                 }
212
213                 rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &tgts->conn,
214                                    sizeof(struct obd_device *), obd, NULL);
215                 if (rc) {
216                         CERROR("Target %s REGISTER_LOV error %d\n",
217                                uuid->uuid, rc);
218                         obd_disconnect(&tgts->conn);
219                         GOTO(out_disc, rc);
220                 }
221
222                 desc->ld_active_tgt_count++;
223                 tgts->active = 1;
224         }
225
226         mdc->cl_max_mds_easize = obd_size_wiremd(conn, NULL);
227
228  out:
229         ptlrpc_req_finished(req);
230         RETURN(rc);
231
232  out_disc:
233         while (i-- > 0) {
234                 struct obd_uuid uuid;
235                 --tgts;
236                 --desc->ld_active_tgt_count;
237                 tgts->active = 0;
238                 obd_str2uuid(&uuid, tgts->uuid.uuid);
239                 rc2 = obd_disconnect(&tgts->conn);
240                 if (rc2)
241                         CERROR("error: LOV target %s disconnect on OST idx %d: "
242                                "rc = %d\n", uuid.uuid, i, rc2);
243         }
244         OBD_FREE(lov->tgts, lov->bufsize);
245  out_conn:
246         class_disconnect(conn);
247         goto out;
248 }
249
250 static int lov_disconnect(struct lustre_handle *conn)
251 {
252         struct obd_device *obd = class_conn2obd(conn);
253         struct lov_obd *lov = &obd->u.lov;
254         struct obd_export *exp;
255         struct list_head *p, *n;
256         int rc, i;
257
258         if (!lov->tgts)
259                 goto out_local;
260
261         /* Only disconnect the underlying layers on the final disconnect. */
262         lov->refcount--;
263         if (lov->refcount != 0)
264                 goto out_local;
265
266         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
267                 rc = obd_disconnect(&lov->tgts[i].conn);
268                 if (rc) {
269                         if (lov->tgts[i].active) {
270                                 CERROR("Target %s disconnect error %d\n",
271                                        lov->tgts[i].uuid.uuid, rc);
272                         }
273                         rc = 0;
274                 }
275                 if (lov->tgts[i].active) {
276                         lov->desc.ld_active_tgt_count--;
277                         lov->tgts[i].active = 0;
278                 }
279         }
280         OBD_FREE(lov->tgts, lov->bufsize);
281         lov->bufsize = 0;
282         lov->tgts = NULL;
283
284         exp = class_conn2export(conn);
285         spin_lock(&exp->exp_lov_data.led_lock);
286         list_for_each_safe(p, n, &exp->exp_lov_data.led_open_head) {
287                 /* XXX close these, instead of just discarding them? */
288                 struct lov_file_handles *lfh;
289                 lfh = list_entry(p, typeof(*lfh), lfh_list);
290                 CERROR("discarding open LOV handle %p:"LPX64"\n",
291                        lfh, lfh->lfh_cookie);
292                 list_del(&lfh->lfh_list);
293                 OBD_FREE(lfh->lfh_data, lfh->lfh_count * FD_OSTDATA_SIZE);
294                 PORTAL_SLAB_FREE(lfh, lov_file_cache, sizeof(*lfh));
295         }
296         spin_unlock(&exp->exp_lov_data.led_lock);
297
298  out_local:
299         rc = class_disconnect(conn);
300         return rc;
301 }
302
303 /* Error codes:
304  *
305  *  -EINVAL  : UUID can't be found in the LOV's target list
306  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
307  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
308  *  -EALREADY: The OSC is already marked (in)active
309  */
310 static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
311                               int activate)
312 {
313         struct obd_device *obd;
314         struct lov_tgt_desc *tgt;
315         int i, rc = 0;
316         ENTRY;
317
318         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
319                lov, uuid->uuid, activate);
320
321         spin_lock(&lov->lov_lock);
322         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
323                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
324                        i, tgt->uuid.uuid, tgt->conn.addr);
325                 if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof(uuid->uuid)) == 0)
326                         break;
327         }
328
329         if (i == lov->desc.ld_tgt_count)
330                 GOTO(out, rc = -EINVAL);
331
332         obd = class_conn2obd(&tgt->conn);
333         if (obd == NULL) {
334                 LBUG();
335                 GOTO(out, rc = -ENOTCONN);
336         }
337
338         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LOV idx %d\n",
339                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
340                obd->obd_type->typ_name, i);
341         if (strcmp(obd->obd_type->typ_name, "osc") != 0) {
342                 LBUG();
343                 GOTO(out, rc = -EBADF);
344         }
345
346         if (tgt->active == activate) {
347                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
348                        activate ? "" : "in");
349                 GOTO(out, rc = -EALREADY);
350         }
351
352         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
353
354         tgt->active = activate;
355         if (activate) {
356                 /*
357                  * foreach(export)
358                  *     foreach(open_file)
359                  *         if (file_handle uses this_osc)
360                  *             if (has_no_filehandle)
361                  *                 open(file_handle, this_osc);
362                  */
363                 /* XXX reconnect? */
364                 lov->desc.ld_active_tgt_count++;
365         } else {
366                 /*
367                  * Should I invalidate filehandles that refer to this OSC, so
368                  * that I reopen them during reactivation?
369                  */
370                 /* XXX disconnect from OSC? */
371                 lov->desc.ld_active_tgt_count--;
372         }
373
374 #warning "FIXME: walk open files list for objects that need opening"
375         EXIT;
376  out:
377         spin_unlock(&lov->lov_lock);
378         return rc;
379 }
380
381 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
382 {
383         struct obd_ioctl_data *data = buf;
384         struct lov_obd *lov = &obd->u.lov;
385         struct obd_uuid uuid;
386         int rc = 0;
387         ENTRY;
388
389         if (data->ioc_inllen1 < 1) {
390                 CERROR("LOV setup requires an MDC UUID\n");
391                 RETURN(-EINVAL);
392         }
393
394         if (data->ioc_inllen1 > 37) {
395                 CERROR("mdc UUID must be 36 characters or less\n");
396                 RETURN(-EINVAL);
397         }
398
399         spin_lock_init(&lov->lov_lock);
400         obd_str2uuid(&uuid, data->ioc_inlbuf1);
401         lov->mdcobd = class_uuid2obd(&uuid);
402         if (!lov->mdcobd) {
403                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid.uuid,
404                        data->ioc_inlbuf1);
405                 rc = -EINVAL;
406         }
407         RETURN(rc);
408 }
409
410 static struct lov_file_handles *lov_handle2lfh(struct lustre_handle *handle)
411 {
412         struct lov_file_handles *lfh = NULL;
413
414         if (!handle || !handle->addr)
415                 RETURN(NULL);
416
417         lfh = (struct lov_file_handles *)(unsigned long)(handle->addr);
418         if (!kmem_cache_validate(lov_file_cache, lfh))
419                 RETURN(NULL);
420
421         if (lfh->lfh_cookie != handle->cookie)
422                 RETURN(NULL);
423
424         return lfh;
425 }
426
427 /* the LOV expects oa->o_id to be set to the LOV object id */
428 static int lov_create(struct lustre_handle *conn, struct obdo *oa,
429                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
430 {
431         struct obd_export *export = class_conn2export(conn);
432         struct lov_obd *lov;
433         struct lov_stripe_md *lsm;
434         struct lov_oinfo *loi;
435         struct obdo *tmp;
436         int ost_count, ost_idx;
437         int first = 1, obj_alloc = 0;
438         int rc = 0, i;
439         ENTRY;
440
441         LASSERT(ea);
442
443         if (!export)
444                 RETURN(-EINVAL);
445
446         lov = &export->exp_obd->u.lov;
447
448         if (!lov->desc.ld_active_tgt_count)
449                 RETURN(-EIO);
450
451         tmp = obdo_alloc();
452         if (!tmp)
453                 RETURN(-ENOMEM);
454
455         lsm = *ea;
456
457         if (!lsm) {
458                 rc = obd_alloc_memmd(conn, &lsm);
459                 if (rc < 0)
460                         GOTO(out_tmp, rc);
461
462                 rc = 0;
463                 lsm->lsm_magic = LOV_MAGIC;
464         }
465
466         ost_count = lov->desc.ld_tgt_count;
467
468         LASSERT(oa->o_valid & OBD_MD_FLID);
469         lsm->lsm_object_id = oa->o_id;
470         if (!lsm->lsm_stripe_size)
471                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
472
473         if (!*ea || lsm->lsm_stripe_offset >= ost_count) {
474                 int mult = lsm->lsm_object_id * lsm->lsm_stripe_count;
475                 int stripe_offset = mult % ost_count;
476                 int sub_offset = (mult / ost_count);
477
478                 ost_idx = (stripe_offset + sub_offset) % ost_count;
479         } else
480                 ost_idx = lsm->lsm_stripe_offset;
481
482         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
483                lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
484
485         loi = lsm->lsm_oinfo;
486         for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
487                 struct lov_stripe_md obj_md;
488                 struct lov_stripe_md *obj_mdp = &obj_md;
489                 int err;
490
491                 if (lov->tgts[ost_idx].active == 0) {
492                         CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
493                         continue;
494                 }
495
496                 /* create data objects with "parent" OA */
497                 memcpy(tmp, oa, sizeof(*tmp));
498                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
499                 err = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp, oti);
500                 if (err) {
501                         if (lov->tgts[ost_idx].active) {
502                                 CERROR("error creating objid "LPX64" sub-object"
503                                        " on OST idx %d/%d: rc = %d\n", oa->o_id,
504                                        ost_idx, lsm->lsm_stripe_count, err);
505                                 if (err > 0) {
506                                         CERROR("obd_create returned invalid "
507                                                "err %d\n", err);
508                                         err = -EIO;
509                                 }
510                                 if (!rc)
511                                         rc = err;
512                         }
513                         continue;
514                 }
515                 loi->loi_id = tmp->o_id;
516                 loi->loi_ost_idx = ost_idx;
517                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
518                        lsm->lsm_object_id, loi->loi_id, ost_idx);
519
520                 if (first) {
521                         lsm->lsm_stripe_offset = ost_idx;
522                         first = 0;
523                 }
524
525                 ++obj_alloc;
526                 ++loi;
527
528                 /* If we have allocated enough objects, we are OK */
529                 if (obj_alloc == lsm->lsm_stripe_count) {
530                         rc = 0;
531                         GOTO(out_done, rc);
532                 }
533         }
534
535         if (*ea)
536                 GOTO(out_cleanup, rc);
537         else {
538                 struct lov_stripe_md *lsm_new;
539                 /* XXX LOV STACKING call into osc for sizes */
540                 int size = lov_stripe_md_size(obj_alloc);
541
542                 OBD_ALLOC(lsm_new, size);
543                 if (!lsm_new)
544                         GOTO(out_cleanup, rc = -ENOMEM);
545                 memcpy(lsm_new, lsm, size);
546                 lsm_new->lsm_stripe_count = obj_alloc;
547
548                 /* XXX LOV STACKING call into osc for sizes */
549                 OBD_FREE(lsm, lov_stripe_md_size(lsm->lsm_stripe_count));
550                 lsm = lsm_new;
551         }
552  out_done:
553         *ea = lsm;
554
555  out_tmp:
556         obdo_free(tmp);
557         return rc;
558
559  out_cleanup:
560         while (obj_alloc-- > 0) {
561                 int err;
562
563                 --loi;
564                 /* destroy already created objects here */
565                 memcpy(tmp, oa, sizeof(*tmp));
566                 tmp->o_id = loi->loi_id;
567                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL, NULL);
568                 if (err)
569                         CERROR("Failed to uncreate objid "LPX64" subobj "
570                                LPX64" on OST idx %d: rc = %d\n",
571                                oa->o_id, loi->loi_id, loi->loi_ost_idx,
572                                err);
573         }
574         if (!*ea)
575                 obd_free_memmd(conn, &lsm);
576         goto out_tmp;
577 }
578
579 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
580                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
581 {
582         struct obdo tmp;
583         struct obd_export *export = class_conn2export(conn);
584         struct lov_obd *lov;
585         struct lov_oinfo *loi;
586         struct lov_file_handles *lfh = NULL;
587         int rc = 0, i;
588         ENTRY;
589
590         if (!lsm) {
591                 CERROR("LOV requires striping ea for destruction\n");
592                 RETURN(-EINVAL);
593         }
594
595         if (lsm->lsm_magic != LOV_MAGIC) {
596                 CERROR("LOV striping magic bad %#x != %#x\n",
597                        lsm->lsm_magic, LOV_MAGIC);
598                 RETURN(-EINVAL);
599         }
600
601         if (!export || !export->exp_obd)
602                 RETURN(-ENODEV);
603
604         if (oa->o_valid & OBD_MD_FLHANDLE)
605                 lfh = lov_handle2lfh(obdo_handle(oa));
606
607         lov = &export->exp_obd->u.lov;
608         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
609                 int err;
610                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
611                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
612                         /* Orphan clean up will (someday) fix this up. */
613                         continue;
614                 }
615
616                 memcpy(&tmp, oa, sizeof(tmp));
617                 tmp.o_id = loi->loi_id;
618                 if (lfh)
619                         memcpy(obdo_handle(&tmp),
620                                lfh->lfh_data + i * FD_OSTDATA_SIZE,
621                                FD_OSTDATA_SIZE);
622                 else
623                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
624                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
625                                   NULL, NULL);
626                 if (err && lov->tgts[loi->loi_ost_idx].active) {
627                         CERROR("error: destroying objid "LPX64" subobj "
628                                LPX64" on OST idx %d\n: rc = %d",
629                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
630                         if (!rc)
631                                 rc = err;
632                 }
633         }
634         RETURN(rc);
635 }
636
637 /* compute object size given "stripeno" and the ost size */
638 static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
639                                 int stripeno)
640 {
641         unsigned long ssize  = lsm->lsm_stripe_size;
642         unsigned long swidth = ssize * lsm->lsm_stripe_count;
643         unsigned long stripe_size;
644         obd_size lov_size;
645
646         if (ost_size == 0)
647                 return 0;
648
649         /* do_div(a, b) returns a % b, and a = a / b */
650         stripe_size = do_div(ost_size, ssize);
651
652         if (stripe_size)
653                 lov_size = ost_size * swidth + stripeno * ssize + stripe_size;
654         else
655                 lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize;
656
657         return lov_size;
658 }
659
660 static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
661                             struct lov_stripe_md *lsm, int stripeno, int *set)
662 {
663         if (*set) {
664                 if (valid & OBD_MD_FLSIZE) {
665                         /* this handles sparse files properly */
666                         obd_size lov_size;
667
668                         lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
669                         if (lov_size > tgt->o_size)
670                                 tgt->o_size = lov_size;
671                 }
672                 if (valid & OBD_MD_FLBLOCKS)
673                         tgt->o_blocks += src->o_blocks;
674                 if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
675                         tgt->o_ctime = src->o_ctime;
676                 if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
677                         tgt->o_mtime = src->o_mtime;
678         } else {
679                 obdo_cpy_md(tgt, src, valid);
680                 if (valid & OBD_MD_FLSIZE)
681                         tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
682                 *set = 1;
683         }
684 }
685
686 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
687                        struct lov_stripe_md *lsm)
688 {
689         struct obdo tmp;
690         struct obd_export *export = class_conn2export(conn);
691         struct lov_obd *lov;
692         struct lov_oinfo *loi;
693         struct lov_file_handles *lfh = NULL;
694         int i;
695         int set = 0;
696         ENTRY;
697
698         if (!lsm) {
699                 CERROR("LOV requires striping ea\n");
700                 RETURN(-EINVAL);
701         }
702
703         if (lsm->lsm_magic != LOV_MAGIC) {
704                 CERROR("LOV striping magic bad %#x != %#x\n",
705                        lsm->lsm_magic, LOV_MAGIC);
706                 RETURN(-EINVAL);
707         }
708
709         if (!export || !export->exp_obd)
710                 RETURN(-ENODEV);
711
712         lov = &export->exp_obd->u.lov;
713
714         if (oa->o_valid & OBD_MD_FLHANDLE)
715                 lfh = lov_handle2lfh(obdo_handle(oa));
716
717         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
718                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
719         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
720                 int err;
721
722                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
723                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
724                         continue;
725                 }
726
727                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
728                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
729                 /* create data objects with "parent" OA */
730                 memcpy(&tmp, oa, sizeof(tmp));
731                 tmp.o_id = loi->loi_id;
732                 if (lfh)
733                         memcpy(obdo_handle(&tmp),
734                                lfh->lfh_data + i * FD_OSTDATA_SIZE,
735                                FD_OSTDATA_SIZE);
736                 else
737                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
738
739                 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
740                 if (err) {
741                         if (lov->tgts[loi->loi_ost_idx].active) {
742                                 CERROR("error: getattr objid "LPX64" subobj "
743                                        LPX64" on OST idx %d: rc = %d\n",
744                                        oa->o_id, loi->loi_id, loi->loi_ost_idx,
745                                        err);
746                                 RETURN(err);
747                         }
748                 } else {
749                         lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &set);
750                 }
751         }
752
753         RETURN(set ? 0 : -EIO);
754 }
755
756 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
757                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
758 {
759         struct obdo *tmp;
760         struct obd_export *export = class_conn2export(conn);
761         struct lov_obd *lov;
762         struct lov_oinfo *loi;
763         struct lov_file_handles *lfh = NULL;
764         int rc = 0, i, set = 0;
765         ENTRY;
766
767         if (!lsm) {
768                 CERROR("LOV requires striping ea\n");
769                 RETURN(-EINVAL);
770         }
771
772         if (lsm->lsm_magic != LOV_MAGIC) {
773                 CERROR("LOV striping magic bad %#x != %#x\n",
774                        lsm->lsm_magic, LOV_MAGIC);
775                 RETURN(-EINVAL);
776         }
777
778         if (!export || !export->exp_obd)
779                 RETURN(-ENODEV);
780
781         /* size changes should go through punch and not setattr */
782         LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
783
784         /* for now, we only expect mtime updates here */
785         LASSERT(!(oa->o_valid & ~(OBD_MD_FLID |OBD_MD_FLTYPE |OBD_MD_FLMTIME)));
786
787         tmp = obdo_alloc();
788         if (!tmp)
789                 RETURN(-ENOMEM);
790
791         if (oa->o_valid & OBD_MD_FLHANDLE)
792                 lfh = lov_handle2lfh(obdo_handle(oa));
793
794         lov = &export->exp_obd->u.lov;
795         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
796                 int err;
797
798                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
799                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
800                         continue;
801                 }
802
803                 obdo_cpy_md(tmp, oa, oa->o_valid);
804
805                 if (lfh)
806                         memcpy(obdo_handle(tmp),
807                                lfh->lfh_data + i * FD_OSTDATA_SIZE,
808                                FD_OSTDATA_SIZE);
809                 else
810                         tmp->o_valid &= ~OBD_MD_FLHANDLE;
811
812                 tmp->o_id = loi->loi_id;
813
814                 err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp,
815                                   NULL, NULL);
816                 if (err) {
817                         if (lov->tgts[loi->loi_ost_idx].active) {
818                                 CERROR("error: setattr objid "LPX64" subobj "
819                                        LPX64" on OST idx %d: rc = %d\n",
820                                        oa->o_id, loi->loi_id, loi->loi_ost_idx,
821                                        err);
822                                 if (!rc)
823                                         rc = err;
824                         }
825                 } else
826                         set = 1;
827         }
828         obdo_free(tmp);
829         if (!set && !rc)
830                 rc = -EIO;
831         RETURN(rc);
832 }
833
834 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
835                     struct lov_stripe_md *lsm, struct obd_trans_info *oti)
836 {
837         struct obdo *tmp; /* on the heap here, on the stack in lov_close? */
838         struct obd_export *export = class_conn2export(conn);
839         struct lov_obd *lov;
840         struct lov_oinfo *loi;
841         struct lov_file_handles *lfh = NULL;
842         struct lustre_handle *handle;
843         int set = 0;
844         int rc = 0, i;
845         ENTRY;
846
847         if (!lsm) {
848                 CERROR("LOV requires striping ea for opening\n");
849                 RETURN(-EINVAL);
850         }
851
852         if (lsm->lsm_magic != LOV_MAGIC) {
853                 CERROR("LOV striping magic bad %#x != %#x\n",
854                        lsm->lsm_magic, LOV_MAGIC);
855                 RETURN(-EINVAL);
856         }
857
858         if (!export || !export->exp_obd)
859                 RETURN(-ENODEV);
860
861         tmp = obdo_alloc();
862         if (!tmp)
863                 RETURN(-ENOMEM);
864
865         PORTAL_SLAB_ALLOC(lfh, lov_file_cache, sizeof(*lfh));
866         if (!lfh)
867                 GOTO(out_tmp, rc = -ENOMEM);
868         OBD_ALLOC(lfh->lfh_data, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
869         if (!lfh->lfh_data)
870                 GOTO(out_lfh, rc = -ENOMEM);
871
872         lov = &export->exp_obd->u.lov;
873         oa->o_size = 0;
874         oa->o_blocks = 0;
875         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
876                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
877                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
878                         continue;
879                 }
880
881                 /* create data objects with "parent" OA */
882                 memcpy(tmp, oa, sizeof(*tmp));
883                 tmp->o_id = loi->loi_id;
884
885                 rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp,
886                               NULL, NULL);
887                 if (rc) {
888                         if (!lov->tgts[loi->loi_ost_idx].active)
889                                 continue;
890                         CERROR("error: open objid "LPX64" subobj "LPX64
891                                " on OST idx %d: rc = %d\n",
892                                oa->o_id, lsm->lsm_oinfo[i].loi_id,
893                                loi->loi_ost_idx, rc);
894                         goto out_handles;
895                 }
896
897                 lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &set);
898
899                 if (tmp->o_valid & OBD_MD_FLHANDLE)
900                         memcpy(lfh->lfh_data + i * FD_OSTDATA_SIZE,
901                                obdo_handle(tmp), FD_OSTDATA_SIZE);
902         }
903
904         handle = obdo_handle(oa);
905
906         lfh->lfh_count = lsm->lsm_stripe_count;
907         get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie));
908
909         handle->addr = (__u64)(unsigned long)lfh;
910         handle->cookie = lfh->lfh_cookie;
911         oa->o_valid |= OBD_MD_FLHANDLE;
912         spin_lock(&export->exp_lov_data.led_lock);
913         list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
914         spin_unlock(&export->exp_lov_data.led_lock);
915
916         if (!set && !rc)
917                 rc = -EIO;
918 out_tmp:
919         obdo_free(tmp);
920         RETURN(rc);
921
922 out_handles:
923         for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) {
924                 int err;
925
926                 if (lov->tgts[loi->loi_ost_idx].active == 0)
927                         continue;
928
929                 memcpy(tmp, oa, sizeof(*tmp));
930                 tmp->o_id = loi->loi_id;
931                 memcpy(obdo_handle(tmp), lfh->lfh_data + i * FD_OSTDATA_SIZE,
932                        FD_OSTDATA_SIZE);
933
934                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp,
935                                 NULL, NULL);
936                 if (err && lov->tgts[loi->loi_ost_idx].active) {
937                         CERROR("error: closing objid "LPX64" subobj "LPX64
938                                " on OST idx %d after open error: rc=%d\n",
939                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
940                 }
941         }
942
943         OBD_FREE(lfh->lfh_data, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
944 out_lfh:
945         PORTAL_SLAB_FREE(lfh, lov_file_cache, sizeof(*lfh));
946         goto out_tmp;
947 }
948
949 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
950                      struct lov_stripe_md *lsm, struct obd_trans_info *oti)
951 {
952         struct obdo tmp;
953         struct obd_export *export = class_conn2export(conn);
954         struct lov_obd *lov;
955         struct lov_oinfo *loi;
956         struct lov_file_handles *lfh = NULL;
957         int rc = 0, i;
958         ENTRY;
959
960         if (!lsm) {
961                 CERROR("LOV requires striping ea\n");
962                 RETURN(-EINVAL);
963         }
964
965         if (lsm->lsm_magic != LOV_MAGIC) {
966                 CERROR("LOV striping magic bad %#x != %#x\n",
967                        lsm->lsm_magic, LOV_MAGIC);
968                 RETURN(-EINVAL);
969         }
970
971         if (!export || !export->exp_obd)
972                 RETURN(-ENODEV);
973
974         if (oa->o_valid & OBD_MD_FLHANDLE)
975                 lfh = lov_handle2lfh(obdo_handle(oa));
976
977         lov = &export->exp_obd->u.lov;
978         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
979                 int err;
980
981                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
982                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
983                         continue;
984                 }
985
986                 /* create data objects with "parent" OA */
987                 memcpy(&tmp, oa, sizeof(tmp));
988                 tmp.o_id = loi->loi_id;
989                 if (lfh)
990                         memcpy(obdo_handle(&tmp),
991                                lfh->lfh_data + i * FD_OSTDATA_SIZE,
992                                FD_OSTDATA_SIZE);
993                 else
994                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
995
996                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
997                                 NULL, NULL);
998                 if (err) {
999                         if (lov->tgts[loi->loi_ost_idx].active) {
1000                                 CERROR("error: close objid "LPX64" subobj "LPX64
1001                                        " on OST idx %d: rc = %d\n", oa->o_id,
1002                                        loi->loi_id, loi->loi_ost_idx, err);
1003                         }
1004                         if (!rc)
1005                                 rc = err;
1006                 }
1007         }
1008         if (lfh) {
1009                 spin_lock(&export->exp_lov_data.led_lock);
1010                 list_del(&lfh->lfh_list);
1011                 spin_unlock(&export->exp_lov_data.led_lock);
1012
1013                 OBD_FREE(lfh->lfh_data, lsm->lsm_stripe_count*FD_OSTDATA_SIZE);
1014                 PORTAL_SLAB_FREE(lfh, lov_file_cache, sizeof(*lfh));
1015         }
1016
1017         RETURN(rc);
1018 }
1019
1020 #ifndef log2
1021 #define log2(n) ffz(~(n))
1022 #endif
1023
1024 #warning FIXME: merge these two functions now that they are nearly the same
1025
1026 /* compute ost offset in stripe "stripeno" corresponding to offset "lov_off" */
1027 static obd_off lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
1028                                  int stripeno)
1029 {
1030         unsigned long ssize  = lsm->lsm_stripe_size;
1031         unsigned long swidth = ssize * lsm->lsm_stripe_count;
1032         unsigned long stripe_off, this_stripe;
1033
1034         if (lov_off == OBD_OBJECT_EOF || lov_off == 0)
1035                 return lov_off;
1036
1037         /* do_div(a, b) returns a % b, and a = a / b */
1038         stripe_off = do_div(lov_off, swidth);
1039
1040         this_stripe = stripeno * ssize;
1041         if (stripe_off <= this_stripe)
1042                 stripe_off = 0;
1043         else {
1044                 stripe_off -= this_stripe;
1045
1046                 if (stripe_off > ssize)
1047                         stripe_off = ssize;
1048         }
1049
1050
1051         return lov_off * ssize + stripe_off;
1052 }
1053
1054 /* compute which stripe number "lov_off" will be written into */
1055 static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
1056 {
1057         unsigned long ssize  = lsm->lsm_stripe_size;
1058         unsigned long swidth = ssize * lsm->lsm_stripe_count;
1059         unsigned long stripe_off;
1060
1061         stripe_off = do_div(lov_off, swidth);
1062
1063         return stripe_off / ssize;
1064 }
1065
1066
1067 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1068  * we can send this 'punch' to just the authoritative node and the nodes
1069  * that the punch will affect. */
1070 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
1071                      struct lov_stripe_md *lsm,
1072                      obd_off start, obd_off end, struct obd_trans_info *oti)
1073 {
1074         struct obdo tmp;
1075         struct obd_export *export = class_conn2export(conn);
1076         struct lov_obd *lov;
1077         struct lov_oinfo *loi;
1078         struct lov_file_handles *lfh = NULL;
1079         int rc = 0, i;
1080         ENTRY;
1081
1082         if (!lsm) {
1083                 CERROR("LOV requires striping ea\n");
1084                 RETURN(-EINVAL);
1085         }
1086
1087         if (lsm->lsm_magic != LOV_MAGIC) {
1088                 CERROR("LOV striping magic bad %#x != %#x\n",
1089                        lsm->lsm_magic, LOV_MAGIC);
1090                 RETURN(-EINVAL);
1091         }
1092
1093         if (!export || !export->exp_obd)
1094                 RETURN(-ENODEV);
1095
1096         if (oa->o_valid & OBD_MD_FLHANDLE)
1097                 lfh = lov_handle2lfh(obdo_handle(oa));
1098
1099         lov = &export->exp_obd->u.lov;
1100         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1101                 obd_off starti = lov_stripe_offset(lsm, start, i);
1102                 obd_off endi = lov_stripe_offset(lsm, end, i);
1103                 int err;
1104
1105                 if (starti == endi)
1106                         continue;
1107
1108                 /* create data objects with "parent" OA */
1109                 memcpy(&tmp, oa, sizeof(tmp));
1110                 tmp.o_id = loi->loi_id;
1111                 if (lfh)
1112                         memcpy(obdo_handle(&tmp),
1113                                lfh->lfh_data + i * FD_OSTDATA_SIZE,
1114                                FD_OSTDATA_SIZE);
1115                 else
1116                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
1117
1118                 err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL,
1119                                 starti, endi, NULL);
1120                 if (err) {
1121                         if (lov->tgts[loi->loi_ost_idx].active) {
1122                                 CERROR("error: punch objid "LPX64" subobj "LPX64
1123                                        " on OST idx %d: rc = %d\n", oa->o_id,
1124                                        loi->loi_id, loi->loi_ost_idx, err);
1125                         }
1126                         if (!rc)
1127                                 rc = err;
1128                 }
1129         }
1130         RETURN(rc);
1131 }
1132
1133 static inline int lov_brw(int cmd, struct lustre_handle *conn,
1134                           struct lov_stripe_md *lsm, obd_count oa_bufs,
1135                           struct brw_page *pga, struct obd_brw_set *set,
1136                           struct obd_trans_info *oti)
1137 {
1138         struct {
1139                 int bufct;
1140                 int index;
1141                 int subcount;
1142                 struct lov_stripe_md lsm;
1143                 int ost_idx;
1144         } *stripeinfo, *si, *si_last;
1145         struct obd_export *export = class_conn2export(conn);
1146         struct lov_obd *lov;
1147         struct brw_page *ioarr;
1148         struct lov_oinfo *loi;
1149         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
1150         ENTRY;
1151
1152         if (!lsm) {
1153                 CERROR("LOV requires striping ea\n");
1154                 RETURN(-EINVAL);
1155         }
1156
1157         if (lsm->lsm_magic != LOV_MAGIC) {
1158                 CERROR("LOV striping magic bad %#x != %#x\n",
1159                        lsm->lsm_magic, LOV_MAGIC);
1160                 RETURN(-EINVAL);
1161         }
1162
1163         lov = &export->exp_obd->u.lov;
1164
1165         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1166         if (!stripeinfo)
1167                 GOTO(out_cbdata, rc = -ENOMEM);
1168
1169         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1170         if (!where)
1171                 GOTO(out_sinfo, rc = -ENOMEM);
1172
1173         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1174         if (!ioarr)
1175                 GOTO(out_where, rc = -ENOMEM);
1176
1177         for (i = 0; i < oa_bufs; i++) {
1178                 where[i] = lov_stripe_number(lsm, pga[i].off);
1179                 stripeinfo[where[i]].bufct++;
1180         }
1181
1182         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1183              i < stripe_count; i++, loi++, si_last = si, si++) {
1184                 if (i > 0)
1185                         si->index = si_last->index + si_last->bufct;
1186                 si->lsm.lsm_object_id = loi->loi_id;
1187                 si->ost_idx = loi->loi_ost_idx;
1188         }
1189
1190         for (i = 0; i < oa_bufs; i++) {
1191                 int which = where[i];
1192                 int shift;
1193
1194                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1195                 LASSERT(shift < oa_bufs);
1196                 ioarr[shift] = pga[i];
1197                 ioarr[shift].off = lov_stripe_offset(lsm, pga[i].off, which);
1198                 stripeinfo[which].subcount++;
1199         }
1200
1201         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1202                 int shift = si->index;
1203
1204                 if (si->bufct) {
1205                         LASSERT(shift < oa_bufs);
1206                         rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
1207                                      &si->lsm, si->bufct, &ioarr[shift],
1208                                      set, oti);
1209                         if (rc)
1210                                 GOTO(out_ioarr, rc);
1211                 }
1212         }
1213
1214  out_ioarr:
1215         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1216  out_where:
1217         OBD_FREE(where, sizeof(*where) * oa_bufs);
1218  out_sinfo:
1219         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1220  out_cbdata:
1221         RETURN(rc);
1222 }
1223
1224 static struct lov_lock_handles *lov_newlockh(struct lov_stripe_md *lsm)
1225 {
1226         struct lov_lock_handles *lov_lockh;
1227
1228         OBD_ALLOC(lov_lockh, sizeof(*lov_lockh) +
1229                   sizeof(*lov_lockh->llh_handles) * lsm->lsm_stripe_count);
1230         if (!lov_lockh)
1231                 return NULL;
1232
1233         get_random_bytes(&lov_lockh->llh_cookie, sizeof(lov_lockh->llh_cookie));
1234
1235         return lov_lockh;
1236 }
1237
1238 /* We are only ever passed local lock handles here, so we do not need to
1239  * validate (and we can't really because these structs are variable sized
1240  * and therefore alloced, and not from a private slab).
1241  *
1242  * We just check because we can...
1243  */
1244 static struct lov_lock_handles *lov_h2lovlockh(struct lustre_handle *handle)
1245 {
1246         struct lov_lock_handles *lov_lockh = NULL;
1247
1248         if (!handle || !handle->addr)
1249                 RETURN(NULL);
1250
1251         lov_lockh = (struct lov_lock_handles *)(unsigned long)(handle->addr);
1252         if (lov_lockh->llh_cookie != handle->cookie)
1253                 RETURN(NULL);
1254
1255         return lov_lockh;
1256 }
1257
1258 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1259                        struct lustre_handle *parent_lock,
1260                        __u32 type, void *cookie, int cookielen, __u32 mode,
1261                        int *flags, void *cb, void *data, int datalen,
1262                        struct lustre_handle *lockh)
1263 {
1264         struct obd_export *export = class_conn2export(conn);
1265         struct lov_lock_handles *lov_lockh = NULL;
1266         struct lustre_handle *lov_lockhp;
1267         struct lov_obd *lov;
1268         struct lov_oinfo *loi;
1269         struct lov_stripe_md submd;
1270         ldlm_error_t rc = ELDLM_LOCK_MATCHED, err;
1271         int i;
1272         ENTRY;
1273
1274         if (!lsm) {
1275                 CERROR("LOV requires striping ea\n");
1276                 RETURN(-EINVAL);
1277         }
1278
1279         if (lsm->lsm_magic != LOV_MAGIC) {
1280                 CERROR("LOV striping magic bad %#x != %#x\n",
1281                        lsm->lsm_magic, LOV_MAGIC);
1282                 RETURN(-EINVAL);
1283         }
1284
1285         /* we should never be asked to replay a lock. */
1286
1287         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1288
1289         if (!export || !export->exp_obd)
1290                 RETURN(-ENODEV);
1291
1292         if (lsm->lsm_stripe_count > 1) {
1293                 lov_lockh = lov_newlockh(lsm);
1294                 if (!lov_lockh)
1295                         RETURN(-ENOMEM);
1296
1297                 lockh->addr = (__u64)(unsigned long)lov_lockh;
1298                 lockh->cookie = lov_lockh->llh_cookie;
1299                 lov_lockhp = lov_lockh->llh_handles;
1300         } else {
1301                 lov_lockhp = lockh;
1302         }
1303
1304         lov = &export->exp_obd->u.lov;
1305         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1306              i++, loi++, lov_lockhp++) {
1307                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
1308                 struct ldlm_extent sub_ext;
1309
1310                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1311                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1312                         continue;
1313                 }
1314
1315                 *flags = 0;
1316                 sub_ext.start = lov_stripe_offset(lsm, extent->start, i);
1317                 sub_ext.end = lov_stripe_offset(lsm, extent->end, i);
1318                 if (sub_ext.start == sub_ext.end /* || !active */)
1319                         continue;
1320
1321                 /* XXX LOV STACKING: submd should be from the subobj */
1322                 submd.lsm_object_id = loi->loi_id;
1323                 submd.lsm_stripe_count = 0;
1324                 /* XXX submd is not fully initialized here */
1325                 *flags = 0;
1326                 err = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
1327                                   parent_lock, type, &sub_ext, sizeof(sub_ext),
1328                                   mode, flags, cb, data, datalen, lov_lockhp);
1329
1330                 // XXX add a lock debug statement here
1331                 /* return _MATCHED only when all locks matched.. */
1332                 if (err == ELDLM_OK) {
1333                         rc = ELDLM_OK;
1334                 } else if (err != ELDLM_LOCK_MATCHED) {
1335                         rc = err;
1336                         memset(lov_lockhp, 0, sizeof(*lov_lockhp));
1337                         if (lov->tgts[loi->loi_ost_idx].active) {
1338                                 CERROR("error: enqueue objid "LPX64" subobj "
1339                                        LPX64" on OST idx %d: rc = %d\n",
1340                                        lsm->lsm_object_id, loi->loi_id,
1341                                        loi->loi_ost_idx, rc);
1342                                 goto out_locks;
1343                         }
1344                 }
1345         }
1346         RETURN(rc);
1347
1348 out_locks:
1349         while (loi--, lov_lockhp--, i-- > 0) {
1350                 struct lov_stripe_md submd;
1351                 int err;
1352
1353                 if (lov_lockhp->cookie == 0 ||
1354                     lov->tgts[loi->loi_ost_idx].active == 0)
1355                         continue;
1356
1357                 /* XXX LOV STACKING: submd should be from the subobj */
1358                 submd.lsm_object_id = loi->loi_id;
1359                 submd.lsm_stripe_count = 0;
1360                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1361                                  mode, lov_lockhp);
1362                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1363                         CERROR("error: cancelling objid "LPX64" on OST "
1364                                "idx %d after enqueue error: rc = %d\n",
1365                                loi->loi_id, loi->loi_ost_idx, err);
1366                 }
1367         }
1368
1369         if (lsm->lsm_stripe_count > 1) {
1370                 lov_lockh->llh_cookie = DEAD_HANDLE_MAGIC;
1371                 OBD_FREE(lov_lockh, sizeof(*lov_lockh) +
1372                           sizeof(*lov_lockh->llh_handles) *
1373                           lsm->lsm_stripe_count);
1374         }
1375         lockh->cookie = DEAD_HANDLE_MAGIC;
1376
1377         RETURN(rc);
1378 }
1379
1380 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1381                       __u32 mode, struct lustre_handle *lockh)
1382 {
1383         struct obd_export *export = class_conn2export(conn);
1384         struct lov_lock_handles *lov_lockh = NULL;
1385         struct lustre_handle *lov_lockhp;
1386         struct lov_obd *lov;
1387         struct lov_oinfo *loi;
1388         int rc = 0, i;
1389         ENTRY;
1390
1391         if (!lsm) {
1392                 CERROR("LOV requires striping ea\n");
1393                 RETURN(-EINVAL);
1394         }
1395
1396         if (lsm->lsm_magic != LOV_MAGIC) {
1397                 CERROR("LOV striping magic bad %#x != %#x\n",
1398                        lsm->lsm_magic, LOV_MAGIC);
1399                 RETURN(-EINVAL);
1400         }
1401
1402         if (!export || !export->exp_obd)
1403                 RETURN(-ENODEV);
1404
1405         LASSERT(lockh);
1406         if (lsm->lsm_stripe_count > 1) {
1407                 lov_lockh = lov_h2lovlockh(lockh);
1408                 if (!lov_lockh) {
1409                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
1410                         RETURN(-EINVAL);
1411                 }
1412
1413                 lov_lockhp = lov_lockh->llh_handles;
1414         } else
1415                 lov_lockhp = lockh;
1416
1417         lov = &export->exp_obd->u.lov;
1418         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1419              i++, loi++, lov_lockhp++) {
1420                 struct lov_stripe_md submd;
1421                 int err;
1422
1423                 if (lov_lockhp->cookie == 0) {
1424                         CDEBUG(D_HA, "lov idx %d no lock?\n", loi->loi_ost_idx);
1425                         continue;
1426                 }
1427
1428                 /* XXX LOV STACKING: submd should be from the subobj */
1429                 submd.lsm_object_id = loi->loi_id;
1430                 submd.lsm_stripe_count = 0;
1431                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1432                                  mode, lov_lockhp);
1433                 if (err) {
1434                         if (lov->tgts[loi->loi_ost_idx].active) {
1435                                 CERROR("error: cancel objid "LPX64" subobj "
1436                                        LPX64" on OST idx %d: rc = %d\n",
1437                                        lsm->lsm_object_id,
1438                                        loi->loi_id, loi->loi_ost_idx, err);
1439                                 if (!rc)
1440                                         rc = err;
1441                         }
1442                 }
1443         }
1444
1445         if (lsm->lsm_stripe_count > 1) {
1446                 lov_lockh->llh_cookie = DEAD_HANDLE_MAGIC;
1447                 OBD_FREE(lov_lockh, sizeof(*lov_lockh) +
1448                           sizeof(*lov_lockh->llh_handles) *
1449                           lsm->lsm_stripe_count);
1450         }
1451         lockh->cookie = DEAD_HANDLE_MAGIC;
1452
1453         RETURN(rc);
1454 }
1455
1456 static int lov_cancel_unused(struct lustre_handle *conn,
1457                              struct lov_stripe_md *lsm, int flags)
1458 {
1459         struct obd_export *export = class_conn2export(conn);
1460         struct lov_obd *lov;
1461         struct lov_oinfo *loi;
1462         int rc = 0, i;
1463         ENTRY;
1464
1465         if (!lsm) {
1466                 CERROR("LOV requires striping ea for lock cancellation\n");
1467                 RETURN(-EINVAL);
1468         }
1469
1470         if (!export || !export->exp_obd)
1471                 RETURN(-ENODEV);
1472
1473         lov = &export->exp_obd->u.lov;
1474         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1475                 struct lov_stripe_md submd;
1476                 int err;
1477
1478                 submd.lsm_object_id = loi->loi_id;
1479                 submd.lsm_stripe_count = 0;
1480                 err = obd_cancel_unused(&lov->tgts[loi->loi_ost_idx].conn,
1481                                        &submd, flags);
1482                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1483                         CERROR("error: cancel unused objid "LPX64" subobj "LPX64
1484                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1485                                loi->loi_id, loi->loi_ost_idx, err);
1486                         if (!rc)
1487                                 rc = err;
1488                 }
1489         }
1490
1491         RETURN(rc);
1492 }
1493
1494 static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
1495 {
1496         struct obd_export *export = class_conn2export(conn);
1497         struct lov_obd *lov;
1498         struct obd_statfs lov_sfs;
1499         int set = 0;
1500         int rc = 0;
1501         int i;
1502         ENTRY;
1503
1504         if (!export || !export->exp_obd)
1505                 RETURN(-ENODEV);
1506
1507         lov = &export->exp_obd->u.lov;
1508
1509         /* We only get block data from the OBD */
1510         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1511                 int err;
1512
1513                 if (!lov->tgts[i].active) {
1514                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1515                         continue;
1516                 }
1517
1518                 err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
1519                 if (err) {
1520                         if (lov->tgts[i].active) {
1521                                 CERROR("error: statfs OSC %s on OST idx %d: "
1522                                        "err = %d\n",
1523                                        lov->tgts[i].uuid.uuid, i, err);
1524                                 if (!rc)
1525                                         rc = err;
1526                         }
1527                         continue;
1528                 }
1529                 if (!set) {
1530                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
1531                         set = 1;
1532                 } else {
1533                         osfs->os_bfree += lov_sfs.os_bfree;
1534                         osfs->os_bavail += lov_sfs.os_bavail;
1535                         osfs->os_blocks += lov_sfs.os_blocks;
1536                         /* XXX not sure about this one - depends on policy.
1537                          *   - could be minimum if we always stripe on all OBDs
1538                          *     (but that would be wrong for any other policy,
1539                          *     if one of the OBDs has no more objects left)
1540                          *   - could be sum if we stripe whole objects
1541                          *   - could be average, just to give a nice number
1542                          *   - we just pick first OST and hope it is enough
1543                         sfs->f_ffree += lov_sfs.f_ffree;
1544                          */
1545                 }
1546         }
1547         if (!set && !rc)
1548                 rc = -EIO;
1549         RETURN(rc);
1550 }
1551
1552 static int lov_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
1553                          void *karg, void *uarg)
1554 {
1555         struct obd_device *obddev = class_conn2obd(conn);
1556         struct lov_obd *lov = &obddev->u.lov;
1557         int i, count = lov->desc.ld_tgt_count;
1558         struct obd_uuid *uuidp;
1559         int rc;
1560
1561         ENTRY;
1562
1563         switch (cmd) {
1564         case IOC_LOV_SET_OSC_ACTIVE: {
1565                 struct obd_ioctl_data *data = karg;
1566                 uuidp = (struct obd_uuid *)data->ioc_inlbuf1;
1567                 rc = lov_set_osc_active(lov, uuidp, data->ioc_offset);
1568                 break;
1569         }
1570         case OBD_IOC_LOV_GET_CONFIG: {
1571                 struct obd_ioctl_data *data = karg;
1572                 struct lov_tgt_desc *tgtdesc;
1573                 struct lov_desc *desc;
1574                 char *buf = NULL;
1575
1576                 buf = NULL;
1577                 len = 0;
1578                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1579                         RETURN(-EINVAL);
1580
1581                 data = (struct obd_ioctl_data *)buf;
1582
1583                 if (sizeof(*desc) > data->ioc_inllen1) {
1584                         OBD_FREE(buf, len);
1585                         RETURN(-EINVAL);
1586                 }
1587
1588                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
1589                         OBD_FREE(buf, len);
1590                         RETURN(-EINVAL);
1591                 }
1592
1593                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1594                 memcpy(desc, &(lov->desc), sizeof(*desc));
1595
1596                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
1597                 tgtdesc = lov->tgts;
1598                 for (i = 0; i < count; i++, uuidp++, tgtdesc++)
1599                         obd_str2uuid(uuidp, tgtdesc->uuid.uuid);
1600
1601                 rc = copy_to_user((void *)uarg, buf, len);
1602                 if (rc)
1603                         rc = -EFAULT;
1604                 OBD_FREE(buf, len);
1605                 break;
1606         }
1607         case LL_IOC_LOV_SETSTRIPE:
1608                 rc = lov_setstripe(conn, karg, uarg);
1609                 break;
1610         case LL_IOC_LOV_GETSTRIPE:
1611                 rc = lov_getstripe(conn, karg, uarg);
1612                 break;
1613         default: {
1614                 int set = 0;
1615                 if (count == 0)
1616                         RETURN(-ENOTTY);
1617                 rc = 0;
1618                 for (i = 0; i < count; i++) {
1619                         int err;
1620
1621                         err = obd_iocontrol(cmd, &lov->tgts[i].conn,
1622                                             len, karg, uarg);
1623                         if (err) {
1624                                 if (lov->tgts[i].active) {
1625                                         CERROR("error: iocontrol OSC %s on OST"
1626                                                "idx %d: err = %d\n",
1627                                                lov->tgts[i].uuid.uuid, i, err);
1628                                         if (!rc)
1629                                                 rc = err;
1630                                 }
1631                         } else
1632                                 set = 1;
1633                 }
1634                 if (!set && !rc)
1635                         rc = -EIO;
1636         }
1637         }
1638
1639         RETURN(rc);
1640 }
1641
1642 struct obd_ops lov_obd_ops = {
1643         o_owner:       THIS_MODULE,
1644         o_attach:      lov_attach,
1645         o_detach:      lov_detach,
1646         o_setup:       lov_setup,
1647         o_connect:     lov_connect,
1648         o_disconnect:  lov_disconnect,
1649         o_statfs:      lov_statfs,
1650         o_packmd:      lov_packmd,
1651         o_unpackmd:    lov_unpackmd,
1652         o_create:      lov_create,
1653         o_destroy:     lov_destroy,
1654         o_getattr:     lov_getattr,
1655         o_setattr:     lov_setattr,
1656         o_open:        lov_open,
1657         o_close:       lov_close,
1658         o_brw:         lov_brw,
1659         o_punch:       lov_punch,
1660         o_enqueue:     lov_enqueue,
1661         o_cancel:      lov_cancel,
1662         o_cancel_unused: lov_cancel_unused,
1663         o_iocontrol:   lov_iocontrol
1664 };
1665
1666 int __init lov_init(void)
1667 {
1668         struct lprocfs_static_vars lvars;
1669         int rc;
1670
1671         printk(KERN_INFO "Lustre Logical Object Volume driver; "
1672                "info@clusterfs.com\n");
1673         lov_file_cache = kmem_cache_create("ll_lov_file_data",
1674                                            sizeof(struct lov_file_handles),
1675                                            0, 0, NULL, NULL);
1676         if (!lov_file_cache)
1677                 RETURN(-ENOMEM);
1678
1679         lprocfs_init_vars(&lvars);
1680         rc = class_register_type(&lov_obd_ops, lvars.module_vars,
1681                                  OBD_LOV_DEVICENAME);
1682         RETURN(rc);
1683 }
1684
1685 static void __exit lov_exit(void)
1686 {
1687         if (kmem_cache_destroy(lov_file_cache))
1688                 CERROR("couldn't free LOV open cache\n");
1689         class_unregister_type(OBD_LOV_DEVICENAME);
1690 }
1691
1692 #ifdef __KERNEL__
1693 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1694 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
1695 MODULE_LICENSE("GPL");
1696
1697 module_init(lov_init);
1698 module_exit(lov_exit);
1699 #endif