Whamcloud - gitweb
Fix LOV stripe allocation.
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lov/lov.c
5  *
6  * Copyright (C) 2002 Cluster File Systems, Inc.
7  * Author: Phil Schwan <phil@off.net>
8  *         Peter Braam <braam@clusterfs.com>
9  *
10  * This code is issued under the GNU General Public License.
11  * See the file COPYING in this distribution
12  */
13
14 #define EXPORT_SYMTAB
15 #define DEBUG_SUBSYSTEM S_LOV
16
17 #include <linux/slab.h>
18 #include <linux/module.h>
19 #include <linux/obd_support.h>
20 #include <linux/lustre_lib.h>
21 #include <linux/lustre_net.h>
22 #include <linux/lustre_idl.h>
23 #include <linux/lustre_mds.h>
24 #include <linux/obd_class.h>
25 #include <linux/obd_lov.h>
26 #include <linux/init.h>
27 #include <asm/div64.h>
28
29 /* obd methods */
30 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
31                        obd_uuid_t cluuid, struct recovd_obd *recovd,
32                        ptlrpc_recovery_cb_t recover)
33 {
34         struct ptlrpc_request *req = NULL;
35         struct lov_obd *lov = &obd->u.lov;
36         struct client_obd *mdc = &lov->mdcobd->u.cli;
37         struct lov_desc *desc = &lov->desc;
38         struct lustre_handle mdc_conn;
39         obd_uuid_t *uuidarray;
40         int rc, rc2, i;
41
42         MOD_INC_USE_COUNT;
43         rc = class_connect(conn, obd, cluuid);
44         if (rc) {
45                 MOD_DEC_USE_COUNT;
46                 RETURN(rc);
47         }
48
49         /* We don't want to actually do the underlying connections more than
50          * once, so keep track. */
51         lov->refcount++;
52         if (lov->refcount > 1)
53                 RETURN(0);
54
55         /* retrieve LOV metadata from MDS */
56         rc = obd_connect(&mdc_conn, lov->mdcobd, NULL, recovd, recover);
57         if (rc) {
58                 CERROR("cannot connect to mdc: rc = %d\n", rc);
59                 GOTO(out_conn, rc);
60         }
61
62         rc = mdc_getlovinfo(obd, &mdc_conn, &req);
63         rc2 = obd_disconnect(&mdc_conn);
64         if (rc) {
65                 CERROR("cannot get lov info %d\n", rc);
66                 GOTO(out_conn, rc);
67         }
68
69         if (rc2) {
70                 CERROR("error disconnecting from MDS %d\n", rc2);
71                 GOTO(out_conn, rc = rc2);
72         }
73
74         /* sanity... */
75         if (req->rq_repmsg->bufcount < 2 ||
76             req->rq_repmsg->buflens[0] < sizeof(*desc)) {
77                 CERROR("LOV desc: invalid descriptor returned\n");
78                 GOTO(out_conn, rc = -EINVAL);
79         }
80
81         memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
82         lov_unpackdesc(desc);
83
84         if (req->rq_repmsg->buflens[1] < sizeof(*uuidarray)*desc->ld_tgt_count){
85                 CERROR("LOV desc: invalid uuid array returned\n");
86                 GOTO(out_conn, rc = -EINVAL);
87         }
88
89         mdc->cl_max_mds_easize = lov_mds_md_size(desc->ld_tgt_count);
90         mdc->cl_max_ost_easize = lov_stripe_md_size(desc->ld_tgt_count);
91
92         if (memcmp(obd->obd_uuid, desc->ld_uuid, sizeof(desc->ld_uuid))) {
93                 CERROR("LOV desc: uuid %s not on mds device (%s)\n",
94                        obd->obd_uuid, desc->ld_uuid);
95                 GOTO(out_conn, rc = -EINVAL);
96         }
97
98         if (desc->ld_tgt_count > 1000) {
99                 CERROR("LOV desc: target count > 1000 (%d)\n",
100                        desc->ld_tgt_count);
101                 GOTO(out_conn, rc = -EINVAL);
102         }
103
104         /* Because of 64-bit divide/mod operations only work with a 32-bit
105          * divisor in a 32-bit kernel, we cannot support a stripe width
106          * of 4GB or larger on 32-bit CPUs.
107          */
108         if ((desc->ld_default_stripe_count ?
109              desc->ld_default_stripe_count : desc->ld_tgt_count) *
110              desc->ld_default_stripe_size > ~0UL) {
111                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
112                        desc->ld_default_stripe_size,
113                        desc->ld_default_stripe_count ?
114                        desc->ld_default_stripe_count : desc->ld_tgt_count,~0UL);
115                 GOTO(out_conn, rc = -EINVAL);
116         }
117
118         lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
119         OBD_ALLOC(lov->tgts, lov->bufsize);
120         if (!lov->tgts) {
121                 CERROR("Out of memory\n");
122                 GOTO(out_conn, rc = -ENOMEM);
123         }
124
125         uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
126         for (i = 0; i < desc->ld_tgt_count; i++)
127                 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(*uuidarray));
128
129         for (i = 0; i < desc->ld_tgt_count; i++) {
130                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
131                 if (!tgt) {
132                         CERROR("Target %s not attached\n", uuidarray[i]);
133                         GOTO(out_disc, rc = -EINVAL);
134                 }
135                 if (!(tgt->obd_flags & OBD_SET_UP)) {
136                         CERROR("Target %s not set up\n", uuidarray[i]);
137                         GOTO(out_disc, rc = -EINVAL);
138                 }
139                 rc = obd_connect(&lov->tgts[i].conn, tgt, NULL, recovd,
140                                  recover);
141                 if (rc) {
142                         CERROR("Target %s connect error %d\n",
143                                uuidarray[i], rc);
144                         GOTO(out_disc, rc);
145                 }
146                 desc->ld_active_tgt_count++;
147                 lov->tgts[i].active = 1;
148         }
149
150  out:
151         ptlrpc_req_finished(req);
152         return rc;
153
154  out_disc:
155         while (i-- > 0) {
156                 desc->ld_active_tgt_count--;
157                 lov->tgts[i].active = 0;
158                 rc2 = obd_disconnect(&lov->tgts[i].conn);
159                 if (rc2)
160                         CERROR("LOV Target %s disconnect error: rc = %d\n",
161                                 uuidarray[i], rc2);
162         }
163         OBD_FREE(lov->tgts, lov->bufsize);
164  out_conn:
165         class_disconnect(conn);
166         goto out;
167 }
168
169 static int lov_disconnect(struct lustre_handle *conn)
170 {
171         struct obd_device *obd = class_conn2obd(conn);
172         struct lov_obd *lov = &obd->u.lov;
173         int rc, i;
174
175         if (!lov->tgts)
176                 goto out_local;
177
178         /* Only disconnect the underlying laters on the final disconnect. */
179         lov->refcount--;
180         if (lov->refcount != 0)
181                 goto out_local;
182
183         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
184                 if (!lov->tgts[i].active) {
185                         CERROR("Skipping disconnect for inactive OSC %s\n",
186                                lov->tgts[i].uuid);
187                         continue;
188                 }
189
190                 lov->desc.ld_active_tgt_count--;
191                 lov->tgts[i].active = 0;
192                 rc = obd_disconnect(&lov->tgts[i].conn);
193                 if (rc) {
194                         CERROR("Target %s disconnect error %d\n",
195                                lov->tgts[i].uuid, rc);
196                         RETURN(rc);
197                 }
198         }
199         OBD_FREE(lov->tgts, lov->bufsize);
200         lov->bufsize = 0;
201         lov->tgts = NULL;
202
203  out_local:
204         rc = class_disconnect(conn);
205         if (!rc)
206                 MOD_DEC_USE_COUNT;
207         return rc;
208 }
209
210 /* Error codes:
211  *
212  *  -EINVAL  : UUID can't be found in the LOV's target list
213  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
214  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
215  *  -EALREADY: The OSC is already marked (in)active
216  */
217 static int lov_set_osc_active(struct lov_obd *lov, obd_uuid_t uuid,
218                               int activate)
219 {
220         struct obd_device *obd;
221         int i, rc = 0;
222         ENTRY;
223
224         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
225                lov, uuid, activate);
226
227         spin_lock(&lov->lov_lock);
228         for (i = 0; i < lov->desc.ld_tgt_count; i++)
229                 if (strncmp(uuid, lov->tgts[i].uuid,
230                             sizeof(lov->tgts[i].uuid)) == 0)
231                         break;
232
233         if (i == lov->desc.ld_tgt_count)
234                 GOTO(out, rc = -EINVAL);
235
236         obd = class_conn2obd(&lov->tgts[i].conn);
237         if (obd == NULL) {
238                 LBUG();
239                 GOTO(out, rc = -ENOTCONN);
240         }
241
242         CDEBUG(D_INFO, "Found OBD %p type %s\n", obd, obd->obd_type->typ_name);
243         if (strcmp(obd->obd_type->typ_name, "osc") != 0) {
244                 LBUG();
245                 GOTO(out, rc = -EBADF);
246         }
247
248         if (lov->tgts[i].active == activate) {
249                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
250                        activate ? "" : "in");
251                 GOTO(out, rc = -EALREADY);
252         }
253
254         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
255
256         lov->tgts[i].active = activate;
257         if (activate)
258                 lov->desc.ld_active_tgt_count++;
259         else
260                 lov->desc.ld_active_tgt_count--;
261
262         EXIT;
263  out:
264         spin_unlock(&lov->lov_lock);
265         return rc;
266 }
267
268 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
269 {
270         struct obd_ioctl_data* data = buf;
271         struct lov_obd *lov = &obd->u.lov;
272         int rc = 0;
273         ENTRY;
274
275         if (data->ioc_inllen1 < 1) {
276                 CERROR("osc setup requires an MDC UUID\n");
277                 RETURN(-EINVAL);
278         }
279
280         if (data->ioc_inllen1 > 37) {
281                 CERROR("mdc UUID must be 36 characters or less\n");
282                 RETURN(-EINVAL);
283         }
284
285         spin_lock_init(&lov->lov_lock);
286         lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1);
287         if (!lov->mdcobd) {
288                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid,
289                        data->ioc_inlbuf1);
290                 rc = -EINVAL;
291         }
292         RETURN(rc);
293 }
294
295
296 /* the LOV expects oa->o_id to be set to the LOV object id */
297 static int lov_create(struct lustre_handle *conn, struct obdo *oa,
298                       struct lov_stripe_md **ea)
299 {
300         struct obd_export *export = class_conn2export(conn);
301         struct lov_obd *lov;
302         struct lov_stripe_md *lsm;
303         struct lov_oinfo *loi;
304         struct obdo *tmp;
305         int ost_count, ost_idx = 1, i, rc = 0;
306         ENTRY;
307
308         LASSERT(ea);
309
310         if (!export)
311                 RETURN(-EINVAL);
312
313         tmp = obdo_alloc();
314         if (!tmp)
315                 RETURN(-ENOMEM);
316
317         lov = &export->exp_obd->u.lov;
318
319         spin_lock(&lov->lov_lock);
320         ost_count = lov->desc.ld_tgt_count;
321         oa->o_easize = lov_stripe_md_size(ost_count);
322
323         lsm = *ea;
324         if (!lsm) {
325                 OBD_ALLOC(lsm, oa->o_easize);
326                 if (!lsm) {
327                         spin_unlock(&lov->lov_lock);
328                         GOTO(out_tmp, rc = -ENOMEM);
329                 }
330                 lsm->lsm_magic = LOV_MAGIC;
331                 lsm->lsm_mds_easize = lov_mds_md_size(ost_count);
332                 ost_idx = 0; /* if lsm->lsm_stripe_offset is set yet */
333         }
334
335         LASSERT(oa->o_valid & OBD_MD_FLID);
336         lsm->lsm_object_id = oa->o_id;
337         if (!lsm->lsm_stripe_count)
338                 lsm->lsm_stripe_count = lov->desc.ld_default_stripe_count;
339         if (!lsm->lsm_stripe_count)
340                 lsm->lsm_stripe_count = lov->desc.ld_active_tgt_count;
341         else if (lsm->lsm_stripe_count > lov->desc.ld_active_tgt_count)
342                 lsm->lsm_stripe_count = lov->desc.ld_active_tgt_count;
343
344         if (!lsm->lsm_stripe_size)
345                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
346
347         /* Because of 64-bit divide/mod operations only work with a 32-bit
348          * divisor in a 32-bit kernel, we cannot support a stripe width
349          * of 4GB or larger on 32-bit CPUs.
350          */
351         if (lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL) {
352                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
353                        lsm->lsm_stripe_size, lsm->lsm_stripe_count, ~0UL);
354                 spin_unlock(&lov->lov_lock);
355                 GOTO(out_free, rc = -EINVAL);
356         }
357
358         lsm->lsm_ost_count = ost_count;
359         if (!ost_idx || lsm->lsm_stripe_offset >= ost_count) {
360                 int mult = lsm->lsm_object_id * lsm->lsm_stripe_count;
361                 int stripe_offset = mult % ost_count;
362                 int sub_offset = (mult / ost_count) % lsm->lsm_stripe_count;
363
364                 lsm->lsm_stripe_offset = stripe_offset + sub_offset;
365         }
366
367         while (!lov->tgts[lsm->lsm_stripe_offset].active)
368                 lsm->lsm_stripe_offset = (lsm->lsm_stripe_offset+1) % ost_count;
369
370         /* Pick the OSTs before we release the lock */
371         ost_idx = lsm->lsm_stripe_offset;
372         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
373                 CDEBUG(D_INODE, "objid "LPX64"[%d] is ost_idx %d (uuid %s)\n",
374                        lsm->lsm_object_id, i, ost_idx, lov->tgts[ost_idx].uuid);
375                 loi->loi_ost_idx = ost_idx;
376                 do {
377                         ost_idx = (ost_idx + 1) % ost_count;
378                 } while (!lov->tgts[ost_idx].active);
379         }
380
381         spin_unlock(&lov->lov_lock);
382
383         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
384                lsm->lsm_stripe_count,lsm->lsm_object_id,lsm->lsm_stripe_offset);
385
386         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
387                 struct lov_stripe_md obj_md;
388                 struct lov_stripe_md *obj_mdp = &obj_md;
389
390                 ost_idx = loi->loi_ost_idx;
391
392                 /* create data objects with "parent" OA */
393                 memcpy(tmp, oa, sizeof(*tmp));
394                 tmp->o_easize = sizeof(struct lov_stripe_md);
395                 rc = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp);
396                 if (rc) {
397                         CERROR("error creating objid "LPX64" sub-object on "
398                                "OST idx %d: rc = %d\n", oa->o_id, ost_idx, rc);
399                         GOTO(out_cleanup, rc);
400                 }
401                 loi->loi_id = tmp->o_id;
402                 loi->loi_size = tmp->o_size;
403                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
404                        lsm->lsm_object_id, loi->loi_id, ost_idx);
405         }
406
407         *ea = lsm;
408
409  out_tmp:
410         obdo_free(tmp);
411         return rc;
412
413  out_cleanup:
414         while (i-- > 0) {
415                 int err;
416
417                 --loi;
418                 /* destroy already created objects here */
419                 memcpy(tmp, oa, sizeof(*tmp));
420                 tmp->o_id = loi->loi_id;
421                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
422                 if (err)
423                         CERROR("Failed to uncreate objid "LPX64" subobj "
424                                LPX64" on OST idx %d: rc = %d\n",
425                                oa->o_id, loi->loi_id, loi->loi_ost_idx,
426                                err);
427         }
428  out_free:
429         OBD_FREE(lsm, oa->o_easize);
430         goto out_tmp;
431 }
432
433 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
434                        struct lov_stripe_md *lsm)
435 {
436         struct obdo tmp;
437         struct obd_export *export = class_conn2export(conn);
438         struct lov_obd *lov;
439         struct lov_oinfo *loi;
440         int rc = 0, i;
441         ENTRY;
442
443         if (!lsm) {
444                 CERROR("LOV requires striping ea for destruction\n");
445                 RETURN(-EINVAL);
446         }
447
448         if (lsm->lsm_magic != LOV_MAGIC) {
449                 CERROR("LOV striping magic bad %#lx != %#lx\n",
450                        lsm->lsm_magic, LOV_MAGIC);
451                 RETURN(-EINVAL);
452         }
453
454         if (!export || !export->exp_obd)
455                 RETURN(-ENODEV);
456
457         lov = &export->exp_obd->u.lov;
458         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
459                 /* create data objects with "parent" OA */
460                 memcpy(&tmp, oa, sizeof(tmp));
461                 tmp.o_id = loi->loi_id;
462                 rc = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
463                 if (rc)
464                         CERROR("Error destroying objid "LPX64" subobj "LPX64
465                                " on OST idx %d\n: rc = %d",
466                                oa->o_id, loi->loi_id, loi->loi_ost_idx, rc);
467         }
468         RETURN(rc);
469 }
470
471 /* compute object size given "stripeno" and the ost size */
472 static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
473                                 int stripeno)
474 {
475         unsigned long ssize  = lsm->lsm_stripe_size;
476         unsigned long swidth = ssize * lsm->lsm_stripe_count;
477         unsigned long stripe_size;
478         obd_size lov_size;
479
480         if (ost_size == 0)
481                 return 0;
482
483         /* do_div(a, b) returns a % b, and a = a / b */
484         stripe_size = do_div(ost_size, ssize);
485
486         if (stripe_size)
487                 lov_size = ost_size * swidth + stripeno * ssize + stripe_size;
488         else
489                 lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize;
490
491         return lov_size;
492 }
493
494 static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
495                             struct lov_stripe_md *lsm, int stripeno, int *new)
496 {
497         if (*new) {
498                 obdo_cpy_md(tgt, src, valid);
499                 if (valid & OBD_MD_FLSIZE)
500                         tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
501                 *new = 0;
502         } else {
503                 if (valid & OBD_MD_FLSIZE) {
504                         /* this handles sparse files properly */
505                         obd_size lov_size;
506
507                         lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
508                         if (lov_size > tgt->o_size)
509                                 tgt->o_size = lov_size;
510                 }
511                 if (valid & OBD_MD_FLBLOCKS)
512                         tgt->o_blocks += src->o_blocks;
513                 if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
514                         tgt->o_ctime = src->o_ctime;
515                 if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
516                         tgt->o_mtime = src->o_mtime;
517         }
518 }
519
520 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
521                        struct lov_stripe_md *lsm)
522 {
523         struct obdo tmp;
524         struct obd_export *export = class_conn2export(conn);
525         struct lov_obd *lov;
526         struct lov_oinfo *loi;
527         int rc = 0, i;
528         int new = 1;
529         ENTRY;
530
531         if (!lsm) {
532                 CERROR("LOV requires striping ea\n");
533                 RETURN(-EINVAL);
534         }
535
536         if (lsm->lsm_magic != LOV_MAGIC) {
537                 CERROR("LOV striping magic bad %#lx != %#lx\n",
538                        lsm->lsm_magic, LOV_MAGIC);
539                 RETURN(-EINVAL);
540         }
541
542         if (!export || !export->exp_obd)
543                 RETURN(-ENODEV);
544
545         lov = &export->exp_obd->u.lov;
546         oa->o_size = 0;
547         oa->o_blocks = 0;
548         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
549                 int err;
550
551                 if (loi->loi_id == 0)
552                         continue;
553
554                 CERROR("objid "LPX64"[%d] has subobj "LPX64" at idx %u\n",
555                        oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
556                 /* create data objects with "parent" OA */
557                 memcpy(&tmp, oa, sizeof(tmp));
558                 tmp.o_id = loi->loi_id;
559
560                 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
561                 if (err) {
562                         CERROR("Error getattr objid "LPX64" subobj "LPX64
563                                " on OST idx %d: rc = %d\n",
564                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
565                         if (!rc)
566                                 rc = err;
567                         continue; /* XXX or break? */
568                 }
569                 lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &new);
570         }
571         RETURN(rc);
572 }
573
574 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
575                        struct lov_stripe_md *lsm)
576 {
577         struct obdo tmp;
578         struct obd_export *export = class_conn2export(conn);
579         struct lov_obd *lov;
580         struct lov_oinfo *loi;
581         int rc = 0, i;
582         ENTRY;
583
584         /* Note that this code is currently unused, hence LBUG(), just
585          * to know when/if it is ever revived that it needs cleanups.
586          */
587         LBUG();
588
589         if (!lsm) {
590                 CERROR("LOV requires striping ea\n");
591                 RETURN(-EINVAL);
592         }
593
594         if (lsm->lsm_magic != LOV_MAGIC) {
595                 CERROR("LOV striping magic bad %#lx != %#lx\n",
596                        lsm->lsm_magic, LOV_MAGIC);
597                 RETURN(-EINVAL);
598         }
599
600         if (!export || !export->exp_obd)
601                 RETURN(-ENODEV);
602
603         /* size changes should go through punch and not setattr */
604         LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
605
606         lov = &export->exp_obd->u.lov;
607         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
608                 int err;
609
610                 /* create data objects with "parent" OA */
611                 memcpy(&tmp, oa, sizeof(tmp));
612                 tmp.o_id = loi->loi_id;
613
614                 err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
615                 if (err) {
616                         CERROR("Error setattr objid "LPX64" subobj "LPX64
617                                " on OST idx %d: rc = %d\n",
618                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
619                         if (!rc)
620                                 rc = err;
621                 }
622         }
623         RETURN(rc);
624 }
625
626 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
627                     struct lov_stripe_md *lsm)
628 {
629         struct obdo *tmp;
630         struct obd_export *export = class_conn2export(conn);
631         struct lov_obd *lov;
632         struct lov_oinfo *loi;
633         int new = 1;
634         int rc = 0, i;
635         ENTRY;
636
637         if (!lsm) {
638                 CERROR("LOV requires striping ea for opening\n");
639                 RETURN(-EINVAL);
640         }
641
642         if (lsm->lsm_magic != LOV_MAGIC) {
643                 CERROR("LOV striping magic bad %#lx != %#lx\n",
644                        lsm->lsm_magic, LOV_MAGIC);
645                 RETURN(-EINVAL);
646         }
647
648         if (!export || !export->exp_obd)
649                 RETURN(-ENODEV);
650
651         tmp = obdo_alloc();
652         if (!tmp)
653                 RETURN(-ENOMEM);
654
655         lov = &export->exp_obd->u.lov;
656         oa->o_size = 0;
657         oa->o_blocks = 0;
658         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
659                 int err;
660
661                 /* create data objects with "parent" OA */
662                 memcpy(tmp, oa, sizeof(*tmp));
663                 tmp->o_id = loi->loi_id;
664
665                 err = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
666                 if (err) {
667                         CERROR("Error open objid "LPX64" subobj "LPX64
668                                " on OST idx %d: rc = %d\n",
669                                oa->o_id, lsm->lsm_oinfo[i].loi_id,
670                                loi->loi_ost_idx, rc);
671                         if (!rc)
672                                 rc = err;
673                 }
674
675                 lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &new);
676         }
677         /* FIXME: returning an error, but having opened some objects is a bad
678          *        idea, since they will likely never be closed.  We either
679          *        need to not return an error if _some_ objects could be
680          *        opened, and leave it to read/write to return -EIO (with
681          *        hopefully partial error status) or close all opened objects
682          *        and return an error.  I think the former is preferred.
683          */
684         obdo_free(tmp);
685         RETURN(rc);
686 }
687
688 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
689                      struct lov_stripe_md *lsm)
690 {
691         struct obdo tmp;
692         struct obd_export *export = class_conn2export(conn);
693         struct lov_obd *lov;
694         struct lov_oinfo *loi;
695         int rc = 0, i;
696         ENTRY;
697
698         if (!lsm) {
699                 CERROR("LOV requires striping ea\n");
700                 RETURN(-EINVAL);
701         }
702
703         if (lsm->lsm_magic != LOV_MAGIC) {
704                 CERROR("LOV striping magic bad %#lx != %#lx\n",
705                        lsm->lsm_magic, LOV_MAGIC);
706                 RETURN(-EINVAL);
707         }
708
709         if (!export || !export->exp_obd)
710                 RETURN(-ENODEV);
711
712         lov = &export->exp_obd->u.lov;
713         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
714                 int err;
715
716                 /* create data objects with "parent" OA */
717                 memcpy(&tmp, oa, sizeof(tmp));
718                 tmp.o_id = loi->loi_id;
719
720                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
721                 if (err) {
722                         CERROR("Error close objid "LPX64" subobj "LPX64
723                                " on OST idx %d: rc = %d\n",
724                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
725                         if (!rc)
726                                 rc = err;
727                 }
728         }
729         RETURN(rc);
730 }
731
732 #ifndef log2
733 #define log2(n) ffz(~(n))
734 #endif
735
736 #warning FIXME: merge these two functions now that they are nearly the same
737
738 /* compute ost offset in stripe "stripeno" corresponding to offset "lov_off" */
739 static obd_off lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
740                                  int stripeno)
741 {
742         unsigned long ssize  = lsm->lsm_stripe_size;
743         unsigned long swidth = ssize * lsm->lsm_stripe_count;
744         unsigned long stripe_off, this_stripe;
745
746         if (lov_off == OBD_OBJECT_EOF || lov_off == 0)
747                 return lov_off;
748
749         /* do_div(a, b) returns a % b, and a = a / b */
750         stripe_off = do_div(lov_off, swidth);
751
752         this_stripe = stripeno * ssize;
753         if (stripe_off <= this_stripe)
754                 stripe_off = 0;
755         else {
756                 stripe_off -= this_stripe;
757
758                 if (stripe_off > ssize)
759                         stripe_off = ssize;
760         }
761
762
763         return lov_off * ssize + stripe_off;
764 }
765
766 /* compute which stripe number "lov_off" will be written into */
767 static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
768 {
769         unsigned long ssize  = lsm->lsm_stripe_size;
770         unsigned long swidth = ssize * lsm->lsm_stripe_count;
771         unsigned long stripe_off;
772
773         stripe_off = do_div(lov_off, swidth);
774
775         return stripe_off / ssize;
776 }
777
778
779 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
780  * we can send this 'punch' to just the authoritative node and the nodes
781  * that the punch will affect. */
782 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
783                      struct lov_stripe_md *lsm,
784                      obd_off start, obd_off end)
785 {
786         struct obdo tmp;
787         struct obd_export *export = class_conn2export(conn);
788         struct lov_obd *lov;
789         struct lov_oinfo *loi;
790         int rc = 0, i;
791         ENTRY;
792
793         if (!lsm) {
794                 CERROR("LOV requires striping ea\n");
795                 RETURN(-EINVAL);
796         }
797
798         if (lsm->lsm_magic != LOV_MAGIC) {
799                 CERROR("LOV striping magic bad %#lx != %#lx\n",
800                        lsm->lsm_magic, LOV_MAGIC);
801                 RETURN(-EINVAL);
802         }
803
804         if (!export || !export->exp_obd)
805                 RETURN(-ENODEV);
806
807         lov = &export->exp_obd->u.lov;
808         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
809                 obd_off starti = lov_stripe_offset(lsm, start, i);
810                 obd_off endi = lov_stripe_offset(lsm, end, i);
811                 int err;
812
813                 if (starti == endi)
814                         continue;
815                 /* create data objects with "parent" OA */
816                 memcpy(&tmp, oa, sizeof(tmp));
817                 tmp.o_id = loi->loi_id;
818
819                 err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL,
820                                 starti, endi);
821                 if (err) {
822                         CERROR("Error punch objid "LPX64" subobj "LPX64
823                                " on OST idx %d: rc = %d\n",
824                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
825                         if (!rc)
826                                 rc = err;
827                 }
828         }
829         RETURN(rc);
830 }
831
832 static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase)
833 {
834         int ret = 0;
835         ENTRY;
836
837         if (phase == CB_PHASE_START)
838                 RETURN(0);
839
840         if (phase == CB_PHASE_FINISH) {
841                 if (err)
842                         cbd->err = err;
843                 if (atomic_dec_and_test(&cbd->refcount))
844                         ret = cbd->cb(cbd->data, cbd->err, phase);
845                 RETURN(ret);
846         }
847
848         LBUG();
849         return 0;
850 }
851
852 static inline int lov_brw(int cmd, struct lustre_handle *conn,
853                           struct lov_stripe_md *lsm, obd_count oa_bufs,
854                           struct brw_page *pga,
855                           brw_callback_t callback, struct io_cb_data *cbd)
856 {
857         int stripe_count = lsm->lsm_stripe_count;
858         struct obd_export *export = class_conn2export(conn);
859         struct lov_obd *lov;
860         struct {
861                 int bufct;
862                 int index;
863                 int subcount;
864                 struct lov_stripe_md lsm;
865                 int ost_idx;
866         } *stripeinfo, *si, *si_last;
867         struct brw_page *ioarr;
868         int rc, i;
869         struct io_cb_data *our_cb;
870         struct lov_oinfo *loi;
871         int *where;
872         ENTRY;
873
874         if (!lsm) {
875                 CERROR("LOV requires striping ea\n");
876                 RETURN(-EINVAL);
877         }
878
879         if (lsm->lsm_magic != LOV_MAGIC) {
880                 CERROR("LOV striping magic bad %#lx != %#lx\n",
881                        lsm->lsm_magic, LOV_MAGIC);
882                 RETURN(-EINVAL);
883         }
884
885         lov = &export->exp_obd->u.lov;
886
887         our_cb = ll_init_cb();
888         if (!our_cb)
889                 RETURN(-ENOMEM);
890
891         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
892         if (!stripeinfo)
893                 GOTO(out_cbdata, rc = -ENOMEM);
894
895         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
896         if (!where)
897                 GOTO(out_sinfo, rc = -ENOMEM);
898
899         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
900         if (!ioarr)
901                 GOTO(out_where, rc = -ENOMEM);
902
903         /* This is the only race-free way I can think of to get the refcount
904          * correct. -phil */
905         atomic_set(&our_cb->refcount, 0);
906         our_cb->cb = callback;
907         our_cb->data = cbd;
908
909         for (i = 0; i < oa_bufs; i++) {
910                 where[i] = lov_stripe_number(lsm, pga[i].off);
911                 if (stripeinfo[where[i]].bufct++ == 0)
912                         atomic_inc(&our_cb->refcount);
913         }
914
915         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
916              i < stripe_count; i++, loi++, si_last = si, si++) {
917                 if (i > 0)
918                         si->index = si_last->index + si_last->bufct;
919                 si->lsm.lsm_object_id = loi->loi_id;
920                 si->ost_idx = loi->loi_ost_idx;
921         }
922
923         for (i = 0; i < oa_bufs; i++) {
924                 int which = where[i];
925                 int shift;
926
927                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
928                 LASSERT(shift < oa_bufs);
929                 ioarr[shift] = pga[i];
930                 ioarr[shift].off = lov_stripe_offset(lsm, pga[i].off, which);
931                 stripeinfo[which].subcount++;
932         }
933
934         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
935                 int shift = si->index;
936
937                 if (si->bufct) {
938                         LASSERT(shift < oa_bufs);
939                         /* XXX handle error returns here */
940                         obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
941                                 &si->lsm, si->bufct, &ioarr[shift],
942                                 lov_osc_brw_callback, our_cb);
943                 }
944         }
945
946         rc = callback(cbd, 0, CB_PHASE_START);
947
948         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
949  out_where:
950         OBD_FREE(where, sizeof(*where) * oa_bufs);
951  out_sinfo:
952         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
953  out_cbdata:
954         OBD_FREE(our_cb, sizeof(*our_cb));
955         RETURN(rc);
956 }
957
958 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
959                        struct lustre_handle *parent_lock,
960                        __u32 type, void *cookie, int cookielen, __u32 mode,
961                        int *flags, void *cb, void *data, int datalen,
962                        struct lustre_handle *lockhs)
963 {
964         struct obd_export *export = class_conn2export(conn);
965         struct lov_obd *lov;
966         struct lov_oinfo *loi;
967         int rc = 0, i;
968         ENTRY;
969
970         if (!lsm) {
971                 CERROR("LOV requires striping ea\n");
972                 RETURN(-EINVAL);
973         }
974
975         if (lsm->lsm_magic != LOV_MAGIC) {
976                 CERROR("LOV striping magic bad %#lx != %#lx\n",
977                        lsm->lsm_magic, LOV_MAGIC);
978                 RETURN(-EINVAL);
979         }
980
981         if (!export || !export->exp_obd)
982                 RETURN(-ENODEV);
983
984         lov = &export->exp_obd->u.lov;
985         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
986                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
987                 struct ldlm_extent sub_ext;
988                 struct lov_stripe_md submd;
989
990                 sub_ext.start = lov_stripe_offset(lsm, extent->start, i);
991                 sub_ext.end = lov_stripe_offset(lsm, extent->end, i);
992                 if (sub_ext.start == sub_ext.end)
993                         continue;
994
995                 submd.lsm_object_id = loi->loi_id;
996                 /* XXX submd lsm_mds_easize should be that from the subobj,
997                  *     and the subobj should get it opaquely from the LOV.
998                  */
999                 submd.lsm_mds_easize = lov_mds_md_size(lsm->lsm_ost_count);
1000                 submd.lsm_stripe_count = 0;
1001                 /* XXX submd is not fully initialized here */
1002                 rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
1003                                  parent_lock, type, &sub_ext, sizeof(sub_ext),
1004                                  mode, flags, cb, data, datalen, &(lockhs[i]));
1005                 // XXX add a lock debug statement here
1006                 if (rc)
1007                         CERROR("Error enqueue objid "LPX64" subobj "LPX64
1008                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1009                                loi->loi_id, loi->loi_ost_idx, rc);
1010         }
1011         RETURN(rc);
1012 }
1013
1014 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1015                       __u32 mode, struct lustre_handle *lockhs)
1016 {
1017         struct obd_export *export = class_conn2export(conn);
1018         struct lov_obd *lov;
1019         struct lov_oinfo *loi;
1020         int rc = 0, i;
1021         ENTRY;
1022
1023         if (!lsm) {
1024                 CERROR("LOV requires striping ea\n");
1025                 RETURN(-EINVAL);
1026         }
1027
1028         if (lsm->lsm_magic != LOV_MAGIC) {
1029                 CERROR("LOV striping magic bad %#lx != %#lx\n",
1030                        lsm->lsm_magic, LOV_MAGIC);
1031                 RETURN(-EINVAL);
1032         }
1033
1034         if (!export || !export->exp_obd)
1035                 RETURN(-ENODEV);
1036
1037         lov = &export->exp_obd->u.lov;
1038         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1039                 struct lov_stripe_md submd;
1040
1041                 if (lockhs[i].addr == 0)
1042                         continue;
1043
1044                 submd.lsm_object_id = loi->loi_id;
1045                 submd.lsm_mds_easize = lov_mds_md_size(lsm->lsm_ost_count);
1046                 submd.lsm_stripe_count = 0;
1047                 rc = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1048                                 mode, &lockhs[i]);
1049                 if (rc)
1050                         CERROR("Error cancel objid "LPX64" subobj "LPX64
1051                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1052                                loi->loi_id, loi->loi_ost_idx, rc);
1053         }
1054         RETURN(rc);
1055 }
1056
1057 static int lov_cancel_unused(struct lustre_handle *conn,
1058                              struct lov_stripe_md *lsm, int local_only)
1059 {
1060         struct obd_export *export = class_conn2export(conn);
1061         struct lov_obd *lov;
1062         struct lov_oinfo *loi;
1063         int rc = 0, i;
1064         ENTRY;
1065
1066         if (!lsm) {
1067                 CERROR("LOV requires striping ea for lock cancellation\n");
1068                 RETURN(-EINVAL);
1069         }
1070
1071         if (!export || !export->exp_obd)
1072                 RETURN(-ENODEV);
1073
1074         lov = &export->exp_obd->u.lov;
1075         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1076                 struct lov_stripe_md submd;
1077
1078                 submd.lsm_object_id = loi->loi_id;
1079                 submd.lsm_mds_easize = lov_mds_md_size(lsm->lsm_ost_count);
1080                 submd.lsm_stripe_count = 0;
1081                 rc = obd_cancel_unused(&lov->tgts[loi->loi_ost_idx].conn,
1082                                        &submd, local_only);
1083                 if (rc)
1084                         CERROR("Error cancel unused objid "LPX64" subobj "LPX64
1085                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1086                                loi->loi_id, loi->loi_ost_idx, rc);
1087         }
1088         RETURN(rc);
1089 }
1090
1091 static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
1092 {
1093         struct obd_export *export = class_conn2export(conn);
1094         struct lov_obd *lov;
1095         struct obd_statfs lov_sfs;
1096         int set = 0;
1097         int rc = 0;
1098         int i;
1099         ENTRY;
1100
1101         if (!export || !export->exp_obd)
1102                 RETURN(-ENODEV);
1103
1104         lov = &export->exp_obd->u.lov;
1105
1106         /* We only get block data from the OBD */
1107         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1108                 int err;
1109
1110                 err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
1111                 if (err) {
1112                         CERROR("Error statfs OSC %s idx %d: err = %d\n",
1113                                lov->tgts[i].uuid, i, err);
1114                         if (!rc)
1115                                 rc = err;
1116                         continue; /* XXX or break? - probably OK to continue */
1117                 }
1118                 if (!set) {
1119                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
1120                         set = 1;
1121                 } else {
1122                         osfs->os_bfree += lov_sfs.os_bfree;
1123                         osfs->os_bavail += lov_sfs.os_bavail;
1124                         osfs->os_blocks += lov_sfs.os_blocks;
1125                         /* XXX not sure about this one - depends on policy.
1126                          *   - could be minimum if we always stripe on all OBDs
1127                          *     (but that would be wrong for any other policy,
1128                          *     if one of the OBDs has no more objects left)
1129                          *   - could be sum if we stripe whole objects
1130                          *   - could be average, just to give a nice number
1131                          *   - we just pick first OST and hope it is enough
1132                         sfs->f_ffree += lov_sfs.f_ffree;
1133                          */
1134                 }
1135         }
1136         RETURN(rc);
1137 }
1138
1139 static int lov_iocontrol(long cmd, struct lustre_handle *conn, int len,
1140                          void *karg, void *uarg)
1141 {
1142         struct obd_device *obddev = class_conn2obd(conn);
1143         struct obd_ioctl_data *data = karg;
1144         struct lov_obd *lov = &obddev->u.lov;
1145         int rc, i;
1146         ENTRY;
1147
1148         switch (cmd) {
1149         case IOC_LOV_SET_OSC_ACTIVE:
1150                 rc = lov_set_osc_active(lov,data->ioc_inlbuf1,data->ioc_offset);
1151                 break;
1152         default:
1153                 rc = -ENOTTY;
1154                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1155                         int err = obd_iocontrol(cmd, &lov->tgts[i].conn,
1156                                                 len, data, NULL);
1157                         if (err && !rc)
1158                                 rc = err;
1159                 }
1160         }
1161
1162         RETURN(rc);
1163 }
1164
1165 struct obd_ops lov_obd_ops = {
1166         o_setup:       lov_setup,
1167         o_connect:     lov_connect,
1168         o_disconnect:  lov_disconnect,
1169         o_create:      lov_create,
1170         o_destroy:     lov_destroy,
1171         o_getattr:     lov_getattr,
1172         o_setattr:     lov_setattr,
1173         o_statfs:      lov_statfs,
1174         o_open:        lov_open,
1175         o_close:       lov_close,
1176         o_brw:         lov_brw,
1177         o_punch:       lov_punch,
1178         o_enqueue:     lov_enqueue,
1179         o_cancel:      lov_cancel,
1180         o_cancel_unused: lov_cancel_unused,
1181         o_iocontrol:   lov_iocontrol
1182 };
1183
1184
1185 #define LOV_VERSION "v0.1"
1186
1187 static int __init lov_init(void)
1188 {
1189         printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
1190                ", info@clusterfs.com\n");
1191         return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
1192 }
1193
1194 static void __exit lov_exit(void)
1195 {
1196         class_unregister_type(OBD_LOV_DEVICENAME);
1197 }
1198
1199 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1200 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
1201 MODULE_LICENSE("GPL");
1202
1203 module_init(lov_init);
1204 module_exit(lov_exit);