Whamcloud - gitweb
Some random double pointer error..
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lov/lov.c
5  *
6  * Copyright (C) 2002 Cluster File Systems, Inc.
7  * Author: Phil Schwan <phil@off.net>
8  *         Peter Braam <braam@clusterfs.com>
9  *
10  * This code is issued under the GNU General Public License.
11  * See the file COPYING in this distribution
12  */
13
14 #define EXPORT_SYMTAB
15 #define DEBUG_SUBSYSTEM S_LOV
16
17 #include <linux/slab.h>
18 #include <linux/module.h>
19 #include <linux/obd_support.h>
20 #include <linux/lustre_lib.h>
21 #include <linux/lustre_net.h>
22 #include <linux/lustre_idl.h>
23 #include <linux/lustre_mds.h>
24 #include <linux/obd_class.h>
25 #include <linux/obd_lov.h>
26 #include <linux/init.h>
27 #include <asm/div64.h>
28
29 /* obd methods */
30 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
31                        obd_uuid_t cluuid, struct recovd_obd *recovd,
32                        ptlrpc_recovery_cb_t recover)
33 {
34         struct ptlrpc_request *req = NULL;
35         struct lov_obd *lov = &obd->u.lov;
36         struct client_obd *mdc = &lov->mdcobd->u.cli;
37         struct lov_desc *desc = &lov->desc;
38         struct lustre_handle mdc_conn;
39         obd_uuid_t *uuidarray;
40         int rc, rc2, i;
41
42         MOD_INC_USE_COUNT;
43         rc = class_connect(conn, obd, cluuid);
44         if (rc) {
45                 MOD_DEC_USE_COUNT;
46                 RETURN(rc);
47         }
48
49         /* We don't want to actually do the underlying connections more than
50          * once, so keep track. */
51         lov->refcount++;
52         if (lov->refcount > 1)
53                 RETURN(0);
54
55         /* retrieve LOV metadata from MDS */
56         rc = obd_connect(&mdc_conn, lov->mdcobd, NULL, recovd, recover);
57         if (rc) {
58                 CERROR("cannot connect to mdc: rc = %d\n", rc);
59                 GOTO(out_conn, rc);
60         }
61
62         rc = mdc_getlovinfo(obd, &mdc_conn, &req);
63         rc2 = obd_disconnect(&mdc_conn);
64         if (rc) {
65                 CERROR("cannot get lov info %d\n", rc);
66                 GOTO(out_conn, rc);
67         }
68
69         if (rc2) {
70                 CERROR("error disconnecting from MDS %d\n", rc2);
71                 GOTO(out_conn, rc = rc2);
72         }
73
74         /* sanity... */
75         if (req->rq_repmsg->bufcount < 2 ||
76             req->rq_repmsg->buflens[0] < sizeof(*desc)) {
77                 CERROR("LOV desc: invalid descriptor returned\n");
78                 GOTO(out_conn, rc = -EINVAL);
79         }
80
81         memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
82         lov_unpackdesc(desc);
83
84         if (req->rq_repmsg->buflens[1] < sizeof(*uuidarray)*desc->ld_tgt_count){
85                 CERROR("LOV desc: invalid uuid array returned\n");
86                 GOTO(out_conn, rc = -EINVAL);
87         }
88
89         mdc->cl_max_mds_easize = lov_mds_md_size(desc->ld_tgt_count);
90         mdc->cl_max_ost_easize = lov_stripe_md_size(desc->ld_tgt_count);
91
92         if (memcmp(obd->obd_uuid, desc->ld_uuid, sizeof(desc->ld_uuid))) {
93                 CERROR("LOV desc: uuid %s not on mds device (%s)\n",
94                        obd->obd_uuid, desc->ld_uuid);
95                 GOTO(out_conn, rc = -EINVAL);
96         }
97
98         if (desc->ld_tgt_count > 1000) {
99                 CERROR("LOV desc: target count > 1000 (%d)\n",
100                        desc->ld_tgt_count);
101                 GOTO(out_conn, rc = -EINVAL);
102         }
103
104         /* Because of 64-bit divide/mod operations only work with a 32-bit
105          * divisor in a 32-bit kernel, we cannot support a stripe width
106          * of 4GB or larger on 32-bit CPUs.
107          */
108         if ((desc->ld_default_stripe_count ?
109              desc->ld_default_stripe_count : desc->ld_tgt_count) *
110              desc->ld_default_stripe_size > ~0UL) {
111                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
112                        desc->ld_default_stripe_size,
113                        desc->ld_default_stripe_count ?
114                        desc->ld_default_stripe_count : desc->ld_tgt_count,~0UL);
115                 GOTO(out_conn, rc = -EINVAL);
116         }
117
118         lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
119         OBD_ALLOC(lov->tgts, lov->bufsize);
120         if (!lov->tgts) {
121                 CERROR("Out of memory\n");
122                 GOTO(out_conn, rc = -ENOMEM);
123         }
124
125         uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
126         for (i = 0; i < desc->ld_tgt_count; i++)
127                 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(*uuidarray));
128
129         for (i = 0; i < desc->ld_tgt_count; i++) {
130                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
131                 if (!tgt) {
132                         CERROR("Target %s not attached\n", uuidarray[i]);
133                         GOTO(out_disc, rc = -EINVAL);
134                 }
135                 if (!(tgt->obd_flags & OBD_SET_UP)) {
136                         CERROR("Target %s not set up\n", uuidarray[i]);
137                         GOTO(out_disc, rc = -EINVAL);
138                 }
139                 rc = obd_connect(&lov->tgts[i].conn, tgt, NULL, recovd,
140                                  recover);
141                 if (rc) {
142                         CERROR("Target %s connect error %d\n",
143                                uuidarray[i], rc);
144                         GOTO(out_disc, rc);
145                 }
146                 desc->ld_active_tgt_count++;
147                 lov->tgts[i].active = 1;
148         }
149
150  out:
151         ptlrpc_req_finished(req);
152         return rc;
153
154  out_disc:
155         while (i-- > 0) {
156                 desc->ld_active_tgt_count--;
157                 lov->tgts[i].active = 0;
158                 rc2 = obd_disconnect(&lov->tgts[i].conn);
159                 if (rc2)
160                         CERROR("LOV Target %s disconnect error: rc = %d\n",
161                                 uuidarray[i], rc2);
162         }
163         OBD_FREE(lov->tgts, lov->bufsize);
164  out_conn:
165         class_disconnect(conn);
166         goto out;
167 }
168
169 static int lov_disconnect(struct lustre_handle *conn)
170 {
171         struct obd_device *obd = class_conn2obd(conn);
172         struct lov_obd *lov = &obd->u.lov;
173         int rc, i;
174
175         if (!lov->tgts)
176                 goto out_local;
177
178         /* Only disconnect the underlying laters on the final disconnect. */
179         lov->refcount--;
180         if (lov->refcount != 0)
181                 goto out_local;
182
183         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
184                 if (!lov->tgts[i].active) {
185                         CERROR("Skipping disconnect for inactive OSC %s\n",
186                                lov->tgts[i].uuid);
187                         continue;
188                 }
189
190                 lov->desc.ld_active_tgt_count--;
191                 lov->tgts[i].active = 0;
192                 rc = obd_disconnect(&lov->tgts[i].conn);
193                 if (rc) {
194                         CERROR("Target %s disconnect error %d\n",
195                                lov->tgts[i].uuid, rc);
196                         RETURN(rc);
197                 }
198         }
199         OBD_FREE(lov->tgts, lov->bufsize);
200         lov->bufsize = 0;
201         lov->tgts = NULL;
202
203  out_local:
204         rc = class_disconnect(conn);
205         if (!rc)
206                 MOD_DEC_USE_COUNT;
207         return rc;
208 }
209
210 /* Error codes:
211  *
212  *  -EINVAL  : UUID can't be found in the LOV's target list
213  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
214  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
215  *  -EALREADY: The OSC is already marked (in)active
216  */
217 static int lov_set_osc_active(struct lov_obd *lov, obd_uuid_t uuid,
218                               int activate)
219 {
220         struct obd_device *obd;
221         int i, rc = 0;
222         ENTRY;
223
224         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
225                lov, uuid, activate);
226
227         spin_lock(&lov->lov_lock);
228         for (i = 0; i < lov->desc.ld_tgt_count; i++)
229                 if (strncmp(uuid, lov->tgts[i].uuid,
230                             sizeof(lov->tgts[i].uuid)) == 0)
231                         break;
232
233         if (i == lov->desc.ld_tgt_count)
234                 GOTO(out, rc = -EINVAL);
235
236         obd = class_conn2obd(&lov->tgts[i].conn);
237         if (obd == NULL) {
238                 LBUG();
239                 GOTO(out, rc = -ENOTCONN);
240         }
241
242         CDEBUG(D_INFO, "Found OBD %p type %s\n", obd, obd->obd_type->typ_name);
243         if (strcmp(obd->obd_type->typ_name, "osc") != 0) {
244                 LBUG();
245                 GOTO(out, rc = -EBADF);
246         }
247
248         if (lov->tgts[i].active == activate) {
249                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
250                        activate ? "" : "in");
251                 GOTO(out, rc = -EALREADY);
252         }
253
254         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
255
256         lov->tgts[i].active = activate;
257         if (activate)
258                 lov->desc.ld_active_tgt_count++;
259         else
260                 lov->desc.ld_active_tgt_count--;
261
262         EXIT;
263  out:
264         spin_unlock(&lov->lov_lock);
265         return rc;
266 }
267
268 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
269 {
270         struct obd_ioctl_data* data = buf;
271         struct lov_obd *lov = &obd->u.lov;
272         int rc = 0;
273         ENTRY;
274
275         if (data->ioc_inllen1 < 1) {
276                 CERROR("osc setup requires an MDC UUID\n");
277                 RETURN(-EINVAL);
278         }
279
280         if (data->ioc_inllen1 > 37) {
281                 CERROR("mdc UUID must be 36 characters or less\n");
282                 RETURN(-EINVAL);
283         }
284
285         spin_lock_init(&lov->lov_lock);
286         lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1);
287         if (!lov->mdcobd) {
288                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid,
289                        data->ioc_inlbuf1);
290                 rc = -EINVAL;
291         }
292         RETURN(rc);
293 }
294
295
296 /* the LOV expects oa->o_id to be set to the LOV object id */
297 static int lov_create(struct lustre_handle *conn, struct obdo *oa,
298                       struct lov_stripe_md **ea)
299 {
300         struct obd_export *export = class_conn2export(conn);
301         struct lov_obd *lov;
302         struct lov_stripe_md *lsm;
303         struct lov_oinfo *loi;
304         struct obdo *tmp;
305         int ost_count, ost_idx, i, rc = 0;
306         ENTRY;
307
308         LASSERT(ea);
309
310         if (!export)
311                 RETURN(-EINVAL);
312
313         tmp = obdo_alloc();
314         if (!tmp)
315                 RETURN(-ENOMEM);
316
317         lov = &export->exp_obd->u.lov;
318
319         spin_lock(&lov->lov_lock);
320         ost_count = lov->desc.ld_tgt_count;
321         oa->o_easize = lov_stripe_md_size(ost_count);
322
323         lsm = *ea;
324         if (!lsm) {
325                 OBD_ALLOC(lsm, oa->o_easize);
326                 if (!lsm) {
327                         spin_unlock(&lov->lov_lock);
328                         GOTO(out_tmp, rc = -ENOMEM);
329                 }
330         }
331
332         LASSERT(oa->o_valid & OBD_MD_FLID);
333         lsm->lsm_magic = LOV_MAGIC;
334         lsm->lsm_mds_easize = lov_mds_md_size(ost_count);
335         lsm->lsm_object_id = oa->o_id;
336         if (!lsm->lsm_stripe_count)
337                 lsm->lsm_stripe_count = lov->desc.ld_default_stripe_count;
338         if (!lsm->lsm_stripe_count)
339                 lsm->lsm_stripe_count = lov->desc.ld_active_tgt_count;
340         else if (lsm->lsm_stripe_count > lov->desc.ld_active_tgt_count)
341                 lsm->lsm_stripe_count = lov->desc.ld_active_tgt_count;
342
343         if (!lsm->lsm_stripe_size)
344                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
345
346         /* Because of 64-bit divide/mod operations only work with a 32-bit
347          * divisor in a 32-bit kernel, we cannot support a stripe width
348          * of 4GB or larger on 32-bit CPUs.
349          */
350         if (lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL) {
351                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
352                        lsm->lsm_stripe_size, lsm->lsm_stripe_count, ~0UL);
353                 spin_unlock(&lov->lov_lock);
354                 GOTO(out_free, rc = -EINVAL);
355         }
356
357         lsm->lsm_ost_count = ost_count;
358         if (!lsm->lsm_stripe_offset) {
359                 int mult = lsm->lsm_object_id * lsm->lsm_stripe_count;
360                 int stripe_offset = mult % ost_count;
361                 int sub_offset = (mult / ost_count) % lsm->lsm_stripe_count;
362
363                 lsm->lsm_stripe_offset = stripe_offset + sub_offset;
364         }
365
366         /* Pick the OSTs before we release the lock */
367         ost_idx = lsm->lsm_stripe_offset;
368         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
369                 do {
370                         ost_idx = (ost_idx + 1) % ost_count;
371                 } while (!lov->tgts[ost_idx].active);
372                 CDEBUG(D_INFO, "Using ost_idx %d (uuid %s)\n", ost_idx,
373                        lov->tgts[ost_idx].uuid);
374                 loi->loi_ost_idx = ost_idx;
375         }
376
377         spin_unlock(&lov->lov_lock);
378
379         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
380                lsm->lsm_stripe_count,lsm->lsm_object_id,lsm->lsm_stripe_offset);
381
382         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
383                 struct lov_stripe_md obj_md;
384                 struct lov_stripe_md *obj_mdp = &obj_md;
385
386                 ost_idx = loi->loi_ost_idx;
387
388                 /* create data objects with "parent" OA */
389                 memcpy(tmp, oa, sizeof(*tmp));
390                 tmp->o_easize = sizeof(struct lov_stripe_md);
391                 rc = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp);
392                 if (rc) {
393                         CERROR("error creating objid "LPX64" sub-object on "
394                                "OST idx %d: rc = %d\n", oa->o_id, ost_idx, rc);
395                         GOTO(out_cleanup, rc);
396                 }
397                 loi->loi_id = tmp->o_id;
398                 loi->loi_size = tmp->o_size;
399                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
400                        lsm->lsm_object_id, loi->loi_id, ost_idx);
401         }
402
403         *ea = lsm;
404
405  out_tmp:
406         obdo_free(tmp);
407         return rc;
408
409  out_cleanup:
410         while (i-- > 0) {
411                 int err;
412
413                 --loi;
414                 /* destroy already created objects here */
415                 memcpy(tmp, oa, sizeof(*tmp));
416                 tmp->o_id = loi->loi_id;
417                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
418                 if (err)
419                         CERROR("Failed to uncreate objid "LPX64" subobj "
420                                LPX64" on OST idx %d: rc = %d\n",
421                                oa->o_id, loi->loi_id, loi->loi_ost_idx,
422                                err);
423         }
424  out_free:
425         OBD_FREE(lsm, oa->o_easize);
426         goto out_tmp;
427 }
428
429 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
430                        struct lov_stripe_md *lsm)
431 {
432         struct obdo tmp;
433         struct obd_export *export = class_conn2export(conn);
434         struct lov_obd *lov;
435         struct lov_oinfo *loi;
436         int rc = 0, i;
437         ENTRY;
438
439         if (!lsm) {
440                 CERROR("LOV requires striping ea for destruction\n");
441                 RETURN(-EINVAL);
442         }
443
444         if (lsm->lsm_magic != LOV_MAGIC) {
445                 CERROR("LOV striping magic bad %#lx != %#lx\n",
446                        lsm->lsm_magic, LOV_MAGIC);
447                 RETURN(-EINVAL);
448         }
449
450         if (!export || !export->exp_obd)
451                 RETURN(-ENODEV);
452
453         lov = &export->exp_obd->u.lov;
454         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
455                 /* create data objects with "parent" OA */
456                 memcpy(&tmp, oa, sizeof(tmp));
457                 tmp.o_id = loi->loi_id;
458                 rc = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
459                 if (rc)
460                         CERROR("Error destroying objid "LPX64" subobj "LPX64
461                                " on OST idx %d\n: rc = %d",
462                                oa->o_id, loi->loi_id, loi->loi_ost_idx, rc);
463         }
464         RETURN(rc);
465 }
466
467 /* compute object size given "stripeno" and the ost size */
468 static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
469                                 int stripeno)
470 {
471         unsigned long ssize  = lsm->lsm_stripe_size;
472         unsigned long swidth = ssize * lsm->lsm_stripe_count;
473         unsigned long stripe_size;
474         obd_size lov_size;
475
476         if (ost_size == 0)
477                 return 0;
478
479         /* do_div(a, b) returns a % b, and a = a / b */
480         stripe_size = do_div(ost_size, ssize);
481
482         if (stripe_size)
483                 lov_size = ost_size * swidth + stripeno * ssize + stripe_size;
484         else
485                 lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize;
486
487         return lov_size;
488 }
489
490 static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
491                             struct lov_stripe_md *lsm, int stripeno, int *new)
492 {
493         if (*new) {
494                 obdo_cpy_md(tgt, src, valid);
495                 if (valid & OBD_MD_FLSIZE)
496                         tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
497                 *new = 0;
498         } else {
499                 if (valid & OBD_MD_FLSIZE) {
500                         /* this handles sparse files properly */
501                         obd_size lov_size;
502
503                         lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
504                         if (lov_size > tgt->o_size)
505                                 tgt->o_size = lov_size;
506                 }
507                 if (valid & OBD_MD_FLBLOCKS)
508                         tgt->o_blocks += src->o_blocks;
509                 if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
510                         tgt->o_ctime = src->o_ctime;
511                 if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
512                         tgt->o_mtime = src->o_mtime;
513         }
514 }
515
516 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
517                        struct lov_stripe_md *lsm)
518 {
519         struct obdo tmp;
520         struct obd_export *export = class_conn2export(conn);
521         struct lov_obd *lov;
522         struct lov_oinfo *loi;
523         int rc = 0, i;
524         int new = 1;
525         ENTRY;
526
527         if (!lsm) {
528                 CERROR("LOV requires striping ea\n");
529                 RETURN(-EINVAL);
530         }
531
532         if (lsm->lsm_magic != LOV_MAGIC) {
533                 CERROR("LOV striping magic bad %#lx != %#lx\n",
534                        lsm->lsm_magic, LOV_MAGIC);
535                 RETURN(-EINVAL);
536         }
537
538         if (!export || !export->exp_obd)
539                 RETURN(-ENODEV);
540
541         lov = &export->exp_obd->u.lov;
542         oa->o_size = 0;
543         oa->o_blocks = 0;
544         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
545                 int err;
546
547                 if (loi->loi_id == 0)
548                         continue;
549
550                 /* create data objects with "parent" OA */
551                 memcpy(&tmp, oa, sizeof(tmp));
552                 tmp.o_id = loi->loi_id;
553
554                 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
555                 if (err) {
556                         CERROR("Error getattr objid "LPX64" subobj "LPX64
557                                " on OST idx %d: rc = %d\n",
558                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
559                         if (!rc)
560                                 rc = err;
561                         continue; /* XXX or break? */
562                 }
563                 lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &new);
564         }
565         RETURN(rc);
566 }
567
568 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
569                        struct lov_stripe_md *lsm)
570 {
571         struct obdo tmp;
572         struct obd_export *export = class_conn2export(conn);
573         struct lov_obd *lov;
574         struct lov_oinfo *loi;
575         int rc = 0, i;
576         ENTRY;
577
578         /* Note that this code is currently unused, hence LBUG(), just
579          * to know when/if it is ever revived that it needs cleanups.
580          */
581         LBUG();
582
583         if (!lsm) {
584                 CERROR("LOV requires striping ea\n");
585                 RETURN(-EINVAL);
586         }
587
588         if (lsm->lsm_magic != LOV_MAGIC) {
589                 CERROR("LOV striping magic bad %#lx != %#lx\n",
590                        lsm->lsm_magic, LOV_MAGIC);
591                 RETURN(-EINVAL);
592         }
593
594         if (!export || !export->exp_obd)
595                 RETURN(-ENODEV);
596
597         /* size changes should go through punch and not setattr */
598         LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
599
600         lov = &export->exp_obd->u.lov;
601         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
602                 int err;
603
604                 /* create data objects with "parent" OA */
605                 memcpy(&tmp, oa, sizeof(tmp));
606                 tmp.o_id = loi->loi_id;
607
608                 err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
609                 if (err) {
610                         CERROR("Error setattr objid "LPX64" subobj "LPX64
611                                " on OST idx %d: rc = %d\n",
612                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
613                         if (!rc)
614                                 rc = err;
615                 }
616         }
617         RETURN(rc);
618 }
619
620 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
621                     struct lov_stripe_md *lsm)
622 {
623         struct obdo *tmp;
624         struct obd_export *export = class_conn2export(conn);
625         struct lov_obd *lov;
626         struct lov_oinfo *loi;
627         int new = 1;
628         int rc = 0, i;
629         ENTRY;
630
631         if (!lsm) {
632                 CERROR("LOV requires striping ea for opening\n");
633                 RETURN(-EINVAL);
634         }
635
636         if (lsm->lsm_magic != LOV_MAGIC) {
637                 CERROR("LOV striping magic bad %#lx != %#lx\n",
638                        lsm->lsm_magic, LOV_MAGIC);
639                 RETURN(-EINVAL);
640         }
641
642         if (!export || !export->exp_obd)
643                 RETURN(-ENODEV);
644
645         tmp = obdo_alloc();
646         if (!tmp)
647                 RETURN(-ENOMEM);
648
649         lov = &export->exp_obd->u.lov;
650         oa->o_size = 0;
651         oa->o_blocks = 0;
652         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
653                 int err;
654
655                 /* create data objects with "parent" OA */
656                 memcpy(tmp, oa, sizeof(*tmp));
657                 tmp->o_id = loi->loi_id;
658
659                 err = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
660                 if (err) {
661                         CERROR("Error open objid "LPX64" subobj "LPX64
662                                " on OST idx %d: rc = %d\n",
663                                oa->o_id, lsm->lsm_oinfo[i].loi_id,
664                                loi->loi_ost_idx, rc);
665                         if (!rc)
666                                 rc = err;
667                 }
668
669                 lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &new);
670         }
671         /* FIXME: returning an error, but having opened some objects is a bad
672          *        idea, since they will likely never be closed.  We either
673          *        need to not return an error if _some_ objects could be
674          *        opened, and leave it to read/write to return -EIO (with
675          *        hopefully partial error status) or close all opened objects
676          *        and return an error.  I think the former is preferred.
677          */
678         obdo_free(tmp);
679         RETURN(rc);
680 }
681
682 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
683                      struct lov_stripe_md *lsm)
684 {
685         struct obdo tmp;
686         struct obd_export *export = class_conn2export(conn);
687         struct lov_obd *lov;
688         struct lov_oinfo *loi;
689         int rc = 0, i;
690         ENTRY;
691
692         if (!lsm) {
693                 CERROR("LOV requires striping ea\n");
694                 RETURN(-EINVAL);
695         }
696
697         if (lsm->lsm_magic != LOV_MAGIC) {
698                 CERROR("LOV striping magic bad %#lx != %#lx\n",
699                        lsm->lsm_magic, LOV_MAGIC);
700                 RETURN(-EINVAL);
701         }
702
703         if (!export || !export->exp_obd)
704                 RETURN(-ENODEV);
705
706         lov = &export->exp_obd->u.lov;
707         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
708                 int err;
709
710                 /* create data objects with "parent" OA */
711                 memcpy(&tmp, oa, sizeof(tmp));
712                 tmp.o_id = loi->loi_id;
713
714                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
715                 if (err) {
716                         CERROR("Error close objid "LPX64" subobj "LPX64
717                                " on OST idx %d: rc = %d\n",
718                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
719                         if (!rc)
720                                 rc = err;
721                 }
722         }
723         RETURN(rc);
724 }
725
726 #ifndef log2
727 #define log2(n) ffz(~(n))
728 #endif
729
730 #warning FIXME: merge these two functions now that they are nearly the same
731
732 /* compute ost offset in stripe "stripeno" corresponding to offset "lov_off" */
733 static obd_off lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
734                                  int stripeno)
735 {
736         unsigned long ssize  = lsm->lsm_stripe_size;
737         unsigned long swidth = ssize * lsm->lsm_stripe_count;
738         unsigned long stripe_off, this_stripe;
739
740         if (lov_off == OBD_OBJECT_EOF || lov_off == 0)
741                 return lov_off;
742
743         /* do_div(a, b) returns a % b, and a = a / b */
744         stripe_off = do_div(lov_off, swidth);
745
746         this_stripe = stripeno * ssize;
747         if (stripe_off <= this_stripe)
748                 stripe_off = 0;
749         else {
750                 stripe_off -= this_stripe;
751
752                 if (stripe_off > ssize)
753                         stripe_off = ssize;
754         }
755
756
757         return lov_off * ssize + stripe_off;
758 }
759
760 /* compute which stripe number "lov_off" will be written into */
761 static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
762 {
763         unsigned long ssize  = lsm->lsm_stripe_size;
764         unsigned long swidth = ssize * lsm->lsm_stripe_count;
765         unsigned long stripe_off;
766
767         stripe_off = do_div(lov_off, swidth);
768
769         return stripe_off / ssize;
770 }
771
772
773 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
774  * we can send this 'punch' to just the authoritative node and the nodes
775  * that the punch will affect. */
776 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
777                      struct lov_stripe_md *lsm,
778                      obd_off start, obd_off end)
779 {
780         struct obdo tmp;
781         struct obd_export *export = class_conn2export(conn);
782         struct lov_obd *lov;
783         struct lov_oinfo *loi;
784         int rc = 0, i;
785         ENTRY;
786
787         if (!lsm) {
788                 CERROR("LOV requires striping ea\n");
789                 RETURN(-EINVAL);
790         }
791
792         if (lsm->lsm_magic != LOV_MAGIC) {
793                 CERROR("LOV striping magic bad %#lx != %#lx\n",
794                        lsm->lsm_magic, LOV_MAGIC);
795                 RETURN(-EINVAL);
796         }
797
798         if (!export || !export->exp_obd)
799                 RETURN(-ENODEV);
800
801         lov = &export->exp_obd->u.lov;
802         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
803                 obd_off starti = lov_stripe_offset(lsm, start, i);
804                 obd_off endi = lov_stripe_offset(lsm, end, i);
805                 int err;
806
807                 if (starti == endi)
808                         continue;
809                 /* create data objects with "parent" OA */
810                 memcpy(&tmp, oa, sizeof(tmp));
811                 tmp.o_id = loi->loi_id;
812
813                 err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL,
814                                 starti, endi);
815                 if (err) {
816                         CERROR("Error punch objid "LPX64" subobj "LPX64
817                                " on OST idx %d: rc = %d\n",
818                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
819                         if (!rc)
820                                 rc = err;
821                 }
822         }
823         RETURN(rc);
824 }
825
826 static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase)
827 {
828         int ret = 0;
829         ENTRY;
830
831         if (phase == CB_PHASE_START)
832                 RETURN(0);
833
834         if (phase == CB_PHASE_FINISH) {
835                 if (err)
836                         cbd->err = err;
837                 if (atomic_dec_and_test(&cbd->refcount))
838                         ret = cbd->cb(cbd->data, cbd->err, phase);
839                 RETURN(ret);
840         }
841
842         LBUG();
843         return 0;
844 }
845
846 static inline int lov_brw(int cmd, struct lustre_handle *conn,
847                           struct lov_stripe_md *lsm, obd_count oa_bufs,
848                           struct brw_page *pga,
849                           brw_callback_t callback, struct io_cb_data *cbd)
850 {
851         int stripe_count = lsm->lsm_stripe_count;
852         struct obd_export *export = class_conn2export(conn);
853         struct lov_obd *lov;
854         struct {
855                 int bufct;
856                 int index;
857                 int subcount;
858                 struct lov_stripe_md lsm;
859                 int ost_idx;
860         } *stripeinfo, *si, *si_last;
861         struct brw_page *ioarr;
862         int rc, i;
863         struct io_cb_data *our_cb;
864         struct lov_oinfo *loi;
865         int *where;
866         ENTRY;
867
868         if (!lsm) {
869                 CERROR("LOV requires striping ea\n");
870                 RETURN(-EINVAL);
871         }
872
873         if (lsm->lsm_magic != LOV_MAGIC) {
874                 CERROR("LOV striping magic bad %#lx != %#lx\n",
875                        lsm->lsm_magic, LOV_MAGIC);
876                 RETURN(-EINVAL);
877         }
878
879         lov = &export->exp_obd->u.lov;
880
881         our_cb = ll_init_cb();
882         if (!our_cb)
883                 RETURN(-ENOMEM);
884
885         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
886         if (!stripeinfo)
887                 GOTO(out_cbdata, rc = -ENOMEM);
888
889         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
890         if (!where)
891                 GOTO(out_sinfo, rc = -ENOMEM);
892
893         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
894         if (!ioarr)
895                 GOTO(out_where, rc = -ENOMEM);
896
897         /* This is the only race-free way I can think of to get the refcount
898          * correct. -phil */
899         atomic_set(&our_cb->refcount, 0);
900         our_cb->cb = callback;
901         our_cb->data = cbd;
902
903         for (i = 0; i < oa_bufs; i++) {
904                 where[i] = lov_stripe_number(lsm, pga[i].off);
905                 if (stripeinfo[where[i]].bufct++ == 0)
906                         atomic_inc(&our_cb->refcount);
907         }
908
909         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
910              i < stripe_count; i++, loi++, si_last = si, si++) {
911                 if (i > 0)
912                         si->index = si_last->index + si_last->bufct;
913                 si->lsm.lsm_object_id = loi->loi_id;
914                 si->ost_idx = loi->loi_ost_idx;
915         }
916
917         for (i = 0; i < oa_bufs; i++) {
918                 int which = where[i];
919                 int shift;
920
921                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
922                 LASSERT(shift < oa_bufs);
923                 ioarr[shift] = pga[i];
924                 ioarr[shift].off = lov_stripe_offset(lsm, pga[i].off, which);
925                 stripeinfo[which].subcount++;
926         }
927
928         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
929                 int shift = si->index;
930
931                 if (si->bufct) {
932                         LASSERT(shift < oa_bufs);
933                         /* XXX handle error returns here */
934                         obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
935                                 &si->lsm, si->bufct, &ioarr[shift],
936                                 lov_osc_brw_callback, our_cb);
937                 }
938         }
939
940         rc = callback(cbd, 0, CB_PHASE_START);
941
942         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
943  out_where:
944         OBD_FREE(where, sizeof(*where) * oa_bufs);
945  out_sinfo:
946         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
947  out_cbdata:
948         OBD_FREE(our_cb, sizeof(*our_cb));
949         RETURN(rc);
950 }
951
952 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
953                        struct lustre_handle *parent_lock,
954                        __u32 type, void *cookie, int cookielen, __u32 mode,
955                        int *flags, void *cb, void *data, int datalen,
956                        struct lustre_handle *lockhs)
957 {
958         struct obd_export *export = class_conn2export(conn);
959         struct lov_obd *lov;
960         struct lov_oinfo *loi;
961         int rc = 0, i;
962         ENTRY;
963
964         if (!lsm) {
965                 CERROR("LOV requires striping ea\n");
966                 RETURN(-EINVAL);
967         }
968
969         if (lsm->lsm_magic != LOV_MAGIC) {
970                 CERROR("LOV striping magic bad %#lx != %#lx\n",
971                        lsm->lsm_magic, LOV_MAGIC);
972                 RETURN(-EINVAL);
973         }
974
975         if (!export || !export->exp_obd)
976                 RETURN(-ENODEV);
977
978         lov = &export->exp_obd->u.lov;
979         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
980                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
981                 struct ldlm_extent sub_ext;
982                 struct lov_stripe_md submd;
983
984                 sub_ext.start = lov_stripe_offset(lsm, extent->start, i);
985                 sub_ext.end = lov_stripe_offset(lsm, extent->end, i);
986                 if (sub_ext.start == sub_ext.end)
987                         continue;
988
989                 submd.lsm_object_id = loi->loi_id;
990                 /* XXX submd lsm_mds_easize should be that from the subobj,
991                  *     and the subobj should get it opaquely from the LOV.
992                  */
993                 submd.lsm_mds_easize = lov_mds_md_size(lsm->lsm_ost_count);
994                 submd.lsm_stripe_count = 0;
995                 /* XXX submd is not fully initialized here */
996                 rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
997                                  parent_lock, type, &sub_ext, sizeof(sub_ext),
998                                  mode, flags, cb, data, datalen, &(lockhs[i]));
999                 // XXX add a lock debug statement here
1000                 if (rc)
1001                         CERROR("Error enqueue objid "LPX64" subobj "LPX64
1002                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1003                                loi->loi_id, loi->loi_ost_idx, rc);
1004         }
1005         RETURN(rc);
1006 }
1007
1008 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1009                       __u32 mode, struct lustre_handle *lockhs)
1010 {
1011         struct obd_export *export = class_conn2export(conn);
1012         struct lov_obd *lov;
1013         struct lov_oinfo *loi;
1014         int rc = 0, i;
1015         ENTRY;
1016
1017         if (!lsm) {
1018                 CERROR("LOV requires striping ea\n");
1019                 RETURN(-EINVAL);
1020         }
1021
1022         if (lsm->lsm_magic != LOV_MAGIC) {
1023                 CERROR("LOV striping magic bad %#lx != %#lx\n",
1024                        lsm->lsm_magic, LOV_MAGIC);
1025                 RETURN(-EINVAL);
1026         }
1027
1028         if (!export || !export->exp_obd)
1029                 RETURN(-ENODEV);
1030
1031         lov = &export->exp_obd->u.lov;
1032         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1033                 struct lov_stripe_md submd;
1034
1035                 if (lockhs[i].addr == 0)
1036                         continue;
1037
1038                 submd.lsm_object_id = loi->loi_id;
1039                 submd.lsm_mds_easize = lov_mds_md_size(lsm->lsm_ost_count);
1040                 submd.lsm_stripe_count = 0;
1041                 rc = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1042                                 mode, &lockhs[i]);
1043                 if (rc)
1044                         CERROR("Error cancel objid "LPX64" subobj "LPX64
1045                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1046                                loi->loi_id, loi->loi_ost_idx, rc);
1047         }
1048         RETURN(rc);
1049 }
1050
1051 static int lov_cancel_unused(struct lustre_handle *conn,
1052                              struct lov_stripe_md *lsm, int local_only)
1053 {
1054         struct obd_export *export = class_conn2export(conn);
1055         struct lov_obd *lov;
1056         struct lov_oinfo *loi;
1057         int rc = 0, i;
1058         ENTRY;
1059
1060         if (!lsm) {
1061                 CERROR("LOV requires striping ea for lock cancellation\n");
1062                 RETURN(-EINVAL);
1063         }
1064
1065         if (!export || !export->exp_obd)
1066                 RETURN(-ENODEV);
1067
1068         lov = &export->exp_obd->u.lov;
1069         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1070                 struct lov_stripe_md submd;
1071
1072                 submd.lsm_object_id = loi->loi_id;
1073                 submd.lsm_mds_easize = lov_mds_md_size(lsm->lsm_ost_count);
1074                 submd.lsm_stripe_count = 0;
1075                 rc = obd_cancel_unused(&lov->tgts[loi->loi_ost_idx].conn,
1076                                        &submd, local_only);
1077                 if (rc)
1078                         CERROR("Error cancel unused objid "LPX64" subobj "LPX64
1079                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1080                                loi->loi_id, loi->loi_ost_idx, rc);
1081         }
1082         RETURN(rc);
1083 }
1084
1085 static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
1086 {
1087         struct obd_export *export = class_conn2export(conn);
1088         struct lov_obd *lov;
1089         struct obd_statfs lov_sfs;
1090         int set = 0;
1091         int rc = 0;
1092         int i;
1093         ENTRY;
1094
1095         if (!export || !export->exp_obd)
1096                 RETURN(-ENODEV);
1097
1098         lov = &export->exp_obd->u.lov;
1099
1100         /* We only get block data from the OBD */
1101         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1102                 int err;
1103
1104                 err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
1105                 if (err) {
1106                         CERROR("Error statfs OSC %s idx %d: err = %d\n",
1107                                lov->tgts[i].uuid, i, err);
1108                         if (!rc)
1109                                 rc = err;
1110                         continue; /* XXX or break? - probably OK to continue */
1111                 }
1112                 if (!set) {
1113                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
1114                         set = 1;
1115                 } else {
1116                         osfs->os_bfree += lov_sfs.os_bfree;
1117                         osfs->os_bavail += lov_sfs.os_bavail;
1118                         osfs->os_blocks += lov_sfs.os_blocks;
1119                         /* XXX not sure about this one - depends on policy.
1120                          *   - could be minimum if we always stripe on all OBDs
1121                          *     (but that would be wrong for any other policy,
1122                          *     if one of the OBDs has no more objects left)
1123                          *   - could be sum if we stripe whole objects
1124                          *   - could be average, just to give a nice number
1125                          *   - we just pick first OST and hope it is enough
1126                         sfs->f_ffree += lov_sfs.f_ffree;
1127                          */
1128                 }
1129         }
1130         RETURN(rc);
1131 }
1132
1133 static int lov_iocontrol(long cmd, struct lustre_handle *conn, int len,
1134                          void *karg, void *uarg)
1135 {
1136         struct obd_device *obddev = class_conn2obd(conn);
1137         struct obd_ioctl_data *data = karg;
1138         struct lov_obd *lov = &obddev->u.lov;
1139         int rc, i;
1140         ENTRY;
1141
1142         switch (cmd) {
1143         case IOC_LOV_SET_OSC_ACTIVE:
1144                 rc = lov_set_osc_active(lov,data->ioc_inlbuf1,data->ioc_offset);
1145                 break;
1146         default:
1147                 rc = -ENOTTY;
1148                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1149                         int err = obd_iocontrol(cmd, &lov->tgts[i].conn,
1150                                                 len, data, NULL);
1151                         if (err && !rc)
1152                                 rc = err;
1153                 }
1154         }
1155
1156         RETURN(rc);
1157 }
1158
1159 struct obd_ops lov_obd_ops = {
1160         o_setup:       lov_setup,
1161         o_connect:     lov_connect,
1162         o_disconnect:  lov_disconnect,
1163         o_create:      lov_create,
1164         o_destroy:     lov_destroy,
1165         o_getattr:     lov_getattr,
1166         o_setattr:     lov_setattr,
1167         o_statfs:      lov_statfs,
1168         o_open:        lov_open,
1169         o_close:       lov_close,
1170         o_brw:         lov_brw,
1171         o_punch:       lov_punch,
1172         o_enqueue:     lov_enqueue,
1173         o_cancel:      lov_cancel,
1174         o_cancel_unused: lov_cancel_unused,
1175         o_iocontrol:   lov_iocontrol
1176 };
1177
1178
1179 #define LOV_VERSION "v0.1"
1180
1181 static int __init lov_init(void)
1182 {
1183         printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
1184                ", info@clusterfs.com\n");
1185         return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
1186 }
1187
1188 static void __exit lov_exit(void)
1189 {
1190         class_unregister_type(OBD_LOV_DEVICENAME);
1191 }
1192
1193 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1194 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
1195 MODULE_LICENSE("GPL");
1196
1197 module_init(lov_init);
1198 module_exit(lov_exit);