Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / lov / lov_ea.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/lov/lov_ea.c
32  *
33  * Author: Wang Di <wangdi@clusterfs.com>
34  */
35
36 #define DEBUG_SUBSYSTEM S_LOV
37
38 #include <linux/math64.h>
39 #include <linux/sort.h>
40 #include <libcfs/libcfs.h>
41
42 #include <obd_class.h>
43 #include "lov_internal.h"
44
45 static inline void
46 lu_extent_le_to_cpu(struct lu_extent *dst, const struct lu_extent *src)
47 {
48         dst->e_start = le64_to_cpu(src->e_start);
49         dst->e_end = le64_to_cpu(src->e_end);
50 }
51
52 /*
53  * Find minimum stripe maxbytes value.  For inactive or
54  * reconnecting targets use LUSTRE_EXT3_STRIPE_MAXBYTES.
55  */
56 static loff_t lov_tgt_maxbytes(struct lov_tgt_desc *tgt)
57 {
58         struct obd_import *imp;
59         loff_t maxbytes = LUSTRE_EXT3_STRIPE_MAXBYTES;
60
61         if (!tgt->ltd_active)
62                 return maxbytes;
63
64         imp = tgt->ltd_obd->u.cli.cl_import;
65         if (!imp)
66                 return maxbytes;
67
68         spin_lock(&imp->imp_lock);
69         if ((imp->imp_state == LUSTRE_IMP_FULL ||
70             imp->imp_state == LUSTRE_IMP_IDLE) &&
71             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES) &&
72             imp->imp_connect_data.ocd_maxbytes > 0)
73                 maxbytes = imp->imp_connect_data.ocd_maxbytes;
74
75         spin_unlock(&imp->imp_lock);
76
77         return maxbytes;
78 }
79
80 static int lsm_lmm_verify_v1v3(struct lov_mds_md *lmm, size_t lmm_size,
81                                u16 stripe_count)
82 {
83         u32 pattern = le32_to_cpu(lmm->lmm_pattern);
84         int rc = 0;
85
86         if (stripe_count > LOV_V1_INSANE_STRIPE_COUNT) {
87                 rc = -EINVAL;
88                 CERROR("lov: bad stripe count %d: rc = %d\n",
89                        stripe_count, rc);
90                 lov_dump_lmm_common(D_WARNING, lmm);
91                 goto out;
92         }
93
94         if (lmm_oi_id(&lmm->lmm_oi) == 0) {
95                 rc = -EINVAL;
96                 CERROR("lov: zero object id: rc = %d\n", rc);
97                 lov_dump_lmm_common(D_WARNING, lmm);
98                 goto out;
99         }
100
101         if (!lov_pattern_supported(lov_pattern(pattern))) {
102                 static int nr;
103                 static ktime_t time2_clear_nr;
104                 ktime_t now = ktime_get();
105
106                 /* limit this message 20 times within 24h */
107                 if (ktime_after(now, time2_clear_nr)) {
108                         nr = 0;
109                         time2_clear_nr = ktime_add_ms(now,
110                                                       24 * 3600 * MSEC_PER_SEC);
111                 }
112                 if (nr++ < 20) {
113                         CWARN("lov: unrecognized striping pattern: rc = %d\n",
114                               rc);
115                         lov_dump_lmm_common(D_WARNING, lmm);
116                 }
117                 goto out;
118         }
119
120         if (lmm->lmm_stripe_size == 0 ||
121             (le32_to_cpu(lmm->lmm_stripe_size)&(LOV_MIN_STRIPE_SIZE-1)) != 0) {
122                 rc = -EINVAL;
123                 CERROR("lov: bad stripe size %u: rc = %d\n",
124                        le32_to_cpu(lmm->lmm_stripe_size), rc);
125                 lov_dump_lmm_common(D_WARNING, lmm);
126                 goto out;
127         }
128
129 out:
130         return rc;
131 }
132
133 static void lsme_free(struct lov_stripe_md_entry *lsme)
134 {
135         unsigned int stripe_count;
136         unsigned int i;
137         size_t lsme_size;
138
139         if (lsme->lsme_magic == LOV_MAGIC_FOREIGN) {
140                 /*
141                  * TODO: In addition to HSM foreign layout, It needs to add
142                  * support for other kinds of foreign layout types such as
143                  * DAOS, S3. When add these supports, it will use non-inline
144                  * @lov_hsm_base to store layout information, and need to
145                  * free extra allocated buffer.
146                  */
147                 OBD_FREE_LARGE(lsme, sizeof(*lsme));
148                 return;
149         }
150
151         stripe_count = lsme->lsme_stripe_count;
152         if (!lsme_inited(lsme) ||
153             lsme->lsme_pattern & LOV_PATTERN_F_RELEASED ||
154             !lov_supported_comp_magic(lsme->lsme_magic) ||
155             !lov_pattern_supported(lov_pattern(lsme->lsme_pattern)))
156                 stripe_count = 0;
157         for (i = 0; i < stripe_count; i++)
158                 OBD_SLAB_FREE_PTR(lsme->lsme_oinfo[i], lov_oinfo_slab);
159
160         lsme_size = offsetof(typeof(*lsme), lsme_oinfo[stripe_count]);
161         OBD_FREE_LARGE(lsme, lsme_size);
162 }
163
164 void lsm_free(struct lov_stripe_md *lsm)
165 {
166         unsigned int entry_count = lsm->lsm_entry_count;
167         unsigned int i;
168         size_t lsm_size;
169
170         if (lsm->lsm_magic == LOV_MAGIC_FOREIGN) {
171                 OBD_FREE_LARGE(lsm_foreign(lsm), lsm->lsm_foreign_size);
172         } else {
173                 for (i = 0; i < entry_count; i++)
174                         lsme_free(lsm->lsm_entries[i]);
175         }
176
177         lsm_size = lsm->lsm_magic == LOV_MAGIC_FOREIGN ?
178                    offsetof(typeof(*lsm), lsm_entries[1]) :
179                    offsetof(typeof(*lsm), lsm_entries[entry_count]);
180         OBD_FREE(lsm, lsm_size);
181 }
182
183 /**
184  * Unpack a struct lov_mds_md into a struct lov_stripe_md_entry.
185  *
186  * The caller should set id and extent.
187  */
188 static struct lov_stripe_md_entry *
189 lsme_unpack(struct lov_obd *lov, struct lov_mds_md *lmm, size_t buf_size,
190             const char *pool_name, bool inited, struct lov_ost_data_v1 *objects,
191             loff_t *maxbytes)
192 {
193         struct lov_stripe_md_entry *lsme;
194         size_t lsme_size;
195         loff_t min_stripe_maxbytes = 0;
196         loff_t lov_bytes;
197         u32 magic;
198         u32 pattern;
199         time64_t retry_limit = 0;
200         unsigned int stripe_count;
201         unsigned int i;
202         int rc;
203
204         magic = le32_to_cpu(lmm->lmm_magic);
205         if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3)
206                 RETURN(ERR_PTR(-EINVAL));
207
208         pattern = le32_to_cpu(lmm->lmm_pattern);
209         if (pattern & LOV_PATTERN_F_RELEASED || !inited ||
210             !lov_pattern_supported(lov_pattern(pattern)))
211                 stripe_count = 0;
212         else
213                 stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
214
215         if (buf_size < lov_mds_md_size(stripe_count, magic)) {
216                 CERROR("LOV EA %s too small: %zu, need %u\n",
217                        magic == LOV_MAGIC_V1 ? "V1" : "V3", buf_size,
218                        lov_mds_md_size(stripe_count, magic == LOV_MAGIC_V1 ?
219                                        LOV_MAGIC_V1 : LOV_MAGIC_V3));
220                 lov_dump_lmm_common(D_WARNING, lmm);
221                 return ERR_PTR(-EINVAL);
222         }
223
224         rc = lsm_lmm_verify_v1v3(lmm, buf_size, stripe_count);
225         if (rc < 0)
226                 return ERR_PTR(rc);
227
228         lsme_size = offsetof(typeof(*lsme), lsme_oinfo[stripe_count]);
229         OBD_ALLOC_LARGE(lsme, lsme_size);
230         if (!lsme)
231                 RETURN(ERR_PTR(-ENOMEM));
232
233         lsme->lsme_magic = magic;
234         lsme->lsme_pattern = pattern;
235         lsme->lsme_flags = 0;
236         lsme->lsme_stripe_size = le32_to_cpu(lmm->lmm_stripe_size);
237         /* preserve the possible -1 stripe count for uninstantiated component */
238         lsme->lsme_stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
239         lsme->lsme_layout_gen = le16_to_cpu(lmm->lmm_layout_gen);
240
241         if (pool_name) {
242                 size_t pool_name_len;
243
244                 pool_name_len = strscpy(lsme->lsme_pool_name, pool_name,
245                                         sizeof(lsme->lsme_pool_name));
246                 if (pool_name_len < 0)
247                         GOTO(out_lsme, rc = pool_name_len);
248         }
249
250         /* with Data-on-MDT set maxbytes to stripe size */
251         if (lsme_is_dom(lsme)) {
252                 if (maxbytes) {
253                         lov_bytes = lsme->lsme_stripe_size;
254                         goto out_dom1;
255                 } else {
256                         goto out_dom2;
257                 }
258         }
259
260         for (i = 0; i < stripe_count; i++) {
261                 struct lov_oinfo *loi;
262                 struct lov_tgt_desc *ltd = NULL;
263                 static time64_t next_print;
264                 unsigned int level;
265
266                 OBD_SLAB_ALLOC_PTR_GFP(loi, lov_oinfo_slab, GFP_NOFS);
267                 if (!loi)
268                         GOTO(out_lsme, rc = -ENOMEM);
269
270                 lsme->lsme_oinfo[i] = loi;
271
272                 ostid_le_to_cpu(&objects[i].l_ost_oi, &loi->loi_oi);
273                 loi->loi_ost_idx = le32_to_cpu(objects[i].l_ost_idx);
274                 loi->loi_ost_gen = le32_to_cpu(objects[i].l_ost_gen);
275                 if (lov_oinfo_is_dummy(loi))
276                         continue;
277
278 retry_new_ost:
279                 if (unlikely((u32)loi->loi_ost_idx >= lov->desc.ld_tgt_count ||
280                              !(ltd = lov->lov_tgts[loi->loi_ost_idx]))) {
281                         time64_t now = ktime_get_seconds();
282
283                         /* print message on the first hit, error if giving up */
284                         if (retry_limit == 0) {
285                                 level = now > next_print ? D_WARNING : D_INFO;
286                                 retry_limit = now + RECONNECT_DELAY_MAX;
287                         } else if (now > retry_limit) {
288                                 level = D_ERROR;
289                         } else {
290                                 level = D_INFO;
291                         }
292
293                         /* log debug every loop, just to see it is trying */
294                         CDEBUG_LIMIT(level,
295                                 (u32)loi->loi_ost_idx < lov->desc.ld_tgt_count ?
296                                 "%s: FID "DOSTID" OST index %d/%u missing\n" :
297                                 "%s: FID "DOSTID" OST index %d more than OST count %u\n",
298                                 lov->desc.ld_uuid.uuid, POSTID(&loi->loi_oi),
299                                 loi->loi_ost_idx, lov->desc.ld_tgt_count);
300
301                         if ((u32)loi->loi_ost_idx >= LOV_V1_INSANE_STRIPE_COUNT)
302                                 GOTO(out_lsme, rc = -EINVAL);
303
304                         if (now > next_print) {
305                                 LCONSOLE_INFO("%s: wait %ds while client connects to new OST\n",
306                                               lov->desc.ld_uuid.uuid,
307                                               (int)(retry_limit - now));
308                                 next_print = retry_limit + 600;
309                         }
310                         if (now < retry_limit) {
311                                 rc = schedule_timeout_interruptible(cfs_time_seconds(1));
312                                 if (rc == 0)
313                                         goto retry_new_ost;
314                         }
315                         lov_dump_lmm_v1(D_WARNING, lmm);
316                         GOTO(out_lsme, rc = -EINVAL);
317                 }
318
319                 lov_bytes = lov_tgt_maxbytes(ltd);
320                 if (min_stripe_maxbytes == 0 || lov_bytes < min_stripe_maxbytes)
321                         min_stripe_maxbytes = lov_bytes;
322         }
323
324         if (maxbytes) {
325                 if (min_stripe_maxbytes == 0)
326                         min_stripe_maxbytes = LUSTRE_EXT3_STRIPE_MAXBYTES;
327
328                 if (stripe_count == 0)
329                         stripe_count = lov->desc.ld_tgt_count;
330
331                 if (min_stripe_maxbytes <= LLONG_MAX / stripe_count)
332                         lov_bytes = min_stripe_maxbytes * stripe_count;
333                 else
334                         lov_bytes = MAX_LFS_FILESIZE;
335 out_dom1:
336                 *maxbytes = min_t(loff_t, lov_bytes, MAX_LFS_FILESIZE);
337         }
338 out_dom2:
339
340         return lsme;
341
342 out_lsme:
343         for (i = 0; i < stripe_count; i++) {
344                 struct lov_oinfo *loi = lsme->lsme_oinfo[i];
345
346                 if (loi)
347                         OBD_SLAB_FREE_PTR(lsme->lsme_oinfo[i], lov_oinfo_slab);
348         }
349         OBD_FREE_LARGE(lsme, lsme_size);
350
351         return ERR_PTR(rc);
352 }
353
354 static struct
355 lov_stripe_md *lsm_unpackmd_v1v3(struct lov_obd *lov, struct lov_mds_md *lmm,
356                                  size_t buf_size, const char *pool_name,
357                                  struct lov_ost_data_v1 *objects)
358 {
359         struct lov_stripe_md *lsm;
360         struct lov_stripe_md_entry *lsme;
361         size_t lsm_size;
362         loff_t maxbytes;
363         u32 pattern;
364         int rc;
365
366         pattern = le32_to_cpu(lmm->lmm_pattern);
367
368         lsme = lsme_unpack(lov, lmm, buf_size, pool_name, true, objects,
369                            &maxbytes);
370         if (IS_ERR(lsme))
371                 RETURN(ERR_CAST(lsme));
372
373         lsme->lsme_flags = LCME_FL_INIT;
374         lsme->lsme_extent.e_start = 0;
375         lsme->lsme_extent.e_end = LUSTRE_EOF;
376
377         lsm_size = offsetof(typeof(*lsm), lsm_entries[1]);
378         OBD_ALLOC(lsm, lsm_size);
379         if (!lsm)
380                 GOTO(out_lsme, rc = -ENOMEM);
381
382         atomic_set(&lsm->lsm_refc, 1);
383         spin_lock_init(&lsm->lsm_lock);
384         lsm->lsm_maxbytes = maxbytes;
385         lmm_oi_le_to_cpu(&lsm->lsm_oi, &lmm->lmm_oi);
386         lsm->lsm_magic = le32_to_cpu(lmm->lmm_magic);
387         lsm->lsm_layout_gen = le16_to_cpu(lmm->lmm_layout_gen);
388         lsm->lsm_entry_count = 1;
389         lsm->lsm_is_released = pattern & LOV_PATTERN_F_RELEASED;
390         lsm->lsm_entries[0] = lsme;
391
392         return lsm;
393
394 out_lsme:
395         lsme_free(lsme);
396
397         return ERR_PTR(rc);
398 }
399
400 static struct lov_stripe_md *
401 lsm_unpackmd_v1(struct lov_obd *lov, void *buf, size_t buf_size)
402 {
403         struct lov_mds_md_v1 *lmm = buf;
404
405         return lsm_unpackmd_v1v3(lov, buf, buf_size, NULL, lmm->lmm_objects);
406 }
407
408 static const struct lsm_operations lsm_v1_ops = {
409         .lsm_unpackmd           = lsm_unpackmd_v1,
410 };
411
412 static struct lov_stripe_md *
413 lsm_unpackmd_v3(struct lov_obd *lov, void *buf, size_t buf_size)
414 {
415         struct lov_mds_md_v3 *lmm = buf;
416
417         return lsm_unpackmd_v1v3(lov, buf, buf_size, lmm->lmm_pool_name,
418                                  lmm->lmm_objects);
419 }
420
421 static const struct lsm_operations lsm_v3_ops = {
422         .lsm_unpackmd           = lsm_unpackmd_v3,
423 };
424
425 static int lsm_verify_comp_md_v1(struct lov_comp_md_v1 *lcm,
426                                  size_t lcm_buf_size)
427 {
428         unsigned int entry_count;
429         unsigned int i;
430         size_t lcm_size;
431
432         lcm_size = le32_to_cpu(lcm->lcm_size);
433         if (lcm_buf_size < lcm_size) {
434                 CERROR("bad LCM buffer size %zu, expected %zu\n",
435                        lcm_buf_size, lcm_size);
436                 RETURN(-EINVAL);
437         }
438
439         entry_count = le16_to_cpu(lcm->lcm_entry_count);
440         for (i = 0; i < entry_count; i++) {
441                 struct lov_comp_md_entry_v1 *lcme = &lcm->lcm_entries[i];
442                 size_t blob_offset;
443                 size_t blob_size;
444
445                 blob_offset = le32_to_cpu(lcme->lcme_offset);
446                 blob_size = le32_to_cpu(lcme->lcme_size);
447
448                 if (lcm_size < blob_offset || lcm_size < blob_size ||
449                     lcm_size < blob_offset + blob_size) {
450                         CERROR("LCM entry %u has invalid blob: "
451                                "LCM size = %zu, offset = %zu, size = %zu\n",
452                                le32_to_cpu(lcme->lcme_id),
453                                lcm_size, blob_offset, blob_size);
454                         RETURN(-EINVAL);
455                 }
456         }
457
458         return 0;
459 }
460
461 static struct lov_stripe_md_entry *
462 lsme_unpack_foreign(struct lov_obd *lov, void *buf, size_t buf_size,
463                     bool inited, loff_t *maxbytes)
464 {
465         struct lov_stripe_md_entry *lsme;
466         struct lov_foreign_md *lfm = buf;
467         size_t length;
468         __u32 magic;
469         __u32 type;
470
471         ENTRY;
472
473         magic = le32_to_cpu(lfm->lfm_magic);
474         if (magic != LOV_MAGIC_FOREIGN)
475                 RETURN(ERR_PTR(-EINVAL));
476
477         type = le32_to_cpu(lfm->lfm_type);
478         if (!lov_foreign_type_supported(type)) {
479                 CDEBUG(D_LAYOUT, "Unsupported foreign type: %u\n", type);
480                 RETURN(ERR_PTR(-EINVAL));
481         }
482
483         length = le32_to_cpu(lfm->lfm_length);
484         if (lov_foreign_size_le(lfm) > buf_size) {
485                 CDEBUG(D_LAYOUT, "LOV EA HSM too small: %zu, need %zu\n",
486                        buf_size, lov_foreign_size_le(lfm));
487                 RETURN(ERR_PTR(-EINVAL));
488         }
489
490         if (lov_hsm_type_supported(type) &&
491             length < sizeof(struct lov_hsm_base)) {
492                 CDEBUG(D_LAYOUT,
493                        "Invalid LOV HSM len: %zu, should be larger than %zu\n",
494                        length, sizeof(struct lov_hsm_base));
495                 RETURN(ERR_PTR(-EINVAL));
496         }
497
498         OBD_ALLOC_LARGE(lsme, sizeof(*lsme));
499         if (!lsme)
500                 RETURN(ERR_PTR(-ENOMEM));
501
502         lsme->lsme_magic = magic;
503         lsme->lsme_pattern = LOV_PATTERN_FOREIGN;
504         lsme->lsme_flags = 0;
505         lsme->lsme_length = length;
506         lsme->lsme_type = type;
507         lsme->lsme_foreign_flags = le32_to_cpu(lfm->lfm_flags);
508
509         /* TODO: Initialize for other kind of foreign layout such as DAOS. */
510         if (lov_hsm_type_supported(type))
511                 lov_foreign_hsm_to_cpu(&lsme->lsme_hsm, lfm);
512
513         if (maxbytes)
514                 *maxbytes = MAX_LFS_FILESIZE;
515
516         RETURN(lsme);
517 }
518
519 static struct lov_stripe_md_entry *
520 lsme_unpack_comp(struct lov_obd *lov, struct lov_mds_md *lmm,
521                  size_t lmm_buf_size, bool inited, loff_t *maxbytes)
522 {
523         unsigned int magic;
524
525         magic = le32_to_cpu(lmm->lmm_magic);
526         if (!lov_supported_comp_magic(magic)) {
527                 struct lov_stripe_md_entry *lsme;
528
529                 /* allocate a lsme holder for invalid magic lmm */
530                 OBD_ALLOC_LARGE(lsme, offsetof(typeof(*lsme), lsme_oinfo[0]));
531                 lsme->lsme_magic = magic;
532                 lsme->lsme_pattern = le32_to_cpu(lmm->lmm_pattern);
533
534                 return lsme;
535         }
536
537         if (magic != LOV_MAGIC_FOREIGN &&
538             le16_to_cpu(lmm->lmm_stripe_count) == 0 &&
539             !(lov_pattern(le32_to_cpu(lmm->lmm_pattern)) & LOV_PATTERN_MDT))
540                 RETURN(ERR_PTR(-EINVAL));
541
542         if (magic == LOV_MAGIC_FOREIGN) {
543                 return lsme_unpack_foreign(lov, lmm, lmm_buf_size,
544                                            inited, maxbytes);
545         } else if (magic == LOV_MAGIC_V1) {
546                 return lsme_unpack(lov, lmm, lmm_buf_size, NULL,
547                                    inited, lmm->lmm_objects, maxbytes);
548         } else if (magic == LOV_MAGIC_V3) {
549                 struct lov_mds_md_v3 *lmm3 = (struct lov_mds_md_v3 *)lmm;
550
551                 return lsme_unpack(lov, lmm, lmm_buf_size, lmm3->lmm_pool_name,
552                                    inited, lmm3->lmm_objects, maxbytes);
553         } else { /* LOV_MAGIC_FOREIGN */
554                 return lsme_unpack_foreign(lov, lmm, lmm_buf_size,
555                                            inited, maxbytes);
556         }
557 }
558
559 static struct lov_stripe_md *
560 lsm_unpackmd_comp_md_v1(struct lov_obd *lov, void *buf, size_t buf_size)
561 {
562         struct lov_comp_md_v1 *lcm = buf;
563         struct lov_stripe_md *lsm;
564         size_t lsm_size;
565         unsigned int entry_count = 0;
566         unsigned int i;
567         loff_t maxbytes;
568         int rc;
569
570         rc = lsm_verify_comp_md_v1(buf, buf_size);
571         if (rc < 0)
572                 return ERR_PTR(rc);
573
574         entry_count = le16_to_cpu(lcm->lcm_entry_count);
575
576         lsm_size = offsetof(typeof(*lsm), lsm_entries[entry_count]);
577         OBD_ALLOC(lsm, lsm_size);
578         if (!lsm)
579                 return ERR_PTR(-ENOMEM);
580
581         atomic_set(&lsm->lsm_refc, 1);
582         spin_lock_init(&lsm->lsm_lock);
583         lsm->lsm_magic = le32_to_cpu(lcm->lcm_magic);
584         lsm->lsm_layout_gen = le32_to_cpu(lcm->lcm_layout_gen);
585         lsm->lsm_entry_count = entry_count;
586         lsm->lsm_mirror_count = le16_to_cpu(lcm->lcm_mirror_count);
587         lsm->lsm_flags = le16_to_cpu(lcm->lcm_flags);
588         lsm->lsm_is_rdonly = lsm->lsm_flags & LCM_FL_PCC_RDONLY;
589         lsm->lsm_is_released = true;
590         lsm->lsm_maxbytes = LLONG_MIN;
591
592         for (i = 0; i < entry_count; i++) {
593                 struct lov_comp_md_entry_v1 *lcme = &lcm->lcm_entries[i];
594                 struct lov_stripe_md_entry *lsme;
595                 size_t blob_offset;
596                 size_t blob_size;
597                 void *blob;
598
599                 blob_offset = le32_to_cpu(lcme->lcme_offset);
600                 blob_size = le32_to_cpu(lcme->lcme_size);
601                 blob = (char *)lcm + blob_offset;
602
603                 if (unlikely(CFS_FAIL_CHECK(OBD_FAIL_LOV_COMP_MAGIC) &&
604                              (cfs_fail_val == i + 1)))
605                         ((struct lov_mds_md *)blob)->lmm_magic = LOV_MAGIC_BAD;
606
607                 if (unlikely(CFS_FAIL_CHECK(OBD_FAIL_LOV_COMP_PATTERN) &&
608                              (cfs_fail_val == i + 1))) {
609                         ((struct lov_mds_md *)blob)->lmm_pattern =
610                                                                 LOV_PATTERN_BAD;
611                 }
612
613                 lsme = lsme_unpack_comp(lov, blob, blob_size,
614                                         le32_to_cpu(lcme->lcme_flags) &
615                                         LCME_FL_INIT,
616                                         (i == entry_count - 1) ? &maxbytes :
617                                                                  NULL);
618                 if (IS_ERR(lsme)) {
619                         OBD_ALLOC_LARGE(lsme, sizeof(*lsme));
620                         if (!lsme)
621                                 GOTO(out_lsm, rc = -ENOMEM);
622
623                         lsme->lsme_magic = LOV_MAGIC_FOREIGN;
624                         lsme->lsme_pattern = LOV_PATTERN_FOREIGN;
625                         lsme->lsme_flags = LCME_FL_OFFLINE;
626                 }
627
628                 /**
629                  * pressume that unrecognized magic component also has valid
630                  * lsme_id/lsme_flags/lsme_extent
631                  */
632                 if (!(lsme->lsme_magic == LOV_MAGIC_FOREIGN) &&
633                     !(lsme->lsme_pattern & LOV_PATTERN_F_RELEASED))
634                         lsm->lsm_is_released = false;
635
636                 lsm->lsm_entries[i] = lsme;
637                 lsme->lsme_id = le32_to_cpu(lcme->lcme_id);
638                 lsme->lsme_flags = le32_to_cpu(lcme->lcme_flags);
639                 if (lsme->lsme_flags & LCME_FL_NOSYNC)
640                         lsme->lsme_timestamp =
641                                 le64_to_cpu(lcme->lcme_timestamp);
642                 lu_extent_le_to_cpu(&lsme->lsme_extent, &lcme->lcme_extent);
643
644                 if (i == entry_count - 1) {
645                         lsm->lsm_maxbytes = (loff_t)lsme->lsme_extent.e_start +
646                                             maxbytes;
647                         /*
648                          * the last component hasn't been defined, or
649                          * lsm_maxbytes overflowed.
650                          */
651                         if (!lsme_is_dom(lsme) &&
652                             (lsme->lsme_extent.e_end != LUSTRE_EOF ||
653                              lsm->lsm_maxbytes <
654                              (loff_t)lsme->lsme_extent.e_start))
655                                 lsm->lsm_maxbytes = MAX_LFS_FILESIZE;
656                 }
657         }
658
659         RETURN(lsm);
660
661 out_lsm:
662         for (i = 0; i < entry_count; i++)
663                 if (lsm->lsm_entries[i])
664                         lsme_free(lsm->lsm_entries[i]);
665
666         OBD_FREE(lsm, lsm_size);
667
668         RETURN(ERR_PTR(rc));
669 }
670
671 static const struct lsm_operations lsm_comp_md_v1_ops = {
672         .lsm_unpackmd           = lsm_unpackmd_comp_md_v1,
673 };
674
675 static struct
676 lov_stripe_md *lsm_unpackmd_foreign(struct lov_obd *lov, void *buf,
677                                     size_t buf_size)
678 {
679         struct lov_foreign_md *lfm = buf;
680         struct lov_stripe_md *lsm;
681         size_t lsm_size;
682         struct lov_stripe_md_entry *lsme;
683
684         lsm_size = offsetof(typeof(*lsm), lsm_entries[1]);
685         OBD_ALLOC(lsm, lsm_size);
686         if (lsm == NULL)
687                 RETURN(ERR_PTR(-ENOMEM));
688
689         atomic_set(&lsm->lsm_refc, 1);
690         spin_lock_init(&lsm->lsm_lock);
691         lsm->lsm_magic = le32_to_cpu(lfm->lfm_magic);
692         lsm->lsm_foreign_size = lov_foreign_size_le(lfm);
693
694         /* alloc for full foreign EA including format fields */
695         OBD_ALLOC_LARGE(lsme, lsm->lsm_foreign_size);
696         if (lsme == NULL) {
697                 OBD_FREE(lsm, lsm_size);
698                 RETURN(ERR_PTR(-ENOMEM));
699         }
700
701         /* copy full foreign EA including format fields */
702         memcpy(lsme, buf, lsm->lsm_foreign_size);
703
704         lsm_foreign(lsm) = lsme;
705
706         return lsm;
707 }
708
709 static const struct lsm_operations lsm_foreign_ops = {
710         .lsm_unpackmd           = lsm_unpackmd_foreign,
711 };
712
713 const struct lsm_operations *lsm_op_find(int magic)
714 {
715         switch (magic) {
716         case LOV_MAGIC_V1:
717                 return &lsm_v1_ops;
718         case LOV_MAGIC_V3:
719                 return &lsm_v3_ops;
720         case LOV_MAGIC_COMP_V1:
721                 return &lsm_comp_md_v1_ops;
722         case LOV_MAGIC_FOREIGN:
723                 return &lsm_foreign_ops;
724         default:
725                 CERROR("unrecognized lsm_magic %08x\n", magic);
726                 return NULL;
727         }
728 }
729
730 void dump_lsm(unsigned int level, const struct lov_stripe_md *lsm)
731 {
732         int i, j;
733
734         CDEBUG_LIMIT(level,
735                      "lsm %p, objid "DOSTID", maxbytes %#llx, magic 0x%08X, refc: %d, entry: %u, mirror: %u, flags: %u,layout_gen %u\n",
736                lsm, POSTID(&lsm->lsm_oi), lsm->lsm_maxbytes, lsm->lsm_magic,
737                atomic_read(&lsm->lsm_refc), lsm->lsm_entry_count,
738                lsm->lsm_mirror_count, lsm->lsm_flags, lsm->lsm_layout_gen);
739
740         if (lsm->lsm_magic == LOV_MAGIC_FOREIGN) {
741                 struct lov_foreign_md *lfm = (void *)lsm_foreign(lsm);
742
743                 CDEBUG_LIMIT(level,
744                              "foreign LOV EA, magic %x, length %u, type %x, flags %x, value '%.*s'\n",
745                        lfm->lfm_magic, lfm->lfm_length, lfm->lfm_type,
746                        lfm->lfm_flags, lfm->lfm_length, lfm->lfm_value);
747                 return;
748         }
749
750         for (i = 0; i < lsm->lsm_entry_count; i++) {
751                 struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
752
753                 if (lsme_is_foreign(lse)) {
754                         CDEBUG_LIMIT(level,
755                                    "HSM layout "DEXT ": id %u, flags: %08x, magic 0x%08X, length %u, type %x, flags %08x, archive_id %llu, archive_ver %llu, archive_uuid '%.*s'\n",
756                                    PEXT(&lse->lsme_extent), lse->lsme_id,
757                                    lse->lsme_flags, lse->lsme_magic,
758                                    lse->lsme_length, lse->lsme_type,
759                                    lse->lsme_foreign_flags,
760                                    lse->lsme_archive_id, lse->lsme_archive_ver,
761                                    (int)sizeof(lse->lsme_uuid), lse->lsme_uuid);
762                 } else {
763                         CDEBUG_LIMIT(level,
764                                    DEXT ": id: %u, flags: %x, magic 0x%08X, layout_gen %u, stripe count %u, sstripe size %u, pool: ["LOV_POOLNAMEF"]\n",
765                                    PEXT(&lse->lsme_extent), lse->lsme_id,
766                                    lse->lsme_flags, lse->lsme_magic,
767                                    lse->lsme_layout_gen, lse->lsme_stripe_count,
768                                    lse->lsme_stripe_size, lse->lsme_pool_name);
769                         if (!lsme_inited(lse) ||
770                             lse->lsme_pattern & LOV_PATTERN_F_RELEASED ||
771                             !lov_supported_comp_magic(lse->lsme_magic) ||
772                             !lov_pattern_supported(
773                                         lov_pattern(lse->lsme_pattern)))
774                                 continue;
775                         for (j = 0; j < lse->lsme_stripe_count; j++) {
776                                 CDEBUG_LIMIT(level,
777                                            "   oinfo:%p: ostid: "DOSTID" ost idx: %d gen: %d\n",
778                                            lse->lsme_oinfo[j],
779                                            POSTID(&lse->lsme_oinfo[j]->loi_oi),
780                                            lse->lsme_oinfo[j]->loi_ost_idx,
781                                            lse->lsme_oinfo[j]->loi_ost_gen);
782                         }
783                 }
784         }
785 }
786
787 /**
788  * lmm_layout_gen overlaps stripe_offset field, it needs to be reset back when
789  * sending to MDT for passing striping checks
790  */
791 void lov_fix_ea_for_replay(void *lovea)
792 {
793         struct lov_user_md *lmm = lovea;
794         struct lov_comp_md_v1 *c1;
795         int i;
796
797         switch (le32_to_cpu(lmm->lmm_magic)) {
798         case LOV_USER_MAGIC_V1:
799         case LOV_USER_MAGIC_V3:
800                 lmm->lmm_stripe_offset = LOV_OFFSET_DEFAULT;
801                 break;
802
803         case LOV_USER_MAGIC_COMP_V1:
804                 c1 = (void *)lmm;
805                 for (i = 0; i < le16_to_cpu(c1->lcm_entry_count); i++) {
806                         struct lov_comp_md_entry_v1 *ent = &c1->lcm_entries[i];
807
808                         if (le32_to_cpu(ent->lcme_flags) & LCME_FL_INIT) {
809                                 lmm = (void *)((char *)c1 +
810                                       le32_to_cpu(ent->lcme_offset));
811                                 lmm->lmm_stripe_offset = LOV_OFFSET_DEFAULT;
812                         }
813                 }
814         }
815 }
816 EXPORT_SYMBOL(lov_fix_ea_for_replay);