Whamcloud - gitweb
Branch:HEAD
[fs/lustre-release.git] / lustre / lov / lov_ea.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001-2005 Cluster File Systems, Inc.
5  *   Author: Wang Di <wangdi@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  */
25
26 #ifndef EXPORT_SYMTAB
27 # define EXPORT_SYMTAB
28 #endif
29 #define DEBUG_SUBSYSTEM S_LOV
30
31 #ifdef __KERNEL__
32 #include <asm/div64.h>
33 #include <libcfs/libcfs.h>
34 #else
35 #include <liblustre.h>
36 #endif
37
38 #include <obd_class.h>
39 #include <obd_lov.h>
40 #include <lustre/lustre_idl.h>
41 #include <lustre_log.h>
42
43 #include "lov_internal.h"
44
45 struct lovea_unpack_args {
46         struct lov_stripe_md *lsm;
47         int                   cursor;
48 };
49
50 static int lsm_lmm_verify_common(struct lov_mds_md *lmm, int lmm_bytes,
51                                  int stripe_count)
52 {
53
54         if (stripe_count == 0) {
55                 CERROR("bad stripe count %d\n", stripe_count);
56                 lov_dump_lmm_v1(D_WARNING, lmm);
57                 return -EINVAL;
58         }
59
60         if (lmm->lmm_object_id == 0) {
61                 CERROR("zero object id\n");
62                 lov_dump_lmm_v1(D_WARNING, lmm);
63                 return -EINVAL;
64         }
65
66         if (lmm->lmm_pattern != cpu_to_le32(LOV_PATTERN_RAID0)) {
67                 CERROR("bad striping pattern\n");
68                 lov_dump_lmm_v1(D_WARNING, lmm);
69                 return -EINVAL;
70         }
71
72         if (lmm->lmm_stripe_size == 0 ||
73             (__u64)le32_to_cpu(lmm->lmm_stripe_size)*stripe_count > 0xffffffff){
74                 CERROR("bad stripe size %u\n",
75                        le32_to_cpu(lmm->lmm_stripe_size));
76                 lov_dump_lmm_v1(D_WARNING, lmm);
77                 return -EINVAL;
78         }
79         return 0;
80 }
81
82 struct lov_stripe_md *lsm_alloc_plain(int stripe_count, int *size)
83 {
84         struct lov_stripe_md *lsm;
85         int i, oinfo_ptrs_size;
86         struct lov_oinfo *loi;
87
88         LASSERT(stripe_count > 0);
89
90         oinfo_ptrs_size = sizeof(struct lov_oinfo *) * stripe_count;
91         *size = sizeof(struct lov_stripe_md) + oinfo_ptrs_size;
92
93         OBD_ALLOC(lsm, *size);
94         if (!lsm)
95                 return NULL;;
96
97         for (i = 0; i < stripe_count; i++) {
98                 OBD_SLAB_ALLOC(loi, lov_oinfo_slab, SLAB_NOFS, sizeof(*loi));
99                 if (loi == NULL)
100                         goto err;
101                 lsm->lsm_oinfo[i] = loi;
102         }
103         lsm->lsm_stripe_count = stripe_count;
104         return lsm;
105
106 err:
107         while (--i >= 0)
108                 OBD_SLAB_FREE(lsm->lsm_oinfo[i], lov_oinfo_slab, sizeof(*loi));
109         OBD_FREE(lsm, *size);
110         return NULL;
111 }
112
113 void lsm_free_plain(struct lov_stripe_md *lsm)
114 {
115         int stripe_count = lsm->lsm_stripe_count;
116         int i;
117
118         for (i = 0; i < stripe_count; i++)
119                 OBD_SLAB_FREE(lsm->lsm_oinfo[i], lov_oinfo_slab,
120                               sizeof(struct lov_oinfo));
121         OBD_FREE(lsm, sizeof(struct lov_stripe_md) +
122                  stripe_count * sizeof(struct lov_oinfo *));
123 }
124
125 static void lsm_unpackmd_common(struct lov_stripe_md *lsm,
126                                 struct lov_mds_md *lmm)
127 {
128         lsm->lsm_object_id = le64_to_cpu(lmm->lmm_object_id);
129         lsm->lsm_object_gr = le64_to_cpu(lmm->lmm_object_gr);
130         lsm->lsm_stripe_size = le32_to_cpu(lmm->lmm_stripe_size);
131         lsm->lsm_pattern = le32_to_cpu(lmm->lmm_pattern);
132 }
133
134 static void
135 lsm_stripe_by_index_plain(struct lov_stripe_md *lsm, int *stripeno,
136                            obd_off *lov_off, unsigned long *swidth)
137 {
138         if (swidth)
139                 *swidth = (ulong)lsm->lsm_stripe_size * lsm->lsm_stripe_count;
140 }
141
142 static void
143 lsm_stripe_by_offset_plain(struct lov_stripe_md *lsm, int *stripeno,
144                            obd_off *lov_off, unsigned long *swidth)
145 {
146         if (swidth)
147                 *swidth = (ulong)lsm->lsm_stripe_size * lsm->lsm_stripe_count;
148 }
149
150 static obd_off
151 lsm_stripe_offset_by_index_plain(struct lov_stripe_md *lsm,
152                                   int stripe_index)
153 {
154         return 0;
155 }
156
157 static obd_off
158 lsm_stripe_offset_by_offset_plain(struct lov_stripe_md *lsm,
159                                   obd_off lov_off)
160 {
161         return 0;
162 }
163
164 static int
165 lsm_stripe_index_by_offset_plain(struct lov_stripe_md *lsm,
166                                   obd_off lov_off)
167 {
168         return 0;
169 }
170
171 static int lsm_revalidate_plain(struct lov_stripe_md *lsm,
172                                 struct obd_device *obd)
173 {
174         return 0;
175 }
176
177 static int lsm_destroy_plain(struct lov_stripe_md *lsm, struct obdo *oa,
178                              struct obd_export *md_exp)
179 {
180         return 0;
181 }
182
183 static int lsm_lmm_verify_plain(struct lov_mds_md *lmm, int lmm_bytes,
184                              int *stripe_count)
185 {
186         if (lmm_bytes < sizeof(*lmm)) {
187                 CERROR("lov_mds_md too small: %d, need at least %d\n",
188                        lmm_bytes, (int)sizeof(*lmm));
189                 return -EINVAL;
190         }
191
192         *stripe_count = le32_to_cpu(lmm->lmm_stripe_count);
193
194         if (lmm_bytes < lov_mds_md_v1_size(*stripe_count)) {
195                 CERROR("LOV EA too small: %d, need %d\n",
196                        lmm_bytes, lov_mds_md_v1_size(*stripe_count));
197                 lov_dump_lmm_v1(D_WARNING, lmm);
198                 return -EINVAL;
199         }
200
201         return lsm_lmm_verify_common(lmm, lmm_bytes, *stripe_count);
202 }
203
204 int lsm_unpackmd_plain(struct lov_obd *lov, struct lov_stripe_md *lsm,
205                     struct lov_mds_md_v1 *lmm)
206 {
207         struct lov_oinfo *loi;
208         int i;
209
210         lsm_unpackmd_common(lsm, lmm);
211
212         for (i = 0; i < lsm->lsm_stripe_count; i++) {
213                 /* XXX LOV STACKING call down to osc_unpackmd() */
214                 loi = lsm->lsm_oinfo[i];
215                 loi->loi_id = le64_to_cpu(lmm->lmm_objects[i].l_object_id);
216                 loi->loi_gr = le64_to_cpu(lmm->lmm_objects[i].l_object_gr);
217                 loi->loi_ost_idx = le32_to_cpu(lmm->lmm_objects[i].l_ost_idx);
218                 loi->loi_ost_gen = le32_to_cpu(lmm->lmm_objects[i].l_ost_gen);
219                 if (loi->loi_ost_idx >= lov->desc.ld_tgt_count) {
220                         CERROR("OST index %d more than OST count %d\n",
221                                loi->loi_ost_idx, lov->desc.ld_tgt_count);
222                         lov_dump_lmm_v1(D_WARNING, lmm);
223                         return -EINVAL;
224                 }
225                 if (!lov->lov_tgts[loi->loi_ost_idx]) {
226                         CERROR("OST index %d missing\n", loi->loi_ost_idx);
227                         lov_dump_lmm_v1(D_WARNING, lmm);
228                         return -EINVAL;
229                 }
230         }
231
232         return 0;
233 }
234
235 struct lsm_operations lsm_plain_ops = {
236         .lsm_free            = lsm_free_plain,
237         .lsm_destroy         = lsm_destroy_plain,
238         .lsm_stripe_by_index    = lsm_stripe_by_index_plain,
239         .lsm_stripe_by_offset   = lsm_stripe_by_offset_plain,
240         .lsm_revalidate         = lsm_revalidate_plain,
241         .lsm_stripe_offset_by_index  = lsm_stripe_offset_by_index_plain,
242         .lsm_stripe_offset_by_offset = lsm_stripe_offset_by_offset_plain,
243         .lsm_stripe_index_by_offset  = lsm_stripe_index_by_offset_plain,
244         .lsm_lmm_verify         = lsm_lmm_verify_plain,
245         .lsm_unpackmd           = lsm_unpackmd_plain,
246 };
247
248 struct lov_extent *lovea_off2le(struct lov_stripe_md *lsm, obd_off lov_off)
249 {
250         struct lov_array_info *lai;
251         struct lov_extent *le;
252         int i = 0;
253
254         LASSERT(lsm->lsm_array != NULL);
255         lai = lsm->lsm_array;
256         LASSERT(lai->lai_ext_count > 1);
257
258         for (le = lai->lai_ext_array, i = 0;
259              i < lai->lai_ext_count && le->le_start + le->le_len <= lov_off
260              && le->le_len != -1;
261              i ++, le ++) {
262                ; /* empty loop */
263         }
264
265         CDEBUG(D_INFO, "off "LPU64" idx %d, ext "LPU64":"LPU64" idx %d sc %d\n",
266                lov_off, i, le->le_start, le->le_len, le->le_loi_idx,
267                le->le_stripe_count);
268
269         RETURN(le);
270 }
271
272 struct lov_extent *lovea_idx2le(struct lov_stripe_md *lsm, int stripe_no)
273 {
274         struct lov_extent *le;
275         struct lov_array_info *lai;
276         int i, stripe_index;
277
278         LASSERT(lsm->lsm_array != NULL);
279         LASSERT(stripe_no >= 0 && stripe_no <= lsm->lsm_stripe_count);
280         lai = lsm->lsm_array;
281         LASSERT(lai->lai_ext_count > 1);
282
283         for (le = lai->lai_ext_array, i = 0, stripe_index = le->le_stripe_count;
284              i < lai->lai_ext_count && stripe_index <= stripe_no &&
285              le->le_len != -1; i ++, le ++,
286              stripe_index += le->le_stripe_count) {
287                 ; /* empty loop */
288         }
289
290         CDEBUG(D_INFO, "stripe %d idx %d, ext "LPU64":"LPU64" idx %d sc %d\n",
291                stripe_no, i, le->le_start, le->le_len, le->le_loi_idx,
292                le->le_stripe_count);
293         RETURN(le);
294 }
295
296 static void lovea_free_array_info(struct lov_stripe_md *lsm)
297 {
298         if (!lsm || !lsm->lsm_array)
299                 return;
300
301         if (lsm->lsm_array->lai_ext_array)
302                 OBD_FREE(lsm->lsm_array->lai_ext_array,
303                          lsm->lsm_array->lai_ext_count *
304                          sizeof(struct lov_extent));
305
306         OBD_FREE_PTR(lsm->lsm_array);
307 }
308
309 static void lsm_free_join(struct lov_stripe_md *lsm)
310 {
311         lovea_free_array_info(lsm);
312         lsm_free_plain(lsm);
313 }
314
315 static void
316 lsm_stripe_by_index_join(struct lov_stripe_md *lsm, int *stripeno,
317                            obd_off *lov_off, unsigned long *swidth)
318 {
319         struct lov_extent *le;
320
321         LASSERT(stripeno != NULL);
322
323         le = lovea_idx2le(lsm, *stripeno);
324
325         LASSERT(le != NULL && le->le_stripe_count != 0);
326
327         *stripeno -= le->le_loi_idx;
328
329         if (swidth)
330                 *swidth = (ulong)lsm->lsm_stripe_size * le->le_stripe_count;
331
332         if (lov_off) {
333                 struct lov_extent *lov_le = lovea_off2le(lsm, *lov_off);
334                 if (lov_le == le) {
335                         *lov_off = (*lov_off > le->le_start) ?
336                                    (*lov_off - le->le_start) : 0;
337                 } else {
338                         *lov_off = (*lov_off > le->le_start) ?
339                                    le->le_len : 0;
340                         LASSERT(*lov_off != -1);
341                 }
342         }
343 }
344
345 static void
346 lsm_stripe_by_offset_join(struct lov_stripe_md *lsm, int *stripeno,
347                            obd_off *lov_off, unsigned long *swidth)
348 {
349         struct lov_extent *le;
350
351         LASSERT(lov_off != NULL);
352
353         le = lovea_off2le(lsm, *lov_off);
354
355         LASSERT(le != NULL && le->le_stripe_count != 0);
356
357         *lov_off = (*lov_off > le->le_start) ? (*lov_off - le->le_start) : 0;
358
359         if (stripeno)
360                 *stripeno -= le->le_loi_idx;
361
362         if (swidth)
363                 *swidth = (ulong)lsm->lsm_stripe_size * le->le_stripe_count;
364 }
365
366 static obd_off
367 lsm_stripe_offset_by_index_join(struct lov_stripe_md *lsm,
368                                  int stripe_index)
369 {
370         struct lov_extent *le;
371
372         le = lovea_idx2le(lsm, stripe_index);
373
374         return le ? le->le_start : 0;
375 }
376
377 static obd_off
378 lsm_stripe_offset_by_offset_join(struct lov_stripe_md *lsm,
379                                  obd_off lov_off)
380 {
381         struct lov_extent *le;
382
383         le = lovea_off2le(lsm, lov_off);
384
385         return le ? le->le_start : 0;
386 }
387
388 static int
389 lsm_stripe_index_by_offset_join(struct lov_stripe_md *lsm,
390                                  obd_off lov_off)
391 {
392         struct lov_extent *le = NULL;
393
394         le = lovea_off2le(lsm, lov_off);
395
396         return le ? le->le_loi_idx : 0;
397 }
398
399 static int lovea_unpack_array(struct llog_handle *handle,
400                               struct llog_rec_hdr *rec, void *data)
401 {
402         struct lovea_unpack_args *args = (struct lovea_unpack_args *)data;
403         struct llog_array_rec *la_rec = (struct llog_array_rec*)rec;
404         struct mds_extent_desc *med = &la_rec->lmr_med;
405         struct lov_stripe_md *lsm = args->lsm;
406         int cursor = args->cursor++;
407         struct lov_mds_md *lmm;
408         struct lov_array_info *lai;
409         struct lov_oinfo * loi;
410         int i, loi_index;
411         ENTRY;
412
413         /* sanity check */
414         LASSERT(lsm->lsm_stripe_count != 0);
415         lmm = &med->med_lmm;
416         LASSERT(lsm->lsm_array != NULL);
417
418         lai = lsm->lsm_array;
419
420         if (cursor == 0) {
421                lai->lai_ext_array[cursor].le_loi_idx = 0;
422         } else {
423                int next_loi_index = lai->lai_ext_array[cursor - 1].le_loi_idx +
424                                  lai->lai_ext_array[cursor - 1].le_stripe_count;
425                lai->lai_ext_array[cursor].le_loi_idx = next_loi_index;
426         }
427         /* insert extent desc into lsm extent array  */
428         lai->lai_ext_array[cursor].le_start = le64_to_cpu(med->med_start);
429         lai->lai_ext_array[cursor].le_len   = le64_to_cpu(med->med_len);
430         lai->lai_ext_array[cursor].le_stripe_count = lmm->lmm_stripe_count;
431
432         /* unpack extent's lmm to lov_oinfo array */
433         loi_index = lai->lai_ext_array[cursor].le_loi_idx;
434         CDEBUG(D_INFO, "lovea upackmd cursor %d, loi_index %d extent "
435                         LPU64":"LPU64"\n", cursor, loi_index, med->med_start,
436                         med->med_len);
437
438         for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i ++, loi_index++) {
439                 /* XXX LOV STACKING call down to osc_unpackmd() */
440                 loi = lsm->lsm_oinfo[loi_index];
441                 loi->loi_id = le64_to_cpu(lmm->lmm_objects[i].l_object_id);
442                 loi->loi_gr = le64_to_cpu(lmm->lmm_objects[i].l_object_gr);
443                 loi->loi_ost_idx = le32_to_cpu(lmm->lmm_objects[i].l_ost_idx);
444                 loi->loi_ost_gen = le32_to_cpu(lmm->lmm_objects[i].l_ost_gen);
445         }
446
447         RETURN(0);
448 }
449
450 static int lsm_revalidate_join(struct lov_stripe_md *lsm,
451                                struct obd_device *obd)
452 {
453         struct llog_handle *llh;
454         struct llog_ctxt *ctxt;
455         struct lovea_unpack_args args;
456         int rc, rc2;
457         ENTRY;
458
459         LASSERT(lsm->lsm_array != NULL);
460
461         /*Revalidate lsm might be called from client or MDS server.
462          *So the ctxt might be in different position
463          */
464         ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
465         if (!ctxt)
466                 ctxt = llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT);
467
468         LASSERT(ctxt);
469
470         if (lsm->lsm_array && lsm->lsm_array->lai_ext_array)
471                 RETURN(0);
472
473         CDEBUG(D_INFO, "get lsm logid: "LPU64":"LPU64"\n",
474                lsm->lsm_array->lai_array_id.lgl_oid,
475                lsm->lsm_array->lai_array_id.lgl_ogr);
476         OBD_ALLOC(lsm->lsm_array->lai_ext_array,lsm->lsm_array->lai_ext_count *
477                                                 sizeof (struct lov_extent));
478         if (!lsm->lsm_array->lai_ext_array)
479                 RETURN(-ENOMEM);
480
481         CDEBUG(D_INFO, "get lsm logid: "LPU64":"LPU64"\n",
482                lsm->lsm_array->lai_array_id.lgl_oid,
483                lsm->lsm_array->lai_array_id.lgl_ogr);
484
485         rc = llog_create(ctxt, &llh, &lsm->lsm_array->lai_array_id, NULL);
486         if (rc)
487                 GOTO(out, rc);
488
489         args.lsm = lsm;
490         args.cursor = 0;
491         rc = llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
492         if (rc == 0)
493                 rc = llog_process(llh, lovea_unpack_array, &args, NULL);
494         rc2 = llog_close(llh);
495         if (rc == 0)
496                 rc = rc2;
497 out:
498         if (rc)
499                 lovea_free_array_info(lsm);
500         RETURN(rc);
501 }
502
503 int lsm_destroy_join(struct lov_stripe_md *lsm, struct obdo *oa, 
504                       struct obd_export *md_exp)
505 {
506         struct llog_ctxt *ctxt;
507         struct llog_handle *llh;
508         int rc = 0;
509         ENTRY;
510
511         LASSERT(md_exp != NULL);
512         ctxt = llog_get_context(md_exp->exp_obd, LLOG_LOVEA_REPL_CTXT);
513         if (!ctxt)
514                 GOTO(out, rc = -EINVAL);
515
516         LASSERT(lsm->lsm_array != NULL);
517         /*for those orphan inode, we should keep array id*/
518         if (!(oa->o_valid & OBD_MD_FLCOOKIE))
519                 RETURN(0);
520
521         LASSERT(ctxt != NULL);
522         rc = llog_create(ctxt, &llh, &lsm->lsm_array->lai_array_id,
523                          NULL);
524         if (rc)
525                 GOTO(out, rc);
526
527         rc = llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
528         if (rc == 0) {
529                 rc = llog_destroy(llh);
530         }
531         llog_free_handle(llh);
532 out:
533         RETURN(rc);
534 }
535
536 static int lsm_lmm_verify_join(struct lov_mds_md *lmm, int lmm_bytes,
537                                int *stripe_count)
538 {
539         struct lov_mds_md_join *lmmj = (struct lov_mds_md_join *)lmm;
540
541         if (lmm_bytes < sizeof(*lmmj)) {
542                 CERROR("lov_mds_md too small: %d, need at least %d\n",
543                        lmm_bytes, (int)sizeof(*lmmj));
544                 return -EINVAL;
545         }
546
547         if (lmmj->lmmj_array_id.lgl_oid == 0) {
548                 CERROR("zero array object id\n");
549                 return -EINVAL;
550         }
551
552         *stripe_count = le32_to_cpu(lmmj->lmmj_md.lmm_stripe_count);
553
554         return lsm_lmm_verify_common(&lmmj->lmmj_md, lmm_bytes, *stripe_count);
555 }
556
557 static int lovea_init_array_info(struct lov_stripe_md *lsm,
558                                  struct llog_logid *logid,
559                                  __u32 extent_count)
560 {
561         struct lov_array_info *lai;
562         ENTRY;
563
564         OBD_ALLOC_PTR(lai);
565         if (!lai)
566                 RETURN(-ENOMEM);
567
568         lai->lai_array_id = *logid;
569         lai->lai_ext_count = extent_count;
570         lsm->lsm_array = lai;
571         RETURN(0);
572 }
573
574 static int lsm_unpackmd_join(struct lov_obd *lov, struct lov_stripe_md *lsm,
575                       struct lov_mds_md *lmm)
576 {
577         struct lov_mds_md_join *lmmj = (struct lov_mds_md_join*)lmm;
578         int    rc;
579         ENTRY;
580
581         lsm_unpackmd_common(lsm, &lmmj->lmmj_md);
582
583         rc = lovea_init_array_info(lsm, &lmmj->lmmj_array_id,
584                                    lmmj->lmmj_extent_count);
585         if (rc) {
586                 CERROR("Init joined lsm id"LPU64" arrary error %d",
587                         lsm->lsm_object_id, rc);
588                 GOTO(out, rc);
589         }
590 out:
591         RETURN(rc);
592 }
593
594 struct lsm_operations lsm_join_ops = {
595         .lsm_free             = lsm_free_join,
596         .lsm_destroy          = lsm_destroy_join,
597         .lsm_stripe_by_index  = lsm_stripe_by_index_join,
598         .lsm_stripe_by_offset = lsm_stripe_by_offset_join,
599         .lsm_revalidate       = lsm_revalidate_join,
600         .lsm_stripe_offset_by_index  = lsm_stripe_offset_by_index_join,
601         .lsm_stripe_offset_by_offset = lsm_stripe_offset_by_offset_join,
602         .lsm_stripe_index_by_offset  = lsm_stripe_index_by_offset_join,
603         .lsm_lmm_verify         = lsm_lmm_verify_join,
604         .lsm_unpackmd           = lsm_unpackmd_join,
605 };
606
607