Whamcloud - gitweb
LU-12624 lod: alloc dir stripes by QoS
[fs/lustre-release.git] / lustre / lod / lod_qos.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/lod/lod_qos.c
33  *
34  * Implementation of different allocation algorithm used
35  * to distribute objects and data among OSTs.
36  */
37
38 #define DEBUG_SUBSYSTEM S_LOV
39
40 #include <asm/div64.h>
41 #include <linux/random.h>
42
43 #include <libcfs/libcfs.h>
44 #include <uapi/linux/lustre/lustre_idl.h>
45 #include <lustre_swab.h>
46 #include <obd_class.h>
47
48 #include "lod_internal.h"
49
50 /*
51  * force QoS policy (not RR) to be used for testing purposes
52  */
53 #define FORCE_QOS_
54
55 #define D_QOS   D_OTHER
56
57 #define QOS_DEBUG(fmt, ...)     CDEBUG(D_QOS, fmt, ## __VA_ARGS__)
58 #define QOS_CONSOLE(fmt, ...)   LCONSOLE(D_QOS, fmt, ## __VA_ARGS__)
59
60 #define TGT_BAVAIL(i) (OST_TGT(lod,i)->ltd_statfs.os_bavail * \
61                        OST_TGT(lod,i)->ltd_statfs.os_bsize)
62
63 static inline int lod_statfs_check(struct lu_tgt_descs *ltd,
64                                    struct lu_tgt_desc *tgt)
65 {
66         struct obd_statfs *sfs = &tgt->ltd_statfs;
67
68         if (((sfs->os_state & OS_STATE_ENOSPC) ||
69             (!ltd->ltd_is_mdt && sfs->os_state & OS_STATE_ENOINO &&
70              sfs->os_fprecreated == 0)))
71                 return -ENOSPC;
72
73         /* If the OST is readonly then we can't allocate objects there */
74         if (sfs->os_state & OS_STATE_READONLY)
75                 return -EROFS;
76
77         /* object precreation is skipped on the OST with max_create_count=0 */
78         if (!ltd->ltd_is_mdt && sfs->os_state & OS_STATE_NOPRECREATE)
79                 return -ENOBUFS;
80
81         return 0;
82 }
83
84 /**
85  * Check whether the target is available for new objects.
86  *
87  * Request statfs data from the given target and verify it's active and not
88  * read-only. If so, then it can be used to place new objects. This
89  * function also maintains the number of active/inactive targets and sets
90  * dirty flags if those numbers change so others can run re-balance procedures.
91  * No external locking is required.
92  *
93  * \param[in] env       execution environment for this thread
94  * \param[in] d         LOD device
95  * \param[in] ltd       target table
96  * \param[in] tgt       target
97  *
98  * \retval 0            if the target is good
99  * \retval negative     negated errno on error
100  */
101 static int lod_statfs_and_check(const struct lu_env *env, struct lod_device *d,
102                                 struct lu_tgt_descs *ltd,
103                                 struct lu_tgt_desc *tgt)
104 {
105         struct lov_desc *desc = &ltd->ltd_lov_desc;
106         int rc;
107
108         LASSERT(d);
109         LASSERT(tgt);
110
111         rc = dt_statfs(env, tgt->ltd_tgt, &tgt->ltd_statfs);
112         if (rc && rc != -ENOTCONN)
113                 CERROR("%s: statfs: rc = %d\n", lod2obd(d)->obd_name, rc);
114
115         if (!rc) {
116                 rc = lod_statfs_check(ltd, tgt);
117                 if (rc == -ENOSPC)
118                         return rc;
119         }
120
121         /* check whether device has changed state (active, inactive) */
122         if (rc != 0 && tgt->ltd_active) {
123                 /* turned inactive? */
124                 spin_lock(&d->lod_lock);
125                 if (tgt->ltd_active) {
126                         tgt->ltd_active = 0;
127                         if (rc == -ENOTCONN)
128                                 tgt->ltd_connecting = 1;
129
130                         LASSERT(desc->ld_active_tgt_count > 0);
131                         desc->ld_active_tgt_count--;
132                         ltd->ltd_qos.lq_dirty = 1;
133                         ltd->ltd_qos.lq_rr.lqr_dirty = 1;
134                         CDEBUG(D_CONFIG, "%s: turns inactive\n",
135                                tgt->ltd_exp->exp_obd->obd_name);
136                 }
137                 spin_unlock(&d->lod_lock);
138         } else if (rc == 0 && tgt->ltd_active == 0) {
139                 /* turned active? */
140                 LASSERTF(desc->ld_active_tgt_count < desc->ld_tgt_count,
141                          "active tgt count %d, tgt nr %d\n",
142                          desc->ld_active_tgt_count, desc->ld_tgt_count);
143                 spin_lock(&d->lod_lock);
144                 if (tgt->ltd_active == 0) {
145                         tgt->ltd_active = 1;
146                         tgt->ltd_connecting = 0;
147                         desc->ld_active_tgt_count++;
148                         ltd->ltd_qos.lq_dirty = 1;
149                         ltd->ltd_qos.lq_rr.lqr_dirty = 1;
150                         CDEBUG(D_CONFIG, "%s: turns active\n",
151                                tgt->ltd_exp->exp_obd->obd_name);
152                 }
153                 spin_unlock(&d->lod_lock);
154         }
155
156         return rc;
157 }
158
159 static int lod_is_tgt_usable(struct lu_tgt_descs *ltd, struct lu_tgt_desc *tgt)
160 {
161         int rc;
162
163         rc = lod_statfs_check(ltd, tgt);
164         if (rc)
165                 return rc;
166
167         if (!tgt->ltd_active)
168                 return -ENOTCONN;
169
170         return 0;
171 }
172
173 /**
174  * Maintain per-target statfs data.
175  *
176  * The function refreshes statfs data for all the targets every N seconds.
177  * The actual N is controlled via procfs and set to LOV_DESC_QOS_MAXAGE_DEFAULT
178  * initially.
179  *
180  * \param[in] env       execution environment for this thread
181  * \param[in] lod       LOD device
182  * \param[in] ltd       tgt table
183  */
184 void lod_qos_statfs_update(const struct lu_env *env, struct lod_device *lod,
185                            struct lu_tgt_descs *ltd)
186 {
187         struct obd_device *obd = lod2obd(lod);
188         struct lu_tgt_desc *tgt;
189         time64_t max_age;
190         u64 avail;
191         ENTRY;
192
193         max_age = ktime_get_seconds() - 2 * ltd->ltd_lov_desc.ld_qos_maxage;
194
195         if (obd->obd_osfs_age > max_age)
196                 /* statfs data are quite recent, don't need to refresh it */
197                 RETURN_EXIT;
198
199         down_write(&ltd->ltd_qos.lq_rw_sem);
200
201         if (obd->obd_osfs_age > max_age)
202                 goto out;
203
204         ltd_foreach_tgt(ltd, tgt) {
205                 avail = tgt->ltd_statfs.os_bavail;
206                 if (lod_statfs_and_check(env, lod, ltd, tgt))
207                         continue;
208
209                 if (tgt->ltd_statfs.os_bavail != avail)
210                         /* recalculate weigths */
211                         ltd->ltd_qos.lq_dirty = 1;
212         }
213         obd->obd_osfs_age = ktime_get_seconds();
214
215 out:
216         up_write(&ltd->ltd_qos.lq_rw_sem);
217         EXIT;
218 }
219
220 #define LOV_QOS_EMPTY ((__u32)-1)
221
222 /**
223  * Calculate optimal round-robin order with regard to OSSes.
224  *
225  * Place all the OSTs from pool \a src_pool in a special array to be used for
226  * round-robin (RR) stripe allocation.  The placement algorithm interleaves
227  * OSTs from the different OSSs so that RR allocation can balance OSSs evenly.
228  * Resorts the targets when the number of active targets changes (because of
229  * a new target or activation/deactivation).
230  *
231  * \param[in] lod       LOD device
232  * \param[in] ltd       tgt table
233  * \param[in] src_pool  tgt pool
234  * \param[in] lqr       round-robin list
235  *
236  * \retval 0            on success
237  * \retval -ENOMEM      fails to allocate the array
238  */
239 static int lod_qos_calc_rr(struct lod_device *lod, struct lu_tgt_descs *ltd,
240                            const struct lu_tgt_pool *src_pool,
241                            struct lu_qos_rr *lqr)
242 {
243         struct lu_svr_qos  *svr;
244         struct lu_tgt_desc *tgt;
245         unsigned placed, real_count;
246         unsigned int i;
247         int rc;
248         ENTRY;
249
250         if (!lqr->lqr_dirty) {
251                 LASSERT(lqr->lqr_pool.op_size);
252                 RETURN(0);
253         }
254
255         /* Do actual allocation. */
256         down_write(&ltd->ltd_qos.lq_rw_sem);
257
258         /*
259          * Check again. While we were sleeping on @lq_rw_sem something could
260          * change.
261          */
262         if (!lqr->lqr_dirty) {
263                 LASSERT(lqr->lqr_pool.op_size);
264                 up_write(&ltd->ltd_qos.lq_rw_sem);
265                 RETURN(0);
266         }
267
268         real_count = src_pool->op_count;
269
270         /* Zero the pool array */
271         /* alloc_rr is holding a read lock on the pool, so nobody is adding/
272            deleting from the pool. The lq_rw_sem insures that nobody else
273            is reading. */
274         lqr->lqr_pool.op_count = real_count;
275         rc = lod_tgt_pool_extend(&lqr->lqr_pool, real_count);
276         if (rc) {
277                 up_write(&ltd->ltd_qos.lq_rw_sem);
278                 RETURN(rc);
279         }
280         for (i = 0; i < lqr->lqr_pool.op_count; i++)
281                 lqr->lqr_pool.op_array[i] = LOV_QOS_EMPTY;
282
283         /* Place all the tgts from 1 svr at the same time. */
284         placed = 0;
285         list_for_each_entry(svr, &ltd->ltd_qos.lq_svr_list, lsq_svr_list) {
286                 int j = 0;
287
288                 for (i = 0; i < lqr->lqr_pool.op_count; i++) {
289                         int next;
290
291                         if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap,
292                                               src_pool->op_array[i]))
293                                 continue;
294
295                         tgt = LTD_TGT(ltd, src_pool->op_array[i]);
296                         LASSERT(tgt && tgt->ltd_tgt);
297                         if (tgt->ltd_qos.ltq_svr != svr)
298                                 continue;
299
300                         /* Evenly space these tgts across arrayspace */
301                         next = j * lqr->lqr_pool.op_count / svr->lsq_tgt_count;
302                         while (lqr->lqr_pool.op_array[next] != LOV_QOS_EMPTY)
303                                 next = (next + 1) % lqr->lqr_pool.op_count;
304
305                         lqr->lqr_pool.op_array[next] = src_pool->op_array[i];
306                         j++;
307                         placed++;
308                 }
309         }
310
311         lqr->lqr_dirty = 0;
312         up_write(&ltd->ltd_qos.lq_rw_sem);
313
314         if (placed != real_count) {
315                 /* This should never happen */
316                 LCONSOLE_ERROR_MSG(0x14e, "Failed to place all tgts in the "
317                                    "round-robin list (%d of %d).\n",
318                                    placed, real_count);
319                 for (i = 0; i < lqr->lqr_pool.op_count; i++) {
320                         LCONSOLE(D_WARNING, "rr #%d tgt idx=%d\n", i,
321                                  lqr->lqr_pool.op_array[i]);
322                 }
323                 lqr->lqr_dirty = 1;
324                 RETURN(-EAGAIN);
325         }
326
327 #if 0
328         for (i = 0; i < lqr->lqr_pool.op_count; i++)
329                 QOS_CONSOLE("rr #%d ost idx=%d\n", i, lqr->lqr_pool.op_array[i]);
330 #endif
331
332         RETURN(0);
333 }
334
335 /**
336  * Instantiate and declare creation of a new object.
337  *
338  * The function instantiates LU representation for a new object on the
339  * specified device. Also it declares an intention to create that
340  * object on the storage target.
341  *
342  * Note lu_object_anon() is used which is a trick with regard to LU/OSD
343  * infrastructure - in the existing precreation framework we can't assign FID
344  * at this moment, we do this later once a transaction is started. So the
345  * special method instantiates FID-less object in the cache and later it
346  * will get a FID and proper placement in LU cache.
347  *
348  * \param[in] env       execution environment for this thread
349  * \param[in] d         LOD device
350  * \param[in] ost_idx   OST target index where the object is being created
351  * \param[in] th        transaction handle
352  *
353  * \retval              object ptr on success, ERR_PTR() otherwise
354  */
355 static struct dt_object *lod_qos_declare_object_on(const struct lu_env *env,
356                                                    struct lod_device *d,
357                                                    __u32 ost_idx,
358                                                    struct thandle *th)
359 {
360         struct lod_tgt_desc *ost;
361         struct lu_object *o, *n;
362         struct lu_device *nd;
363         struct dt_object *dt;
364         int               rc;
365         ENTRY;
366
367         LASSERT(d);
368         LASSERT(ost_idx < d->lod_ost_descs.ltd_tgts_size);
369         ost = OST_TGT(d,ost_idx);
370         LASSERT(ost);
371         LASSERT(ost->ltd_tgt);
372
373         nd = &ost->ltd_tgt->dd_lu_dev;
374
375         /*
376          * allocate anonymous object with zero fid, real fid
377          * will be assigned by OSP within transaction
378          * XXX: to be fixed with fully-functional OST fids
379          */
380         o = lu_object_anon(env, nd, NULL);
381         if (IS_ERR(o))
382                 GOTO(out, dt = ERR_PTR(PTR_ERR(o)));
383
384         n = lu_object_locate(o->lo_header, nd->ld_type);
385         if (unlikely(n == NULL)) {
386                 CERROR("can't find slice\n");
387                 lu_object_put(env, o);
388                 GOTO(out, dt = ERR_PTR(-EINVAL));
389         }
390
391         dt = container_of(n, struct dt_object, do_lu);
392
393         rc = lod_sub_declare_create(env, dt, NULL, NULL, NULL, th);
394         if (rc < 0) {
395                 CDEBUG(D_OTHER, "can't declare creation on #%u: %d\n",
396                        ost_idx, rc);
397                 lu_object_put(env, o);
398                 dt = ERR_PTR(rc);
399         }
400
401 out:
402         RETURN(dt);
403 }
404
405 /**
406  * Calculate a minimum acceptable stripe count.
407  *
408  * Return an acceptable stripe count depending on flag LOV_USES_DEFAULT_STRIPE:
409  * all stripes or 3/4 of stripes.
410  *
411  * \param[in] stripe_count      number of stripes requested
412  * \param[in] flags             0 or LOV_USES_DEFAULT_STRIPE
413  *
414  * \retval                      acceptable stripecount
415  */
416 static int min_stripe_count(__u32 stripe_count, int flags)
417 {
418         return (flags & LOV_USES_DEFAULT_STRIPE ?
419                 stripe_count - (stripe_count / 4) : stripe_count);
420 }
421
422 #define LOV_CREATE_RESEED_MULT 30
423 #define LOV_CREATE_RESEED_MIN  2000
424
425 /**
426  * Initialize temporary tgt-in-use array.
427  *
428  * Allocate or extend the array used to mark targets already assigned to a new
429  * striping so they are not used more than once.
430  *
431  * \param[in] env       execution environment for this thread
432  * \param[in] stripes   number of items needed in the array
433  *
434  * \retval 0            on success
435  * \retval -ENOMEM      on error
436  */
437 static inline int lod_qos_tgt_in_use_clear(const struct lu_env *env,
438                                            __u32 stripes)
439 {
440         struct lod_thread_info *info = lod_env_info(env);
441
442         if (info->lti_ea_store_size < sizeof(int) * stripes)
443                 lod_ea_store_resize(info, stripes * sizeof(int));
444         if (info->lti_ea_store_size < sizeof(int) * stripes) {
445                 CERROR("can't allocate memory for ost-in-use array\n");
446                 return -ENOMEM;
447         }
448         memset(info->lti_ea_store, -1, sizeof(int) * stripes);
449         return 0;
450 }
451
452 /**
453  * Remember a target in the array of used targets.
454  *
455  * Mark the given target as used for a new striping being created. The status
456  * of an tgt in a striping can be checked with lod_qos_is_tgt_used().
457  *
458  * \param[in] env       execution environment for this thread
459  * \param[in] idx       index in the array
460  * \param[in] tgt_idx   target index to mark as used
461  */
462 static inline void lod_qos_tgt_in_use(const struct lu_env *env,
463                                       int idx, int tgt_idx)
464 {
465         struct lod_thread_info *info = lod_env_info(env);
466         int *tgts = info->lti_ea_store;
467
468         LASSERT(info->lti_ea_store_size >= idx * sizeof(int));
469         tgts[idx] = tgt_idx;
470 }
471
472 /**
473  * Check is tgt used in a striping.
474  *
475  * Checks whether tgt with the given index is marked as used in the temporary
476  * array (see lod_qos_tgt_in_use()).
477  *
478  * \param[in] env       execution environment for this thread
479  * \param[in] tgt_idx   target index to check
480  * \param[in] stripes   the number of items used in the array already
481  *
482  * \retval 0            not used
483  * \retval 1            used
484  */
485 static int lod_qos_is_tgt_used(const struct lu_env *env, int tgt_idx,
486                                __u32 stripes)
487 {
488         struct lod_thread_info *info = lod_env_info(env);
489         int *tgts = info->lti_ea_store;
490         __u32 j;
491
492         for (j = 0; j < stripes; j++) {
493                 if (tgts[j] == tgt_idx)
494                         return 1;
495         }
496         return 0;
497 }
498
499 static inline bool
500 lod_obj_is_ost_use_skip_cb(const struct lu_env *env, struct lod_object *lo,
501                            int comp_idx, struct lod_obj_stripe_cb_data *data)
502 {
503         struct lod_layout_component *comp = &lo->ldo_comp_entries[comp_idx];
504
505         return comp->llc_ost_indices == NULL;
506 }
507
508 static inline int
509 lod_obj_is_ost_use_cb(const struct lu_env *env, struct lod_object *lo,
510                       int comp_idx, struct lod_obj_stripe_cb_data *data)
511 {
512         struct lod_layout_component *comp = &lo->ldo_comp_entries[comp_idx];
513         int i;
514
515         for (i = 0; i < comp->llc_stripe_count; i++) {
516                 if (comp->llc_ost_indices[i] == data->locd_ost_index) {
517                         data->locd_ost_index = -1;
518                         return -EEXIST;
519                 }
520         }
521
522         return 0;
523 }
524
525 /**
526  * Check is OST used in a composite layout
527  *
528  * \param[in] lo        lod object
529  * \param[in] ost       OST target index to check
530  *
531  * \retval false        not used
532  * \retval true         used
533  */
534 static inline bool lod_comp_is_ost_used(const struct lu_env *env,
535                                        struct lod_object *lo, int ost)
536 {
537         struct lod_obj_stripe_cb_data data = { { 0 } };
538
539         data.locd_ost_index = ost;
540         data.locd_comp_skip_cb = lod_obj_is_ost_use_skip_cb;
541         data.locd_comp_cb = lod_obj_is_ost_use_cb;
542
543         (void)lod_obj_for_each_stripe(env, lo, NULL, &data);
544
545         return data.locd_ost_index == -1;
546 }
547
548 static inline void lod_avoid_update(struct lod_object *lo,
549                                     struct lod_avoid_guide *lag)
550 {
551         if (!lod_is_flr(lo))
552                 return;
553
554         lag->lag_ost_avail--;
555 }
556
557 static inline bool lod_should_avoid_ost(struct lod_object *lo,
558                                         struct lod_avoid_guide *lag,
559                                         __u32 index)
560 {
561         struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
562         struct lod_tgt_desc *ost = OST_TGT(lod, index);
563         struct lu_svr_qos *lsq = ost->ltd_qos.ltq_svr;
564         bool used = false;
565         int i;
566
567         if (!cfs_bitmap_check(lod->lod_ost_bitmap, index)) {
568                 QOS_DEBUG("OST%d: been used in conflicting mirror component\n",
569                           index);
570                 return true;
571         }
572
573         /**
574          * we've tried our best, all available OSTs have been used in
575          * overlapped components in the other mirror
576          */
577         if (lag->lag_ost_avail == 0)
578                 return false;
579
580         /* check OSS use */
581         for (i = 0; i < lag->lag_oaa_count; i++) {
582                 if (lag->lag_oss_avoid_array[i] == lsq->lsq_id) {
583                         used = true;
584                         break;
585                 }
586         }
587         /**
588          * if the OSS which OST[index] resides has not been used, we'd like to
589          * use it
590          */
591         if (!used)
592                 return false;
593
594         /* if the OSS has been used, check whether the OST has been used */
595         if (!cfs_bitmap_check(lag->lag_ost_avoid_bitmap, index))
596                 used = false;
597         else
598                 QOS_DEBUG("OST%d: been used in conflicting mirror component\n",
599                           index);
600         return used;
601 }
602
603 static int lod_check_and_reserve_ost(const struct lu_env *env,
604                                      struct lod_object *lo,
605                                      struct lod_layout_component *lod_comp,
606                                      __u32 ost_idx, __u32 speed, __u32 *s_idx,
607                                      struct dt_object **stripe,
608                                      __u32 *ost_indices,
609                                      struct thandle *th,
610                                      bool *overstriped)
611 {
612         struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
613         struct lod_avoid_guide *lag = &lod_env_info(env)->lti_avoid;
614         struct lu_tgt_desc *ost = OST_TGT(lod, ost_idx);
615         struct dt_object   *o;
616         __u32 stripe_idx = *s_idx;
617         int rc;
618
619         ENTRY;
620
621         rc = lod_statfs_and_check(env, lod, &lod->lod_ost_descs, ost);
622         if (rc)
623                 RETURN(rc);
624
625         /*
626          * We expect number of precreated objects in f_ffree at
627          * the first iteration, skip OSPs with no objects ready
628          */
629         if (ost->ltd_statfs.os_fprecreated == 0 && speed == 0) {
630                 QOS_DEBUG("#%d: precreation is empty\n", ost_idx);
631                 RETURN(rc);
632         }
633
634         /*
635          * try to use another OSP if this one is degraded
636          */
637         if (ost->ltd_statfs.os_state & OS_STATE_DEGRADED && speed < 2) {
638                 QOS_DEBUG("#%d: degraded\n", ost_idx);
639                 RETURN(rc);
640         }
641
642         /*
643          * try not allocate on OST which has been used by other
644          * component
645          */
646         if (speed == 0 && lod_comp_is_ost_used(env, lo, ost_idx)) {
647                 QOS_DEBUG("iter %d: OST%d used by other component\n",
648                           speed, ost_idx);
649                 RETURN(rc);
650         }
651
652         /**
653          * try not allocate OSTs used by conflicting component of other mirrors
654          * for the first and second time.
655          */
656         if (speed < 2 && lod_should_avoid_ost(lo, lag, ost_idx)) {
657                 QOS_DEBUG("iter %d: OST%d used by conflicting mirror component\n",
658                           speed, ost_idx);
659                 RETURN(rc);
660         }
661
662         /* do not put >1 objects on a single OST, except for overstriping */
663         if (lod_qos_is_tgt_used(env, ost_idx, stripe_idx)) {
664                 if (lod_comp->llc_pattern & LOV_PATTERN_OVERSTRIPING)
665                         *overstriped = true;
666                 else
667                         RETURN(rc);
668         }
669
670         o = lod_qos_declare_object_on(env, lod, ost_idx, th);
671         if (IS_ERR(o)) {
672                 CDEBUG(D_OTHER, "can't declare new object on #%u: %d\n",
673                        ost_idx, (int) PTR_ERR(o));
674                 rc = PTR_ERR(o);
675                 RETURN(rc);
676         }
677
678         /*
679          * We've successfully declared (reserved) an object
680          */
681         lod_avoid_update(lo, lag);
682         lod_qos_tgt_in_use(env, stripe_idx, ost_idx);
683         stripe[stripe_idx] = o;
684         ost_indices[stripe_idx] = ost_idx;
685         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LOV_CREATE_RACE, 2);
686         stripe_idx++;
687         *s_idx = stripe_idx;
688
689         RETURN(rc);
690 }
691
692 /**
693  * Allocate a striping using round-robin algorithm.
694  *
695  * Allocates a new striping using round-robin algorithm. The function refreshes
696  * all the internal structures (statfs cache, array of available OSTs sorted
697  * with regard to OSS, etc). The number of stripes required is taken from the
698  * object (must be prepared by the caller), but can change if the flag
699  * LOV_USES_DEFAULT_STRIPE is supplied. The caller should ensure nobody else
700  * is trying to create a striping on the object in parallel. All the internal
701  * structures (like pools, etc) are protected and no additional locking is
702  * required. The function succeeds even if a single stripe is allocated. To save
703  * time we give priority to targets which already have objects precreated.
704  * Full OSTs are skipped (see lod_qos_dev_is_full() for the details).
705  *
706  * \param[in] env               execution environment for this thread
707  * \param[in] lo                LOD object
708  * \param[out] stripe           striping created
709  * \param[out] ost_indices      ost indices of striping created
710  * \param[in] flags             allocation flags (0 or LOV_USES_DEFAULT_STRIPE)
711  * \param[in] th                transaction handle
712  * \param[in] comp_idx          index of ldo_comp_entries
713  *
714  * \retval 0            on success
715  * \retval -ENOSPC      if not enough OSTs are found
716  * \retval negative     negated errno for other failures
717  */
718 static int lod_ost_alloc_rr(const struct lu_env *env, struct lod_object *lo,
719                             struct dt_object **stripe, __u32 *ost_indices,
720                             int flags, struct thandle *th, int comp_idx)
721 {
722         struct lod_layout_component *lod_comp;
723         struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
724         struct pool_desc  *pool = NULL;
725         struct lu_tgt_pool *osts;
726         struct lu_qos_rr *lqr;
727         unsigned int i, array_idx;
728         __u32 ost_start_idx_temp;
729         __u32 stripe_idx = 0;
730         __u32 stripe_count, stripe_count_min, ost_idx;
731         int rc, speed = 0, ost_connecting = 0;
732         int stripes_per_ost = 1;
733         bool overstriped = false;
734         ENTRY;
735
736         LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL);
737         lod_comp = &lo->ldo_comp_entries[comp_idx];
738         stripe_count = lod_comp->llc_stripe_count;
739         stripe_count_min = min_stripe_count(stripe_count, flags);
740
741         if (lod_comp->llc_pool != NULL)
742                 pool = lod_find_pool(m, lod_comp->llc_pool);
743
744         if (pool != NULL) {
745                 down_read(&pool_tgt_rw_sem(pool));
746                 osts = &(pool->pool_obds);
747                 lqr = &(pool->pool_rr);
748         } else {
749                 osts = &m->lod_ost_descs.ltd_tgt_pool;
750                 lqr = &(m->lod_ost_descs.ltd_qos.lq_rr);
751         }
752
753         rc = lod_qos_calc_rr(m, &m->lod_ost_descs, osts, lqr);
754         if (rc)
755                 GOTO(out, rc);
756
757         rc = lod_qos_tgt_in_use_clear(env, stripe_count);
758         if (rc)
759                 GOTO(out, rc);
760
761         down_read(&m->lod_ost_descs.ltd_qos.lq_rw_sem);
762         spin_lock(&lqr->lqr_alloc);
763         if (--lqr->lqr_start_count <= 0) {
764                 lqr->lqr_start_idx = prandom_u32_max(osts->op_count);
765                 lqr->lqr_start_count =
766                         (LOV_CREATE_RESEED_MIN / max(osts->op_count, 1U) +
767                          LOV_CREATE_RESEED_MULT) * max(osts->op_count, 1U);
768         } else if (stripe_count_min >= osts->op_count ||
769                         lqr->lqr_start_idx > osts->op_count) {
770                 /* If we have allocated from all of the OSTs, slowly
771                  * precess the next start if the OST/stripe count isn't
772                  * already doing this for us. */
773                 lqr->lqr_start_idx %= osts->op_count;
774                 if (stripe_count > 1 && (osts->op_count % stripe_count) != 1)
775                         ++lqr->lqr_offset_idx;
776         }
777         ost_start_idx_temp = lqr->lqr_start_idx;
778
779 repeat_find:
780
781         QOS_DEBUG("pool '%s' want %d start_idx %d start_count %d offset %d "
782                   "active %d count %d\n",
783                   lod_comp->llc_pool ? lod_comp->llc_pool : "",
784                   stripe_count, lqr->lqr_start_idx, lqr->lqr_start_count,
785                   lqr->lqr_offset_idx, osts->op_count, osts->op_count);
786
787         if (lod_comp->llc_pattern & LOV_PATTERN_OVERSTRIPING)
788                 stripes_per_ost =
789                         (lod_comp->llc_stripe_count - 1)/osts->op_count + 1;
790
791         for (i = 0; i < osts->op_count * stripes_per_ost
792              && stripe_idx < stripe_count; i++) {
793                 array_idx = (lqr->lqr_start_idx + lqr->lqr_offset_idx) %
794                                 osts->op_count;
795                 ++lqr->lqr_start_idx;
796                 ost_idx = lqr->lqr_pool.op_array[array_idx];
797
798                 QOS_DEBUG("#%d strt %d act %d strp %d ary %d idx %d\n",
799                           i, lqr->lqr_start_idx, /* XXX: active*/ 0,
800                           stripe_idx, array_idx, ost_idx);
801
802                 if ((ost_idx == LOV_QOS_EMPTY) ||
803                     !cfs_bitmap_check(m->lod_ost_bitmap, ost_idx))
804                         continue;
805
806                 /* Fail Check before osc_precreate() is called
807                    so we can only 'fail' single OSC. */
808                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && ost_idx == 0)
809                         continue;
810
811                 spin_unlock(&lqr->lqr_alloc);
812                 rc = lod_check_and_reserve_ost(env, lo, lod_comp, ost_idx,
813                                                speed, &stripe_idx, stripe,
814                                                ost_indices, th, &overstriped);
815                 spin_lock(&lqr->lqr_alloc);
816
817                 if (rc != 0 && OST_TGT(m, ost_idx)->ltd_connecting)
818                         ost_connecting = 1;
819         }
820         if ((speed < 2) && (stripe_idx < stripe_count_min)) {
821                 /* Try again, allowing slower OSCs */
822                 speed++;
823                 lqr->lqr_start_idx = ost_start_idx_temp;
824
825                 ost_connecting = 0;
826                 goto repeat_find;
827         }
828
829         spin_unlock(&lqr->lqr_alloc);
830         up_read(&m->lod_ost_descs.ltd_qos.lq_rw_sem);
831
832         /* If there are enough OSTs, a component with overstriping requested
833          * will not actually end up overstriped.  The comp should reflect this.
834          */
835         if (!overstriped)
836                 lod_comp->llc_pattern &= ~LOV_PATTERN_OVERSTRIPING;
837
838         if (stripe_idx) {
839                 lod_comp->llc_stripe_count = stripe_idx;
840                 /* at least one stripe is allocated */
841                 rc = 0;
842         } else {
843                 /* nobody provided us with a single object */
844                 if (ost_connecting)
845                         rc = -EINPROGRESS;
846                 else
847                         rc = -ENOSPC;
848         }
849
850 out:
851         if (pool != NULL) {
852                 up_read(&pool_tgt_rw_sem(pool));
853                 /* put back ref got by lod_find_pool() */
854                 lod_pool_putref(pool);
855         }
856
857         RETURN(rc);
858 }
859
860 /**
861  * Allocate a striping using round-robin algorithm.
862  *
863  * Allocates a new striping using round-robin algorithm. The function refreshes
864  * all the internal structures (statfs cache, array of available remote MDTs
865  * sorted with regard to MDS, etc). The number of stripes required is taken from
866  * the object (must be prepared by the caller). The caller should ensure nobody
867  * else is trying to create a striping on the object in parallel. All the
868  * internal structures (like pools, etc) are protected and no additional locking
869  * is required. The function succeeds even if a single stripe is allocated.
870  *
871  * \param[in] env               execution environment for this thread
872  * \param[in] lo                LOD object
873  * \param[out] stripe           striping created
874  *
875  * \retval positive     stripe objects allocated, including the first stripe
876  *                      allocated outside
877  * \retval -ENOSPC      if not enough MDTs are found
878  * \retval negative     negated errno for other failures
879  */
880 int lod_mdt_alloc_rr(const struct lu_env *env, struct lod_object *lo,
881                      struct dt_object **stripe)
882 {
883         struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
884         struct lu_tgt_descs *ltd = &lod->lod_mdt_descs;
885         struct lu_tgt_pool *pool;
886         struct lu_qos_rr *lqr;
887         struct lu_tgt_desc *mdt;
888         struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
889         struct lu_fid fid = { 0 };
890         struct dt_object *dto;
891         unsigned int pool_idx;
892         unsigned int i;
893         u32 start_idx_temp;
894         u32 stripe_count = lo->ldo_dir_stripe_count;
895         u32 stripe_idx = 1;
896         u32 mdt_idx;
897         bool use_degraded = false;
898         int tgt_connecting = 0;
899         int rc;
900
901         ENTRY;
902
903         pool = &ltd->ltd_tgt_pool;
904         lqr = &ltd->ltd_qos.lq_rr;
905         rc = lod_qos_calc_rr(lod, ltd, pool, lqr);
906         if (rc)
907                 RETURN(rc);
908
909         rc = lod_qos_tgt_in_use_clear(env, stripe_count);
910         if (rc)
911                 RETURN(rc);
912
913         down_read(&ltd->ltd_qos.lq_rw_sem);
914         spin_lock(&lqr->lqr_alloc);
915         if (--lqr->lqr_start_count <= 0) {
916                 lqr->lqr_start_idx = prandom_u32_max(pool->op_count);
917                 lqr->lqr_start_count =
918                         (LOV_CREATE_RESEED_MIN / max(pool->op_count, 1U) +
919                          LOV_CREATE_RESEED_MULT) * max(pool->op_count, 1U);
920         } else if (stripe_count - 1 >= pool->op_count ||
921                    lqr->lqr_start_idx > pool->op_count) {
922                 /* If we have allocated from all of the tgts, slowly
923                  * precess the next start if the tgt/stripe count isn't
924                  * already doing this for us. */
925                 lqr->lqr_start_idx %= pool->op_count;
926                 if (stripe_count - 1 > 1 &&
927                     (pool->op_count % (stripe_count - 1)) != 1)
928                         ++lqr->lqr_offset_idx;
929         }
930         start_idx_temp = lqr->lqr_start_idx;
931
932 repeat_find:
933         QOS_DEBUG("want %d start_idx %d start_count %d offset %d active %d count %d\n",
934                   stripe_count - 1, lqr->lqr_start_idx, lqr->lqr_start_count,
935                   lqr->lqr_offset_idx, pool->op_count, pool->op_count);
936
937         for (i = 0; i < pool->op_count && stripe_idx < stripe_count; i++) {
938                 pool_idx = (lqr->lqr_start_idx + lqr->lqr_offset_idx) %
939                             pool->op_count;
940                 ++lqr->lqr_start_idx;
941                 mdt_idx = lqr->lqr_pool.op_array[pool_idx];
942                 mdt = LTD_TGT(ltd, mdt_idx);
943
944                 QOS_DEBUG("#%d strt %d act %d strp %d ary %d idx %d\n",
945                           i, lqr->lqr_start_idx, /* XXX: active*/ 0,
946                           stripe_idx, pool_idx, mdt_idx);
947
948                 if (mdt_idx == LOV_QOS_EMPTY ||
949                     !cfs_bitmap_check(ltd->ltd_tgt_bitmap, mdt_idx))
950                         continue;
951
952                 /* do not put >1 objects on one MDT */
953                 if (lod_qos_is_tgt_used(env, mdt_idx, stripe_idx))
954                         continue;
955
956                 rc = lod_is_tgt_usable(ltd, mdt);
957                 if (rc) {
958                         if (mdt->ltd_connecting)
959                                 tgt_connecting = 1;
960                         continue;
961                 }
962
963                 /* try to use another OSP if this one is degraded */
964                 if (mdt->ltd_statfs.os_state & OS_STATE_DEGRADED &&
965                     !use_degraded) {
966                         QOS_DEBUG("#%d: degraded\n", mdt_idx);
967                         continue;
968                 }
969                 spin_unlock(&lqr->lqr_alloc);
970
971                 rc = obd_fid_alloc(env, mdt->ltd_exp, &fid, NULL);
972                 if (rc) {
973                         QOS_DEBUG("#%d: alloc FID failed: %dl\n", mdt_idx, rc);
974                         spin_lock(&lqr->lqr_alloc);
975                         continue;
976                 }
977
978                 dto = dt_locate_at(env, mdt->ltd_tgt, &fid,
979                                 lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
980                                 &conf);
981
982                 spin_lock(&lqr->lqr_alloc);
983                 if (IS_ERR(dto)) {
984                         QOS_DEBUG("can't alloc stripe on #%u: %d\n",
985                                   mdt->ltd_index, (int) PTR_ERR(dto));
986
987                         if (mdt->ltd_connecting)
988                                 tgt_connecting = 1;
989                         continue;
990                 }
991
992                 lod_qos_tgt_in_use(env, stripe_idx, mdt_idx);
993                 stripe[stripe_idx] = dto;
994                 stripe_idx++;
995         }
996
997         if (!use_degraded && stripe_idx < stripe_count) {
998                 /* Try again, allowing slower OSCs */
999                 use_degraded = true;
1000                 lqr->lqr_start_idx = start_idx_temp;
1001
1002                 tgt_connecting = 0;
1003                 goto repeat_find;
1004         }
1005         spin_unlock(&lqr->lqr_alloc);
1006         up_read(&ltd->ltd_qos.lq_rw_sem);
1007
1008         if (stripe_idx > 1)
1009                 /* at least one stripe is allocated */
1010                 RETURN(stripe_idx);
1011
1012         /* nobody provided us with a single object */
1013         if (tgt_connecting)
1014                 RETURN(-EINPROGRESS);
1015
1016         RETURN(-ENOSPC);
1017 }
1018
1019 /**
1020  * Allocate a specific striping layout on a user defined set of OSTs.
1021  *
1022  * Allocates new striping using the OST index range provided by the data from
1023  * the lmm_obejcts contained in the lov_user_md passed to this method. Full
1024  * OSTs are not considered. The exact order of OSTs requested by the user
1025  * is respected as much as possible depending on OST status. The number of
1026  * stripes needed and stripe offset are taken from the object. If that number
1027  * can not be met, then the function returns a failure and then it's the
1028  * caller's responsibility to release the stripes allocated. All the internal
1029  * structures are protected, but no concurrent allocation is allowed on the
1030  * same objects.
1031  *
1032  * \param[in] env               execution environment for this thread
1033  * \param[in] lo                LOD object
1034  * \param[out] stripe           striping created
1035  * \param[out] ost_indices      ost indices of striping created
1036  * \param[in] th                transaction handle
1037  * \param[in] comp_idx          index of ldo_comp_entries
1038  *
1039  * \retval 0            on success
1040  * \retval -ENODEV      OST index does not exist on file system
1041  * \retval -EINVAL      requested OST index is invalid
1042  * \retval negative     negated errno on error
1043  */
1044 static int lod_alloc_ost_list(const struct lu_env *env, struct lod_object *lo,
1045                               struct dt_object **stripe, __u32 *ost_indices,
1046                               struct thandle *th, int comp_idx)
1047 {
1048         struct lod_layout_component *lod_comp;
1049         struct lod_device       *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1050         struct dt_object        *o;
1051         unsigned int            array_idx = 0;
1052         int                     stripe_count = 0;
1053         int                     i;
1054         int                     rc = -EINVAL;
1055         ENTRY;
1056
1057         /* for specific OSTs layout */
1058         LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL);
1059         lod_comp = &lo->ldo_comp_entries[comp_idx];
1060         LASSERT(lod_comp->llc_ostlist.op_array);
1061         LASSERT(lod_comp->llc_ostlist.op_count);
1062
1063         rc = lod_qos_tgt_in_use_clear(env, lod_comp->llc_stripe_count);
1064         if (rc < 0)
1065                 RETURN(rc);
1066
1067         if (lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT)
1068                 lod_comp->llc_stripe_offset =
1069                                 lod_comp->llc_ostlist.op_array[0];
1070
1071         for (i = 0; i < lod_comp->llc_stripe_count; i++) {
1072                 if (lod_comp->llc_ostlist.op_array[i] ==
1073                     lod_comp->llc_stripe_offset) {
1074                         array_idx = i;
1075                         break;
1076                 }
1077         }
1078         if (i == lod_comp->llc_stripe_count) {
1079                 CDEBUG(D_OTHER,
1080                        "%s: start index %d not in the specified list of OSTs\n",
1081                        lod2obd(m)->obd_name, lod_comp->llc_stripe_offset);
1082                 RETURN(-EINVAL);
1083         }
1084
1085         for (i = 0; i < lod_comp->llc_stripe_count;
1086              i++, array_idx = (array_idx + 1) % lod_comp->llc_stripe_count) {
1087                 __u32 ost_idx = lod_comp->llc_ostlist.op_array[array_idx];
1088
1089                 if (!cfs_bitmap_check(m->lod_ost_bitmap, ost_idx)) {
1090                         rc = -ENODEV;
1091                         break;
1092                 }
1093
1094                 /* do not put >1 objects on a single OST, except for
1095                  * overstriping
1096                  */
1097                 if (lod_qos_is_tgt_used(env, ost_idx, stripe_count) &&
1098                     !(lod_comp->llc_pattern & LOV_PATTERN_OVERSTRIPING)) {
1099                         rc = -EINVAL;
1100                         break;
1101                 }
1102
1103                 rc = lod_statfs_and_check(env, m, &m->lod_ost_descs,
1104                                           LTD_TGT(&m->lod_ost_descs, ost_idx));
1105                 if (rc < 0) /* this OSP doesn't feel well */
1106                         break;
1107
1108                 o = lod_qos_declare_object_on(env, m, ost_idx, th);
1109                 if (IS_ERR(o)) {
1110                         rc = PTR_ERR(o);
1111                         CDEBUG(D_OTHER,
1112                                "%s: can't declare new object on #%u: %d\n",
1113                                lod2obd(m)->obd_name, ost_idx, rc);
1114                         break;
1115                 }
1116
1117                 /*
1118                  * We've successfully declared (reserved) an object
1119                  */
1120                 lod_qos_tgt_in_use(env, stripe_count, ost_idx);
1121                 stripe[stripe_count] = o;
1122                 ost_indices[stripe_count] = ost_idx;
1123                 stripe_count++;
1124         }
1125
1126         RETURN(rc);
1127 }
1128
1129 /**
1130  * Allocate a striping on a predefined set of OSTs.
1131  *
1132  * Allocates new layout starting from OST index in lo->ldo_stripe_offset.
1133  * Full OSTs are not considered. The exact order of OSTs is not important and
1134  * varies depending on OST status. The allocation procedure prefers the targets
1135  * with precreated objects ready. The number of stripes needed and stripe
1136  * offset are taken from the object. If that number cannot be met, then the
1137  * function returns an error and then it's the caller's responsibility to
1138  * release the stripes allocated. All the internal structures are protected,
1139  * but no concurrent allocation is allowed on the same objects.
1140  *
1141  * \param[in] env               execution environment for this thread
1142  * \param[in] lo                LOD object
1143  * \param[out] stripe           striping created
1144  * \param[out] ost_indices      ost indices of striping created
1145  * \param[in] flags             not used
1146  * \param[in] th                transaction handle
1147  * \param[in] comp_idx          index of ldo_comp_entries
1148  *
1149  * \retval 0            on success
1150  * \retval -ENOSPC      if no OST objects are available at all
1151  * \retval -EFBIG       if not enough OST objects are found
1152  * \retval -EINVAL      requested offset is invalid
1153  * \retval negative     errno on failure
1154  */
1155 static int lod_ost_alloc_specific(const struct lu_env *env,
1156                                   struct lod_object *lo,
1157                                   struct dt_object **stripe, __u32 *ost_indices,
1158                                   int flags, struct thandle *th, int comp_idx)
1159 {
1160         struct lod_layout_component *lod_comp;
1161         struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1162         struct dt_object *o;
1163         struct lu_tgt_desc *tgt;
1164         __u32 ost_idx;
1165         unsigned int i, array_idx, ost_count;
1166         int rc, stripe_num = 0;
1167         int speed = 0;
1168         struct pool_desc *pool = NULL;
1169         struct lu_tgt_pool *osts;
1170         int stripes_per_ost = 1;
1171         bool overstriped = false;
1172         ENTRY;
1173
1174         LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL);
1175         lod_comp = &lo->ldo_comp_entries[comp_idx];
1176
1177         rc = lod_qos_tgt_in_use_clear(env, lod_comp->llc_stripe_count);
1178         if (rc)
1179                 GOTO(out, rc);
1180
1181         if (lod_comp->llc_pool != NULL)
1182                 pool = lod_find_pool(m, lod_comp->llc_pool);
1183
1184         if (pool != NULL) {
1185                 down_read(&pool_tgt_rw_sem(pool));
1186                 osts = &(pool->pool_obds);
1187         } else {
1188                 osts = &m->lod_ost_descs.ltd_tgt_pool;
1189         }
1190
1191         ost_count = osts->op_count;
1192
1193 repeat_find:
1194         /* search loi_ost_idx in ost array */
1195         array_idx = 0;
1196         for (i = 0; i < ost_count; i++) {
1197                 if (osts->op_array[i] == lod_comp->llc_stripe_offset) {
1198                         array_idx = i;
1199                         break;
1200                 }
1201         }
1202         if (i == ost_count) {
1203                 CERROR("Start index %d not found in pool '%s'\n",
1204                        lod_comp->llc_stripe_offset,
1205                        lod_comp->llc_pool ? lod_comp->llc_pool : "");
1206                 GOTO(out, rc = -EINVAL);
1207         }
1208
1209         if (lod_comp->llc_pattern & LOV_PATTERN_OVERSTRIPING)
1210                 stripes_per_ost =
1211                         (lod_comp->llc_stripe_count - 1)/ost_count + 1;
1212
1213         for (i = 0; i < ost_count * stripes_per_ost;
1214                         i++, array_idx = (array_idx + 1) % ost_count) {
1215                 ost_idx = osts->op_array[array_idx];
1216
1217                 if (!cfs_bitmap_check(m->lod_ost_bitmap, ost_idx))
1218                         continue;
1219
1220                 /* Fail Check before osc_precreate() is called
1221                    so we can only 'fail' single OSC. */
1222                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && ost_idx == 0)
1223                         continue;
1224
1225                 /*
1226                  * do not put >1 objects on a single OST, except for
1227                  * overstriping, where it is intended
1228                  */
1229                 if (lod_qos_is_tgt_used(env, ost_idx, stripe_num)) {
1230                         if (lod_comp->llc_pattern & LOV_PATTERN_OVERSTRIPING)
1231                                 overstriped = true;
1232                         else
1233                                 continue;
1234                 }
1235
1236                 /*
1237                  * try not allocate on the OST used by other component
1238                  */
1239                 if (speed == 0 && i != 0 &&
1240                     lod_comp_is_ost_used(env, lo, ost_idx))
1241                         continue;
1242
1243                 tgt = LTD_TGT(&m->lod_ost_descs, ost_idx);
1244
1245                 /* Drop slow OSCs if we can, but not for requested start idx.
1246                  *
1247                  * This means "if OSC is slow and it is not the requested
1248                  * start OST, then it can be skipped, otherwise skip it only
1249                  * if it is inactive/recovering/out-of-space." */
1250
1251                 rc = lod_statfs_and_check(env, m, &m->lod_ost_descs, tgt);
1252                 if (rc) {
1253                         /* this OSP doesn't feel well */
1254                         continue;
1255                 }
1256
1257                 /*
1258                  * We expect number of precreated objects at the first
1259                  * iteration.  Skip OSPs with no objects ready.  Don't apply
1260                  * this logic to OST specified with stripe_offset.
1261                  */
1262                 if (i && !tgt->ltd_statfs.os_fprecreated && !speed)
1263                         continue;
1264
1265                 o = lod_qos_declare_object_on(env, m, ost_idx, th);
1266                 if (IS_ERR(o)) {
1267                         CDEBUG(D_OTHER, "can't declare new object on #%u: %d\n",
1268                                ost_idx, (int) PTR_ERR(o));
1269                         continue;
1270                 }
1271
1272                 /*
1273                  * We've successfully declared (reserved) an object
1274                  */
1275                 lod_qos_tgt_in_use(env, stripe_num, ost_idx);
1276                 stripe[stripe_num] = o;
1277                 ost_indices[stripe_num] = ost_idx;
1278                 stripe_num++;
1279
1280                 /* We have enough stripes */
1281                 if (stripe_num == lod_comp->llc_stripe_count)
1282                         GOTO(out, rc = 0);
1283         }
1284         if (speed < 2) {
1285                 /* Try again, allowing slower OSCs */
1286                 speed++;
1287                 goto repeat_find;
1288         }
1289
1290         /* If we were passed specific striping params, then a failure to
1291          * meet those requirements is an error, since we can't reallocate
1292          * that memory (it might be part of a larger array or something).
1293          */
1294         CERROR("can't lstripe objid "DFID": have %d want %u\n",
1295                PFID(lu_object_fid(lod2lu_obj(lo))), stripe_num,
1296                lod_comp->llc_stripe_count);
1297         rc = stripe_num == 0 ? -ENOSPC : -EFBIG;
1298
1299         /* If there are enough OSTs, a component with overstriping requessted
1300          * will not actually end up overstriped.  The comp should reflect this.
1301          */
1302         if (rc == 0 && !overstriped)
1303                 lod_comp->llc_pattern &= ~LOV_PATTERN_OVERSTRIPING;
1304
1305 out:
1306         if (pool != NULL) {
1307                 up_read(&pool_tgt_rw_sem(pool));
1308                 /* put back ref got by lod_find_pool() */
1309                 lod_pool_putref(pool);
1310         }
1311
1312         RETURN(rc);
1313 }
1314
1315 /**
1316  * Allocate a striping using an algorithm with weights.
1317  *
1318  * The function allocates OST objects to create a striping. The algorithm
1319  * used is based on weights (currently only using the free space), and it's
1320  * trying to ensure the space is used evenly by OSTs and OSSs. The striping
1321  * configuration (# of stripes, offset, pool) is taken from the object and
1322  * is prepared by the caller.
1323  *
1324  * If LOV_USES_DEFAULT_STRIPE is not passed and prepared configuration can't
1325  * be met due to too few OSTs, then allocation fails. If the flag is passed
1326  * fewer than 3/4 of the requested number of stripes can be allocated, then
1327  * allocation fails.
1328  *
1329  * No concurrent allocation is allowed on the object and this must be ensured
1330  * by the caller. All the internal structures are protected by the function.
1331  *
1332  * The algorithm has two steps: find available OSTs and calculate their
1333  * weights, then select the OSTs with their weights used as the probability.
1334  * An OST with a higher weight is proportionately more likely to be selected
1335  * than one with a lower weight.
1336  *
1337  * \param[in] env               execution environment for this thread
1338  * \param[in] lo                LOD object
1339  * \param[out] stripe           striping created
1340  * \param[out] ost_indices      ost indices of striping created
1341  * \param[in] flags             0 or LOV_USES_DEFAULT_STRIPE
1342  * \param[in] th                transaction handle
1343  * \param[in] comp_idx          index of ldo_comp_entries
1344  *
1345  * \retval 0            on success
1346  * \retval -EAGAIN      not enough OSTs are found for specified stripe count
1347  * \retval -EINVAL      requested OST index is invalid
1348  * \retval negative     errno on failure
1349  */
1350 static int lod_ost_alloc_qos(const struct lu_env *env, struct lod_object *lo,
1351                              struct dt_object **stripe, __u32 *ost_indices,
1352                              int flags, struct thandle *th, int comp_idx)
1353 {
1354         struct lod_layout_component *lod_comp;
1355         struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1356         struct lod_avoid_guide *lag = &lod_env_info(env)->lti_avoid;
1357         struct lod_tgt_desc *ost;
1358         struct dt_object *o;
1359         __u64 total_weight = 0;
1360         struct pool_desc *pool = NULL;
1361         struct lu_tgt_pool *osts;
1362         unsigned int i;
1363         __u32 nfound, good_osts, stripe_count, stripe_count_min;
1364         bool overstriped = false;
1365         int stripes_per_ost = 1;
1366         int rc = 0;
1367         ENTRY;
1368
1369         LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL);
1370         lod_comp = &lo->ldo_comp_entries[comp_idx];
1371         stripe_count = lod_comp->llc_stripe_count;
1372         stripe_count_min = min_stripe_count(stripe_count, flags);
1373         if (stripe_count_min < 1)
1374                 RETURN(-EINVAL);
1375
1376         if (lod_comp->llc_pool != NULL)
1377                 pool = lod_find_pool(lod, lod_comp->llc_pool);
1378
1379         if (pool != NULL) {
1380                 down_read(&pool_tgt_rw_sem(pool));
1381                 osts = &(pool->pool_obds);
1382         } else {
1383                 osts = &lod->lod_ost_descs.ltd_tgt_pool;
1384         }
1385
1386         /* Detect -EAGAIN early, before expensive lock is taken. */
1387         if (!ltd_qos_is_usable(&lod->lod_ost_descs))
1388                 GOTO(out_nolock, rc = -EAGAIN);
1389
1390         if (lod_comp->llc_pattern & LOV_PATTERN_OVERSTRIPING)
1391                 stripes_per_ost =
1392                         (lod_comp->llc_stripe_count - 1)/osts->op_count + 1;
1393
1394         /* Do actual allocation, use write lock here. */
1395         down_write(&lod->lod_ost_descs.ltd_qos.lq_rw_sem);
1396
1397         /*
1398          * Check again, while we were sleeping on @lq_rw_sem things could
1399          * change.
1400          */
1401         if (!ltd_qos_is_usable(&lod->lod_ost_descs))
1402                 GOTO(out, rc = -EAGAIN);
1403
1404         rc = ltd_qos_penalties_calc(&lod->lod_ost_descs);
1405         if (rc)
1406                 GOTO(out, rc);
1407
1408         rc = lod_qos_tgt_in_use_clear(env, lod_comp->llc_stripe_count);
1409         if (rc)
1410                 GOTO(out, rc);
1411
1412         good_osts = 0;
1413         /* Find all the OSTs that are valid stripe candidates */
1414         for (i = 0; i < osts->op_count; i++) {
1415                 if (!cfs_bitmap_check(lod->lod_ost_bitmap, osts->op_array[i]))
1416                         continue;
1417
1418                 ost = OST_TGT(lod, osts->op_array[i]);
1419                 ost->ltd_qos.ltq_usable = 0;
1420
1421                 rc = lod_statfs_and_check(env, lod, &lod->lod_ost_descs, ost);
1422                 if (rc) {
1423                         /* this OSP doesn't feel well */
1424                         continue;
1425                 }
1426
1427                 if (ost->ltd_statfs.os_state & OS_STATE_DEGRADED)
1428                         continue;
1429
1430                 /* Fail Check before osc_precreate() is called
1431                  * so we can only 'fail' single OSC.
1432                  */
1433                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) &&
1434                                    osts->op_array[i] == 0)
1435                         continue;
1436
1437                 ost->ltd_qos.ltq_usable = 1;
1438                 lu_tgt_qos_weight_calc(ost);
1439                 total_weight += ost->ltd_qos.ltq_weight;
1440
1441                 good_osts++;
1442         }
1443
1444         QOS_DEBUG("found %d good osts\n", good_osts);
1445
1446         if (good_osts < stripe_count_min)
1447                 GOTO(out, rc = -EAGAIN);
1448
1449         /* If we do not have enough OSTs for the requested stripe count, do not
1450          * put more stripes per OST than requested.
1451          */
1452         if (stripe_count / stripes_per_ost > good_osts)
1453                 stripe_count = good_osts * stripes_per_ost;
1454
1455         /* Find enough OSTs with weighted random allocation. */
1456         nfound = 0;
1457         while (nfound < stripe_count) {
1458                 u64 rand, cur_weight;
1459
1460                 cur_weight = 0;
1461                 rc = -ENOSPC;
1462
1463                 rand = lu_prandom_u64_max(total_weight);
1464
1465                 /* On average, this will hit larger-weighted OSTs more often.
1466                  * 0-weight OSTs will always get used last (only when rand=0)
1467                  */
1468                 for (i = 0; i < osts->op_count; i++) {
1469                         __u32 idx = osts->op_array[i];
1470
1471                         if (lod_should_avoid_ost(lo, lag, idx))
1472                                 continue;
1473
1474                         ost = OST_TGT(lod, idx);
1475
1476                         if (!ost->ltd_qos.ltq_usable)
1477                                 continue;
1478
1479                         cur_weight += ost->ltd_qos.ltq_weight;
1480                         QOS_DEBUG("stripe_count=%d nfound=%d cur_weight=%llu "
1481                                   "rand=%llu total_weight=%llu\n",
1482                                   stripe_count, nfound, cur_weight, rand,
1483                                   total_weight);
1484
1485                         if (cur_weight < rand)
1486                                 continue;
1487
1488                         QOS_DEBUG("stripe=%d to idx=%d\n", nfound, idx);
1489                         /*
1490                          * do not put >1 objects on a single OST, except for
1491                          * overstriping
1492                          */
1493                         if ((lod_comp_is_ost_used(env, lo, idx)) &&
1494                             !(lod_comp->llc_pattern & LOV_PATTERN_OVERSTRIPING))
1495                                 continue;
1496
1497                         if (lod_qos_is_tgt_used(env, idx, nfound)) {
1498                                 if (lod_comp->llc_pattern &
1499                                     LOV_PATTERN_OVERSTRIPING)
1500                                         overstriped = true;
1501                                 else
1502                                         continue;
1503                         }
1504
1505                         o = lod_qos_declare_object_on(env, lod, idx, th);
1506                         if (IS_ERR(o)) {
1507                                 QOS_DEBUG("can't declare object on #%u: %d\n",
1508                                           idx, (int) PTR_ERR(o));
1509                                 continue;
1510                         }
1511
1512                         lod_avoid_update(lo, lag);
1513                         lod_qos_tgt_in_use(env, nfound, idx);
1514                         stripe[nfound] = o;
1515                         ost_indices[nfound] = idx;
1516                         ltd_qos_update(&lod->lod_ost_descs, ost, &total_weight);
1517                         nfound++;
1518                         rc = 0;
1519                         break;
1520                 }
1521
1522                 if (rc) {
1523                         /* no OST found on this iteration, give up */
1524                         break;
1525                 }
1526         }
1527
1528         if (unlikely(nfound != stripe_count)) {
1529                 /*
1530                  * when the decision to use weighted algorithm was made
1531                  * we had enough appropriate OSPs, but this state can
1532                  * change anytime (no space on OST, broken connection, etc)
1533                  * so it's possible OSP won't be able to provide us with
1534                  * an object due to just changed state
1535                  */
1536                 QOS_DEBUG("%s: wanted %d objects, found only %d\n",
1537                           lod2obd(lod)->obd_name, stripe_count, nfound);
1538                 for (i = 0; i < nfound; i++) {
1539                         LASSERT(stripe[i] != NULL);
1540                         dt_object_put(env, stripe[i]);
1541                         stripe[i] = NULL;
1542                 }
1543
1544                 /* makes sense to rebalance next time */
1545                 lod->lod_ost_descs.ltd_qos.lq_dirty = 1;
1546                 lod->lod_ost_descs.ltd_qos.lq_same_space = 0;
1547
1548                 rc = -EAGAIN;
1549         }
1550
1551         /* If there are enough OSTs, a component with overstriping requessted
1552          * will not actually end up overstriped.  The comp should reflect this.
1553          */
1554         if (rc == 0 && !overstriped)
1555                 lod_comp->llc_pattern &= ~LOV_PATTERN_OVERSTRIPING;
1556
1557 out:
1558         up_write(&lod->lod_ost_descs.ltd_qos.lq_rw_sem);
1559
1560 out_nolock:
1561         if (pool != NULL) {
1562                 up_read(&pool_tgt_rw_sem(pool));
1563                 /* put back ref got by lod_find_pool() */
1564                 lod_pool_putref(pool);
1565         }
1566
1567         RETURN(rc);
1568 }
1569
1570 /**
1571  * Allocate a striping using an algorithm with weights.
1572  *
1573  * The function allocates remote MDT objects to create a striping, the first
1574  * object was already allocated on current MDT to ensure master object and
1575  * the first object are on the same MDT. The algorithm used is based on weights
1576  * (both free space and inodes), and it's trying to ensure the space/inodes are
1577  * used evenly by MDTs and MDSs. The striping configuration (# of stripes,
1578  * offset, pool) is taken from the object and is prepared by the caller.
1579  *
1580  * If prepared configuration can't be met due to too few MDTs, then allocation
1581  * fails.
1582  *
1583  * No concurrent allocation is allowed on the object and this must be ensured
1584  * by the caller. All the internal structures are protected by the function.
1585  *
1586  * The algorithm has two steps: find available MDTs and calculate their
1587  * weights, then select the MDTs with their weights used as the probability.
1588  * An MDT with a higher weight is proportionately more likely to be selected
1589  * than one with a lower weight.
1590  *
1591  * \param[in] env               execution environment for this thread
1592  * \param[in] lo                LOD object
1593  * \param[out] stripes          striping created
1594  *
1595  * \retval positive     stripes allocated, and it should be equal to
1596  *                      lo->ldo_dir_stripe_count
1597  * \retval -EAGAIN      not enough tgts are found for specified stripe count
1598  * \retval -EINVAL      requested MDT index is invalid
1599  * \retval negative     errno on failure
1600  */
1601 int lod_mdt_alloc_qos(const struct lu_env *env, struct lod_object *lo,
1602                       struct dt_object **stripes)
1603 {
1604         struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1605         struct lu_tgt_descs *ltd = &lod->lod_mdt_descs;
1606         struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
1607         struct lu_fid fid = { 0 };
1608         const struct lu_tgt_pool *pool;
1609         struct lu_tgt_desc *mdt;
1610         struct dt_object *dto;
1611         u64 total_weight = 0;
1612         u32 stripe_count = lo->ldo_dir_stripe_count;
1613         unsigned int nfound;
1614         unsigned int good_mdts;
1615         unsigned int i;
1616         int rc = 0;
1617
1618         ENTRY;
1619
1620         if (stripe_count == 1)
1621                 RETURN(1);
1622
1623         pool = &ltd->ltd_tgt_pool;
1624
1625         /* Detect -EAGAIN early, before expensive lock is taken. */
1626         if (!ltd_qos_is_usable(ltd))
1627                 RETURN(-EAGAIN);
1628
1629         /* Do actual allocation, use write lock here. */
1630         down_write(&ltd->ltd_qos.lq_rw_sem);
1631
1632         /*
1633          * Check again, while we were sleeping on @lq_rw_sem things could
1634          * change.
1635          */
1636         if (!ltd_qos_is_usable(ltd))
1637                 GOTO(unlock, rc = -EAGAIN);
1638
1639         rc = ltd_qos_penalties_calc(ltd);
1640         if (rc)
1641                 GOTO(unlock, rc);
1642
1643         rc = lod_qos_tgt_in_use_clear(env, stripe_count);
1644         if (rc)
1645                 GOTO(unlock, rc);
1646
1647         good_mdts = 0;
1648         /* Find all the tgts that are valid stripe candidates */
1649         for (i = 0; i < pool->op_count; i++) {
1650                 if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, pool->op_array[i]))
1651                         continue;
1652
1653                 mdt = LTD_TGT(ltd, pool->op_array[i]);
1654                 mdt->ltd_qos.ltq_usable = 0;
1655
1656                 rc = lod_is_tgt_usable(ltd, mdt);
1657                 if (rc)
1658                         continue;
1659
1660                 if (mdt->ltd_statfs.os_state & OS_STATE_DEGRADED)
1661                         continue;
1662
1663                 mdt->ltd_qos.ltq_usable = 1;
1664                 lu_tgt_qos_weight_calc(mdt);
1665                 total_weight += mdt->ltd_qos.ltq_weight;
1666
1667                 good_mdts++;
1668         }
1669
1670         QOS_DEBUG("found %d good tgts\n", good_mdts);
1671
1672         if (good_mdts < stripe_count - 1)
1673                 GOTO(unlock, rc = -EAGAIN);
1674
1675         /* Find enough tgts with weighted random allocation. */
1676         nfound = 1;
1677         while (nfound < stripe_count) {
1678                 u64 rand, cur_weight;
1679
1680                 cur_weight = 0;
1681                 rc = -ENOSPC;
1682
1683                 rand = lu_prandom_u64_max(total_weight);
1684
1685                 /* On average, this will hit larger-weighted tgts more often.
1686                  * 0-weight tgts will always get used last (only when rand=0) */
1687                 for (i = 0; i < pool->op_count; i++) {
1688                         __u32 idx = pool->op_array[i];
1689                         int rc2;
1690
1691                         mdt = LTD_TGT(ltd, idx);
1692
1693                         if (!mdt->ltd_qos.ltq_usable)
1694                                 continue;
1695
1696                         cur_weight += mdt->ltd_qos.ltq_weight;
1697
1698                         QOS_DEBUG("idx=%d nfound=%d cur_weight=%llu rand=%llu total_weight=%llu\n",
1699                                   idx, nfound, cur_weight, rand,
1700                                   total_weight);
1701
1702                         if (cur_weight < rand)
1703                                 continue;
1704
1705                         QOS_DEBUG("stripe=%d to idx=%d\n", nfound, idx);
1706
1707                         if (lod_qos_is_tgt_used(env, idx, nfound))
1708                                 continue;
1709
1710                         rc2 = obd_fid_alloc(env, mdt->ltd_exp, &fid, NULL);
1711                         if (rc2) {
1712                                 QOS_DEBUG("can't alloc FID on #%u: %d\n",
1713                                           idx, rc2);
1714                                 continue;
1715                         }
1716
1717                         conf.loc_flags = LOC_F_NEW;
1718                         dto = dt_locate_at(env, mdt->ltd_tgt, &fid,
1719                                 lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1720                                 &conf);
1721                         if (IS_ERR(dto)) {
1722                                 QOS_DEBUG("can't alloc stripe on #%u: %d\n",
1723                                           idx, (int) PTR_ERR(dto));
1724                                 continue;
1725                         }
1726
1727                         lod_qos_tgt_in_use(env, nfound, idx);
1728                         stripes[nfound] = dto;
1729                         ltd_qos_update(ltd, mdt, &total_weight);
1730                         nfound++;
1731                         rc = 0;
1732                         break;
1733                 }
1734
1735                 /* no MDT found on this iteration, give up */
1736                 if (rc)
1737                         break;
1738         }
1739
1740         if (unlikely(nfound != stripe_count)) {
1741                 /*
1742                  * when the decision to use weighted algorithm was made
1743                  * we had enough appropriate OSPs, but this state can
1744                  * change anytime (no space on MDT, broken connection, etc)
1745                  * so it's possible OSP won't be able to provide us with
1746                  * an object due to just changed state
1747                  */
1748                 QOS_DEBUG("%s: wanted %d objects, found only %d\n",
1749                           lod2obd(lod)->obd_name, stripe_count, nfound);
1750                 for (i = 1; i < nfound; i++) {
1751                         LASSERT(stripes[i] != NULL);
1752                         dt_object_put(env, stripes[i]);
1753                         stripes[i] = NULL;
1754                 }
1755
1756                 /* makes sense to rebalance next time */
1757                 ltd->ltd_qos.lq_dirty = 1;
1758                 ltd->ltd_qos.lq_same_space = 0;
1759
1760                 rc = -EAGAIN;
1761         } else {
1762                 rc = nfound;
1763         }
1764
1765 unlock:
1766         up_write(&ltd->ltd_qos.lq_rw_sem);
1767
1768         RETURN(rc);
1769 }
1770
1771 /**
1772  * Check stripe count the caller can use.
1773  *
1774  * For new layouts (no initialized components), check the total size of the
1775  * layout against the maximum EA size from the backing file system.  This
1776  * stops us from creating a layout which will be too large once initialized.
1777  *
1778  * For existing layouts (with initialized components):
1779  * Find the maximal possible stripe count not greater than \a stripe_count.
1780  * If the provided stripe count is 0, then the filesystem's default is used.
1781  *
1782  * \param[in] lod       LOD device
1783  * \param[in] lo        The lod_object
1784  * \param[in] stripe_count      count the caller would like to use
1785  *
1786  * \retval              the maximum usable stripe count
1787  */
1788 __u16 lod_get_stripe_count(struct lod_device *lod, struct lod_object *lo,
1789                            __u16 stripe_count, bool overstriping)
1790 {
1791         __u32 max_stripes = LOV_MAX_STRIPE_COUNT_OLD;
1792         /* max stripe count is based on OSD ea size */
1793         unsigned int easize = lod->lod_osd_max_easize;
1794         int i;
1795
1796
1797         if (!stripe_count)
1798                 stripe_count =
1799                         lod->lod_ost_descs.ltd_lov_desc.ld_default_stripe_count;
1800         if (!stripe_count)
1801                 stripe_count = 1;
1802         /* Overstriping allows more stripes than targets */
1803         if (stripe_count >
1804                 lod->lod_ost_descs.ltd_lov_desc.ld_active_tgt_count &&
1805             !overstriping)
1806                 stripe_count =
1807                         lod->lod_ost_descs.ltd_lov_desc.ld_active_tgt_count;
1808
1809         if (lo->ldo_is_composite) {
1810                 struct lod_layout_component *lod_comp;
1811                 unsigned int header_sz = sizeof(struct lov_comp_md_v1);
1812                 unsigned int init_comp_sz = 0;
1813                 unsigned int total_comp_sz = 0;
1814                 unsigned int comp_sz;
1815
1816                 header_sz += sizeof(struct lov_comp_md_entry_v1) *
1817                                 lo->ldo_comp_cnt;
1818
1819                 for (i = 0; i < lo->ldo_comp_cnt; i++) {
1820                         lod_comp = &lo->ldo_comp_entries[i];
1821                         comp_sz = lov_mds_md_size(lod_comp->llc_stripe_count,
1822                                                   LOV_MAGIC_V3);
1823                         total_comp_sz += comp_sz;
1824                         if (lod_comp->llc_flags & LCME_FL_INIT)
1825                                 init_comp_sz += comp_sz;
1826                 }
1827
1828                 if (init_comp_sz > 0)
1829                         total_comp_sz = init_comp_sz;
1830
1831                 header_sz += total_comp_sz;
1832
1833                 if (easize > header_sz)
1834                         easize -= header_sz;
1835                 else
1836                         easize = 0;
1837         }
1838
1839         max_stripes = lov_mds_md_max_stripe_count(easize, LOV_MAGIC_V3);
1840
1841         return (stripe_count < max_stripes) ? stripe_count : max_stripes;
1842 }
1843
1844 /**
1845  * Create in-core respresentation for a fully-defined striping
1846  *
1847  * When the caller passes a fully-defined striping (i.e. everything including
1848  * OST object FIDs are defined), then we still need to instantiate LU-cache
1849  * with the objects representing the stripes defined. This function completes
1850  * that task.
1851  *
1852  * \param[in] env       execution environment for this thread
1853  * \param[in] mo        LOD object
1854  * \param[in] buf       buffer containing the striping
1855  *
1856  * \retval 0            on success
1857  * \retval negative     negated errno on error
1858  */
1859 int lod_use_defined_striping(const struct lu_env *env,
1860                              struct lod_object *mo,
1861                              const struct lu_buf *buf)
1862 {
1863         struct lod_layout_component *lod_comp;
1864         struct lov_mds_md_v1   *v1 = buf->lb_buf;
1865         struct lov_mds_md_v3   *v3 = buf->lb_buf;
1866         struct lov_comp_md_v1  *comp_v1 = NULL;
1867         struct lov_ost_data_v1 *objs;
1868         __u32   magic;
1869         __u16   comp_cnt;
1870         __u16   mirror_cnt;
1871         int     rc = 0, i;
1872         ENTRY;
1873
1874         mutex_lock(&mo->ldo_layout_mutex);
1875         lod_striping_free_nolock(env, mo);
1876
1877         magic = le32_to_cpu(v1->lmm_magic) & ~LOV_MAGIC_DEFINED;
1878
1879         if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3 &&
1880             magic != LOV_MAGIC_COMP_V1 && magic != LOV_MAGIC_FOREIGN)
1881                 GOTO(unlock, rc = -EINVAL);
1882
1883         if (magic == LOV_MAGIC_COMP_V1) {
1884                 comp_v1 = buf->lb_buf;
1885                 comp_cnt = le16_to_cpu(comp_v1->lcm_entry_count);
1886                 if (comp_cnt == 0)
1887                         GOTO(unlock, rc = -EINVAL);
1888                 mirror_cnt = le16_to_cpu(comp_v1->lcm_mirror_count) + 1;
1889                 mo->ldo_flr_state = le16_to_cpu(comp_v1->lcm_flags) &
1890                                         LCM_FL_FLR_MASK;
1891                 mo->ldo_is_composite = 1;
1892         } else if (magic == LOV_MAGIC_FOREIGN) {
1893                 struct lov_foreign_md *foreign;
1894                 size_t length;
1895
1896                 if (buf->lb_len < offsetof(typeof(*foreign), lfm_value)) {
1897                         CDEBUG(D_LAYOUT,
1898                                "buf len %zu < min lov_foreign_md size (%zu)\n",
1899                                buf->lb_len,
1900                                offsetof(typeof(*foreign), lfm_value));
1901                         GOTO(out, rc = -EINVAL);
1902                 }
1903                 foreign = (struct lov_foreign_md *)buf->lb_buf;
1904                 length = foreign_size_le(foreign);
1905                 if (buf->lb_len < length) {
1906                         CDEBUG(D_LAYOUT,
1907                                "buf len %zu < this lov_foreign_md size (%zu)\n",
1908                                buf->lb_len, length);
1909                         GOTO(out, rc = -EINVAL);
1910                 }
1911
1912                 /* just cache foreign LOV EA raw */
1913                 rc = lod_alloc_foreign_lov(mo, length);
1914                 if (rc)
1915                         GOTO(out, rc);
1916                 memcpy(mo->ldo_foreign_lov, buf->lb_buf, length);
1917                 GOTO(out, rc);
1918         } else {
1919                 mo->ldo_is_composite = 0;
1920                 comp_cnt = 1;
1921                 mirror_cnt = 0;
1922         }
1923         mo->ldo_layout_gen = le16_to_cpu(v1->lmm_layout_gen);
1924
1925         rc = lod_alloc_comp_entries(mo, mirror_cnt, comp_cnt);
1926         if (rc)
1927                 GOTO(unlock, rc);
1928
1929         for (i = 0; i < comp_cnt; i++) {
1930                 struct lu_extent *ext;
1931                 char    *pool_name;
1932                 __u32   offs;
1933
1934                 lod_comp = &mo->ldo_comp_entries[i];
1935
1936                 if (mo->ldo_is_composite) {
1937                         offs = le32_to_cpu(comp_v1->lcm_entries[i].lcme_offset);
1938                         v1 = (struct lov_mds_md_v1 *)((char *)comp_v1 + offs);
1939                         v3 = (struct lov_mds_md_v3 *)v1;
1940                         magic = le32_to_cpu(v1->lmm_magic);
1941
1942                         ext = &comp_v1->lcm_entries[i].lcme_extent;
1943                         lod_comp->llc_extent.e_start =
1944                                 le64_to_cpu(ext->e_start);
1945                         lod_comp->llc_extent.e_end = le64_to_cpu(ext->e_end);
1946                         lod_comp->llc_flags =
1947                                 le32_to_cpu(comp_v1->lcm_entries[i].lcme_flags);
1948                         if (lod_comp->llc_flags & LCME_FL_NOSYNC)
1949                                 lod_comp->llc_timestamp = le64_to_cpu(
1950                                         comp_v1->lcm_entries[i].lcme_timestamp);
1951                         lod_comp->llc_id =
1952                                 le32_to_cpu(comp_v1->lcm_entries[i].lcme_id);
1953                         if (lod_comp->llc_id == LCME_ID_INVAL)
1954                                 GOTO(out, rc = -EINVAL);
1955                 }
1956
1957                 pool_name = NULL;
1958                 if (magic == LOV_MAGIC_V1) {
1959                         objs = &v1->lmm_objects[0];
1960                 } else if (magic == LOV_MAGIC_V3) {
1961                         objs = &v3->lmm_objects[0];
1962                         if (v3->lmm_pool_name[0] != '\0')
1963                                 pool_name = v3->lmm_pool_name;
1964                 } else {
1965                         CDEBUG(D_LAYOUT, "Invalid magic %x\n", magic);
1966                         GOTO(out, rc = -EINVAL);
1967                 }
1968
1969                 lod_comp->llc_pattern = le32_to_cpu(v1->lmm_pattern);
1970                 lod_comp->llc_stripe_size = le32_to_cpu(v1->lmm_stripe_size);
1971                 lod_comp->llc_stripe_count = le16_to_cpu(v1->lmm_stripe_count);
1972                 lod_comp->llc_layout_gen = le16_to_cpu(v1->lmm_layout_gen);
1973                 /**
1974                  * The stripe_offset of an uninit-ed component is stored in
1975                  * the lmm_layout_gen
1976                  */
1977                 if (mo->ldo_is_composite && !lod_comp_inited(lod_comp))
1978                         lod_comp->llc_stripe_offset = lod_comp->llc_layout_gen;
1979                 lod_obj_set_pool(mo, i, pool_name);
1980
1981                 if ((!mo->ldo_is_composite || lod_comp_inited(lod_comp)) &&
1982                     !(lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED) &&
1983                     !(lod_comp->llc_pattern & LOV_PATTERN_MDT)) {
1984                         rc = lod_initialize_objects(env, mo, objs, i);
1985                         if (rc)
1986                                 GOTO(out, rc);
1987                 }
1988         }
1989
1990         rc = lod_fill_mirrors(mo);
1991         GOTO(out, rc);
1992 out:
1993         if (rc)
1994                 lod_striping_free_nolock(env, mo);
1995 unlock:
1996         mutex_unlock(&mo->ldo_layout_mutex);
1997
1998         RETURN(rc);
1999 }
2000
2001 /**
2002  * Parse suggested striping configuration.
2003  *
2004  * The caller gets a suggested striping configuration from a number of sources
2005  * including per-directory default and applications. Then it needs to verify
2006  * the suggested striping is valid, apply missing bits and store the resulting
2007  * configuration in the object to be used by the allocator later. Must not be
2008  * called concurrently against the same object. It's OK to provide a
2009  * fully-defined striping.
2010  *
2011  * \param[in] env       execution environment for this thread
2012  * \param[in] lo        LOD object
2013  * \param[in] buf       buffer containing the striping
2014  *
2015  * \retval 0            on success
2016  * \retval negative     negated errno on error
2017  */
2018 int lod_qos_parse_config(const struct lu_env *env, struct lod_object *lo,
2019                          const struct lu_buf *buf)
2020 {
2021         struct lod_layout_component *lod_comp;
2022         struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
2023         struct lov_desc *desc = &d->lod_ost_descs.ltd_lov_desc;
2024         struct lov_user_md_v1 *v1 = NULL;
2025         struct lov_user_md_v3 *v3 = NULL;
2026         struct lov_comp_md_v1 *comp_v1 = NULL;
2027         struct lov_foreign_md *lfm = NULL;
2028         char def_pool[LOV_MAXPOOLNAME + 1];
2029         __u32 magic;
2030         __u16 comp_cnt;
2031         __u16 mirror_cnt;
2032         int i, rc;
2033         ENTRY;
2034
2035         if (buf == NULL || buf->lb_buf == NULL || buf->lb_len == 0)
2036                 RETURN(0);
2037
2038         memset(def_pool, 0, sizeof(def_pool));
2039         if (lo->ldo_comp_entries != NULL)
2040                 lod_layout_get_pool(lo->ldo_comp_entries, lo->ldo_comp_cnt,
2041                                     def_pool, sizeof(def_pool));
2042
2043         /* free default striping info */
2044         if (lo->ldo_is_foreign)
2045                 lod_free_foreign_lov(lo);
2046         else
2047                 lod_free_comp_entries(lo);
2048
2049         rc = lod_verify_striping(d, lo, buf, false);
2050         if (rc)
2051                 RETURN(-EINVAL);
2052
2053         v3 = buf->lb_buf;
2054         v1 = buf->lb_buf;
2055         comp_v1 = buf->lb_buf;
2056         /* {lmm,lfm}_magic position/length work for all LOV formats */
2057         magic = v1->lmm_magic;
2058
2059         if (unlikely(le32_to_cpu(magic) & LOV_MAGIC_DEFINED)) {
2060                 /* try to use as fully defined striping */
2061                 rc = lod_use_defined_striping(env, lo, buf);
2062                 RETURN(rc);
2063         }
2064
2065         switch (magic) {
2066         case __swab32(LOV_USER_MAGIC_V1):
2067                 lustre_swab_lov_user_md_v1(v1);
2068                 magic = v1->lmm_magic;
2069                 /* fall through */
2070         case LOV_USER_MAGIC_V1:
2071                 break;
2072         case __swab32(LOV_USER_MAGIC_V3):
2073                 lustre_swab_lov_user_md_v3(v3);
2074                 magic = v3->lmm_magic;
2075                 /* fall through */
2076         case LOV_USER_MAGIC_V3:
2077                 break;
2078         case __swab32(LOV_USER_MAGIC_SPECIFIC):
2079                 lustre_swab_lov_user_md_v3(v3);
2080                 lustre_swab_lov_user_md_objects(v3->lmm_objects,
2081                                                 v3->lmm_stripe_count);
2082                 magic = v3->lmm_magic;
2083                 /* fall through */
2084         case LOV_USER_MAGIC_SPECIFIC:
2085                 break;
2086         case __swab32(LOV_USER_MAGIC_COMP_V1):
2087                 lustre_swab_lov_comp_md_v1(comp_v1);
2088                 magic = comp_v1->lcm_magic;
2089                 /* fall trhough */
2090         case LOV_USER_MAGIC_COMP_V1:
2091                 break;
2092         case __swab32(LOV_USER_MAGIC_FOREIGN):
2093                 lfm = buf->lb_buf;
2094                 __swab32s(&lfm->lfm_magic);
2095                 __swab32s(&lfm->lfm_length);
2096                 __swab32s(&lfm->lfm_type);
2097                 __swab32s(&lfm->lfm_flags);
2098                 magic = lfm->lfm_magic;
2099                 /* fall through */
2100         case LOV_USER_MAGIC_FOREIGN:
2101                 if (!lfm)
2102                         lfm = buf->lb_buf;
2103                 rc = lod_alloc_foreign_lov(lo, foreign_size(lfm));
2104                 if (rc)
2105                         RETURN(rc);
2106                 memcpy(lo->ldo_foreign_lov, buf->lb_buf, foreign_size(lfm));
2107                 RETURN(0);
2108         default:
2109                 CERROR("%s: unrecognized magic %X\n",
2110                        lod2obd(d)->obd_name, magic);
2111                 RETURN(-EINVAL);
2112         }
2113
2114         lustre_print_user_md(D_OTHER, v1, "parse config");
2115
2116         if (magic == LOV_USER_MAGIC_COMP_V1) {
2117                 comp_cnt = comp_v1->lcm_entry_count;
2118                 if (comp_cnt == 0)
2119                         RETURN(-EINVAL);
2120                 mirror_cnt =  comp_v1->lcm_mirror_count + 1;
2121                 if (mirror_cnt > 1)
2122                         lo->ldo_flr_state = LCM_FL_RDONLY;
2123                 lo->ldo_is_composite = 1;
2124         } else {
2125                 comp_cnt = 1;
2126                 mirror_cnt = 0;
2127                 lo->ldo_is_composite = 0;
2128         }
2129
2130         rc = lod_alloc_comp_entries(lo, mirror_cnt, comp_cnt);
2131         if (rc)
2132                 RETURN(rc);
2133
2134         LASSERT(lo->ldo_comp_entries);
2135
2136         for (i = 0; i < comp_cnt; i++) {
2137                 struct pool_desc        *pool;
2138                 struct lu_extent        *ext;
2139                 char    *pool_name;
2140
2141                 lod_comp = &lo->ldo_comp_entries[i];
2142
2143                 if (lo->ldo_is_composite) {
2144                         v1 = (struct lov_user_md *)((char *)comp_v1 +
2145                                         comp_v1->lcm_entries[i].lcme_offset);
2146                         ext = &comp_v1->lcm_entries[i].lcme_extent;
2147                         lod_comp->llc_extent = *ext;
2148                         lod_comp->llc_flags =
2149                                 comp_v1->lcm_entries[i].lcme_flags &
2150                                         LCME_CL_COMP_FLAGS;
2151                 }
2152
2153                 pool_name = NULL;
2154                 if (v1->lmm_magic == LOV_USER_MAGIC_V3 ||
2155                     v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC) {
2156                         v3 = (struct lov_user_md_v3 *)v1;
2157                         if (v3->lmm_pool_name[0] != '\0')
2158                                 pool_name = v3->lmm_pool_name;
2159
2160                         if (v3->lmm_magic == LOV_USER_MAGIC_SPECIFIC) {
2161                                 rc = lod_comp_copy_ost_lists(lod_comp, v3);
2162                                 if (rc)
2163                                         GOTO(free_comp, rc);
2164                         }
2165                 }
2166
2167                 if (pool_name == NULL && def_pool[0] != '\0')
2168                         pool_name = def_pool;
2169
2170                 if (v1->lmm_pattern == 0)
2171                         v1->lmm_pattern = LOV_PATTERN_RAID0;
2172                 if (lov_pattern(v1->lmm_pattern) != LOV_PATTERN_RAID0 &&
2173                     lov_pattern(v1->lmm_pattern) != LOV_PATTERN_MDT &&
2174                     lov_pattern(v1->lmm_pattern) !=
2175                         (LOV_PATTERN_RAID0 | LOV_PATTERN_OVERSTRIPING)) {
2176                         CDEBUG(D_LAYOUT, "%s: invalid pattern: %x\n",
2177                                lod2obd(d)->obd_name, v1->lmm_pattern);
2178                         GOTO(free_comp, rc = -EINVAL);
2179                 }
2180
2181                 lod_comp->llc_pattern = v1->lmm_pattern;
2182                 lod_comp->llc_stripe_size = desc->ld_default_stripe_size;
2183                 if (v1->lmm_stripe_size)
2184                         lod_comp->llc_stripe_size = v1->lmm_stripe_size;
2185
2186                 lod_comp->llc_stripe_count = desc->ld_default_stripe_count;
2187                 if (v1->lmm_stripe_count ||
2188                     lov_pattern(v1->lmm_pattern) == LOV_PATTERN_MDT)
2189                         lod_comp->llc_stripe_count = v1->lmm_stripe_count;
2190
2191                 lod_comp->llc_stripe_offset = v1->lmm_stripe_offset;
2192                 lod_obj_set_pool(lo, i, pool_name);
2193
2194                 LASSERT(ergo(lov_pattern(lod_comp->llc_pattern) ==
2195                              LOV_PATTERN_MDT, lod_comp->llc_stripe_count == 0));
2196
2197                 if (pool_name == NULL)
2198                         continue;
2199
2200                 /* In the function below, .hs_keycmp resolves to
2201                  * pool_hashkey_keycmp() */
2202                 /* coverity[overrun-buffer-val] */
2203                 pool = lod_find_pool(d, pool_name);
2204                 if (pool == NULL)
2205                         continue;
2206
2207                 if (lod_comp->llc_stripe_offset != LOV_OFFSET_DEFAULT) {
2208                         rc = lod_check_index_in_pool(
2209                                         lod_comp->llc_stripe_offset, pool);
2210                         if (rc < 0) {
2211                                 lod_pool_putref(pool);
2212                                 CDEBUG(D_LAYOUT, "%s: invalid offset, %u\n",
2213                                        lod2obd(d)->obd_name,
2214                                        lod_comp->llc_stripe_offset);
2215                                 GOTO(free_comp, rc = -EINVAL);
2216                         }
2217                 }
2218
2219                 if (lod_comp->llc_stripe_count > pool_tgt_count(pool) &&
2220                     !(lod_comp->llc_pattern & LOV_PATTERN_OVERSTRIPING))
2221                         lod_comp->llc_stripe_count = pool_tgt_count(pool);
2222
2223                 lod_pool_putref(pool);
2224         }
2225
2226         RETURN(0);
2227
2228 free_comp:
2229         lod_free_comp_entries(lo);
2230         RETURN(rc);
2231 }
2232
2233 /**
2234  * prepare enough OST avoidance bitmap space
2235  */
2236 int lod_prepare_avoidance(const struct lu_env *env, struct lod_object *lo)
2237 {
2238         struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
2239         struct lod_avoid_guide *lag = &lod_env_info(env)->lti_avoid;
2240         struct cfs_bitmap *bitmap = NULL;
2241         __u32 *new_oss = NULL;
2242
2243         lag->lag_ost_avail = lod->lod_ost_count;
2244
2245         /* reset OSS avoid guide array */
2246         lag->lag_oaa_count = 0;
2247         if (lag->lag_oss_avoid_array &&
2248             lag->lag_oaa_size < lod->lod_ost_count) {
2249                 OBD_FREE(lag->lag_oss_avoid_array,
2250                          sizeof(__u32) * lag->lag_oaa_size);
2251                 lag->lag_oss_avoid_array = NULL;
2252                 lag->lag_oaa_size = 0;
2253         }
2254
2255         /* init OST avoid guide bitmap */
2256         if (lag->lag_ost_avoid_bitmap) {
2257                 if (lod->lod_ost_count <= lag->lag_ost_avoid_bitmap->size) {
2258                         CFS_RESET_BITMAP(lag->lag_ost_avoid_bitmap);
2259                 } else {
2260                         CFS_FREE_BITMAP(lag->lag_ost_avoid_bitmap);
2261                         lag->lag_ost_avoid_bitmap = NULL;
2262                 }
2263         }
2264
2265         if (!lag->lag_ost_avoid_bitmap) {
2266                 bitmap = CFS_ALLOCATE_BITMAP(lod->lod_ost_count);
2267                 if (!bitmap)
2268                         return -ENOMEM;
2269         }
2270
2271         if (!lag->lag_oss_avoid_array) {
2272                 /**
2273                  * usually there are multiple OSTs in one OSS, but we don't
2274                  * know the exact OSS number, so we choose a safe option,
2275                  * using OST count to allocate the array to store the OSS
2276                  * id.
2277                  */
2278                 OBD_ALLOC(new_oss, sizeof(*new_oss) * lod->lod_ost_count);
2279                 if (!new_oss) {
2280                         CFS_FREE_BITMAP(bitmap);
2281                         return -ENOMEM;
2282                 }
2283         }
2284
2285         if (new_oss) {
2286                 lag->lag_oss_avoid_array = new_oss;
2287                 lag->lag_oaa_size = lod->lod_ost_count;
2288         }
2289         if (bitmap)
2290                 lag->lag_ost_avoid_bitmap = bitmap;
2291
2292         return 0;
2293 }
2294
2295 /**
2296  * Collect information of used OSTs and OSSs in the overlapped components
2297  * of other mirrors
2298  */
2299 void lod_collect_avoidance(struct lod_object *lo, struct lod_avoid_guide *lag,
2300                            int comp_idx)
2301 {
2302         struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
2303         struct lod_layout_component *lod_comp = &lo->ldo_comp_entries[comp_idx];
2304         struct cfs_bitmap *bitmap = lag->lag_ost_avoid_bitmap;
2305         int i, j;
2306
2307         /* iterate mirrors */
2308         for (i = 0; i < lo->ldo_mirror_count; i++) {
2309                 struct lod_layout_component *comp;
2310
2311                 /**
2312                  * skip mirror containing component[comp_idx], we only
2313                  * collect OSTs info of conflicting component in other mirrors,
2314                  * so that during read, if OSTs of a mirror's component are
2315                  * not available, we still have other mirror with different
2316                  * OSTs to read the data.
2317                  */
2318                 comp = &lo->ldo_comp_entries[lo->ldo_mirrors[i].lme_start];
2319                 if (comp->llc_id != LCME_ID_INVAL &&
2320                     mirror_id_of(comp->llc_id) ==
2321                                                 mirror_id_of(lod_comp->llc_id))
2322                         continue;
2323
2324                 /* iterate components of a mirror */
2325                 lod_foreach_mirror_comp(comp, lo, i) {
2326                         /**
2327                          * skip non-overlapped or un-instantiated components,
2328                          * NOTE: don't use lod_comp_inited(comp) to judge
2329                          * whether @comp has been inited, since during
2330                          * declare phase, comp->llc_stripe has been allocated
2331                          * while it's init flag not been set until the exec
2332                          * phase.
2333                          */
2334                         if (!lu_extent_is_overlapped(&comp->llc_extent,
2335                                                      &lod_comp->llc_extent) ||
2336                             !comp->llc_stripe)
2337                                 continue;
2338
2339                         /**
2340                          * collect used OSTs index and OSS info from a
2341                          * component
2342                          */
2343                         for (j = 0; j < comp->llc_stripe_count; j++) {
2344                                 struct lod_tgt_desc *ost;
2345                                 struct lu_svr_qos *lsq;
2346                                 int k;
2347
2348                                 ost = OST_TGT(lod, comp->llc_ost_indices[j]);
2349                                 lsq = ost->ltd_qos.ltq_svr;
2350
2351                                 if (cfs_bitmap_check(bitmap, ost->ltd_index))
2352                                         continue;
2353
2354                                 QOS_DEBUG("OST%d used in conflicting mirror "
2355                                           "component\n", ost->ltd_index);
2356                                 cfs_bitmap_set(bitmap, ost->ltd_index);
2357                                 lag->lag_ost_avail--;
2358
2359                                 for (k = 0; k < lag->lag_oaa_count; k++) {
2360                                         if (lag->lag_oss_avoid_array[k] ==
2361                                             lsq->lsq_id)
2362                                                 break;
2363                                 }
2364                                 if (k == lag->lag_oaa_count) {
2365                                         lag->lag_oss_avoid_array[k] =
2366                                                                 lsq->lsq_id;
2367                                         lag->lag_oaa_count++;
2368                                 }
2369                         }
2370                 }
2371         }
2372 }
2373
2374 /**
2375  * Create a striping for an obejct.
2376  *
2377  * The function creates a new striping for the object. The function tries QoS
2378  * algorithm first unless free space is distributed evenly among OSTs, but
2379  * by default RR algorithm is preferred due to internal concurrency (QoS is
2380  * serialized). The caller must ensure no concurrent calls to the function
2381  * are made against the same object.
2382  *
2383  * \param[in] env       execution environment for this thread
2384  * \param[in] lo        LOD object
2385  * \param[in] attr      attributes OST objects will be declared with
2386  * \param[in] th        transaction handle
2387  * \param[in] comp_idx  index of ldo_comp_entries
2388  *
2389  * \retval 0            on success
2390  * \retval negative     negated errno on error
2391  */
2392 int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
2393                         struct lu_attr *attr, struct thandle *th,
2394                         int comp_idx)
2395 {
2396         struct lod_layout_component *lod_comp;
2397         struct lod_device      *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
2398         int                     stripe_len;
2399         int                     flag = LOV_USES_ASSIGNED_STRIPE;
2400         int                     i, rc = 0;
2401         struct lod_avoid_guide *lag = &lod_env_info(env)->lti_avoid;
2402         struct dt_object **stripe = NULL;
2403         __u32 *ost_indices = NULL;
2404         ENTRY;
2405
2406         LASSERT(lo);
2407         LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL);
2408         lod_comp = &lo->ldo_comp_entries[comp_idx];
2409         LASSERT(!(lod_comp->llc_flags & LCME_FL_EXTENSION));
2410
2411         /* A released component is being created */
2412         if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
2413                 RETURN(0);
2414
2415         /* A Data-on-MDT component is being created */
2416         if (lov_pattern(lod_comp->llc_pattern) == LOV_PATTERN_MDT)
2417                 RETURN(0);
2418
2419         if (likely(lod_comp->llc_stripe == NULL)) {
2420                 /*
2421                  * no striping has been created so far
2422                  */
2423                 LASSERT(lod_comp->llc_stripe_count);
2424                 /*
2425                  * statfs and check OST targets now, since ld_active_tgt_count
2426                  * could be changed if some OSTs are [de]activated manually.
2427                  */
2428                 lod_qos_statfs_update(env, d, &d->lod_ost_descs);
2429                 stripe_len = lod_get_stripe_count(d, lo,
2430                                                   lod_comp->llc_stripe_count,
2431                                                   lod_comp->llc_pattern &
2432                                                   LOV_PATTERN_OVERSTRIPING);
2433
2434                 if (stripe_len == 0)
2435                         GOTO(out, rc = -ERANGE);
2436                 lod_comp->llc_stripe_count = stripe_len;
2437                 OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_len);
2438                 if (stripe == NULL)
2439                         GOTO(out, rc = -ENOMEM);
2440                 OBD_ALLOC(ost_indices, sizeof(*ost_indices) * stripe_len);
2441                 if (!ost_indices)
2442                         GOTO(out, rc = -ENOMEM);
2443
2444                 lod_getref(&d->lod_ost_descs);
2445                 /* XXX: support for non-0 files w/o objects */
2446                 CDEBUG(D_OTHER, "tgt_count %d stripe_count %d\n",
2447                        d->lod_ost_count, stripe_len);
2448
2449                 if (lod_comp->llc_ostlist.op_array &&
2450                     lod_comp->llc_ostlist.op_count) {
2451                         rc = lod_alloc_ost_list(env, lo, stripe, ost_indices,
2452                                                 th, comp_idx);
2453                 } else if (lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT) {
2454                         /**
2455                          * collect OSTs and OSSs used in other mirrors whose
2456                          * components cross the ldo_comp_entries[comp_idx]
2457                          */
2458                         rc = lod_prepare_avoidance(env, lo);
2459                         if (rc)
2460                                 GOTO(put_ldts, rc);
2461
2462                         QOS_DEBUG("collecting conflict osts for comp[%d]\n",
2463                                   comp_idx);
2464                         lod_collect_avoidance(lo, lag, comp_idx);
2465
2466                         rc = lod_ost_alloc_qos(env, lo, stripe, ost_indices,
2467                                                flag, th, comp_idx);
2468                         if (rc == -EAGAIN)
2469                                 rc = lod_ost_alloc_rr(env, lo, stripe,
2470                                                       ost_indices, flag, th,
2471                                                       comp_idx);
2472                 } else {
2473                         rc = lod_ost_alloc_specific(env, lo, stripe,
2474                                                     ost_indices, flag, th,
2475                                                     comp_idx);
2476                 }
2477 put_ldts:
2478                 lod_putref(d, &d->lod_ost_descs);
2479                 if (rc < 0) {
2480                         for (i = 0; i < stripe_len; i++)
2481                                 if (stripe[i] != NULL)
2482                                         dt_object_put(env, stripe[i]);
2483                         lod_comp->llc_stripe_count = 0;
2484                 } else {
2485                         lod_comp->llc_stripe = stripe;
2486                         lod_comp->llc_ost_indices = ost_indices;
2487                         lod_comp->llc_stripes_allocated = stripe_len;
2488                 }
2489         } else {
2490                 /*
2491                  * lod_qos_parse_config() found supplied buf as a predefined
2492                  * striping (not a hint), so it allocated all the object
2493                  * now we need to create them
2494                  */
2495                 for (i = 0; i < lod_comp->llc_stripe_count; i++) {
2496                         struct dt_object  *o;
2497
2498                         o = lod_comp->llc_stripe[i];
2499                         LASSERT(o);
2500
2501                         rc = lod_sub_declare_create(env, o, attr, NULL,
2502                                                     NULL, th);
2503                         if (rc < 0) {
2504                                 CERROR("can't declare create: %d\n", rc);
2505                                 break;
2506                         }
2507                 }
2508                 /**
2509                  * Clear LCME_FL_INIT for the component so that
2510                  * lod_striping_create() can create the striping objects
2511                  * in replay.
2512                  */
2513                 lod_comp_unset_init(lod_comp);
2514         }
2515
2516 out:
2517         if (rc < 0) {
2518                 if (stripe)
2519                         OBD_FREE(stripe, sizeof(stripe[0]) * stripe_len);
2520                 if (ost_indices)
2521                         OBD_FREE(ost_indices,
2522                                  sizeof(*ost_indices) * stripe_len);
2523         }
2524         RETURN(rc);
2525 }
2526
2527 int lod_prepare_create(const struct lu_env *env, struct lod_object *lo,
2528                        struct lu_attr *attr, const struct lu_buf *buf,
2529                        struct thandle *th)
2530
2531 {
2532         struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
2533         uint64_t size = 0;
2534         int i;
2535         int rc;
2536         ENTRY;
2537
2538         LASSERT(lo);
2539
2540         /* no OST available */
2541         /* XXX: should we be waiting a bit to prevent failures during
2542          * cluster initialization? */
2543         if (!d->lod_ost_count)
2544                 RETURN(-EIO);
2545
2546         /*
2547          * by this time, the object's ldo_stripe_count and ldo_stripe_size
2548          * contain default value for striping: taken from the parent
2549          * or from filesystem defaults
2550          *
2551          * in case the caller is passing lovea with new striping config,
2552          * we may need to parse lovea and apply new configuration
2553          */
2554         rc = lod_qos_parse_config(env, lo, buf);
2555         if (rc)
2556                 RETURN(rc);
2557
2558         if (attr->la_valid & LA_SIZE)
2559                 size = attr->la_size;
2560
2561         /**
2562          * prepare OST object creation for the component covering file's
2563          * size, the 1st component (including plain layout file) is always
2564          * instantiated.
2565          */
2566         for (i = 0; i < lo->ldo_comp_cnt; i++) {
2567                 struct lod_layout_component *lod_comp;
2568                 struct lu_extent *extent;
2569
2570                 lod_comp = &lo->ldo_comp_entries[i];
2571                 extent = &lod_comp->llc_extent;
2572                 QOS_DEBUG("comp[%d] %lld "DEXT"\n", i, size, PEXT(extent));
2573                 if (!lo->ldo_is_composite || size >= extent->e_start) {
2574                         rc = lod_qos_prep_create(env, lo, attr, th, i);
2575                         if (rc)
2576                                 break;
2577                 }
2578         }
2579
2580         RETURN(rc);
2581 }