Whamcloud - gitweb
LU-17872 ldlm: switch to read_positive in reclaim_full
[fs/lustre-release.git] / lustre / lod / lod_qos.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/lod/lod_qos.c
32  *
33  * Implementation of different allocation algorithm used
34  * to distribute objects and data among OSTs.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LOV
38
39 #include <asm/div64.h>
40 #include <linux/random.h>
41
42 #include <libcfs/libcfs.h>
43 #include <uapi/linux/lustre/lustre_idl.h>
44 #include <lustre_swab.h>
45 #include <obd_class.h>
46
47 #include "lod_internal.h"
48
49 /* check whether a target is available for new object allocation */
50 static inline int lod_statfs_check(struct lu_tgt_descs *ltd,
51                                    struct lu_tgt_desc *tgt)
52 {
53         struct obd_statfs *sfs = &tgt->ltd_statfs;
54
55         if (sfs->os_state & OS_STATFS_ENOSPC ||
56             (sfs->os_state & OS_STATFS_ENOINO &&
57              /* OST allocation allowed while precreated objects available */
58              (ltd->ltd_is_mdt || sfs->os_fprecreated == 0)))
59                 return -ENOSPC;
60
61         /* If the OST is readonly then we can't allocate objects there */
62         if (sfs->os_state & OS_STATFS_READONLY)
63                 return -EROFS;
64
65         /* object creation is skipped on the OST with max_create_count=0 */
66         if (!ltd->ltd_is_mdt && sfs->os_state & OS_STATFS_NOCREATE)
67                 return -ENOBUFS;
68
69         return 0;
70 }
71
72 /**
73  * Check whether the target is available for new objects.
74  *
75  * Request statfs data from the given target and verify it's active and not
76  * read-only. If so, then it can be used to place new objects. This
77  * function also maintains the number of active/inactive targets and sets
78  * dirty flags if those numbers change so others can run re-balance procedures.
79  * No external locking is required.
80  *
81  * \param[in] env       execution environment for this thread
82  * \param[in] d         LOD device
83  * \param[in] ltd       target table
84  * \param[in] tgt       target
85  *
86  * \retval 0            if the target is good
87  * \retval negative     negated errno on error
88  */
89 static int lod_statfs_and_check(const struct lu_env *env, struct lod_device *d,
90                                 struct lu_tgt_descs *ltd,
91                                 struct lu_tgt_desc *tgt, __u64 reserve)
92 {
93         struct obd_statfs_info info = { 0 };
94         struct lov_desc *desc = &ltd->ltd_lov_desc;
95         int rc;
96         ENTRY;
97
98         LASSERT(d);
99         LASSERT(tgt);
100
101         info.os_enable_pre = 1;
102         rc = dt_statfs_info(env, tgt->ltd_tgt, &tgt->ltd_statfs, &info);
103         if (rc && rc != -ENOTCONN)
104                 CERROR("%s: statfs error: rc = %d\n", lod2obd(d)->obd_name, rc);
105
106         if (!rc)
107                 rc = lod_statfs_check(ltd, tgt);
108
109         /* reserving space shouldn't be enough to mark an OST inactive */
110         if (reserve &&
111             (reserve + (info.os_reserved_mb_low << 20) >
112              tgt->ltd_statfs.os_bavail * tgt->ltd_statfs.os_bsize))
113                 return -ENOSPC;
114
115         /* check whether device has changed state (active, inactive) */
116         if (rc && tgt->ltd_active) {
117                 /* turned inactive? */
118                 spin_lock(&d->lod_lock);
119                 if (tgt->ltd_active) {
120                         tgt->ltd_active = 0;
121                         if (rc == -ENOTCONN)
122                                 tgt->ltd_discon = 1;
123
124                         LASSERT(desc->ld_active_tgt_count > 0);
125                         desc->ld_active_tgt_count--;
126                         set_bit(LQ_DIRTY, &ltd->ltd_qos.lq_flags);
127                         CDEBUG(D_CONFIG, "%s: turns inactive\n",
128                                tgt->ltd_exp->exp_obd->obd_name);
129                 }
130                 spin_unlock(&d->lod_lock);
131         } else if (rc == 0 && !tgt->ltd_active) {
132                 /* turned active? */
133                 spin_lock(&d->lod_lock);
134                 if (!tgt->ltd_active) {
135                         LASSERTF(desc->ld_active_tgt_count < desc->ld_tgt_count,
136                                  "active tgt count %d, tgt nr %d\n",
137                                  desc->ld_active_tgt_count, desc->ld_tgt_count);
138                         tgt->ltd_active = 1;
139                         tgt->ltd_discon = 0;
140                         desc->ld_active_tgt_count++;
141                         set_bit(LQ_DIRTY, &ltd->ltd_qos.lq_flags);
142                         CDEBUG(D_CONFIG, "%s: turns active\n",
143                                tgt->ltd_exp->exp_obd->obd_name);
144                 }
145                 spin_unlock(&d->lod_lock);
146         }
147         if (rc == -ENOTCONN) {
148                 /* In case that the ENOTCONN for inactive OST state is
149                  * mistreated as MDT disconnection state by the client,
150                  * this error should be changed to someone else.
151                  */
152                 rc = -EREMOTEIO;
153         }
154
155         RETURN(rc);
156 }
157
158 /**
159  * Maintain per-target statfs data.
160  *
161  * The function refreshes statfs data for all the targets every N seconds.
162  * The actual N is controlled via procfs and set to LOV_DESC_QOS_MAXAGE_DEFAULT
163  * initially.
164  *
165  * \param[in] env       execution environment for this thread
166  * \param[in] lod       LOD device
167  * \param[in] ltd       tgt table
168  */
169 void lod_qos_statfs_update(const struct lu_env *env, struct lod_device *lod,
170                            struct lu_tgt_descs *ltd)
171 {
172         struct obd_device *obd = lod2obd(lod);
173         struct lu_tgt_desc *tgt;
174         time64_t max_age;
175         u64 avail;
176         ENTRY;
177
178         max_age = ktime_get_seconds() - 2 * ltd->ltd_lov_desc.ld_qos_maxage;
179
180         if (obd->obd_osfs_age > max_age)
181                 /* statfs data are quite recent, don't need to refresh it */
182                 RETURN_EXIT;
183
184         if (test_and_set_bit(LQ_SF_PROGRESS, &ltd->ltd_qos.lq_flags))
185                 RETURN_EXIT;
186
187         if (obd->obd_osfs_age > max_age) {
188                 /* statfs data are quite recent, don't need to refresh it */
189                 clear_bit(LQ_SF_PROGRESS, &ltd->ltd_qos.lq_flags);
190                 RETURN_EXIT;
191         }
192         lod_getref(ltd);
193         ltd_foreach_tgt(ltd, tgt) {
194                 avail = tgt->ltd_statfs.os_bavail;
195                 if (lod_statfs_and_check(env, lod, ltd, tgt, 0))
196                         continue;
197
198                 if (tgt->ltd_statfs.os_bavail != avail)
199                         /* recalculate weigths */
200                         set_bit(LQ_DIRTY, &ltd->ltd_qos.lq_flags);
201         }
202         lod_putref(lod, ltd);
203         obd->obd_osfs_age = ktime_get_seconds();
204
205         clear_bit(LQ_SF_PROGRESS, &ltd->ltd_qos.lq_flags);
206         EXIT;
207 }
208
209 #define LOV_QOS_EMPTY ((__u32)-1)
210
211 /**
212  * Calculate optimal round-robin order with regard to OSSes.
213  *
214  * Place all the OSTs from pool \a src_pool in a special array to be used for
215  * round-robin (RR) stripe allocation.  The placement algorithm interleaves
216  * OSTs from the different OSSs so that RR allocation can balance OSSs evenly.
217  * Resorts the targets when the number of active targets changes (because of
218  * a new target or activation/deactivation).
219  *
220  * \param[in] lod       LOD device
221  * \param[in] ltd       tgt table
222  * \param[in] src_pool  tgt pool
223  * \param[in] lqr       round-robin list
224  *
225  * \retval 0            on success
226  * \retval -ENOMEM      fails to allocate the array
227  */
228 static int lod_qos_calc_rr(struct lod_device *lod, struct lu_tgt_descs *ltd,
229                            const struct lu_tgt_pool *src_pool,
230                            struct lu_qos_rr *lqr)
231 {
232         struct lu_svr_qos  *svr;
233         struct lu_tgt_desc *tgt;
234         unsigned placed, real_count;
235         unsigned int i;
236         int rc;
237         ENTRY;
238
239         if (!test_bit(LQ_DIRTY, &lqr->lqr_flags)) {
240                 LASSERT(lqr->lqr_pool.op_size);
241                 RETURN(0);
242         }
243
244         /* Do actual allocation. */
245         down_write(&ltd->ltd_qos.lq_rw_sem);
246
247         /*
248          * Check again. While we were sleeping on @lq_rw_sem something could
249          * change.
250          */
251         if (!test_bit(LQ_DIRTY, &lqr->lqr_flags)) {
252                 LASSERT(lqr->lqr_pool.op_size);
253                 up_write(&ltd->ltd_qos.lq_rw_sem);
254                 RETURN(0);
255         }
256
257         real_count = src_pool->op_count;
258
259         /* Zero the pool array */
260         /* alloc_rr is holding a read lock on the pool, so nobody is adding/
261            deleting from the pool. The lq_rw_sem insures that nobody else
262            is reading. */
263         lqr->lqr_pool.op_count = real_count;
264         rc = lu_tgt_pool_extend(&lqr->lqr_pool, real_count);
265         if (rc) {
266                 up_write(&ltd->ltd_qos.lq_rw_sem);
267                 RETURN(rc);
268         }
269         for (i = 0; i < lqr->lqr_pool.op_count; i++)
270                 lqr->lqr_pool.op_array[i] = LOV_QOS_EMPTY;
271
272         /* Place all the tgts from 1 svr at the same time. */
273         placed = 0;
274         list_for_each_entry(svr, &ltd->ltd_qos.lq_svr_list, lsq_svr_list) {
275                 int j = 0;
276
277                 for (i = 0; i < lqr->lqr_pool.op_count; i++) {
278                         int next;
279
280                         if (!test_bit(src_pool->op_array[i],
281                                       ltd->ltd_tgt_bitmap))
282                                 continue;
283
284                         tgt = LTD_TGT(ltd, src_pool->op_array[i]);
285                         LASSERT(tgt && tgt->ltd_tgt);
286                         if (tgt->ltd_qos.ltq_svr != svr)
287                                 continue;
288
289                         /* Evenly space these tgts across arrayspace */
290                         next = j * lqr->lqr_pool.op_count / svr->lsq_tgt_count;
291                         while (lqr->lqr_pool.op_array[next] != LOV_QOS_EMPTY)
292                                 next = (next + 1) % lqr->lqr_pool.op_count;
293
294                         lqr->lqr_pool.op_array[next] = src_pool->op_array[i];
295                         j++;
296                         placed++;
297                 }
298         }
299
300         clear_bit(LQ_DIRTY, &lqr->lqr_flags);
301         up_write(&ltd->ltd_qos.lq_rw_sem);
302
303         if (placed != real_count) {
304                 /* This should never happen */
305                 LCONSOLE_ERROR_MSG(0x14e, "Failed to place all tgts in the "
306                                    "round-robin list (%d of %d).\n",
307                                    placed, real_count);
308                 for (i = 0; i < lqr->lqr_pool.op_count; i++) {
309                         LCONSOLE(D_WARNING, "rr #%d tgt idx=%d\n", i,
310                                  lqr->lqr_pool.op_array[i]);
311                 }
312                 set_bit(LQ_DIRTY, &lqr->lqr_flags);
313                 RETURN(-EAGAIN);
314         }
315
316         RETURN(0);
317 }
318
319 /**
320  * Instantiate and declare creation of a new object.
321  *
322  * The function instantiates LU representation for a new object on the
323  * specified device. Also it declares an intention to create that
324  * object on the storage target.
325  *
326  * Note lu_object_anon() is used which is a trick with regard to LU/OSD
327  * infrastructure - in the existing precreation framework we can't assign FID
328  * at this moment, we do this later once a transaction is started. So the
329  * special method instantiates FID-less object in the cache and later it
330  * will get a FID and proper placement in LU cache.
331  *
332  * \param[in] env       execution environment for this thread
333  * \param[in] d         LOD device
334  * \param[in] ost_idx   OST target index where the object is being created
335  * \param[in] th        transaction handle
336  *
337  * \retval              object ptr on success, ERR_PTR() otherwise
338  */
339 static struct dt_object *lod_qos_declare_object_on(const struct lu_env *env,
340                                                    struct lod_device *d,
341                                                    __u32 ost_idx,
342                                                    bool can_block,
343                                                    struct thandle *th)
344 {
345         struct dt_allocation_hint *ah = &lod_env_info(env)->lti_ah;
346         struct lod_tgt_desc *ost;
347         struct lu_object *o, *n;
348         struct lu_device *nd;
349         struct dt_object *dt;
350         int               rc;
351         ENTRY;
352
353         LASSERT(d);
354         LASSERT(ost_idx < d->lod_ost_descs.ltd_tgts_size);
355         ost = OST_TGT(d,ost_idx);
356         LASSERT(ost);
357         LASSERT(ost->ltd_tgt);
358
359         nd = &ost->ltd_tgt->dd_lu_dev;
360
361         /*
362          * allocate anonymous object with zero fid, real fid
363          * will be assigned by OSP within transaction
364          * XXX: to be fixed with fully-functional OST fids
365          */
366         o = lu_object_anon(env, nd, NULL);
367         if (IS_ERR(o))
368                 GOTO(out, dt = ERR_CAST(o));
369
370         n = lu_object_locate(o->lo_header, nd->ld_type);
371         if (unlikely(n == NULL)) {
372                 CERROR("can't find slice\n");
373                 lu_object_put(env, o);
374                 GOTO(out, dt = ERR_PTR(-EINVAL));
375         }
376
377         dt = container_of(n, struct dt_object, do_lu);
378
379         ah->dah_can_block = can_block;
380         rc = lod_sub_declare_create(env, dt, NULL, ah, NULL, th);
381         if (rc < 0) {
382                 CDEBUG(D_OTHER, "can't declare creation on #%u: %d\n",
383                        ost_idx, rc);
384                 lu_object_put(env, o);
385                 dt = ERR_PTR(rc);
386         }
387
388 out:
389         RETURN(dt);
390 }
391
392 /**
393  * Calculate a minimum acceptable stripe count.
394  *
395  * Return an acceptable stripe count depending on flag LOD_USES_DEFAULT_STRIPE:
396  * all stripes or 3/4 of stripes.  The code is written this way to avoid
397  * returning 0 for stripe_count < 4, like "stripe_count * 3 / 4" would do.
398  *
399  * \param[in] stripe_count      number of stripes requested
400  * \param[in] flags             0 or LOD_USES_DEFAULT_STRIPE
401  *
402  * \retval                      acceptable stripecount
403  */
404 static int lod_stripe_count_min(__u32 stripe_count, enum lod_uses_hint flags)
405 {
406         return (flags & LOD_USES_DEFAULT_STRIPE ?
407                 stripe_count - (stripe_count / 4) : stripe_count);
408 }
409
410 #define LOV_CREATE_RESEED_MULT 30
411 #define LOV_CREATE_RESEED_MIN  2000
412
413 /**
414  * Initialize temporary tgt-in-use array.
415  *
416  * Allocate or extend the array used to mark targets already assigned to a new
417  * striping so they are not used more than once.
418  *
419  * \param[in] env       execution environment for this thread
420  * \param[in] stripes   number of items needed in the array
421  *
422  * \retval 0            on success
423  * \retval -ENOMEM      on error
424  */
425 static inline int lod_qos_tgt_in_use_clear(const struct lu_env *env,
426                                            __u32 stripes)
427 {
428         struct lod_thread_info *info = lod_env_info(env);
429
430         if (info->lti_ea_store_size < sizeof(int) * stripes)
431                 lod_ea_store_resize(info, stripes * sizeof(int));
432         if (info->lti_ea_store_size < sizeof(int) * stripes) {
433                 CERROR("can't allocate memory for tgt-in-use array\n");
434                 return -ENOMEM;
435         }
436         memset(info->lti_ea_store, -1, sizeof(int) * stripes);
437         return 0;
438 }
439
440 /**
441  * Remember a target in the array of used targets.
442  *
443  * Mark the given target as used for a new striping being created. The status
444  * of an tgt in a striping can be checked with lod_qos_is_tgt_used().
445  *
446  * \param[in] env       execution environment for this thread
447  * \param[in] idx       index in the array
448  * \param[in] tgt_idx   target index to mark as used
449  */
450 static inline void lod_qos_tgt_in_use(const struct lu_env *env,
451                                       int idx, int tgt_idx)
452 {
453         struct lod_thread_info *info = lod_env_info(env);
454         int *tgts = info->lti_ea_store;
455
456         LASSERT(info->lti_ea_store_size >= idx * sizeof(int));
457         tgts[idx] = tgt_idx;
458 }
459
460 /**
461  * Check is tgt used in a striping.
462  *
463  * Checks whether tgt with the given index is marked as used in the temporary
464  * array (see lod_qos_tgt_in_use()).
465  *
466  * \param[in] env       execution environment for this thread
467  * \param[in] tgt_idx   target index to check
468  * \param[in] stripes   the number of items used in the array already
469  *
470  * \retval 0            not used
471  * \retval 1            used
472  */
473 static int lod_qos_is_tgt_used(const struct lu_env *env, int tgt_idx,
474                                __u32 stripes)
475 {
476         struct lod_thread_info *info = lod_env_info(env);
477         int *tgts = info->lti_ea_store;
478         __u32 j;
479
480         for (j = 0; j < stripes; j++) {
481                 if (tgts[j] == tgt_idx)
482                         return 1;
483         }
484         return 0;
485 }
486
487 static inline bool
488 lod_obj_is_ost_use_skip_cb(const struct lu_env *env, struct lod_object *lo,
489                            int comp_idx, struct lod_obj_stripe_cb_data *data)
490 {
491         struct lod_layout_component *comp = &lo->ldo_comp_entries[comp_idx];
492
493         return comp->llc_ost_indices == NULL;
494 }
495
496 static inline int
497 lod_obj_is_ost_use_cb(const struct lu_env *env, struct lod_object *lo,
498                       int comp_idx, struct lod_obj_stripe_cb_data *data)
499 {
500         struct lod_layout_component *comp = &lo->ldo_comp_entries[comp_idx];
501         int i;
502
503         for (i = 0; i < comp->llc_stripe_count; i++) {
504                 if (comp->llc_ost_indices[i] == data->locd_ost_index) {
505                         data->locd_ost_index = -1;
506                         return -EEXIST;
507                 }
508         }
509
510         return 0;
511 }
512
513 /**
514  * Check is OST used in a composite layout
515  *
516  * \param[in] lo        lod object
517  * \param[in] ost       OST target index to check
518  *
519  * \retval false        not used
520  * \retval true         used
521  */
522 static inline bool lod_comp_is_ost_used(const struct lu_env *env,
523                                        struct lod_object *lo, int ost)
524 {
525         struct lod_obj_stripe_cb_data data = { { 0 } };
526
527         data.locd_ost_index = ost;
528         data.locd_comp_skip_cb = lod_obj_is_ost_use_skip_cb;
529         data.locd_comp_cb = lod_obj_is_ost_use_cb;
530
531         (void)lod_obj_for_each_stripe(env, lo, NULL, &data);
532
533         return data.locd_ost_index == -1;
534 }
535
536 static inline void lod_avoid_update(struct lod_object *lo,
537                                     struct lod_avoid_guide *lag)
538 {
539         if (!lod_is_flr(lo))
540                 return;
541
542         lag->lag_ost_avail--;
543 }
544
545 static inline bool lod_should_avoid_ost(struct lod_object *lo,
546                                         struct lod_avoid_guide *lag,
547                                         __u32 index)
548 {
549         struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
550         struct lod_tgt_desc *ost = OST_TGT(lod, index);
551         struct lu_svr_qos *lsq = ost->ltd_qos.ltq_svr;
552         bool used = false;
553         int i;
554
555         if (!test_bit(index, lod->lod_ost_bitmap)) {
556                 CDEBUG(D_OTHER, "OST%d: been used in conflicting mirror component\n",
557                        index);
558                 return true;
559         }
560
561         /**
562          * we've tried our best, all available OSTs have been used in
563          * overlapped components in the other mirror
564          */
565         if (lag->lag_ost_avail == 0)
566                 return false;
567
568         /* check OSS use */
569         for (i = 0; i < lag->lag_oaa_count; i++) {
570                 if (lag->lag_oss_avoid_array[i] == lsq->lsq_id) {
571                         used = true;
572                         break;
573                 }
574         }
575         /**
576          * if the OSS which OST[index] resides has not been used, we'd like to
577          * use it
578          */
579         if (!used)
580                 return false;
581
582         /* if the OSS has been used, check whether the OST has been used */
583         if (!test_bit(index, lag->lag_ost_avoid_bitmap))
584                 used = false;
585         else
586                 CDEBUG(D_OTHER, "OST%d: been used in conflicting mirror component\n",
587                        index);
588         return used;
589 }
590
591 static int lod_check_and_reserve_ost(const struct lu_env *env,
592                                      struct lod_object *lo,
593                                      struct lod_layout_component *lod_comp,
594                                      __u32 ost_idx, __u32 speed, __u32 *s_idx,
595                                      struct dt_object **stripe,
596                                      __u32 *ost_indices,
597                                      struct thandle *th,
598                                      bool *overstriped,
599                                      __u64 reserve)
600 {
601         struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
602         struct lod_avoid_guide *lag = &lod_env_info(env)->lti_avoid;
603         struct lu_tgt_desc *ost = OST_TGT(lod, ost_idx);
604         struct dt_object   *o;
605         __u32 stripe_idx = *s_idx;
606         int rc;
607
608         ENTRY;
609
610         rc = lod_statfs_and_check(env, lod, &lod->lod_ost_descs, ost, reserve);
611         if (rc)
612                 RETURN(rc);
613
614         /*
615          * We expect number of precreated objects in f_ffree at
616          * the first iteration, skip OSPs with no objects ready
617          */
618         if (ost->ltd_statfs.os_fprecreated == 0 && speed == 0) {
619                 CDEBUG(D_OTHER, "#%d: precreation is empty\n", ost_idx);
620                 RETURN(rc);
621         }
622
623         /*
624          * try to use another OSP if this one is degraded
625          */
626         if (ost->ltd_statfs.os_state & OS_STATFS_DEGRADED && speed < 2) {
627                 CDEBUG(D_OTHER, "#%d: degraded\n", ost_idx);
628                 RETURN(rc);
629         }
630
631         /*
632          * try not allocate on OST which has been used by other
633          * component
634          */
635         if (speed == 0 && lod_comp_is_ost_used(env, lo, ost_idx)) {
636                 CDEBUG(D_OTHER, "iter %d: OST%d used by other component\n",
637                        speed, ost_idx);
638                 RETURN(rc);
639         }
640
641         /**
642          * try not allocate OSTs used by conflicting component of other mirrors
643          * for the first and second time.
644          */
645         if (speed < 2 && lod_should_avoid_ost(lo, lag, ost_idx)) {
646                 CDEBUG(D_OTHER, "iter %d: OST%d used by conflicting mirror component\n",
647                           speed, ost_idx);
648                 RETURN(rc);
649         }
650
651         /* do not put >1 objects on a single OST, except for overstriping */
652         if (lod_qos_is_tgt_used(env, ost_idx, stripe_idx)) {
653                 if (lod_comp->llc_pattern & LOV_PATTERN_OVERSTRIPING)
654                         *overstriped = true;
655                 else
656                         RETURN(rc);
657         }
658
659         o = lod_qos_declare_object_on(env, lod, ost_idx, (speed > 1), th);
660         if (IS_ERR(o)) {
661                 CDEBUG(D_OTHER, "can't declare new object on #%u: %d\n",
662                        ost_idx, (int) PTR_ERR(o));
663                 rc = PTR_ERR(o);
664                 RETURN(rc);
665         }
666
667         /*
668          * We've successfully declared (reserved) an object
669          */
670         lod_avoid_update(lo, lag);
671         lod_qos_tgt_in_use(env, stripe_idx, ost_idx);
672         stripe[stripe_idx] = o;
673         ost_indices[stripe_idx] = ost_idx;
674         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_LOV_CREATE_RACE, 2);
675         stripe_idx++;
676         *s_idx = stripe_idx;
677
678         RETURN(rc);
679 }
680
681 /**
682  * Allocate a striping using round-robin algorithm.
683  *
684  * Allocates a new striping using round-robin algorithm. The function refreshes
685  * all the internal structures (statfs cache, array of available OSTs sorted
686  * with regard to OSS, etc). The number of stripes required is taken from the
687  * object (must be prepared by the caller), but can change if the flag
688  * LOD_USES_DEFAULT_STRIPE is supplied. The caller should ensure nobody else
689  * is trying to create a striping on the object in parallel. All the internal
690  * structures (like pools, etc) are protected and no additional locking is
691  * required. The function succeeds even if a single stripe is allocated. To save
692  * time we give priority to targets which already have objects precreated.
693  * Full OSTs are skipped (see lod_qos_dev_is_full() for the details).
694  *
695  * \param[in] env               execution environment for this thread
696  * \param[in] lo                LOD object
697  * \param[out] stripe           striping created
698  * \param[out] ost_indices      ost indices of striping created
699  * \param[in] flags             allocation flags (0 or LOD_USES_DEFAULT_STRIPE)
700  * \param[in] th                transaction handle
701  * \param[in] comp_idx          index of ldo_comp_entries
702  *
703  * \retval 0            on success
704  * \retval -ENOSPC      if not enough OSTs are found
705  * \retval negative     negated errno for other failures
706  */
707 static int lod_ost_alloc_rr(const struct lu_env *env, struct lod_object *lo,
708                             struct dt_object **stripe, __u32 *ost_indices,
709                             enum lod_uses_hint flags, struct thandle *th,
710                             int comp_idx, __u64 reserve)
711 {
712         struct lod_layout_component *lod_comp;
713         struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
714         struct lod_pool_desc  *pool = NULL;
715         struct lu_tgt_pool *osts;
716         struct lu_qos_rr *lqr;
717         unsigned int i, array_idx;
718         __u32 stripe_idx = 0;
719         __u32 stripe_count, stripe_count_min, ost_idx;
720         int rc, speed = 0, ost_connecting = 0;
721         int idx, stripes_per_ost = 1;
722         bool overstriped = false;
723         ENTRY;
724
725         LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL);
726         lod_comp = &lo->ldo_comp_entries[comp_idx];
727         stripe_count = lod_comp->llc_stripe_count;
728         stripe_count_min = lod_stripe_count_min(stripe_count, flags);
729
730         if (lod_comp->llc_pool != NULL)
731                 pool = lod_find_pool(m, lod_comp->llc_pool);
732
733         if (pool != NULL) {
734                 down_read(&pool_tgt_rw_sem(pool));
735                 osts = &(pool->pool_obds);
736                 lqr = &(pool->pool_rr);
737         } else {
738                 osts = &m->lod_ost_descs.ltd_tgt_pool;
739                 lqr = &(m->lod_ost_descs.ltd_qos.lq_rr);
740         }
741
742         rc = lod_qos_calc_rr(m, &m->lod_ost_descs, osts, lqr);
743         if (rc)
744                 GOTO(out, rc);
745
746         rc = lod_qos_tgt_in_use_clear(env, stripe_count);
747         if (rc)
748                 GOTO(out, rc);
749
750         down_read(&m->lod_ost_descs.ltd_qos.lq_rw_sem);
751         spin_lock(&lqr->lqr_alloc);
752         if (--lqr->lqr_start_count <= 0) {
753                 atomic_set(&lqr->lqr_start_idx,
754                             get_random_u32_below(osts->op_count));
755                 lqr->lqr_start_count =
756                         (LOV_CREATE_RESEED_MIN / max(osts->op_count, 1U) +
757                          LOV_CREATE_RESEED_MULT) * max(osts->op_count, 1U);
758         } else if (atomic_read(&lqr->lqr_start_idx) >= osts->op_count) {
759                 /* If we have allocated from all of the tgts, slowly
760                  * precess the next start OST if the tgt/stripe count
761                  * difference isn't already doing this for us.
762                  */
763                 atomic_sub(osts->op_count, &lqr->lqr_start_idx);
764                 if (stripe_count > 1 && (osts->op_count % stripe_count) != 1)
765                         ++lqr->lqr_offset_idx;
766         }
767         spin_unlock(&lqr->lqr_alloc);
768         if (lod_comp->llc_pattern & LOV_PATTERN_OVERSTRIPING)
769                 stripes_per_ost =
770                         (lod_comp->llc_stripe_count - 1) / osts->op_count + 1;
771
772 repeat_find:
773         CDEBUG(D_OTHER, "pool '%s' want %d start_idx %d start_count %d offset %d active %d count %d\n",
774                lod_comp->llc_pool ? lod_comp->llc_pool : "",
775                stripe_count, atomic_read(&lqr->lqr_start_idx),
776                lqr->lqr_start_count, lqr->lqr_offset_idx, osts->op_count,
777                osts->op_count);
778
779         for (i = 0, idx = 0; i < osts->op_count * stripes_per_ost &&
780                     stripe_idx < stripe_count; i++) {
781                 if (likely(speed < 2) || i == 0) {
782                         idx = atomic_inc_return(&lqr->lqr_start_idx) +
783                               lqr->lqr_offset_idx;
784                 } else {
785                         /*
786                          * For last speed, use OSTs one by one
787                          */
788                         idx++;
789                 }
790                 array_idx = idx % osts->op_count;
791                 ost_idx = lqr->lqr_pool.op_array[array_idx];
792
793                 CDEBUG(D_OTHER, "#%d strt %d act %d strp %d ary %d idx %d\n",
794                        i, idx, /* XXX: active*/ 0,
795                        stripe_idx, array_idx, ost_idx);
796
797                 if ((ost_idx == LOV_QOS_EMPTY) ||
798                     !test_bit(ost_idx, m->lod_ost_bitmap))
799                         continue;
800
801                 /* Fail Check before osc_precreate() is called
802                    so we can only 'fail' single OSC. */
803                 if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && ost_idx == 0)
804                         continue;
805
806                 if (CFS_FAIL_PRECHECK(OBD_FAIL_MDS_LOD_CREATE_PAUSE)) {
807                         clear_bit(LQ_SAME_SPACE,
808                                   &m->lod_ost_descs.ltd_qos.lq_flags);
809                         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_LOD_CREATE_PAUSE,
810                                          cfs_fail_val);
811                 }
812                 rc = lod_check_and_reserve_ost(env, lo, lod_comp, ost_idx,
813                                                speed, &stripe_idx, stripe,
814                                                ost_indices, th, &overstriped,
815                                                reserve);
816
817                 if (rc != 0 && OST_TGT(m, ost_idx)->ltd_discon)
818                         ost_connecting = 1;
819         }
820         if ((speed < 2) && (stripe_idx < stripe_count_min)) {
821                 /* Try again, allowing slower OSCs */
822                 speed++;
823
824                 ost_connecting = 0;
825                 goto repeat_find;
826         }
827         up_read(&m->lod_ost_descs.ltd_qos.lq_rw_sem);
828
829         /* If there are enough OSTs, a component with overstriping requested
830          * will not actually end up overstriped.  The comp should reflect this.
831          */
832         if (!overstriped)
833                 lod_comp->llc_pattern &= ~LOV_PATTERN_OVERSTRIPING;
834
835         if (stripe_idx) {
836                 lod_comp->llc_stripe_count = stripe_idx;
837                 /* at least one stripe is allocated */
838                 rc = 0;
839         } else {
840                 /* nobody provided us with a single object */
841                 if (ost_connecting)
842                         rc = -EINPROGRESS;
843                 else
844                         rc = -ENOSPC;
845         }
846
847 out:
848         if (pool != NULL) {
849                 up_read(&pool_tgt_rw_sem(pool));
850                 /* put back ref got by lod_find_pool() */
851                 lod_pool_putref(pool);
852         }
853
854         RETURN(rc);
855 }
856
857 static int
858 lod_qos_mdt_in_use_init(const struct lu_env *env,
859                         const struct lu_tgt_descs *ltd,
860                         u32 stripe_idx, u32 stripe_count,
861                         const struct lu_tgt_pool *pool,
862                         struct dt_object **stripes)
863 {
864         u32 mdt_idx;
865         struct lu_tgt_desc *mdt;
866         int i, j;
867         int rc;
868
869         rc = lod_qos_tgt_in_use_clear(env, stripe_count);
870         if (rc)
871                 return rc;
872
873         /* if stripe_idx > 1, we are splitting directory, mark existing stripes
874          * in_use. Because for either split or creation, stripe 0 is local,
875          * don't mark it in use.
876          */
877         for (i = 1; i < stripe_idx; i++) {
878                 LASSERT(stripes[i]);
879                 for (j = 0; j < pool->op_count; j++) {
880                         mdt_idx = pool->op_array[j];
881
882                         if (!test_bit(mdt_idx, ltd->ltd_tgt_bitmap))
883                                 continue;
884
885                         mdt = LTD_TGT(ltd, mdt_idx);
886                         if (&mdt->ltd_tgt->dd_lu_dev ==
887                             stripes[i]->do_lu.lo_dev)
888                                 lod_qos_tgt_in_use(env, i, mdt_idx);
889                 }
890         }
891
892         return 0;
893 }
894
895 /**
896  * Allocate a striping using round-robin algorithm.
897  *
898  * Allocates a new striping using round-robin algorithm. The function refreshes
899  * all the internal structures (statfs cache, array of available remote MDTs
900  * sorted with regard to MDS, etc). The number of stripes required is taken from
901  * the object (must be prepared by the caller). The caller should ensure nobody
902  * else is trying to create a striping on the object in parallel. All the
903  * internal structures (like pools, etc) are protected and no additional locking
904  * is required. The function succeeds even if a single stripe is allocated.
905  *
906  * \param[in] env               execution environment for this thread
907  * \param[in] lo                LOD object
908  * \param[out] stripes          striping created
909  *
910  * \retval positive     stripe objects allocated, including the first stripe
911  *                      allocated outside
912  * \retval -ENOSPC      if not enough MDTs are found
913  * \retval negative     negated errno for other failures
914  */
915 int lod_mdt_alloc_rr(const struct lu_env *env, struct lod_object *lo,
916                      struct dt_object **stripes, u32 stripe_idx,
917                      u32 stripe_count)
918 {
919         struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
920         struct lu_tgt_descs *ltd = &lod->lod_mdt_descs;
921         struct lu_tgt_pool *pool;
922         struct lu_qos_rr *lqr;
923         struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
924         struct lu_fid fid = { 0 };
925         struct dt_object *dto;
926         unsigned int pool_idx;
927         unsigned int i;
928         u32 saved_idx = stripe_idx;
929         int stripes_per_mdt = 1;
930         u32 mdt_idx;
931         bool use_degraded = false;
932         bool overstriped = false;
933         int tgt_connecting = 0;
934         int rc;
935
936         ENTRY;
937
938         pool = &ltd->ltd_tgt_pool;
939         lqr = &ltd->ltd_qos.lq_rr;
940         rc = lod_qos_calc_rr(lod, ltd, pool, lqr);
941         if (rc)
942                 RETURN(rc);
943
944         overstriped = lo->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED;
945
946         if (stripe_count > lod->lod_remote_mdt_count + 1 && !overstriped)
947                 RETURN(-E2BIG);
948
949         if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED)
950                 stripes_per_mdt = stripe_count / (pool->op_count + 1);
951
952         rc = lod_qos_mdt_in_use_init(env, ltd, stripe_idx, stripe_count, pool,
953                                      stripes);
954         if (rc)
955                 RETURN(rc);
956
957         down_read(&ltd->ltd_qos.lq_rw_sem);
958         spin_lock(&lqr->lqr_alloc);
959         if (--lqr->lqr_start_count <= 0) {
960                 atomic_set(&lqr->lqr_start_idx,
961                             get_random_u32_below(pool->op_count));
962                 lqr->lqr_start_count =
963                         (LOV_CREATE_RESEED_MIN / max(pool->op_count, 1U) +
964                          LOV_CREATE_RESEED_MULT) * max(pool->op_count, 1U);
965         } else if (atomic_read(&lqr->lqr_start_idx) >= pool->op_count) {
966                 /* If we have allocated from all of the tgts, slowly
967                  * precess the next start if the tgt/stripe count isn't
968                  * already doing this for us.
969                  */
970                 atomic_sub(pool->op_count, &lqr->lqr_start_idx);
971                 if (stripe_count - 1 > 1 &&
972                     (pool->op_count % (stripe_count - 1)) != 1)
973                         ++lqr->lqr_offset_idx;
974         }
975         spin_unlock(&lqr->lqr_alloc);
976
977 repeat_find:
978         CDEBUG(D_OTHER,
979                "want=%d start_idx=%d start_count=%d offset=%d active=%d count=%d\n",
980                stripe_count - 1, atomic_read(&lqr->lqr_start_idx),
981                lqr->lqr_start_count, lqr->lqr_offset_idx,
982                /* if we're overstriped, the local MDT is available and is
983                 * included in the count
984                 */
985                pool->op_count + overstriped,
986                lqr->lqr_pool.op_count + overstriped);
987
988         for (i = 0; i < (pool->op_count + overstriped) * stripes_per_mdt &&
989              stripe_idx < stripe_count; i++) {
990                 struct lu_tgt_desc *mdt = NULL;
991                 struct dt_device *mdt_tgt;
992                 bool local_alloc = false;
993                 int idx;
994
995                 idx = atomic_inc_return(&lqr->lqr_start_idx);
996                 pool_idx = (idx + lqr->lqr_offset_idx) %
997                             (pool->op_count + overstriped);
998                 /* in the overstriped case, we must be able to allocate a stripe
999                  * to the local MDT, ie, the one doing the allocation
1000                  */
1001                 if (pool_idx == pool->op_count) {
1002                         LASSERT(overstriped);
1003                         /* because there is already a stripe on the local MDT,
1004                          * do not allocate from the local MDT until we've
1005                          * allocated at least as many stripes as we have MDTs
1006                          */
1007                         if (stripe_idx < (pool->op_count + 1)) {
1008                                 CDEBUG(D_OTHER,
1009                                        "Skipping local alloc, not enough stripes yet\n");
1010                                 continue;
1011                         }
1012                         CDEBUG(D_OTHER, "Attempting to allocate locally\n");
1013                         local_alloc = true;
1014                         mdt_tgt = lod->lod_child;
1015                         rc = lodname2mdt_index(lod2obd(lod)->obd_name,
1016                                                &mdt_idx);
1017                         /* this parsing can't fail here because we're working
1018                          * with a known-good MDT
1019                          */
1020                         LASSERT(!rc);
1021                 } else {
1022                         mdt_idx = lqr->lqr_pool.op_array[pool_idx];
1023                         mdt = LTD_TGT(ltd, mdt_idx);
1024                         mdt_tgt = mdt->ltd_tgt;
1025                 }
1026
1027                 CDEBUG(D_OTHER, "#%d strt %d act %d strp %d ary %d idx %d\n",
1028                        i, idx, /* XXX: active*/ 0,
1029                        stripe_idx, pool_idx, mdt_idx);
1030
1031                 if (!local_alloc &&  (mdt_idx == LOV_QOS_EMPTY ||
1032                     !test_bit(mdt_idx, ltd->ltd_tgt_bitmap))) {
1033                         CDEBUG(D_OTHER, "mdt_idx not found %d\n", mdt_idx);
1034                         continue;
1035                 }
1036
1037                 /* do not put >1 objects on one MDT, except for overstriping */
1038                 if (!local_alloc) {
1039                         if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED) {
1040                                 CDEBUG(D_OTHER, "overstriped\n");
1041                         } else if (lod_qos_is_tgt_used(env, mdt_idx,
1042                                                        stripe_idx)) {
1043                                 CDEBUG(D_OTHER, "#%d: already used\n", mdt_idx);
1044                                 continue;
1045                         }
1046                 }
1047
1048                 /* we know the local MDT is usable */
1049                 if (!local_alloc) {
1050                         if (mdt->ltd_discon) {
1051                                 tgt_connecting = 1;
1052                                 CDEBUG(D_OTHER, "#%d: unusable\n", mdt_idx);
1053                                 continue;
1054                         }
1055                         if (lod_statfs_check(ltd, mdt))
1056                                 continue;
1057                         if (mdt->ltd_statfs.os_state & OS_STATFS_NOCREATE)
1058                                 continue;
1059                 }
1060
1061                 /* try to use another OSP if this one is degraded */
1062                 if (!local_alloc && !use_degraded &&
1063                     mdt->ltd_statfs.os_state & OS_STATFS_DEGRADED) {
1064                         CDEBUG(D_OTHER, "#%d: degraded\n", mdt_idx);
1065                         continue;
1066                 }
1067
1068                 rc = dt_fid_alloc(env, mdt_tgt, &fid, NULL, NULL);
1069                 if (rc < 0) {
1070                         CDEBUG(D_OTHER, "#%d: alloc FID failed: %dl\n", mdt_idx, rc);
1071                         continue;
1072                 }
1073
1074                 dto = dt_locate_at(env, mdt_tgt, &fid,
1075                                 lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1076                                 &conf);
1077
1078                 if (IS_ERR(dto)) {
1079                         CDEBUG(D_OTHER, "can't alloc stripe on #%u: %d\n",
1080                                mdt_idx, (int) PTR_ERR(dto));
1081
1082                         if (!local_alloc && mdt->ltd_discon)
1083                                 tgt_connecting = 1;
1084                         continue;
1085                 }
1086
1087                 lod_qos_tgt_in_use(env, stripe_idx, mdt_idx);
1088                 stripes[stripe_idx++] = dto;
1089         }
1090
1091         if (!use_degraded && stripe_idx < stripe_count) {
1092                 /* Try again, allowing slower MDTs */
1093                 use_degraded = true;
1094
1095                 tgt_connecting = 0;
1096                 goto repeat_find;
1097         }
1098         up_read(&ltd->ltd_qos.lq_rw_sem);
1099
1100         if (stripe_idx > saved_idx) {
1101                 /* If there are enough MDTs, we will not actually do
1102                  * overstriping, and the hash flags should reflect this.
1103                  */
1104                 if (!overstriped)
1105                         lo->ldo_dir_hash_type &= ~LMV_HASH_FLAG_OVERSTRIPED;
1106                 /* at least one stripe is allocated */
1107                 RETURN(stripe_idx);
1108         }
1109
1110         /* nobody provided us with a single object */
1111         if (tgt_connecting)
1112                 RETURN(-EINPROGRESS);
1113
1114         RETURN(-ENOSPC);
1115 }
1116
1117 /**
1118  * Allocate a specific striping layout on a user defined set of OSTs.
1119  *
1120  * Allocates new striping using the OST index range provided by the data from
1121  * the lmm_obejcts contained in the lov_user_md passed to this method. Full
1122  * OSTs are not considered. The exact order of OSTs requested by the user
1123  * is respected as much as possible depending on OST status. The number of
1124  * stripes needed and stripe offset are taken from the object. If that number
1125  * can not be met, then the function returns a failure and then it's the
1126  * caller's responsibility to release the stripes allocated. All the internal
1127  * structures are protected, but no concurrent allocation is allowed on the
1128  * same objects.
1129  *
1130  * \param[in] env               execution environment for this thread
1131  * \param[in] lo                LOD object
1132  * \param[out] stripe           striping created
1133  * \param[out] ost_indices      ost indices of striping created
1134  * \param[in] th                transaction handle
1135  * \param[in] comp_idx          index of ldo_comp_entries
1136  *
1137  * \retval 0            on success
1138  * \retval -ENODEV      OST index does not exist on file system
1139  * \retval -EINVAL      requested OST index is invalid
1140  * \retval negative     negated errno on error
1141  */
1142 static int lod_alloc_ost_list(const struct lu_env *env, struct lod_object *lo,
1143                               struct dt_object **stripe, __u32 *ost_indices,
1144                               struct thandle *th, int comp_idx, __u64 reserve)
1145 {
1146         struct lod_layout_component *lod_comp;
1147         struct lod_device       *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1148         struct dt_object        *o;
1149         unsigned int            array_idx = 0;
1150         int                     stripe_count = 0;
1151         int                     i;
1152         int                     rc = -EINVAL;
1153         ENTRY;
1154
1155         /* for specific OSTs layout */
1156         LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL);
1157         lod_comp = &lo->ldo_comp_entries[comp_idx];
1158         LASSERT(lod_comp->llc_ostlist.op_array);
1159         LASSERT(lod_comp->llc_ostlist.op_count);
1160
1161         rc = lod_qos_tgt_in_use_clear(env, lod_comp->llc_stripe_count);
1162         if (rc < 0)
1163                 RETURN(rc);
1164
1165         if (lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT)
1166                 lod_comp->llc_stripe_offset =
1167                                 lod_comp->llc_ostlist.op_array[0];
1168
1169         for (i = 0; i < lod_comp->llc_stripe_count; i++) {
1170                 if (lod_comp->llc_ostlist.op_array[i] ==
1171                     lod_comp->llc_stripe_offset) {
1172                         array_idx = i;
1173                         break;
1174                 }
1175         }
1176         if (i == lod_comp->llc_stripe_count) {
1177                 CDEBUG(D_OTHER,
1178                        "%s: start index %d not in the specified list of OSTs\n",
1179                        lod2obd(m)->obd_name, lod_comp->llc_stripe_offset);
1180                 RETURN(-EINVAL);
1181         }
1182
1183         for (i = 0; i < lod_comp->llc_stripe_count;
1184              i++, array_idx = (array_idx + 1) % lod_comp->llc_stripe_count) {
1185                 __u32 ost_idx = lod_comp->llc_ostlist.op_array[array_idx];
1186
1187                 if (!test_bit(ost_idx, m->lod_ost_bitmap)) {
1188                         rc = -EINVAL;
1189                         break;
1190                 }
1191
1192                 /* do not put >1 objects on a single OST, except for
1193                  * overstriping
1194                  */
1195                 if (lod_qos_is_tgt_used(env, ost_idx, stripe_count) &&
1196                     !(lod_comp->llc_pattern & LOV_PATTERN_OVERSTRIPING)) {
1197                         rc = -EINVAL;
1198                         break;
1199                 }
1200
1201                 rc = lod_statfs_and_check(env, m, &m->lod_ost_descs,
1202                                           LTD_TGT(&m->lod_ost_descs, ost_idx),
1203                                           reserve);
1204                 if (rc < 0) /* this OSP doesn't feel well */
1205                         break;
1206
1207                 o = lod_qos_declare_object_on(env, m, ost_idx, true, th);
1208                 if (IS_ERR(o)) {
1209                         rc = PTR_ERR(o);
1210                         CDEBUG(D_OTHER,
1211                                "%s: can't declare new object on #%u: %d\n",
1212                                lod2obd(m)->obd_name, ost_idx, rc);
1213                         break;
1214                 }
1215
1216                 /*
1217                  * We've successfully declared (reserved) an object
1218                  */
1219                 lod_qos_tgt_in_use(env, stripe_count, ost_idx);
1220                 stripe[stripe_count] = o;
1221                 ost_indices[stripe_count] = ost_idx;
1222                 stripe_count++;
1223         }
1224
1225         RETURN(rc);
1226 }
1227
1228 /**
1229  * Allocate a striping on a predefined set of OSTs.
1230  *
1231  * Allocates new layout starting from OST index in lo->ldo_stripe_offset.
1232  * Full OSTs are not considered. The exact order of OSTs is not important and
1233  * varies depending on OST status. The allocation procedure prefers the targets
1234  * with precreated objects ready. The number of stripes needed and stripe
1235  * offset are taken from the object. If that number cannot be met, then the
1236  * function returns an error and then it's the caller's responsibility to
1237  * release the stripes allocated. All the internal structures are protected,
1238  * but no concurrent allocation is allowed on the same objects.
1239  *
1240  * \param[in] env               execution environment for this thread
1241  * \param[in] lo                LOD object
1242  * \param[out] stripe           striping created
1243  * \param[out] ost_indices      ost indices of striping created
1244  * \param[in] flags             not used
1245  * \param[in] th                transaction handle
1246  * \param[in] comp_idx          index of ldo_comp_entries
1247  *
1248  * \retval 0            on success
1249  * \retval -ENOSPC      if no OST objects are available at all
1250  * \retval -EFBIG       if not enough OST objects are found
1251  * \retval -EINVAL      requested offset is invalid
1252  * \retval negative     errno on failure
1253  */
1254 static int lod_ost_alloc_specific(const struct lu_env *env,
1255                                   struct lod_object *lo,
1256                                   struct dt_object **stripe, __u32 *ost_indices,
1257                                   enum lod_uses_hint flags, struct thandle *th,
1258                                   int comp_idx, __u64 reserve)
1259 {
1260         struct lod_layout_component *lod_comp;
1261         struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1262         struct dt_object *o;
1263         struct lu_tgt_desc *tgt;
1264         __u32 ost_idx;
1265         unsigned int i, array_idx, ost_count;
1266         int rc, stripe_num = 0;
1267         int speed = 0;
1268         struct lod_pool_desc *pool = NULL;
1269         struct lu_tgt_pool *osts;
1270         int stripes_per_ost = 1;
1271         bool overstriped = false;
1272         ENTRY;
1273
1274         LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL);
1275         lod_comp = &lo->ldo_comp_entries[comp_idx];
1276
1277         rc = lod_qos_tgt_in_use_clear(env, lod_comp->llc_stripe_count);
1278         if (rc)
1279                 GOTO(out, rc);
1280
1281         if (lod_comp->llc_pool != NULL)
1282                 pool = lod_find_pool(m, lod_comp->llc_pool);
1283
1284         if (pool != NULL) {
1285                 down_read(&pool_tgt_rw_sem(pool));
1286                 osts = &(pool->pool_obds);
1287         } else {
1288                 osts = &m->lod_ost_descs.ltd_tgt_pool;
1289         }
1290
1291         ost_count = osts->op_count;
1292
1293 repeat_find:
1294         /* search loi_ost_idx in ost array */
1295         array_idx = 0;
1296         for (i = 0; i < ost_count; i++) {
1297                 if (osts->op_array[i] == lod_comp->llc_stripe_offset) {
1298                         array_idx = i;
1299                         break;
1300                 }
1301         }
1302         if (i == ost_count) {
1303                 CERROR("Start index %d not found in pool '%s'\n",
1304                        lod_comp->llc_stripe_offset,
1305                        lod_comp->llc_pool ? lod_comp->llc_pool : "");
1306                 GOTO(out, rc = -EINVAL);
1307         }
1308
1309         if (lod_comp->llc_pattern & LOV_PATTERN_OVERSTRIPING)
1310                 stripes_per_ost =
1311                         (lod_comp->llc_stripe_count - 1)/ost_count + 1;
1312
1313         /* user specifies bigger stripe count than available ost count */
1314         if (lod_comp->llc_stripe_count > ost_count * stripes_per_ost)
1315                 lod_comp->llc_stripe_count = ost_count * stripes_per_ost;
1316
1317         for (i = 0; i < ost_count * stripes_per_ost;
1318                         i++, array_idx = (array_idx + 1) % ost_count) {
1319                 ost_idx = osts->op_array[array_idx];
1320
1321                 if (!test_bit(ost_idx, m->lod_ost_bitmap))
1322                         continue;
1323
1324                 /* Fail Check before osc_precreate() is called
1325                    so we can only 'fail' single OSC. */
1326                 if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && ost_idx == 0)
1327                         continue;
1328
1329                 /*
1330                  * do not put >1 objects on a single OST, except for
1331                  * overstriping, where it is intended
1332                  */
1333                 if (lod_qos_is_tgt_used(env, ost_idx, stripe_num)) {
1334                         if (lod_comp->llc_pattern & LOV_PATTERN_OVERSTRIPING)
1335                                 overstriped = true;
1336                         else
1337                                 continue;
1338                 }
1339
1340                 /*
1341                  * try not allocate on the OST used by other component
1342                  */
1343                 if (speed == 0 && i != 0 &&
1344                     lod_comp_is_ost_used(env, lo, ost_idx))
1345                         continue;
1346
1347                 tgt = LTD_TGT(&m->lod_ost_descs, ost_idx);
1348
1349                 /* Drop slow OSCs if we can, but not for requested start idx.
1350                  *
1351                  * This means "if OSC is slow and it is not the requested
1352                  * start OST, then it can be skipped, otherwise skip it only
1353                  * if it is inactive/recovering/out-of-space." */
1354
1355                 rc = lod_statfs_and_check(env, m, &m->lod_ost_descs,
1356                                           tgt, reserve);
1357                 if (rc) {
1358                         /* this OSP doesn't feel well */
1359                         continue;
1360                 }
1361
1362                 /*
1363                  * We expect number of precreated objects at the first
1364                  * iteration.  Skip OSPs with no objects ready.  Don't apply
1365                  * this logic to OST specified with stripe_offset.
1366                  */
1367                 if (i && !tgt->ltd_statfs.os_fprecreated && !speed)
1368                         continue;
1369
1370                 o = lod_qos_declare_object_on(env, m, ost_idx, true, th);
1371                 if (IS_ERR(o)) {
1372                         CDEBUG(D_OTHER, "can't declare new object on #%u: %d\n",
1373                                ost_idx, (int) PTR_ERR(o));
1374                         continue;
1375                 }
1376
1377                 /*
1378                  * We've successfully declared (reserved) an object
1379                  */
1380                 lod_qos_tgt_in_use(env, stripe_num, ost_idx);
1381                 stripe[stripe_num] = o;
1382                 ost_indices[stripe_num] = ost_idx;
1383                 stripe_num++;
1384
1385                 /* We have enough stripes */
1386                 if (stripe_num == lod_comp->llc_stripe_count)
1387                         GOTO(out, rc = 0);
1388         }
1389         if (speed < 2) {
1390                 /* Try again, allowing slower OSCs */
1391                 speed++;
1392                 goto repeat_find;
1393         }
1394
1395         /* If we were passed specific striping params, then a failure to
1396          * meet those requirements is an error, since we can't reallocate
1397          * that memory (it might be part of a larger array or something).
1398          */
1399         CERROR("can't lstripe objid "DFID": have %d want %u\n",
1400                PFID(lu_object_fid(lod2lu_obj(lo))), stripe_num,
1401                lod_comp->llc_stripe_count);
1402         rc = stripe_num == 0 ? -ENOSPC : -EFBIG;
1403
1404         /* If there are enough OSTs, a component with overstriping requessted
1405          * will not actually end up overstriped.  The comp should reflect this.
1406          */
1407         if (rc == 0 && !overstriped)
1408                 lod_comp->llc_pattern &= ~LOV_PATTERN_OVERSTRIPING;
1409
1410 out:
1411         if (pool != NULL) {
1412                 up_read(&pool_tgt_rw_sem(pool));
1413                 /* put back ref got by lod_find_pool() */
1414                 lod_pool_putref(pool);
1415         }
1416
1417         RETURN(rc);
1418 }
1419
1420 #ifdef HAVE_DOWN_WRITE_KILLABLE
1421 struct semaphore_timer {
1422         struct timer_list timer;
1423         struct task_struct *task;
1424 };
1425
1426 static void process_semaphore_timer(struct timer_list *t)
1427 {
1428         struct semaphore_timer *timeout = cfs_from_timer(timeout, t, timer);
1429
1430         send_sig(SIGKILL, timeout->task, 1);
1431 }
1432 #endif
1433
1434 /**
1435  * Calculate penalties per-ost in a pool
1436  *
1437  * The algorithm is similar to ltd_qos_penalties_calc(), but much simpler,
1438  * just considering the space of each OST in this pool.
1439  *
1440  * \param[in] lod       lod_device
1441  * \param[in] pool      pool_desc
1442  *
1443  * \retval 0            on success
1444  * \retval -EAGAIN      the number of OSTs isn't enough or all tgt spaces are
1445  *                      almost the same
1446  */
1447 static int lod_pool_qos_penalties_calc(struct lod_device *lod,
1448                                        struct lod_pool_desc *pool)
1449 {
1450         struct lu_tgt_descs *ltd = &lod->lod_ost_descs;
1451         struct lu_qos *qos = &ltd->ltd_qos;
1452         struct lov_desc *desc = &ltd->ltd_lov_desc;
1453         struct lu_tgt_pool *osts = &pool->pool_obds;
1454         struct lod_tgt_desc *ost;
1455         __u64 ba_max, ba_min, ba;
1456         __u32 num_active;
1457         int prio_wide;
1458         time64_t now, age;
1459         int i, rc;
1460
1461         ENTRY;
1462
1463         now = ktime_get_real_seconds();
1464
1465         if (pool->pool_same_space && now < pool->pool_same_space_expire)
1466                 GOTO(out, rc = 0);
1467
1468         num_active = osts->op_count - 1;
1469         if (num_active < 1)
1470                 GOTO(out, rc = -EAGAIN);
1471
1472         prio_wide = 256 - qos->lq_prio_free;
1473
1474         ba_min = (__u64)(-1);
1475         ba_max = 0;
1476
1477         /* Calculate penalty per OST */
1478         for (i = 0; i < osts->op_count; i++) {
1479                 if (!test_bit(osts->op_array[i], lod->lod_ost_bitmap))
1480                         continue;
1481
1482                 ost = OST_TGT(lod, osts->op_array[i]);
1483                 if (!ost->ltd_active)
1484                         continue;
1485
1486                 ba = tgt_statfs_bavail(ost) >> 8;
1487                 if (!ba)
1488                         continue;
1489
1490                 ba_min = min(ba, ba_min);
1491                 ba_max = max(ba, ba_max);
1492                 ost->ltd_qos.ltq_svr->lsq_bavail += ba;
1493
1494                 /*
1495                  * per-ost penalty is
1496                  * prio * bavail / (num_tgt - 1) / prio_max / 2
1497                  */
1498                 ost->ltd_qos.ltq_penalty_per_obj = prio_wide * ba >> 9;
1499                 do_div(ost->ltd_qos.ltq_penalty_per_obj, num_active);
1500
1501                 age = (now - ost->ltd_qos.ltq_used) >> 3;
1502                 if (age > 32 * desc->ld_qos_maxage)
1503                         ost->ltd_qos.ltq_penalty = 0;
1504                 else if (age > desc->ld_qos_maxage)
1505                         /* Decay ost penalty. */
1506                         ost->ltd_qos.ltq_penalty >>= age / desc->ld_qos_maxage;
1507         }
1508
1509         /*
1510          * If each ost has almost same free space, do rr allocation for better
1511          * creation performance
1512          */
1513         if ((ba_max * (256 - qos->lq_threshold_rr)) >> 8 < ba_min) {
1514                 pool->pool_same_space = true;
1515                 pool->pool_same_space_expire = now + desc->ld_qos_maxage;
1516         } else {
1517                 pool->pool_same_space = false;
1518         }
1519         rc = 0;
1520
1521 out:
1522         if (!rc && pool->pool_same_space)
1523                 rc = -EAGAIN;
1524
1525         RETURN(rc);
1526 }
1527
1528 /**
1529  * Allocate a striping using an algorithm with weights.
1530  *
1531  * The function allocates OST objects to create a striping. The algorithm
1532  * used is based on weights (currently only using the free space), and it's
1533  * trying to ensure the space is used evenly by OSTs and OSSs. The striping
1534  * configuration (# of stripes, offset, pool) is taken from the object and
1535  * is prepared by the caller.
1536  *
1537  * If LOD_USES_DEFAULT_STRIPE is not passed and prepared configuration can't
1538  * be met due to too few OSTs, then allocation fails. If the flag is passed
1539  * fewer than 3/4 of the requested number of stripes can be allocated, then
1540  * allocation fails.
1541  *
1542  * No concurrent allocation is allowed on the object and this must be ensured
1543  * by the caller. All the internal structures are protected by the function.
1544  *
1545  * The algorithm has two steps: find available OSTs and calculate their
1546  * weights, then select the OSTs with their weights used as the probability.
1547  * An OST with a higher weight is proportionately more likely to be selected
1548  * than one with a lower weight.
1549  *
1550  * \param[in] env               execution environment for this thread
1551  * \param[in] lo                LOD object
1552  * \param[out] stripe           striping created
1553  * \param[out] ost_indices      ost indices of striping created
1554  * \param[in] flags             0 or LOD_USES_DEFAULT_STRIPE
1555  * \param[in] th                transaction handle
1556  * \param[in] comp_idx          index of ldo_comp_entries
1557  *
1558  * \retval 0            on success
1559  * \retval -EAGAIN      not enough OSTs are found for specified stripe count
1560  * \retval -EINVAL      requested OST index is invalid
1561  * \retval negative     errno on failure
1562  */
1563 static int lod_ost_alloc_qos(const struct lu_env *env, struct lod_object *lo,
1564                              struct dt_object **stripe, __u32 *ost_indices,
1565                              enum lod_uses_hint flags, struct thandle *th,
1566                              int comp_idx, __u64 reserve)
1567 {
1568         struct lod_layout_component *lod_comp;
1569         struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1570         struct lod_avoid_guide *lag = &lod_env_info(env)->lti_avoid;
1571         struct lod_tgt_desc *ost;
1572         struct dt_object *o;
1573         __u64 total_weight = 0;
1574         struct lod_pool_desc *pool = NULL;
1575         struct lu_tgt_pool *osts;
1576         unsigned int i;
1577         __u32 nfound, good_osts, stripe_count, stripe_count_min;
1578         bool overstriped = false;
1579         int stripes_per_ost = 1;
1580         bool slow = false;
1581         int rc = 0;
1582         ENTRY;
1583
1584         /* Totally skip qos part when qos_threshold_rr=100% */
1585         if (lod->lod_ost_descs.ltd_qos.lq_threshold_rr == QOS_THRESHOLD_MAX)
1586                 return -EAGAIN;
1587
1588         LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL);
1589         lod_comp = &lo->ldo_comp_entries[comp_idx];
1590         stripe_count = lod_comp->llc_stripe_count;
1591         stripe_count_min = lod_stripe_count_min(stripe_count, flags);
1592         if (stripe_count_min < 1)
1593                 RETURN(-EINVAL);
1594
1595         if (lod_comp->llc_pool != NULL)
1596                 pool = lod_find_pool(lod, lod_comp->llc_pool);
1597
1598         if (pool != NULL) {
1599                 down_read(&pool_tgt_rw_sem(pool));
1600                 osts = &(pool->pool_obds);
1601         } else {
1602                 osts = &lod->lod_ost_descs.ltd_tgt_pool;
1603         }
1604
1605         /* Detect -EAGAIN early, before expensive lock is taken. */
1606         if (!ltd_qos_is_usable(&lod->lod_ost_descs))
1607                 GOTO(out_nolock, rc = -EAGAIN);
1608
1609         if (lod_comp->llc_pattern & LOV_PATTERN_OVERSTRIPING)
1610                 stripes_per_ost =
1611                         (lod_comp->llc_stripe_count - 1)/osts->op_count + 1;
1612
1613 #ifdef HAVE_DOWN_WRITE_KILLABLE
1614         if (!down_write_trylock(&lod->lod_ost_descs.ltd_qos.lq_rw_sem)) {
1615                 struct semaphore_timer timer;
1616
1617                 kernel_sigaction(SIGKILL, SIG_DFL);
1618                 timer.task = current;
1619                 cfs_timer_setup(&timer.timer, process_semaphore_timer, 0, 0);
1620                 mod_timer(&timer.timer, jiffies + cfs_time_seconds(2));
1621                 /* Do actual allocation, use write lock here. */
1622                 rc = down_write_killable(&lod->lod_ost_descs.ltd_qos.lq_rw_sem);
1623
1624                 timer_delete_sync(&timer.timer);
1625                 kernel_sigaction(SIGKILL, SIG_IGN);
1626                 if (rc) {
1627                         flush_signals(current);
1628                         CDEBUG(D_OTHER, "%s: wakeup semaphore on timeout rc = %d\n",
1629                                lod2obd(lod)->obd_name, rc);
1630                         GOTO(out_nolock, rc = -EAGAIN);
1631                 }
1632         }
1633 #else
1634         /* Do actual allocation, use write lock here. */
1635         down_write(&lod->lod_ost_descs.ltd_qos.lq_rw_sem);
1636 #endif
1637         /*
1638          * Check again, while we were sleeping on @lq_rw_sem things could
1639          * change.
1640          */
1641         if (!ltd_qos_is_usable(&lod->lod_ost_descs))
1642                 GOTO(out, rc = -EAGAIN);
1643
1644         if (pool != NULL)
1645                 rc = lod_pool_qos_penalties_calc(lod, pool);
1646         else
1647                 rc = ltd_qos_penalties_calc(&lod->lod_ost_descs);
1648         if (rc)
1649                 GOTO(out, rc);
1650
1651         rc = lod_qos_tgt_in_use_clear(env, lod_comp->llc_stripe_count);
1652         if (rc)
1653                 GOTO(out, rc);
1654
1655         good_osts = 0;
1656         /* Find all the OSTs that are valid stripe candidates */
1657         for (i = 0; i < osts->op_count; i++) {
1658                 if (!test_bit(osts->op_array[i], lod->lod_ost_bitmap))
1659                         continue;
1660
1661                 ost = OST_TGT(lod, osts->op_array[i]);
1662                 ost->ltd_qos.ltq_usable = 0;
1663
1664                 rc = lod_statfs_and_check(env, lod, &lod->lod_ost_descs,
1665                                           ost, reserve);
1666                 if (rc) {
1667                         /* this OSP doesn't feel well */
1668                         continue;
1669                 }
1670
1671                 if (ost->ltd_statfs.os_state & OS_STATFS_DEGRADED)
1672                         continue;
1673
1674                 /* Fail Check before osc_precreate() is called
1675                  * so we can only 'fail' single OSC.
1676                  */
1677                 if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) &&
1678                                    osts->op_array[i] == 0)
1679                         continue;
1680
1681                 ost->ltd_qos.ltq_usable = 1;
1682                 lu_tgt_qos_weight_calc(ost, false);
1683                 total_weight += ost->ltd_qos.ltq_weight;
1684
1685                 good_osts++;
1686         }
1687
1688         CDEBUG(D_OTHER, "found %d good osts\n", good_osts);
1689
1690         if (good_osts < stripe_count_min)
1691                 GOTO(out, rc = -EAGAIN);
1692
1693         /* If we do not have enough OSTs for the requested stripe count, do not
1694          * put more stripes per OST than requested.
1695          */
1696         if (stripe_count / stripes_per_ost > good_osts)
1697                 stripe_count = good_osts * stripes_per_ost;
1698
1699         /* Find enough OSTs with weighted random allocation. */
1700         nfound = 0;
1701         while (nfound < stripe_count) {
1702                 u64 rand, cur_weight;
1703
1704                 cur_weight = 0;
1705                 rc = -ENOSPC;
1706
1707                 rand = lu_prandom_u64_max(total_weight);
1708
1709                 /* On average, this will hit larger-weighted OSTs more often.
1710                  * 0-weight OSTs will always get used last (only when rand=0)
1711                  */
1712                 for (i = 0; i < osts->op_count; i++) {
1713                         __u32 idx = osts->op_array[i];
1714                         struct lod_tgt_desc *ost;
1715
1716                         if (lod_should_avoid_ost(lo, lag, idx))
1717                                 continue;
1718
1719                         ost = OST_TGT(lod, idx);
1720
1721                         if (!ost->ltd_qos.ltq_usable)
1722                                 continue;
1723
1724                         cur_weight += ost->ltd_qos.ltq_weight;
1725                         CDEBUG(D_OTHER, "stripe_count=%d nfound=%d cur_weight=%llu rand=%llu total_weight=%llu\n",
1726                                stripe_count, nfound, cur_weight, rand,
1727                                total_weight);
1728
1729                         if (cur_weight < rand)
1730                                 continue;
1731
1732                         CDEBUG(D_OTHER, "stripe=%d to idx=%d\n", nfound, idx);
1733                         /*
1734                          * In case of QOS it makes sense to check components
1735                          * only for FLR and if current component doesn't support
1736                          * overstriping.
1737                          */
1738                         if (lo->ldo_mirror_count > 1 &&
1739                             !(lod_comp->llc_pattern & LOV_PATTERN_OVERSTRIPING)
1740                             && lod_comp_is_ost_used(env, lo, idx))
1741                                 continue;
1742
1743                         if (lod_qos_is_tgt_used(env, idx, nfound)) {
1744                                 if (lod_comp->llc_pattern &
1745                                     LOV_PATTERN_OVERSTRIPING)
1746                                         overstriped = true;
1747                                 else
1748                                         continue;
1749                         }
1750
1751                         o = lod_qos_declare_object_on(env, lod, idx, slow, th);
1752                         if (IS_ERR(o)) {
1753                                 CDEBUG(D_OTHER, "can't declare object on #%u: %d\n",
1754                                        idx, (int) PTR_ERR(o));
1755                                 continue;
1756                         }
1757
1758                         lod_avoid_update(lo, lag);
1759                         lod_qos_tgt_in_use(env, nfound, idx);
1760                         stripe[nfound] = o;
1761                         ost_indices[nfound] = idx;
1762                         ltd_qos_update(&lod->lod_ost_descs, ost, &total_weight);
1763                         nfound++;
1764                         rc = 0;
1765                         break;
1766                 }
1767
1768                 if (rc && !slow && nfound < stripe_count) {
1769                         /* couldn't allocate using precreated objects
1770                          * so try to wait for new precreations */
1771                         slow = true;
1772                         rc = 0;
1773                 }
1774
1775                 if (rc) {
1776                         /* no OST found on this iteration, give up */
1777                         break;
1778                 }
1779         }
1780
1781         if (unlikely(nfound < stripe_count_min)) {
1782                 /*
1783                  * when the decision to use weighted algorithm was made
1784                  * we had enough appropriate OSPs, but this state can
1785                  * change anytime (no space on OST, broken connection, etc)
1786                  * so it's possible OSP won't be able to provide us with
1787                  * an object due to just changed state
1788                  */
1789                 CDEBUG(D_OTHER, "%s: wanted %d objects, found only %d\n",
1790                        lod2obd(lod)->obd_name, stripe_count, nfound);
1791                 for (i = 0; i < nfound; i++) {
1792                         LASSERT(stripe[i] != NULL);
1793                         dt_object_put(env, stripe[i]);
1794                         stripe[i] = NULL;
1795                 }
1796
1797                 /* makes sense to rebalance next time */
1798                 set_bit(LQ_DIRTY, &lod->lod_ost_descs.ltd_qos.lq_flags);
1799                 clear_bit(LQ_SAME_SPACE, &lod->lod_ost_descs.ltd_qos.lq_flags);
1800                 rc = -EAGAIN;
1801         } else if (nfound < lod_comp->llc_stripe_count) {
1802                 lod_comp->llc_stripe_count = nfound;
1803         }
1804
1805         /* If there are enough OSTs, a component with overstriping requessted
1806          * will not actually end up overstriped.  The comp should reflect this.
1807          */
1808         if (rc == 0 && !overstriped)
1809                 lod_comp->llc_pattern &= ~LOV_PATTERN_OVERSTRIPING;
1810
1811 out:
1812         up_write(&lod->lod_ost_descs.ltd_qos.lq_rw_sem);
1813
1814 out_nolock:
1815         if (pool != NULL) {
1816                 up_read(&pool_tgt_rw_sem(pool));
1817                 /* put back ref got by lod_find_pool() */
1818                 lod_pool_putref(pool);
1819         }
1820
1821         RETURN(rc);
1822 }
1823
1824 /**
1825  * Allocate a striping using an algorithm with weights.
1826  *
1827  * The function allocates remote MDT objects to create a striping, the first
1828  * object was already allocated on current MDT to ensure master object and
1829  * the first object are on the same MDT. The algorithm used is based on weights
1830  * (both free space and inodes), and it's trying to ensure the space/inodes are
1831  * used evenly by MDTs and MDSs. The striping configuration (# of stripes,
1832  * offset, pool) is taken from the object and is prepared by the caller.
1833  *
1834  * If prepared configuration can't be met due to too few MDTs, then allocation
1835  * fails.
1836  *
1837  * No concurrent allocation is allowed on the object and this must be ensured
1838  * by the caller. All the internal structures are protected by the function.
1839  *
1840  * The algorithm has two steps: find available MDTs and calculate their
1841  * weights, then select the MDTs with their weights used as the probability.
1842  * An MDT with a higher weight is proportionately more likely to be selected
1843  * than one with a lower weight.
1844  *
1845  * \param[in] env               execution environment for this thread
1846  * \param[in] lo                LOD object
1847  * \param[in] stripe_idx        starting stripe index to allocate, if it's not
1848  *                              0, we are restriping directory
1849  * \param[in] stripe_count      total stripe count
1850  * \param[out] stripes          striping created
1851  *
1852  * \retval positive     stripes allocated, and it should be equal to
1853  *                      lo->ldo_dir_stripe_count
1854  * \retval -EAGAIN      not enough tgts are found for specified stripe count
1855  * \retval -EINVAL      requested MDT index is invalid
1856  * \retval negative     errno on failure
1857  */
1858 int lod_mdt_alloc_qos(const struct lu_env *env, struct lod_object *lo,
1859                       struct dt_object **stripes, u32 stripe_idx,
1860                       u32 stripe_count)
1861 {
1862         struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1863         struct lu_tgt_descs *ltd = &lod->lod_mdt_descs;
1864         struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
1865         struct lu_fid fid = { 0 };
1866         const struct lu_tgt_pool *pool;
1867         struct lu_tgt_desc *mdt;
1868         struct dt_object *dto;
1869         u64 total_weight = 0;
1870         u32 saved_idx = stripe_idx;
1871         u32 mdt_idx;
1872         unsigned int good_mdts;
1873         unsigned int i;
1874         int rc = 0;
1875
1876         ENTRY;
1877
1878         /* Totally skip qos part when qos_threshold_rr=100% */
1879         if (ltd->ltd_qos.lq_threshold_rr == QOS_THRESHOLD_MAX)
1880                 return -EAGAIN;
1881
1882         LASSERT(stripe_idx <= stripe_count);
1883         if (stripe_idx == stripe_count)
1884                 RETURN(stripe_count);
1885
1886         /* we do not use qos for overstriping, since it will always use all the
1887          * MDTs.  So we check if it's truly needed, falling back to rr if it is,
1888          * and otherwise we remove the flag and continue
1889          */
1890         if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED) {
1891                 if (stripe_count > lod->lod_remote_mdt_count + 1)
1892                         RETURN(-EAGAIN);
1893                 lo->ldo_dir_hash_type &= ~LMV_HASH_FLAG_OVERSTRIPED;
1894         }
1895
1896         /* use MDT pool in @ltd, once MDT pool is supported in the future, it
1897          * can be passed in as argument like OST object allocation.
1898          */
1899         pool = &ltd->ltd_tgt_pool;
1900
1901         /* Detect -EAGAIN early, before expensive lock is taken. */
1902         if (!ltd_qos_is_usable(ltd))
1903                 RETURN(-EAGAIN);
1904
1905         rc = lod_qos_mdt_in_use_init(env, ltd, stripe_idx, stripe_count, pool,
1906                                      stripes);
1907         if (rc)
1908                 RETURN(rc);
1909
1910         /* Do actual allocation, use write lock here. */
1911         down_write(&ltd->ltd_qos.lq_rw_sem);
1912
1913         /*
1914          * Check again, while we were sleeping on @lq_rw_sem things could
1915          * change.
1916          */
1917         if (!ltd_qos_is_usable(ltd))
1918                 GOTO(unlock, rc = -EAGAIN);
1919
1920         rc = ltd_qos_penalties_calc(ltd);
1921         if (rc)
1922                 GOTO(unlock, rc);
1923
1924         good_mdts = 0;
1925         /* Find all the MDTs that are valid stripe candidates */
1926         for (i = 0; i < pool->op_count; i++) {
1927                 if (!test_bit(pool->op_array[i], ltd->ltd_tgt_bitmap))
1928                         continue;
1929
1930                 mdt = LTD_TGT(ltd, pool->op_array[i]);
1931                 mdt->ltd_qos.ltq_usable = 0;
1932
1933                 if (mdt->ltd_discon || lod_statfs_check(ltd, mdt))
1934                         continue;
1935
1936                 if (mdt->ltd_statfs.os_state &
1937                     (OS_STATFS_DEGRADED | OS_STATFS_NOCREATE))
1938                         continue;
1939
1940                 mdt->ltd_qos.ltq_usable = 1;
1941                 lu_tgt_qos_weight_calc(mdt, true);
1942                 total_weight += mdt->ltd_qos.ltq_weight;
1943
1944                 good_mdts++;
1945         }
1946
1947         CDEBUG(D_OTHER, "found %d good MDTs\n", good_mdts);
1948
1949         if (good_mdts < stripe_count - stripe_idx)
1950                 GOTO(unlock, rc = -EAGAIN);
1951
1952         /* Find enough MDTs with weighted random allocation. */
1953         while (stripe_idx < stripe_count) {
1954                 u64 rand, cur_weight;
1955
1956                 cur_weight = 0;
1957                 rc = -ENOSPC;
1958
1959                 rand = lu_prandom_u64_max(total_weight);
1960
1961                 /* On average, this will hit larger-weighted MDTs more often.
1962                  * 0-weight MDT will always get used last (only when rand=0) */
1963                 for (i = 0; i < pool->op_count; i++) {
1964                         int rc2;
1965
1966                         mdt_idx = pool->op_array[i];
1967                         mdt = LTD_TGT(ltd, mdt_idx);
1968
1969                         if (!mdt->ltd_qos.ltq_usable)
1970                                 continue;
1971
1972                         cur_weight += mdt->ltd_qos.ltq_weight;
1973
1974                         CDEBUG(D_OTHER, "stripe_count=%d stripe_index=%d cur_weight=%llu rand=%llu total_weight=%llu\n",
1975                                   stripe_count, stripe_idx, cur_weight, rand,
1976                                   total_weight);
1977
1978                         if (cur_weight < rand)
1979                                 continue;
1980
1981                         CDEBUG(D_OTHER, "stripe=%d to idx=%d\n",
1982                                stripe_idx, mdt_idx);
1983
1984                         if (lod_qos_is_tgt_used(env, mdt_idx, stripe_idx))
1985                                 continue;
1986
1987                         rc2 = dt_fid_alloc(env, mdt->ltd_tgt, &fid, NULL, NULL);
1988                         if (rc2 < 0) {
1989                                 CDEBUG(D_OTHER, "can't alloc FID on #%u: %d\n",
1990                                        mdt_idx, rc2);
1991                                 continue;
1992                         }
1993
1994                         conf.loc_flags = LOC_F_NEW;
1995                         dto = dt_locate_at(env, mdt->ltd_tgt, &fid,
1996                                 lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1997                                 &conf);
1998                         if (IS_ERR(dto)) {
1999                                 CDEBUG(D_OTHER, "can't alloc stripe on #%u: %d\n",
2000                                        mdt_idx, (int) PTR_ERR(dto));
2001                                 continue;
2002                         }
2003
2004                         lod_qos_tgt_in_use(env, stripe_idx, mdt_idx);
2005                         stripes[stripe_idx] = dto;
2006                         ltd_qos_update(ltd, mdt, &total_weight);
2007                         stripe_idx++;
2008                         rc = 0;
2009                         break;
2010                 }
2011
2012                 /* no MDT found on this iteration, give up */
2013                 if (rc)
2014                         break;
2015         }
2016
2017         if (unlikely(stripe_idx != stripe_count)) {
2018                 /*
2019                  * when the decision to use weighted algorithm was made
2020                  * we had enough appropriate OSPs, but this state can
2021                  * change anytime (no space on MDT, broken connection, etc)
2022                  * so it's possible OSP won't be able to provide us with
2023                  * an object due to just changed state
2024                  */
2025                 CDEBUG(D_OTHER, "%s: wanted %d objects, found only %d\n",
2026                        lod2obd(lod)->obd_name, stripe_count, stripe_idx);
2027                 for (i = saved_idx; i < stripe_idx; i++) {
2028                         LASSERT(stripes[i] != NULL);
2029                         dt_object_put(env, stripes[i]);
2030                         stripes[i] = NULL;
2031                 }
2032
2033                 /* makes sense to rebalance next time */
2034                 set_bit(LQ_DIRTY, &ltd->ltd_qos.lq_flags);
2035                 clear_bit(LQ_SAME_SPACE, &ltd->ltd_qos.lq_flags);
2036
2037                 rc = -EAGAIN;
2038         } else {
2039                 rc = stripe_idx;
2040         }
2041
2042 unlock:
2043         up_write(&ltd->ltd_qos.lq_rw_sem);
2044
2045         RETURN(rc);
2046 }
2047
2048 /**
2049  * Check stripe count the caller can use.
2050  *
2051  * For new layouts (no initialized components), check the total size of the
2052  * layout against the maximum EA size from the backing file system.  This
2053  * stops us from creating a layout which will be too large once initialized.
2054  *
2055  * For existing layouts (with initialized components):
2056  * Find the maximal possible stripe count not greater than \a stripe_count.
2057  * If the provided stripe count is 0, then the filesystem's default is used.
2058  *
2059  * \param[in] lod       LOD device
2060  * \param[in] lo        The lod_object
2061  * \param[in] comp_idx  The component id, which the amount of stripes is
2062                         calculated for
2063  * \param[in] stripe_count      count the caller would like to use
2064  *
2065  * \retval              the maximum usable stripe count
2066  */
2067 __u16 lod_get_stripe_count_plain(struct lod_device *lod, struct lod_object *lo,
2068                                  __u16 stripe_count, bool overstriping,
2069                                  enum lod_uses_hint *flags)
2070 {
2071         struct lov_desc *lov_desc = &lod->lod_ost_descs.ltd_lov_desc;
2072
2073         /* Overstriping allows more stripes than targets */
2074         if (stripe_count > lov_desc->ld_active_tgt_count) {
2075                 if (overstriping) {
2076                         if (stripe_count >= LOV_ALL_STRIPES_MIN &&
2077                                 stripe_count <= LOV_ALL_STRIPES_MAX) {
2078                                 stripe_count =
2079                                 ((stripe_count - LOV_ALL_STRIPES_MIN) + 1) *
2080                                 lov_desc->ld_active_tgt_count;
2081                         }
2082                 } else {
2083                         *flags |= LOD_USES_DEFAULT_STRIPE;
2084                         if ((stripe_count >= LOV_ALL_STRIPES_MIN &&
2085                              stripe_count <= LOV_ALL_STRIPES_MAX) &&
2086                              lod->lod_max_stripecount)
2087                                 stripe_count = lod->lod_max_stripecount;
2088                         else
2089                                 stripe_count = lov_desc->ld_active_tgt_count;
2090                 }
2091         }
2092
2093         if (!stripe_count)
2094                 stripe_count = lov_desc->ld_default_stripe_count;
2095
2096         if (overstriping && stripe_count > LOV_MAX_STRIPE_COUNT)
2097                 stripe_count = LOV_MAX_STRIPE_COUNT;
2098
2099         return stripe_count;
2100 }
2101
2102 __u16 lod_get_stripe_count(struct lod_device *lod, struct lod_object *lo,
2103                            int comp_idx, __u16 stripe_count, bool overstriping,
2104                            enum lod_uses_hint *flags)
2105 {
2106         __u32 max_stripes = LOV_MAX_STRIPE_COUNT_OLD;
2107         /* max stripe count is based on OSD ea size */
2108         unsigned int easize = lod->lod_osd_max_easize;
2109         int i;
2110
2111         ENTRY;
2112
2113         stripe_count = lod_get_stripe_count_plain(lod, lo, stripe_count,
2114                                                   overstriping, flags);
2115
2116         if (lo->ldo_is_composite) {
2117                 struct lod_layout_component *lod_comp;
2118                 unsigned int header_sz = sizeof(struct lov_comp_md_v1);
2119                 unsigned int init_comp_sz = 0;
2120                 unsigned int total_comp_sz = 0;
2121                 unsigned int comp_sz;
2122
2123                 header_sz += sizeof(struct lov_comp_md_entry_v1) *
2124                                 lo->ldo_comp_cnt;
2125
2126                 for (i = 0; i < lo->ldo_comp_cnt; i++) {
2127                         unsigned int stripes;
2128
2129                         if (i == comp_idx)
2130                                 continue;
2131
2132                         lod_comp = &lo->ldo_comp_entries[i];
2133                         /* Extension comp is never inited - 0 stripes on disk */
2134                         stripes = lod_comp->llc_flags & LCME_FL_EXTENSION ? 0 :
2135                                 lod_comp->llc_stripe_count;
2136
2137                         comp_sz = lov_mds_md_size(stripes, LOV_MAGIC_V3);
2138                         total_comp_sz += comp_sz;
2139                         if (lod_comp->llc_flags & LCME_FL_INIT)
2140                                 init_comp_sz += comp_sz;
2141                 }
2142
2143                 if (init_comp_sz > 0)
2144                         total_comp_sz = init_comp_sz;
2145
2146                 header_sz += total_comp_sz;
2147
2148                 if (easize > header_sz)
2149                         easize -= header_sz;
2150                 else
2151                         easize = 0;
2152         }
2153
2154         max_stripes = lov_mds_md_max_stripe_count(easize, LOV_MAGIC_V3);
2155         max_stripes = (max_stripes == 0) ? 0 : max_stripes - 1;
2156
2157         stripe_count = min_t(__u16, stripe_count, max_stripes);
2158         RETURN(stripe_count);
2159 }
2160
2161 /**
2162  * Create in-core respresentation for a fully-defined striping
2163  *
2164  * When the caller passes a fully-defined striping (i.e. everything including
2165  * OST object FIDs are defined), then we still need to instantiate LU-cache
2166  * with the objects representing the stripes defined. This function completes
2167  * that task.
2168  *
2169  * \param[in] env       execution environment for this thread
2170  * \param[in] mo        LOD object
2171  * \param[in] buf       buffer containing the striping
2172  *
2173  * \retval 0            on success
2174  * \retval negative     negated errno on error
2175  */
2176 int lod_use_defined_striping(const struct lu_env *env,
2177                              struct lod_object *mo,
2178                              const struct lu_buf *buf)
2179 {
2180         struct lod_layout_component *lod_comp;
2181         struct lov_mds_md_v1   *v1 = buf->lb_buf;
2182         struct lov_mds_md_v3   *v3 = buf->lb_buf;
2183         struct lov_comp_md_v1  *comp_v1 = NULL;
2184         struct lov_ost_data_v1 *objs;
2185         __u32   magic;
2186         __u16   comp_cnt;
2187         __u16   mirror_cnt;
2188         int     rc = 0, i;
2189         ENTRY;
2190
2191         mutex_lock(&mo->ldo_layout_mutex);
2192         lod_striping_free_nolock(env, mo);
2193
2194         magic = le32_to_cpu(v1->lmm_magic) & ~LOV_MAGIC_DEFINED;
2195
2196         if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3 &&
2197             magic != LOV_MAGIC_COMP_V1 && magic != LOV_MAGIC_FOREIGN)
2198                 GOTO(unlock, rc = -EINVAL);
2199
2200         if (magic == LOV_MAGIC_COMP_V1) {
2201                 comp_v1 = buf->lb_buf;
2202                 comp_cnt = le16_to_cpu(comp_v1->lcm_entry_count);
2203                 if (comp_cnt == 0)
2204                         GOTO(unlock, rc = -EINVAL);
2205                 mirror_cnt = le16_to_cpu(comp_v1->lcm_mirror_count) + 1;
2206                 mo->ldo_flr_state = le16_to_cpu(comp_v1->lcm_flags) &
2207                                         LCM_FL_FLR_MASK;
2208                 mo->ldo_is_composite = 1;
2209         } else if (magic == LOV_MAGIC_FOREIGN) {
2210                 struct lov_foreign_md *foreign;
2211                 size_t length;
2212
2213                 if (buf->lb_len < offsetof(typeof(*foreign), lfm_value)) {
2214                         CDEBUG(D_LAYOUT,
2215                                "buf len %zu < min lov_foreign_md size (%zu)\n",
2216                                buf->lb_len,
2217                                offsetof(typeof(*foreign), lfm_value));
2218                         GOTO(out, rc = -EINVAL);
2219                 }
2220                 foreign = (struct lov_foreign_md *)buf->lb_buf;
2221                 length = lov_foreign_size_le(foreign);
2222                 if (buf->lb_len < length) {
2223                         CDEBUG(D_LAYOUT,
2224                                "buf len %zu < this lov_foreign_md size (%zu)\n",
2225                                buf->lb_len, length);
2226                         GOTO(out, rc = -EINVAL);
2227                 }
2228
2229                 /* just cache foreign LOV EA raw */
2230                 rc = lod_alloc_foreign_lov(mo, length);
2231                 if (rc)
2232                         GOTO(out, rc);
2233                 memcpy(mo->ldo_foreign_lov, buf->lb_buf, length);
2234                 GOTO(out, rc);
2235         } else {
2236                 mo->ldo_is_composite = 0;
2237                 comp_cnt = 1;
2238                 mirror_cnt = 0;
2239         }
2240         mo->ldo_layout_gen = le16_to_cpu(v1->lmm_layout_gen);
2241
2242         rc = lod_alloc_comp_entries(mo, mirror_cnt, comp_cnt);
2243         if (rc)
2244                 GOTO(unlock, rc);
2245
2246         for (i = 0; i < comp_cnt; i++) {
2247                 struct lu_extent *ext;
2248                 char    *pool_name;
2249                 __u32   offs;
2250
2251                 lod_comp = &mo->ldo_comp_entries[i];
2252
2253                 if (mo->ldo_is_composite) {
2254                         offs = le32_to_cpu(comp_v1->lcm_entries[i].lcme_offset);
2255                         v1 = (struct lov_mds_md_v1 *)((char *)comp_v1 + offs);
2256                         v3 = (struct lov_mds_md_v3 *)v1;
2257                         magic = le32_to_cpu(v1->lmm_magic);
2258
2259                         ext = &comp_v1->lcm_entries[i].lcme_extent;
2260                         lod_comp->llc_extent.e_start =
2261                                 le64_to_cpu(ext->e_start);
2262                         lod_comp->llc_extent.e_end = le64_to_cpu(ext->e_end);
2263                         lod_comp->llc_flags =
2264                                 le32_to_cpu(comp_v1->lcm_entries[i].lcme_flags);
2265                         if (lod_comp->llc_flags & LCME_FL_NOSYNC)
2266                                 lod_comp->llc_timestamp = le64_to_cpu(
2267                                         comp_v1->lcm_entries[i].lcme_timestamp);
2268                         lod_comp->llc_id =
2269                                 le32_to_cpu(comp_v1->lcm_entries[i].lcme_id);
2270                         if (lod_comp->llc_id == LCME_ID_INVAL)
2271                                 GOTO(out, rc = -EINVAL);
2272
2273                         lod_comp->llc_magic = magic;
2274                         if (magic == LOV_MAGIC_FOREIGN) {
2275                                 rc = lod_init_comp_foreign(lod_comp, v1);
2276                                 if (rc)
2277                                         GOTO(out, rc);
2278                                 continue;
2279                         }
2280                 } else {
2281                         lod_comp->llc_magic = magic;
2282                 }
2283
2284                 pool_name = NULL;
2285                 if (magic == LOV_MAGIC_V1) {
2286                         objs = &v1->lmm_objects[0];
2287                 } else if (magic == LOV_MAGIC_V3) {
2288                         objs = &v3->lmm_objects[0];
2289                         if (v3->lmm_pool_name[0] != '\0')
2290                                 pool_name = v3->lmm_pool_name;
2291                 } else {
2292                         CDEBUG(D_LAYOUT, "Invalid magic %x\n", magic);
2293                         GOTO(out, rc = -EINVAL);
2294                 }
2295
2296                 lod_comp->llc_pattern = le32_to_cpu(v1->lmm_pattern);
2297                 lod_comp->llc_stripe_size = le32_to_cpu(v1->lmm_stripe_size);
2298                 lod_comp->llc_stripe_count = le16_to_cpu(v1->lmm_stripe_count);
2299                 /**
2300                  * limit stripe count so that it's less than/equal to
2301                  * extent_size / stripe_size.
2302                  *
2303                  * Note: extension size reused llc_stripe_size field and
2304                  * uninstantiated component could be defined with
2305                  * extent_start == extent_end as extension component will
2306                  * expand it later.
2307                  */
2308                 if (mo->ldo_is_composite &&
2309                     !(lod_comp->llc_flags & LCME_FL_EXTENSION) &&
2310                     (lod_comp_inited(lod_comp) ||
2311                      lod_comp->llc_extent.e_start <
2312                      lod_comp->llc_extent.e_end) &&
2313                     !(lod_comp->llc_stripe_count >= LOV_ALL_STRIPES_MIN &&
2314                       lod_comp->llc_stripe_count <= LOV_ALL_STRIPES_MAX) &&
2315                     lod_comp->llc_extent.e_end != OBD_OBJECT_EOF &&
2316                     (__u64)lod_comp->llc_stripe_count *
2317                            lod_comp->llc_stripe_size >
2318                     (lod_comp->llc_extent.e_end - lod_comp->llc_extent.e_start))
2319                         lod_comp->llc_stripe_count =
2320                                 DIV_ROUND_UP(lod_comp->llc_extent.e_end -
2321                                              lod_comp->llc_extent.e_start,
2322                                              lod_comp->llc_stripe_size);
2323                 lod_comp->llc_layout_gen = le16_to_cpu(v1->lmm_layout_gen);
2324                 /**
2325                  * The stripe_offset of an uninit-ed component is stored in
2326                  * the lmm_layout_gen
2327                  */
2328                 if (mo->ldo_is_composite && !lod_comp_inited(lod_comp))
2329                         lod_comp->llc_stripe_offset = lod_comp->llc_layout_gen;
2330                 lod_obj_set_pool(mo, i, pool_name);
2331
2332                 if ((!mo->ldo_is_composite || lod_comp_inited(lod_comp)) &&
2333                     !(lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED) &&
2334                     !(lod_comp->llc_pattern & LOV_PATTERN_MDT)) {
2335                         rc = lod_initialize_objects(env, mo, objs, i);
2336                         if (rc)
2337                                 GOTO(out, rc);
2338                 }
2339         }
2340
2341         rc = lod_fill_mirrors(mo);
2342         GOTO(out, rc);
2343 out:
2344         if (rc)
2345                 lod_striping_free_nolock(env, mo);
2346 unlock:
2347         mutex_unlock(&mo->ldo_layout_mutex);
2348
2349         RETURN(rc);
2350 }
2351
2352 void lod_qos_set_pool(struct lod_object *lo, int pos, const char *pool_name)
2353 {
2354         struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
2355         struct lod_layout_component *lod_comp;
2356         struct lod_pool_desc *pool = NULL;
2357         __u32 idx;
2358         int j, rc = 0;
2359
2360         /* In the function below, .hs_keycmp resolves to
2361          * pool_hashkey_keycmp() */
2362         if (pool_name)
2363                 pool = lod_find_pool(d, pool_name);
2364
2365         if (!pool) {
2366                 lod_obj_set_pool(lo, pos, pool_name);
2367                 return;
2368         }
2369
2370         lod_comp = &lo->ldo_comp_entries[pos];
2371         if (lod_comp->llc_stripe_offset != LOV_OFFSET_DEFAULT) {
2372                 if (lod_comp->llc_ostlist.op_count) {
2373                         for (j = 0; j < lod_comp->llc_ostlist.op_count; j++) {
2374                                 idx = lod_comp->llc_ostlist.op_array[j];
2375                                 rc = lod_check_index_in_pool(idx, pool);
2376                                 if (rc)
2377                                         break;
2378                         }
2379                 } else {
2380                         idx = lod_comp->llc_stripe_offset;
2381                         rc = lod_check_index_in_pool(idx, pool);
2382                 }
2383
2384                 if (rc) {
2385                         CDEBUG(D_LAYOUT, "%s: index %u is not in the pool %s, "
2386                                "dropping the pool\n", lod2obd(d)->obd_name,
2387                                idx, pool_name);
2388                         pool_name = NULL;
2389                 }
2390         }
2391
2392         if (pool_name &&
2393             lod_comp->llc_stripe_count > pool_tgt_count(pool) &&
2394             !(lod_comp->llc_pattern & LOV_PATTERN_OVERSTRIPING))
2395                 lod_comp->llc_stripe_count = pool_tgt_count(pool);
2396
2397         lod_pool_putref(pool);
2398         lod_obj_set_pool(lo, pos, pool_name);
2399 }
2400
2401 /**
2402  * Parse suggested striping configuration.
2403  *
2404  * The caller gets a suggested striping configuration from a number of sources
2405  * including per-directory default and applications. Then it needs to verify
2406  * the suggested striping is valid, apply missing bits and store the resulting
2407  * configuration in the object to be used by the allocator later. Must not be
2408  * called concurrently against the same object. It's OK to provide a
2409  * fully-defined striping.
2410  *
2411  * \param[in] env       execution environment for this thread
2412  * \param[in] lo        LOD object
2413  * \param[in] buf       buffer containing the striping
2414  *
2415  * \retval 0            on success
2416  * \retval negative     negated errno on error
2417  */
2418 int lod_qos_parse_config(const struct lu_env *env, struct lod_object *lo,
2419                          const struct lu_buf *buf)
2420 {
2421         struct lod_layout_component *lod_comp;
2422         struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
2423         struct lov_desc *desc = &d->lod_ost_descs.ltd_lov_desc;
2424         struct lov_user_md_v1 *v1 = NULL;
2425         struct lov_user_md_v3 *v3 = NULL;
2426         struct lov_comp_md_v1 *comp_v1 = NULL;
2427         struct lov_foreign_md *lfm = NULL;
2428         char def_pool[LOV_MAXPOOLNAME + 1];
2429         __u32 magic;
2430         __u16 comp_cnt;
2431         __u16 mirror_cnt;
2432         int i, rc;
2433         ENTRY;
2434
2435         if (buf == NULL || buf->lb_buf == NULL || buf->lb_len == 0)
2436                 RETURN(0);
2437
2438         memset(def_pool, 0, sizeof(def_pool));
2439         if (lo->ldo_comp_entries != NULL)
2440                 lod_layout_get_pool(lo->ldo_comp_entries, lo->ldo_comp_cnt,
2441                                     def_pool, sizeof(def_pool));
2442
2443         /* free default striping info */
2444         if (lo->ldo_is_foreign)
2445                 lod_free_foreign_lov(lo);
2446         else
2447                 lod_free_comp_entries(lo);
2448
2449         rc = lod_verify_striping(env, d, lo, buf, false);
2450         if (rc)
2451                 RETURN(-EINVAL);
2452
2453         v3 = buf->lb_buf;
2454         v1 = buf->lb_buf;
2455         comp_v1 = buf->lb_buf;
2456         /* {lmm,lfm}_magic position/length work for all LOV formats */
2457         magic = v1->lmm_magic;
2458
2459         if (unlikely(le32_to_cpu(magic) & LOV_MAGIC_DEFINED)) {
2460                 /* try to use as fully defined striping */
2461                 rc = lod_use_defined_striping(env, lo, buf);
2462                 RETURN(rc);
2463         }
2464
2465         switch (magic) {
2466         case __swab32(LOV_USER_MAGIC_V1):
2467                 lustre_swab_lov_user_md_v1(v1);
2468                 magic = v1->lmm_magic;
2469                 fallthrough;
2470         case LOV_USER_MAGIC_V1:
2471                 break;
2472         case __swab32(LOV_USER_MAGIC_V3):
2473                 lustre_swab_lov_user_md_v3(v3);
2474                 magic = v3->lmm_magic;
2475                 fallthrough;
2476         case LOV_USER_MAGIC_V3:
2477                 break;
2478         case __swab32(LOV_USER_MAGIC_SPECIFIC):
2479                 lustre_swab_lov_user_md_v3(v3);
2480                 lustre_swab_lov_user_md_objects(v3->lmm_objects,
2481                                                 v3->lmm_stripe_count);
2482                 magic = v3->lmm_magic;
2483                 fallthrough;
2484         case LOV_USER_MAGIC_SPECIFIC:
2485                 break;
2486         case __swab32(LOV_USER_MAGIC_COMP_V1):
2487                 lustre_swab_lov_comp_md_v1(comp_v1);
2488                 magic = comp_v1->lcm_magic;
2489                 /* fall trhough */
2490         case LOV_USER_MAGIC_COMP_V1:
2491                 break;
2492         case __swab32(LOV_USER_MAGIC_FOREIGN):
2493                 lfm = buf->lb_buf;
2494                 __swab32s(&lfm->lfm_magic);
2495                 __swab32s(&lfm->lfm_length);
2496                 __swab32s(&lfm->lfm_type);
2497                 __swab32s(&lfm->lfm_flags);
2498                 magic = lfm->lfm_magic;
2499                 fallthrough;
2500         case LOV_USER_MAGIC_FOREIGN:
2501                 if (!lfm)
2502                         lfm = buf->lb_buf;
2503                 rc = lod_alloc_foreign_lov(lo, lov_foreign_size(lfm));
2504                 if (rc)
2505                         RETURN(rc);
2506                 memcpy(lo->ldo_foreign_lov, buf->lb_buf,
2507                        lov_foreign_size(lfm));
2508                 RETURN(0);
2509         default:
2510                 CERROR("%s: unrecognized magic %X\n",
2511                        lod2obd(d)->obd_name, magic);
2512                 RETURN(-EINVAL);
2513         }
2514
2515         lustre_print_user_md(D_OTHER, v1, "parse config");
2516
2517         if (magic == LOV_USER_MAGIC_COMP_V1) {
2518                 comp_cnt = comp_v1->lcm_entry_count;
2519                 if (comp_cnt == 0)
2520                         RETURN(-EINVAL);
2521                 mirror_cnt =  comp_v1->lcm_mirror_count + 1;
2522                 if (mirror_cnt > 1)
2523                         lo->ldo_flr_state = LCM_FL_RDONLY;
2524                 lo->ldo_is_composite = 1;
2525         } else {
2526                 comp_cnt = 1;
2527                 mirror_cnt = 0;
2528                 lo->ldo_is_composite = 0;
2529         }
2530
2531         rc = lod_alloc_comp_entries(lo, mirror_cnt, comp_cnt);
2532         if (rc)
2533                 RETURN(rc);
2534
2535         LASSERT(lo->ldo_comp_entries);
2536
2537         for (i = 0; i < comp_cnt; i++) {
2538                 struct lu_extent        *ext;
2539                 char    *pool_name;
2540
2541                 lod_comp = &lo->ldo_comp_entries[i];
2542
2543                 if (lo->ldo_is_composite) {
2544                         v1 = (struct lov_user_md *)((char *)comp_v1 +
2545                                         comp_v1->lcm_entries[i].lcme_offset);
2546                         ext = &comp_v1->lcm_entries[i].lcme_extent;
2547                         lod_comp->llc_extent = *ext;
2548                         lod_comp->llc_flags =
2549                                 comp_v1->lcm_entries[i].lcme_flags &
2550                                         LCME_CL_COMP_FLAGS;
2551                 }
2552
2553                 pool_name = NULL;
2554                 if (def_pool[0] != '\0')
2555                         pool_name = def_pool;
2556
2557                 if (v1->lmm_magic == LOV_USER_MAGIC_V3 ||
2558                     v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC) {
2559                         v3 = (struct lov_user_md_v3 *)v1;
2560
2561                         if (lov_pool_is_ignored(v3->lmm_pool_name))
2562                                 pool_name = NULL;
2563                         else if (v3->lmm_pool_name[0] != '\0' &&
2564                                  !lov_pool_is_inherited(v3->lmm_pool_name))
2565                                 pool_name = v3->lmm_pool_name;
2566
2567                         if (v3->lmm_magic == LOV_USER_MAGIC_SPECIFIC) {
2568                                 rc = lod_comp_copy_ost_lists(lod_comp, v3);
2569                                 if (rc)
2570                                         GOTO(free_comp, rc);
2571                         }
2572                 }
2573
2574                 if (v1->lmm_pattern == 0)
2575                         v1->lmm_pattern = LOV_PATTERN_RAID0;
2576                 if (!lov_pattern_supported(lov_pattern(v1->lmm_pattern))) {
2577                         CDEBUG(D_LAYOUT, "%s: invalid pattern: %x\n",
2578                                lod2obd(d)->obd_name, v1->lmm_pattern);
2579                         GOTO(free_comp, rc = -EINVAL);
2580                 }
2581
2582                 lod_comp->llc_pattern = v1->lmm_pattern;
2583                 lod_comp->llc_stripe_size = v1->lmm_stripe_size;
2584                 lod_adjust_stripe_size(lod_comp, desc->ld_default_stripe_size);
2585
2586                 lod_comp->llc_stripe_count = desc->ld_default_stripe_count;
2587                 if (v1->lmm_stripe_count ||
2588                     (lov_pattern(v1->lmm_pattern) & LOV_PATTERN_MDT))
2589                         lod_comp->llc_stripe_count = v1->lmm_stripe_count;
2590
2591                 if ((lov_pattern(lod_comp->llc_pattern) & LOV_PATTERN_MDT) &&
2592                     lod_comp->llc_stripe_count != 0) {
2593                         CDEBUG(D_LAYOUT, "%s: invalid stripe count: %u\n",
2594                                lod2obd(d)->obd_name,
2595                                lod_comp->llc_stripe_count);
2596                         GOTO(free_comp, rc = -EINVAL);
2597                 }
2598                 /**
2599                  * limit stripe count so that it's less than/equal to
2600                  * extent_size / stripe_size.
2601                  *
2602                  * Note: extension size reused llc_stripe_size field and
2603                  * uninstantiated component could be defined with
2604                  * extent_start == extent_end as extension component will
2605                  * expand it later.
2606                  */
2607                 if (lo->ldo_is_composite &&
2608                     !(lod_comp->llc_flags & LCME_FL_EXTENSION) &&
2609                     !(lod_comp->llc_stripe_count >= LOV_ALL_STRIPES_MIN &&
2610                       lod_comp->llc_stripe_count <= LOV_ALL_STRIPES_MAX) &&
2611                     (lod_comp_inited(lod_comp) ||
2612                      lod_comp->llc_extent.e_start <
2613                      lod_comp->llc_extent.e_end) &&
2614                     lod_comp->llc_extent.e_end != OBD_OBJECT_EOF &&
2615                     lod_comp->llc_stripe_count * lod_comp->llc_stripe_size >
2616                     (lod_comp->llc_extent.e_end - lod_comp->llc_extent.e_start))
2617                         lod_comp->llc_stripe_count =
2618                                 DIV_ROUND_UP(lod_comp->llc_extent.e_end -
2619                                              lod_comp->llc_extent.e_start,
2620                                              lod_comp->llc_stripe_size);
2621
2622                 lod_comp->llc_stripe_offset = v1->lmm_stripe_offset;
2623                 lod_qos_set_pool(lo, i, pool_name);
2624         }
2625
2626         RETURN(0);
2627
2628 free_comp:
2629         lod_free_comp_entries(lo);
2630         RETURN(rc);
2631 }
2632
2633 /**
2634  * prepare enough OST avoidance bitmap space
2635  */
2636 static int lod_prepare_avoidance(const struct lu_env *env,
2637                                  struct lod_object *lo)
2638 {
2639         struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
2640         struct lod_avoid_guide *lag = &lod_env_info(env)->lti_avoid;
2641         unsigned long *bitmap = NULL;
2642         __u32 *new_oss = NULL;
2643
2644         lag->lag_ost_avail = lod->lod_ost_count;
2645
2646         /* reset OSS avoid guide array */
2647         lag->lag_oaa_count = 0;
2648         if (lag->lag_oss_avoid_array &&
2649             lag->lag_oaa_size < lod->lod_ost_count) {
2650                 OBD_FREE_PTR_ARRAY(lag->lag_oss_avoid_array, lag->lag_oaa_size);
2651                 lag->lag_oss_avoid_array = NULL;
2652                 lag->lag_oaa_size = 0;
2653         }
2654
2655         /* init OST avoid guide bitmap */
2656         if (lag->lag_ost_avoid_bitmap) {
2657                 if (lod->lod_ost_count <= lag->lag_ost_avoid_size) {
2658                         bitmap_zero(lag->lag_ost_avoid_bitmap,
2659                                     lag->lag_ost_avoid_size);
2660                 } else {
2661                         bitmap_free(lag->lag_ost_avoid_bitmap);
2662                         lag->lag_ost_avoid_bitmap = NULL;
2663                 }
2664         }
2665
2666         if (!lag->lag_ost_avoid_bitmap) {
2667                 bitmap = bitmap_zalloc(lod->lod_ost_count, GFP_KERNEL);
2668                 if (!bitmap)
2669                         return -ENOMEM;
2670         }
2671
2672         if (!lag->lag_oss_avoid_array) {
2673                 /**
2674                  * usually there are multiple OSTs in one OSS, but we don't
2675                  * know the exact OSS number, so we choose a safe option,
2676                  * using OST count to allocate the array to store the OSS
2677                  * id.
2678                  */
2679                 OBD_ALLOC_PTR_ARRAY(new_oss, lod->lod_ost_count);
2680                 if (!new_oss) {
2681                         bitmap_free(bitmap);
2682                         return -ENOMEM;
2683                 }
2684         }
2685
2686         if (new_oss) {
2687                 lag->lag_oss_avoid_array = new_oss;
2688                 lag->lag_oaa_size = lod->lod_ost_count;
2689         }
2690         if (bitmap) {
2691                 lag->lag_ost_avoid_bitmap = bitmap;
2692                 lag->lag_ost_avoid_size = lod->lod_ost_count;
2693         }
2694
2695         return 0;
2696 }
2697
2698 /**
2699  * Collect information of used OSTs and OSSs in the overlapped components
2700  * of other mirrors
2701  */
2702 static void lod_collect_avoidance(struct lod_object *lo,
2703                                   struct lod_avoid_guide *lag,
2704                                   int comp_idx)
2705 {
2706         struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
2707         struct lod_layout_component *lod_comp = &lo->ldo_comp_entries[comp_idx];
2708         unsigned long *bitmap = lag->lag_ost_avoid_bitmap;
2709         int i, j;
2710
2711         /* iterate components */
2712         for (i = 0; i < lo->ldo_comp_cnt; i++) {
2713                 struct lod_layout_component *comp;
2714
2715                 /**
2716                  * skip mirror containing component[comp_idx], we only
2717                  * collect OSTs info of conflicting component in other mirrors,
2718                  * so that during read, if OSTs of a mirror's component are
2719                  * not available, we still have other mirror with different
2720                  * OSTs to read the data.
2721                  */
2722                 comp = &lo->ldo_comp_entries[i];
2723                 if (comp->llc_id != LCME_ID_INVAL &&
2724                     mirror_id_of(comp->llc_id) ==
2725                                                 mirror_id_of(lod_comp->llc_id))
2726                         continue;
2727
2728                 /**
2729                  * skip non-overlapped or un-instantiated components,
2730                  * NOTE: don't use lod_comp_inited(comp) to judge
2731                  * whether @comp has been inited, since during
2732                  * declare phase, comp->llc_stripe has been allocated
2733                  * while it's init flag not been set until the exec
2734                  * phase.
2735                  */
2736                 if (!lu_extent_is_overlapped(&comp->llc_extent,
2737                                              &lod_comp->llc_extent) ||
2738                     !comp->llc_stripe)
2739                         continue;
2740
2741                 /**
2742                  * collect used OSTs index and OSS info from a
2743                  * component
2744                  */
2745                 for (j = 0; j < comp->llc_stripe_count; j++) {
2746                         struct lod_tgt_desc *ost;
2747                         struct lu_svr_qos *lsq;
2748                         int k;
2749
2750                         ost = OST_TGT(lod, comp->llc_ost_indices[j]);
2751                         lsq = ost->ltd_qos.ltq_svr;
2752
2753                         if (test_bit(ost->ltd_index, bitmap))
2754                                 continue;
2755
2756                         CDEBUG(D_OTHER, "OST%d used in conflicting mirror component\n", ost->ltd_index);
2757                         set_bit(ost->ltd_index, bitmap);
2758                         lag->lag_ost_avail--;
2759
2760                         for (k = 0; k < lag->lag_oaa_count; k++) {
2761                                 if (lag->lag_oss_avoid_array[k] ==
2762                                     lsq->lsq_id)
2763                                         break;
2764                         }
2765                         if (k == lag->lag_oaa_count) {
2766                                 lag->lag_oss_avoid_array[k] =
2767                                                         lsq->lsq_id;
2768                                 lag->lag_oaa_count++;
2769                         }
2770                 }
2771         }
2772 }
2773
2774 /**
2775  * Create a striping for an obejct.
2776  *
2777  * The function creates a new striping for the object. The function tries QoS
2778  * algorithm first unless free space is distributed evenly among OSTs, but
2779  * by default RR algorithm is preferred due to internal concurrency (QoS is
2780  * serialized). The caller must ensure no concurrent calls to the function
2781  * are made against the same object.
2782  *
2783  * \param[in] env       execution environment for this thread
2784  * \param[in] lo        LOD object
2785  * \param[in] attr      attributes OST objects will be declared with
2786  * \param[in] th        transaction handle
2787  * \param[in] comp_idx  index of ldo_comp_entries
2788  *
2789  * \retval 0            on success
2790  * \retval negative     negated errno on error
2791  */
2792 int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
2793                         struct lu_attr *attr, struct thandle *th,
2794                         int comp_idx, __u64 reserve)
2795 {
2796         struct lod_layout_component *lod_comp;
2797         struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
2798         struct lod_avoid_guide *lag = &lod_env_info(env)->lti_avoid;
2799         struct dt_object **stripe = NULL;
2800         __u32 *ost_indices = NULL;
2801         enum lod_uses_hint flags = LOD_USES_ASSIGNED_STRIPE;
2802         int stripe_len;
2803         int i, rc = 0;
2804         ENTRY;
2805
2806         LASSERT(lo);
2807         LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL);
2808         lod_comp = &lo->ldo_comp_entries[comp_idx];
2809         LASSERT(!(lod_comp->llc_flags & LCME_FL_EXTENSION));
2810
2811         /* A foreign/HSM component is being created */
2812         if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN)
2813                 RETURN(0);
2814
2815         /* A released component is being created */
2816         if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
2817                 RETURN(0);
2818
2819         /* A Data-on-MDT component is being created */
2820         if (lov_pattern(lod_comp->llc_pattern) & LOV_PATTERN_MDT)
2821                 RETURN(0);
2822
2823         if (lod_comp->llc_pool)
2824                 lod_check_and_spill_pool(env, d, &lod_comp->llc_pool);
2825
2826         if (likely(lod_comp->llc_stripe == NULL)) {
2827                 /*
2828                  * no striping has been created so far
2829                  */
2830                 LASSERT(lod_comp->llc_stripe_count);
2831                 /*
2832                  * statfs and check OST targets now, since ld_active_tgt_count
2833                  * could be changed if some OSTs are [de]activated manually.
2834                  */
2835                 lod_qos_statfs_update(env, d, &d->lod_ost_descs);
2836                 stripe_len = lod_get_stripe_count(d, lo, comp_idx,
2837                                                   lod_comp->llc_stripe_count,
2838                                                   lod_comp->llc_pattern &
2839                                                   LOV_PATTERN_OVERSTRIPING,
2840                                                   &flags);
2841
2842                 if (stripe_len == 0)
2843                         GOTO(out, rc = -ERANGE);
2844                 lod_comp->llc_stripe_count = stripe_len;
2845                 OBD_ALLOC_PTR_ARRAY(stripe, stripe_len);
2846                 if (stripe == NULL)
2847                         GOTO(out, rc = -ENOMEM);
2848                 OBD_ALLOC_PTR_ARRAY(ost_indices, stripe_len);
2849                 if (!ost_indices)
2850                         GOTO(out, rc = -ENOMEM);
2851
2852 repeat:
2853                 lod_getref(&d->lod_ost_descs);
2854                 /* XXX: support for non-0 files w/o objects */
2855                 CDEBUG(D_OTHER, "tgt_count %d stripe_count %d\n",
2856                        d->lod_ost_count, stripe_len);
2857
2858                 if (lod_comp->llc_ostlist.op_array &&
2859                     lod_comp->llc_ostlist.op_count) {
2860                         rc = lod_alloc_ost_list(env, lo, stripe, ost_indices,
2861                                                 th, comp_idx, reserve);
2862                 } else if (lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT) {
2863                         /**
2864                          * collect OSTs and OSSs used in other mirrors whose
2865                          * components cross the ldo_comp_entries[comp_idx]
2866                          */
2867                         rc = lod_prepare_avoidance(env, lo);
2868                         if (rc)
2869                                 GOTO(put_ldts, rc);
2870
2871                         CDEBUG(D_OTHER, "collecting conflict osts for comp[%d]\n",
2872                                comp_idx);
2873                         lod_collect_avoidance(lo, lag, comp_idx);
2874
2875                         rc = lod_ost_alloc_qos(env, lo, stripe, ost_indices,
2876                                                flags, th, comp_idx, reserve);
2877                         if (rc == -EAGAIN)
2878                                 rc = lod_ost_alloc_rr(env, lo, stripe,
2879                                                       ost_indices, flags, th,
2880                                                       comp_idx, reserve);
2881                 } else {
2882                         rc = lod_ost_alloc_specific(env, lo, stripe,
2883                                                     ost_indices, flags, th,
2884                                                     comp_idx, reserve);
2885                 }
2886 put_ldts:
2887                 lod_putref(d, &d->lod_ost_descs);
2888                 if (rc < 0) {
2889                         for (i = 0; i < stripe_len; i++)
2890                                 if (stripe[i] != NULL)
2891                                         dt_object_put(env, stripe[i]);
2892
2893                         /* In case there is no space on any OST, let's ignore
2894                          * the @reserve space to avoid an error at the init
2895                          * time, probably the actual IO will be less than the
2896                          * given @reserve space (aka extension_size). */
2897                         if (reserve) {
2898                                 reserve = 0;
2899                                 goto repeat;
2900                         }
2901                         lod_comp->llc_stripe_count = 0;
2902                 } else {
2903                         lod_comp->llc_layout_gen = 0;
2904                         lod_comp->llc_stripe = stripe;
2905                         lod_comp->llc_ost_indices = ost_indices;
2906                         lod_comp->llc_stripes_allocated = stripe_len;
2907                 }
2908         } else {
2909                 /*
2910                  * lod_qos_parse_config() found supplied buf as a predefined
2911                  * striping (not a hint), so it allocated all the object
2912                  * now we need to create them
2913                  */
2914                 for (i = 0; i < lod_comp->llc_stripe_count; i++) {
2915                         struct dt_object  *o;
2916
2917                         o = lod_comp->llc_stripe[i];
2918                         LASSERT(o);
2919
2920                         rc = lod_sub_declare_create(env, o, attr, NULL,
2921                                                     NULL, th);
2922                         if (rc < 0) {
2923                                 CERROR("can't declare create: %d\n", rc);
2924                                 break;
2925                         }
2926                 }
2927                 /**
2928                  * Clear LCME_FL_INIT for the component so that
2929                  * lod_striping_create() can create the striping objects
2930                  * in replay.
2931                  */
2932                 lod_comp_unset_init(lod_comp);
2933         }
2934
2935 out:
2936         if (rc < 0) {
2937                 if (stripe)
2938                         OBD_FREE_PTR_ARRAY(stripe, stripe_len);
2939                 if (ost_indices)
2940                         OBD_FREE_PTR_ARRAY(ost_indices, stripe_len);
2941         }
2942         RETURN(rc);
2943 }
2944
2945 int lod_prepare_create(const struct lu_env *env, struct lod_object *lo,
2946                        struct lu_attr *attr, const struct lu_buf *buf,
2947                        struct thandle *th)
2948
2949 {
2950         struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
2951         uint64_t size = 0;
2952         int i;
2953         int rc;
2954         ENTRY;
2955
2956         LASSERT(lo);
2957
2958         /* no OST available */
2959         /* XXX: should we be waiting a bit to prevent failures during
2960          * cluster initialization? */
2961         if (!d->lod_ost_count)
2962                 RETURN(-EIO);
2963
2964         /*
2965          * by this time, the object's ldo_stripe_count and ldo_stripe_size
2966          * contain default value for striping: taken from the parent
2967          * or from filesystem defaults
2968          *
2969          * in case the caller is passing lovea with new striping config,
2970          * we may need to parse lovea and apply new configuration
2971          */
2972         rc = lod_qos_parse_config(env, lo, buf);
2973         if (rc)
2974                 RETURN(rc);
2975
2976         if (attr->la_valid & LA_SIZE)
2977                 size = attr->la_size;
2978
2979         /**
2980          * prepare OST object creation for the component covering file's
2981          * size, the 1st component (including plain layout file) is always
2982          * instantiated.
2983          */
2984         for (i = 0; i < lo->ldo_comp_cnt; i++) {
2985                 struct lod_layout_component *lod_comp;
2986                 struct lu_extent *extent;
2987
2988                 lod_comp = &lo->ldo_comp_entries[i];
2989                 extent = &lod_comp->llc_extent;
2990                 CDEBUG(D_OTHER, "comp[%d] %lld "DEXT"\n", i, size, PEXT(extent));
2991                 if (!lo->ldo_is_composite || size >= extent->e_start) {
2992                         rc = lod_qos_prep_create(env, lo, attr, th, i, 0);
2993                         if (rc)
2994                                 break;
2995                 }
2996         }
2997
2998         RETURN(rc);
2999 }