Whamcloud - gitweb
port llog fixes from b1_6 into HEAD
[fs/lustre-release.git] / lustre / lov / lov_ea.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001-2005 Cluster File Systems, Inc.
5  *   Author: Wang Di <wangdi@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  */
25
26 #ifndef EXPORT_SYMTAB
27 # define EXPORT_SYMTAB
28 #endif
29 #define DEBUG_SUBSYSTEM S_LOV
30
31 #ifdef __KERNEL__
32 #include <asm/div64.h>
33 #include <libcfs/libcfs.h>
34 #else
35 #include <liblustre.h>
36 #endif
37
38 #include <obd_class.h>
39 #include <obd_lov.h>
40 #include <lustre/lustre_idl.h>
41 #include <lustre_log.h>
42
43 #include "lov_internal.h"
44
45 struct lovea_unpack_args {
46         struct lov_stripe_md *lsm;
47         int                   cursor;
48 };
49
50 static int lsm_lmm_verify_common(struct lov_mds_md *lmm, int lmm_bytes,
51                                  int stripe_count)
52 {
53
54         if (stripe_count == 0 || stripe_count > LOV_V1_INSANE_STRIPE_COUNT) {
55                 CERROR("bad stripe count %d\n", stripe_count);
56                 lov_dump_lmm_v1(D_WARNING, lmm);
57                 return -EINVAL;
58         }
59         
60         if (lmm->lmm_object_id == 0) {
61                 CERROR("zero object id\n");
62                 lov_dump_lmm_v1(D_WARNING, lmm);
63                 return -EINVAL;
64         }
65         
66         if (lmm->lmm_pattern != cpu_to_le32(LOV_PATTERN_RAID0)) {
67                 CERROR("bad striping pattern\n");
68                 lov_dump_lmm_v1(D_WARNING, lmm);
69                 return -EINVAL;
70         }
71
72         if (lmm->lmm_stripe_size == 0 ||
73             (stripe_count != -1 &&
74              (__u64)le32_to_cpu(lmm->lmm_stripe_size)*stripe_count >
75              0xffffffff)) {
76                 CERROR("bad stripe size %u\n",
77                        le32_to_cpu(lmm->lmm_stripe_size));
78                 lov_dump_lmm_v1(D_WARNING, lmm);
79                 return -EINVAL;
80         }
81         return 0;
82 }
83
84 struct lov_stripe_md *lsm_alloc_plain(int stripe_count, int *size)
85 {
86         struct lov_stripe_md *lsm;
87         int i, oinfo_ptrs_size;
88         struct lov_oinfo *loi;
89
90         LASSERT(stripe_count > 0);
91
92         oinfo_ptrs_size = sizeof(struct lov_oinfo *) * stripe_count;
93         *size = sizeof(struct lov_stripe_md) + oinfo_ptrs_size;
94
95         OBD_ALLOC(lsm, *size);
96         if (!lsm)
97                 return NULL;;
98
99         for (i = 0; i < stripe_count; i++) {
100                 OBD_SLAB_ALLOC(loi, lov_oinfo_slab, CFS_ALLOC_IO, sizeof(*loi));
101                 if (loi == NULL)
102                         goto err;
103                 lsm->lsm_oinfo[i] = loi;
104         }
105         lsm->lsm_stripe_count = stripe_count;
106         return lsm;
107
108 err:
109         while (--i >= 0)
110                 OBD_SLAB_FREE(lsm->lsm_oinfo[i], lov_oinfo_slab, sizeof(*loi));
111         OBD_FREE(lsm, *size);
112         return NULL;
113 }
114
115 void lsm_free_plain(struct lov_stripe_md *lsm)
116 {
117         int stripe_count = lsm->lsm_stripe_count;
118         int i;
119
120         for (i = 0; i < stripe_count; i++)
121                 OBD_SLAB_FREE(lsm->lsm_oinfo[i], lov_oinfo_slab,
122                               sizeof(struct lov_oinfo));
123         OBD_FREE(lsm, sizeof(struct lov_stripe_md) +
124                  stripe_count * sizeof(struct lov_oinfo *));
125 }
126
127 static void lsm_unpackmd_common(struct lov_stripe_md *lsm,
128                                 struct lov_mds_md *lmm)
129 {
130         lsm->lsm_object_id = le64_to_cpu(lmm->lmm_object_id);
131         lsm->lsm_object_gr = le64_to_cpu(lmm->lmm_object_gr);
132         lsm->lsm_stripe_size = le32_to_cpu(lmm->lmm_stripe_size);
133         lsm->lsm_pattern = le32_to_cpu(lmm->lmm_pattern);
134 }
135
136 static void
137 lsm_stripe_by_index_plain(struct lov_stripe_md *lsm, int *stripeno,
138                            obd_off *lov_off, unsigned long *swidth)
139 {
140         if (swidth)
141                 *swidth = (ulong)lsm->lsm_stripe_size * lsm->lsm_stripe_count;
142 }
143
144 static void
145 lsm_stripe_by_offset_plain(struct lov_stripe_md *lsm, int *stripeno,
146                            obd_off *lov_off, unsigned long *swidth)
147 {
148         if (swidth)
149                 *swidth = (ulong)lsm->lsm_stripe_size * lsm->lsm_stripe_count;
150 }
151
152 static obd_off
153 lsm_stripe_offset_by_index_plain(struct lov_stripe_md *lsm,
154                                   int stripe_index)
155 {
156         return 0;
157 }
158
159 static obd_off
160 lsm_stripe_offset_by_offset_plain(struct lov_stripe_md *lsm,
161                                   obd_off lov_off)
162 {
163         return 0;
164 }
165
166 static int
167 lsm_stripe_index_by_offset_plain(struct lov_stripe_md *lsm,
168                                   obd_off lov_off)
169 {
170         return 0;
171 }
172
173 static int lsm_revalidate_plain(struct lov_stripe_md *lsm,
174                                 struct obd_device *obd)
175 {
176         return 0;
177 }
178
179 static int lsm_destroy_plain(struct lov_stripe_md *lsm, struct obdo *oa,
180                              struct obd_export *md_exp)
181 {
182         return 0;
183 }
184
185 static int lsm_lmm_verify_plain(struct lov_mds_md *lmm, int lmm_bytes,
186                              int *stripe_count)
187 {
188         if (lmm_bytes < sizeof(*lmm)) {
189                 CERROR("lov_mds_md too small: %d, need at least %d\n",
190                        lmm_bytes, (int)sizeof(*lmm));
191                 return -EINVAL;
192         }
193
194         *stripe_count = le32_to_cpu(lmm->lmm_stripe_count);
195
196         if (lmm_bytes < lov_mds_md_v1_size(*stripe_count)) {
197                 CERROR("LOV EA too small: %d, need %d\n",
198                        lmm_bytes, lov_mds_md_v1_size(*stripe_count));
199                 lov_dump_lmm_v1(D_WARNING, lmm);
200                 return -EINVAL;
201         }
202
203         return lsm_lmm_verify_common(lmm, lmm_bytes, *stripe_count);
204 }
205
206 int lsm_unpackmd_plain(struct lov_obd *lov, struct lov_stripe_md *lsm,
207                     struct lov_mds_md_v1 *lmm)
208 {
209         struct lov_oinfo *loi;
210         int i;
211
212         lsm_unpackmd_common(lsm, lmm);
213
214         for (i = 0; i < lsm->lsm_stripe_count; i++) {
215                 /* XXX LOV STACKING call down to osc_unpackmd() */
216                 loi = lsm->lsm_oinfo[i];
217                 loi->loi_id = le64_to_cpu(lmm->lmm_objects[i].l_object_id);
218                 loi->loi_gr = le64_to_cpu(lmm->lmm_objects[i].l_object_gr);
219                 loi->loi_ost_idx = le32_to_cpu(lmm->lmm_objects[i].l_ost_idx);
220                 loi->loi_ost_gen = le32_to_cpu(lmm->lmm_objects[i].l_ost_gen);
221                 if (loi->loi_ost_idx >= lov->desc.ld_tgt_count) {
222                         CERROR("OST index %d more than OST count %d\n",
223                                loi->loi_ost_idx, lov->desc.ld_tgt_count);
224                         lov_dump_lmm_v1(D_WARNING, lmm);
225                         return -EINVAL;
226                 }
227                 if (!lov->lov_tgts[loi->loi_ost_idx]) {
228                         CERROR("OST index %d missing\n", loi->loi_ost_idx);
229                         lov_dump_lmm_v1(D_WARNING, lmm);
230                         return -EINVAL;
231                 }
232         }
233
234         return 0;
235 }
236
237 struct lsm_operations lsm_plain_ops = {
238         .lsm_free            = lsm_free_plain,
239         .lsm_destroy         = lsm_destroy_plain,
240         .lsm_stripe_by_index    = lsm_stripe_by_index_plain,
241         .lsm_stripe_by_offset   = lsm_stripe_by_offset_plain,
242         .lsm_revalidate         = lsm_revalidate_plain,
243         .lsm_stripe_offset_by_index  = lsm_stripe_offset_by_index_plain,
244         .lsm_stripe_offset_by_offset = lsm_stripe_offset_by_offset_plain,
245         .lsm_stripe_index_by_offset  = lsm_stripe_index_by_offset_plain,
246         .lsm_lmm_verify         = lsm_lmm_verify_plain,
247         .lsm_unpackmd           = lsm_unpackmd_plain,
248 };
249
250 struct lov_extent *lovea_off2le(struct lov_stripe_md *lsm, obd_off lov_off)
251 {
252         struct lov_array_info *lai;
253         struct lov_extent *le;
254         int i = 0;
255
256         LASSERT(lsm->lsm_array != NULL);
257         lai = lsm->lsm_array;
258         LASSERT(lai->lai_ext_count > 1);
259
260         for (le = lai->lai_ext_array, i = 0;
261              i < lai->lai_ext_count && le->le_start + le->le_len <= lov_off
262              && le->le_len != -1;
263              i ++, le ++) {
264                ; /* empty loop */
265         }
266
267         CDEBUG(D_INFO, "off "LPU64" idx %d, ext "LPU64":"LPU64" idx %d sc %d\n",
268                lov_off, i, le->le_start, le->le_len, le->le_loi_idx,
269                le->le_stripe_count);
270
271         RETURN(le);
272 }
273
274 struct lov_extent *lovea_idx2le(struct lov_stripe_md *lsm, int stripe_no)
275 {
276         struct lov_extent *le;
277         struct lov_array_info *lai;
278         int i, stripe_index;
279
280         LASSERT(lsm->lsm_array != NULL);
281         LASSERT(stripe_no >= 0 && stripe_no <= lsm->lsm_stripe_count);
282         lai = lsm->lsm_array;
283         LASSERT(lai->lai_ext_count > 1);
284
285         for (le = lai->lai_ext_array, i = 0, stripe_index = le->le_stripe_count;
286              i < lai->lai_ext_count && stripe_index <= stripe_no &&
287              le->le_len != -1; i ++, le ++,
288              stripe_index += le->le_stripe_count) {
289                 ; /* empty loop */
290         }
291
292         CDEBUG(D_INFO, "stripe %d idx %d, ext "LPU64":"LPU64" idx %d sc %d\n",
293                stripe_no, i, le->le_start, le->le_len, le->le_loi_idx,
294                le->le_stripe_count);
295         RETURN(le);
296 }
297
298 static void lovea_free_array_info(struct lov_stripe_md *lsm)
299 {
300         if (!lsm || !lsm->lsm_array)
301                 return;
302
303         if (lsm->lsm_array->lai_ext_array)
304                 OBD_FREE(lsm->lsm_array->lai_ext_array,
305                          lsm->lsm_array->lai_ext_count *
306                          sizeof(struct lov_extent));
307
308         OBD_FREE_PTR(lsm->lsm_array);
309 }
310
311 static void lsm_free_join(struct lov_stripe_md *lsm)
312 {
313         lovea_free_array_info(lsm);
314         lsm_free_plain(lsm);
315 }
316
317 static void
318 lsm_stripe_by_index_join(struct lov_stripe_md *lsm, int *stripeno,
319                            obd_off *lov_off, unsigned long *swidth)
320 {
321         struct lov_extent *le;
322
323         LASSERT(stripeno != NULL);
324
325         le = lovea_idx2le(lsm, *stripeno);
326
327         LASSERT(le != NULL && le->le_stripe_count != 0);
328
329         *stripeno -= le->le_loi_idx;
330
331         if (swidth)
332                 *swidth = (ulong)lsm->lsm_stripe_size * le->le_stripe_count;
333
334         if (lov_off) {
335                 struct lov_extent *lov_le = lovea_off2le(lsm, *lov_off);
336                 if (lov_le == le) {
337                         *lov_off = (*lov_off > le->le_start) ?
338                                    (*lov_off - le->le_start) : 0;
339                 } else {
340                         *lov_off = (*lov_off > le->le_start) ?
341                                    le->le_len : 0;
342                         LASSERT(*lov_off != -1);
343                 }
344         }
345 }
346
347 static void
348 lsm_stripe_by_offset_join(struct lov_stripe_md *lsm, int *stripeno,
349                            obd_off *lov_off, unsigned long *swidth)
350 {
351         struct lov_extent *le;
352
353         LASSERT(lov_off != NULL);
354
355         le = lovea_off2le(lsm, *lov_off);
356
357         LASSERT(le != NULL && le->le_stripe_count != 0);
358
359         *lov_off = (*lov_off > le->le_start) ? (*lov_off - le->le_start) : 0;
360
361         if (stripeno)
362                 *stripeno -= le->le_loi_idx;
363
364         if (swidth)
365                 *swidth = (ulong)lsm->lsm_stripe_size * le->le_stripe_count;
366 }
367
368 static obd_off
369 lsm_stripe_offset_by_index_join(struct lov_stripe_md *lsm,
370                                  int stripe_index)
371 {
372         struct lov_extent *le;
373
374         le = lovea_idx2le(lsm, stripe_index);
375
376         return le ? le->le_start : 0;
377 }
378
379 static obd_off
380 lsm_stripe_offset_by_offset_join(struct lov_stripe_md *lsm,
381                                  obd_off lov_off)
382 {
383         struct lov_extent *le;
384
385         le = lovea_off2le(lsm, lov_off);
386
387         return le ? le->le_start : 0;
388 }
389
390 static int
391 lsm_stripe_index_by_offset_join(struct lov_stripe_md *lsm,
392                                  obd_off lov_off)
393 {
394         struct lov_extent *le = NULL;
395
396         le = lovea_off2le(lsm, lov_off);
397
398         return le ? le->le_loi_idx : 0;
399 }
400
401 static int lovea_unpack_array(struct llog_handle *handle,
402                               struct llog_rec_hdr *rec, void *data)
403 {
404         struct lovea_unpack_args *args = (struct lovea_unpack_args *)data;
405         struct llog_array_rec *la_rec = (struct llog_array_rec*)rec;
406         struct mds_extent_desc *med = &la_rec->lmr_med;
407         struct lov_stripe_md *lsm = args->lsm;
408         int cursor = args->cursor++;
409         struct lov_mds_md *lmm;
410         struct lov_array_info *lai;
411         struct lov_oinfo * loi;
412         int i, loi_index;
413         ENTRY;
414
415         /* sanity check */
416         LASSERT(lsm->lsm_stripe_count != 0);
417         lmm = &med->med_lmm;
418         LASSERT(lsm->lsm_array != NULL);
419
420         lai = lsm->lsm_array;
421
422         if (cursor == 0) {
423                lai->lai_ext_array[cursor].le_loi_idx = 0;
424         } else {
425                int next_loi_index = lai->lai_ext_array[cursor - 1].le_loi_idx +
426                                  lai->lai_ext_array[cursor - 1].le_stripe_count;
427                lai->lai_ext_array[cursor].le_loi_idx = next_loi_index;
428         }
429         /* insert extent desc into lsm extent array  */
430         lai->lai_ext_array[cursor].le_start = le64_to_cpu(med->med_start);
431         lai->lai_ext_array[cursor].le_len   = le64_to_cpu(med->med_len);
432         lai->lai_ext_array[cursor].le_stripe_count = lmm->lmm_stripe_count;
433
434         /* unpack extent's lmm to lov_oinfo array */
435         loi_index = lai->lai_ext_array[cursor].le_loi_idx;
436         CDEBUG(D_INFO, "lovea upackmd cursor %d, loi_index %d extent "
437                         LPU64":"LPU64"\n", cursor, loi_index, med->med_start,
438                         med->med_len);
439
440         for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i ++, loi_index++) {
441                 /* XXX LOV STACKING call down to osc_unpackmd() */
442                 loi = lsm->lsm_oinfo[loi_index];
443                 loi->loi_id = le64_to_cpu(lmm->lmm_objects[i].l_object_id);
444                 loi->loi_gr = le64_to_cpu(lmm->lmm_objects[i].l_object_gr);
445                 loi->loi_ost_idx = le32_to_cpu(lmm->lmm_objects[i].l_ost_idx);
446                 loi->loi_ost_gen = le32_to_cpu(lmm->lmm_objects[i].l_ost_gen);
447         }
448
449         RETURN(0);
450 }
451
452 static int lsm_revalidate_join(struct lov_stripe_md *lsm,
453                                struct obd_device *obd)
454 {
455         struct llog_handle *llh;
456         struct llog_ctxt *ctxt;
457         struct lovea_unpack_args args;
458         int rc, rc2;
459         ENTRY;
460
461         LASSERT(lsm->lsm_array != NULL);
462
463         /*Revalidate lsm might be called from client or MDS server.
464          *So the ctxt might be in different position
465          */
466         ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
467         if (!ctxt)
468                 ctxt = llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT);
469
470         LASSERT(ctxt);
471
472         if (lsm->lsm_array && lsm->lsm_array->lai_ext_array)
473                 GOTO(release_ctxt, rc = 0);
474
475         CDEBUG(D_INFO, "get lsm logid: "LPU64":"LPU64"\n",
476                lsm->lsm_array->lai_array_id.lgl_oid,
477                lsm->lsm_array->lai_array_id.lgl_ogr);
478         OBD_ALLOC(lsm->lsm_array->lai_ext_array,lsm->lsm_array->lai_ext_count *
479                                                 sizeof (struct lov_extent));
480         if (!lsm->lsm_array->lai_ext_array)
481                 GOTO(release_ctxt, rc = -ENOMEM);        
482
483         CDEBUG(D_INFO, "get lsm logid: "LPU64":"LPU64"\n",
484                lsm->lsm_array->lai_array_id.lgl_oid,
485                lsm->lsm_array->lai_array_id.lgl_ogr);
486
487         rc = llog_create(ctxt, &llh, &lsm->lsm_array->lai_array_id, NULL);
488         if (rc)
489                 GOTO(out, rc);
490
491         args.lsm = lsm;
492         args.cursor = 0;
493         rc = llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
494         if (rc == 0)
495                 rc = llog_process(llh, lovea_unpack_array, &args, NULL);
496         rc2 = llog_close(llh);
497         if (rc == 0)
498                 rc = rc2;
499 out:
500         if (rc)
501                 lovea_free_array_info(lsm);
502 release_ctxt:
503         llog_ctxt_put(ctxt);
504         RETURN(rc);
505 }
506
507 int lsm_destroy_join(struct lov_stripe_md *lsm, struct obdo *oa, 
508                       struct obd_export *md_exp)
509 {
510         struct llog_ctxt *ctxt;
511         struct llog_handle *llh;
512         int rc = 0;
513         ENTRY;
514
515         LASSERT(md_exp != NULL);
516         /*for those orphan inode, we should keep array id*/
517         if (!(oa->o_valid & OBD_MD_FLCOOKIE))
518                 RETURN(rc);
519
520         ctxt = llog_get_context(md_exp->exp_obd, LLOG_LOVEA_REPL_CTXT);
521         if (!ctxt)
522                 RETURN(-EINVAL);
523
524         LASSERT(lsm->lsm_array != NULL);
525         rc = llog_create(ctxt, &llh, &lsm->lsm_array->lai_array_id,
526                          NULL);
527         if (rc)
528                 GOTO(out, rc);
529
530         rc = llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
531         if (rc == 0) {
532                 rc = llog_destroy(llh);
533         }
534         llog_free_handle(llh);
535 out:
536         llog_ctxt_put(ctxt);
537         RETURN(rc);
538 }
539
540 static int lsm_lmm_verify_join(struct lov_mds_md *lmm, int lmm_bytes,
541                                int *stripe_count)
542 {
543         struct lov_mds_md_join *lmmj = (struct lov_mds_md_join *)lmm;
544
545         if (lmm_bytes < sizeof(*lmmj)) {
546                 CERROR("lov_mds_md too small: %d, need at least %d\n",
547                        lmm_bytes, (int)sizeof(*lmmj));
548                 return -EINVAL;
549         }
550
551         if (lmmj->lmmj_array_id.lgl_oid == 0) {
552                 CERROR("zero array object id\n");
553                 return -EINVAL;
554         }
555
556         *stripe_count = le32_to_cpu(lmmj->lmmj_md.lmm_stripe_count);
557
558         return lsm_lmm_verify_common(&lmmj->lmmj_md, lmm_bytes, *stripe_count);
559 }
560
561 static int lovea_init_array_info(struct lov_stripe_md *lsm,
562                                  struct llog_logid *logid,
563                                  __u32 extent_count)
564 {
565         struct lov_array_info *lai;
566         ENTRY;
567
568         OBD_ALLOC_PTR(lai);
569         if (!lai)
570                 RETURN(-ENOMEM);
571
572         lai->lai_array_id = *logid;
573         lai->lai_ext_count = extent_count;
574         lsm->lsm_array = lai;
575         RETURN(0);
576 }
577
578 static int lsm_unpackmd_join(struct lov_obd *lov, struct lov_stripe_md *lsm,
579                       struct lov_mds_md *lmm)
580 {
581         struct lov_mds_md_join *lmmj = (struct lov_mds_md_join*)lmm;
582         int    rc;
583         ENTRY;
584
585         lsm_unpackmd_common(lsm, &lmmj->lmmj_md);
586
587         rc = lovea_init_array_info(lsm, &lmmj->lmmj_array_id,
588                                    lmmj->lmmj_extent_count);
589         if (rc) {
590                 CERROR("Init joined lsm id"LPU64" arrary error %d",
591                         lsm->lsm_object_id, rc);
592                 GOTO(out, rc);
593         }
594 out:
595         RETURN(rc);
596 }
597
598 struct lsm_operations lsm_join_ops = {
599         .lsm_free             = lsm_free_join,
600         .lsm_destroy          = lsm_destroy_join,
601         .lsm_stripe_by_index  = lsm_stripe_by_index_join,
602         .lsm_stripe_by_offset = lsm_stripe_by_offset_join,
603         .lsm_revalidate       = lsm_revalidate_join,
604         .lsm_stripe_offset_by_index  = lsm_stripe_offset_by_index_join,
605         .lsm_stripe_offset_by_offset = lsm_stripe_offset_by_offset_join,
606         .lsm_stripe_index_by_offset  = lsm_stripe_index_by_offset_join,
607         .lsm_lmm_verify         = lsm_lmm_verify_join,
608         .lsm_unpackmd           = lsm_unpackmd_join,
609 };
610
611