Whamcloud - gitweb
LU-3963 libcfs: convert lod, mdt, and gss to linux list api
[fs/lustre-release.git] / lustre / lod / lod_qos.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2013, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/lod/lod_qos.c
33  *
34  */
35
36 #define DEBUG_SUBSYSTEM S_LOV
37
38 #include <asm/div64.h>
39 #include <libcfs/libcfs.h>
40 #include <obd_class.h>
41 #include <lustre/lustre_idl.h>
42 #include "lod_internal.h"
43
44 /*
45  * force QoS policy (not RR) to be used for testing purposes
46  */
47 #define FORCE_QOS_
48
49 #define D_QOS   D_OTHER
50
51 #if 0
52 #define QOS_DEBUG(fmt, ...)     CDEBUG(D_OTHER, fmt, ## __VA_ARGS__)
53 #define QOS_CONSOLE(fmt, ...)   LCONSOLE(D_OTHER, fmt, ## __VA_ARGS__)
54 #else
55 #define QOS_DEBUG(fmt, ...)
56 #define QOS_CONSOLE(fmt, ...)
57 #endif
58
59 #define TGT_BAVAIL(i) (OST_TGT(lod,i)->ltd_statfs.os_bavail * \
60                        OST_TGT(lod,i)->ltd_statfs.os_bsize)
61
62 int qos_add_tgt(struct lod_device *lod, struct lod_tgt_desc *ost_desc)
63 {
64         struct lod_qos_oss *oss = NULL, *temposs;
65         struct obd_export  *exp = ost_desc->ltd_exp;
66         int                 rc = 0, found = 0;
67         struct list_head   *list;
68         ENTRY;
69
70         down_write(&lod->lod_qos.lq_rw_sem);
71         /*
72          * a bit hacky approach to learn NID of corresponding connection
73          * but there is no official API to access information like this
74          * with OSD API.
75          */
76         list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list) {
77                 if (obd_uuid_equals(&oss->lqo_uuid,
78                                     &exp->exp_connection->c_remote_uuid)) {
79                         found++;
80                         break;
81                 }
82         }
83
84         if (!found) {
85                 OBD_ALLOC_PTR(oss);
86                 if (!oss)
87                         GOTO(out, rc = -ENOMEM);
88                 memcpy(&oss->lqo_uuid, &exp->exp_connection->c_remote_uuid,
89                        sizeof(oss->lqo_uuid));
90         } else {
91                 /* Assume we have to move this one */
92                 list_del(&oss->lqo_oss_list);
93         }
94
95         oss->lqo_ost_count++;
96         ost_desc->ltd_qos.ltq_oss = oss;
97
98         CDEBUG(D_QOS, "add tgt %s to OSS %s (%d OSTs)\n",
99                obd_uuid2str(&ost_desc->ltd_uuid), obd_uuid2str(&oss->lqo_uuid),
100                oss->lqo_ost_count);
101
102         /* Add sorted by # of OSTs.  Find the first entry that we're
103            bigger than... */
104         list = &lod->lod_qos.lq_oss_list;
105         list_for_each_entry(temposs, list, lqo_oss_list) {
106                 if (oss->lqo_ost_count > temposs->lqo_ost_count)
107                         break;
108         }
109         /* ...and add before it.  If we're the first or smallest, temposs
110            points to the list head, and we add to the end. */
111         list_add_tail(&oss->lqo_oss_list, &temposs->lqo_oss_list);
112
113         lod->lod_qos.lq_dirty = 1;
114         lod->lod_qos.lq_rr.lqr_dirty = 1;
115
116 out:
117         up_write(&lod->lod_qos.lq_rw_sem);
118         RETURN(rc);
119 }
120
121 int qos_del_tgt(struct lod_device *lod, struct lod_tgt_desc *ost_desc)
122 {
123         struct lod_qos_oss *oss;
124         int                 rc = 0;
125         ENTRY;
126
127         down_write(&lod->lod_qos.lq_rw_sem);
128         oss = ost_desc->ltd_qos.ltq_oss;
129         if (!oss)
130                 GOTO(out, rc = -ENOENT);
131
132         oss->lqo_ost_count--;
133         if (oss->lqo_ost_count == 0) {
134                 CDEBUG(D_QOS, "removing OSS %s\n",
135                        obd_uuid2str(&oss->lqo_uuid));
136                 list_del(&oss->lqo_oss_list);
137                 ost_desc->ltd_qos.ltq_oss = NULL;
138                 OBD_FREE_PTR(oss);
139         }
140
141         lod->lod_qos.lq_dirty = 1;
142         lod->lod_qos.lq_rr.lqr_dirty = 1;
143 out:
144         up_write(&lod->lod_qos.lq_rw_sem);
145         RETURN(rc);
146 }
147
148 static int lod_statfs_and_check(const struct lu_env *env, struct lod_device *d,
149                                 int index, struct obd_statfs *sfs)
150 {
151         struct lod_tgt_desc *ost;
152         int                  rc;
153
154         LASSERT(d);
155         ost = OST_TGT(d,index);
156         LASSERT(ost);
157
158         rc = dt_statfs(env, ost->ltd_ost, sfs);
159         if (rc && rc != -ENOTCONN)
160                 CERROR("%s: statfs: rc = %d\n", lod2obd(d)->obd_name, rc);
161
162         /* If the OST is readonly then we can't allocate objects there */
163         if (sfs->os_state & OS_STATE_READONLY)
164                 rc = -EROFS;
165
166         /* check whether device has changed state (active, inactive) */
167         if (rc != 0 && ost->ltd_active) {
168                 /* turned inactive? */
169                 spin_lock(&d->lod_desc_lock);
170                 if (ost->ltd_active) {
171                         ost->ltd_active = 0;
172                         LASSERT(d->lod_desc.ld_active_tgt_count > 0);
173                         d->lod_desc.ld_active_tgt_count--;
174                         d->lod_qos.lq_dirty = 1;
175                         d->lod_qos.lq_rr.lqr_dirty = 1;
176                         CDEBUG(D_CONFIG, "%s: turns inactive\n",
177                                ost->ltd_exp->exp_obd->obd_name);
178                 }
179                 spin_unlock(&d->lod_desc_lock);
180         } else if (rc == 0 && ost->ltd_active == 0) {
181                 /* turned active? */
182                 LASSERTF(d->lod_desc.ld_active_tgt_count < d->lod_ostnr,
183                          "active tgt count %d, ost nr %d\n",
184                          d->lod_desc.ld_active_tgt_count, d->lod_ostnr);
185                 spin_lock(&d->lod_desc_lock);
186                 if (ost->ltd_active == 0) {
187                         ost->ltd_active = 1;
188                         d->lod_desc.ld_active_tgt_count++;
189                         d->lod_qos.lq_dirty = 1;
190                         d->lod_qos.lq_rr.lqr_dirty = 1;
191                         CDEBUG(D_CONFIG, "%s: turns active\n",
192                                ost->ltd_exp->exp_obd->obd_name);
193                 }
194                 spin_unlock(&d->lod_desc_lock);
195         }
196
197         RETURN(rc);
198 }
199
200 static void lod_qos_statfs_update(const struct lu_env *env,
201                                   struct lod_device *lod)
202 {
203         struct obd_device *obd = lod2obd(lod);
204         struct ost_pool   *osts = &(lod->lod_pool_info);
205         unsigned int       i;
206         int                idx, rc = 0;
207         __u64              max_age, avail;
208         ENTRY;
209
210         max_age = cfs_time_shift_64(-2 * lod->lod_desc.ld_qos_maxage);
211
212         if (cfs_time_beforeq_64(max_age, obd->obd_osfs_age))
213                 /* statfs data are quite recent, don't need to refresh it */
214                 RETURN_EXIT;
215
216         down_write(&lod->lod_qos.lq_rw_sem);
217         if (cfs_time_beforeq_64(max_age, obd->obd_osfs_age))
218                 GOTO(out, rc = 0);
219
220         for (i = 0; i < osts->op_count; i++) {
221                 idx = osts->op_array[i];
222                 avail = OST_TGT(lod,idx)->ltd_statfs.os_bavail;
223                 rc = lod_statfs_and_check(env, lod, idx,
224                                           &OST_TGT(lod,idx)->ltd_statfs);
225                 if (rc)
226                         break;
227                 if (OST_TGT(lod,idx)->ltd_statfs.os_bavail != avail)
228                         /* recalculate weigths */
229                         lod->lod_qos.lq_dirty = 1;
230         }
231         obd->obd_osfs_age = cfs_time_current_64();
232
233 out:
234         up_write(&lod->lod_qos.lq_rw_sem);
235         EXIT;
236 }
237
238 /* Recalculate per-object penalties for OSSs and OSTs,
239    depends on size of each ost in an oss */
240 static int lod_qos_calc_ppo(struct lod_device *lod)
241 {
242         struct lod_qos_oss *oss;
243         __u64               ba_max, ba_min, temp;
244         __u32               num_active;
245         unsigned int        i;
246         int                 rc, prio_wide;
247         time_t              now, age;
248         ENTRY;
249
250         if (!lod->lod_qos.lq_dirty)
251                 GOTO(out, rc = 0);
252
253         num_active = lod->lod_desc.ld_active_tgt_count - 1;
254         if (num_active < 1)
255                 GOTO(out, rc = -EAGAIN);
256
257         /* find bavail on each OSS */
258         list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list)
259                                 oss->lqo_bavail = 0;
260         lod->lod_qos.lq_active_oss_count = 0;
261
262         /*
263          * How badly user wants to select OSTs "widely" (not recently chosen
264          * and not on recent OSS's).  As opposed to "freely" (free space
265          * avail.) 0-256
266          */
267         prio_wide = 256 - lod->lod_qos.lq_prio_free;
268
269         ba_min = (__u64)(-1);
270         ba_max = 0;
271         now = cfs_time_current_sec();
272         /* Calculate OST penalty per object
273          * (lod ref taken in lod_qos_prep_create()) */
274         cfs_foreach_bit(lod->lod_ost_bitmap, i) {
275                 LASSERT(OST_TGT(lod,i));
276                 temp = TGT_BAVAIL(i);
277                 if (!temp)
278                         continue;
279                 ba_min = min(temp, ba_min);
280                 ba_max = max(temp, ba_max);
281
282                 /* Count the number of usable OSS's */
283                 if (OST_TGT(lod,i)->ltd_qos.ltq_oss->lqo_bavail == 0)
284                         lod->lod_qos.lq_active_oss_count++;
285                 OST_TGT(lod,i)->ltd_qos.ltq_oss->lqo_bavail += temp;
286
287                 /* per-OST penalty is prio * TGT_bavail / (num_ost - 1) / 2 */
288                 temp >>= 1;
289                 do_div(temp, num_active);
290                 OST_TGT(lod,i)->ltd_qos.ltq_penalty_per_obj =
291                         (temp * prio_wide) >> 8;
292
293                 age = (now - OST_TGT(lod,i)->ltd_qos.ltq_used) >> 3;
294                 if (lod->lod_qos.lq_reset ||
295                     age > 32 * lod->lod_desc.ld_qos_maxage)
296                         OST_TGT(lod,i)->ltd_qos.ltq_penalty = 0;
297                 else if (age > lod->lod_desc.ld_qos_maxage)
298                         /* Decay the penalty by half for every 8x the update
299                          * interval that the device has been idle.  That gives
300                          * lots of time for the statfs information to be
301                          * updated (which the penalty is only a proxy for),
302                          * and avoids penalizing OSS/OSTs under light load. */
303                         OST_TGT(lod,i)->ltd_qos.ltq_penalty >>=
304                                 (age / lod->lod_desc.ld_qos_maxage);
305         }
306
307         num_active = lod->lod_qos.lq_active_oss_count - 1;
308         if (num_active < 1) {
309                 /* If there's only 1 OSS, we can't penalize it, so instead
310                    we have to double the OST penalty */
311                 num_active = 1;
312                 cfs_foreach_bit(lod->lod_ost_bitmap, i)
313                         OST_TGT(lod,i)->ltd_qos.ltq_penalty_per_obj <<= 1;
314         }
315
316         /* Per-OSS penalty is prio * oss_avail / oss_osts / (num_oss - 1) / 2 */
317         list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list) {
318                 temp = oss->lqo_bavail >> 1;
319                 do_div(temp, oss->lqo_ost_count * num_active);
320                 oss->lqo_penalty_per_obj = (temp * prio_wide) >> 8;
321
322                 age = (now - oss->lqo_used) >> 3;
323                 if (lod->lod_qos.lq_reset ||
324                     age > 32 * lod->lod_desc.ld_qos_maxage)
325                         oss->lqo_penalty = 0;
326                 else if (age > lod->lod_desc.ld_qos_maxage)
327                         /* Decay the penalty by half for every 8x the update
328                          * interval that the device has been idle.  That gives
329                          * lots of time for the statfs information to be
330                          * updated (which the penalty is only a proxy for),
331                          * and avoids penalizing OSS/OSTs under light load. */
332                         oss->lqo_penalty >>= age / lod->lod_desc.ld_qos_maxage;
333         }
334
335         lod->lod_qos.lq_dirty = 0;
336         lod->lod_qos.lq_reset = 0;
337
338         /* If each ost has almost same free space,
339          * do rr allocation for better creation performance */
340         lod->lod_qos.lq_same_space = 0;
341         if ((ba_max * (256 - lod->lod_qos.lq_threshold_rr)) >> 8 < ba_min) {
342                 lod->lod_qos.lq_same_space = 1;
343                 /* Reset weights for the next time we enter qos mode */
344                 lod->lod_qos.lq_reset = 1;
345         }
346         rc = 0;
347
348 out:
349 #ifndef FORCE_QOS
350         if (!rc && lod->lod_qos.lq_same_space)
351                 RETURN(-EAGAIN);
352 #endif
353         RETURN(rc);
354 }
355
356 static int lod_qos_calc_weight(struct lod_device *lod, int i)
357 {
358         __u64 temp, temp2;
359
360         /* Final ost weight = TGT_BAVAIL - ost_penalty - oss_penalty */
361         temp = TGT_BAVAIL(i);
362         temp2 = OST_TGT(lod,i)->ltd_qos.ltq_penalty +
363                 OST_TGT(lod,i)->ltd_qos.ltq_oss->lqo_penalty;
364         if (temp < temp2)
365                 OST_TGT(lod,i)->ltd_qos.ltq_weight = 0;
366         else
367                 OST_TGT(lod,i)->ltd_qos.ltq_weight = temp - temp2;
368         return 0;
369 }
370
371 /* We just used this index for a stripe; adjust everyone's weights */
372 static int lod_qos_used(struct lod_device *lod, struct ost_pool *osts,
373                         __u32 index, __u64 *total_wt)
374 {
375         struct lod_tgt_desc *ost;
376         struct lod_qos_oss  *oss;
377         unsigned int j;
378         ENTRY;
379
380         ost = OST_TGT(lod,index);
381         LASSERT(ost);
382
383         /* Don't allocate on this devuce anymore, until the next alloc_qos */
384         ost->ltd_qos.ltq_usable = 0;
385
386         oss = ost->ltd_qos.ltq_oss;
387
388         /* Decay old penalty by half (we're adding max penalty, and don't
389            want it to run away.) */
390         ost->ltd_qos.ltq_penalty >>= 1;
391         oss->lqo_penalty >>= 1;
392
393         /* mark the OSS and OST as recently used */
394         ost->ltd_qos.ltq_used = oss->lqo_used = cfs_time_current_sec();
395
396         /* Set max penalties for this OST and OSS */
397         ost->ltd_qos.ltq_penalty +=
398                 ost->ltd_qos.ltq_penalty_per_obj * lod->lod_ostnr;
399         oss->lqo_penalty += oss->lqo_penalty_per_obj *
400                 lod->lod_qos.lq_active_oss_count;
401
402         /* Decrease all OSS penalties */
403         list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list) {
404                 if (oss->lqo_penalty < oss->lqo_penalty_per_obj)
405                         oss->lqo_penalty = 0;
406                 else
407                         oss->lqo_penalty -= oss->lqo_penalty_per_obj;
408         }
409
410         *total_wt = 0;
411         /* Decrease all OST penalties */
412         for (j = 0; j < osts->op_count; j++) {
413                 int i;
414
415                 i = osts->op_array[j];
416                 if (!cfs_bitmap_check(lod->lod_ost_bitmap, i))
417                         continue;
418
419                 ost = OST_TGT(lod,i);
420                 LASSERT(ost);
421
422                 if (ost->ltd_qos.ltq_penalty <
423                                 ost->ltd_qos.ltq_penalty_per_obj)
424                         ost->ltd_qos.ltq_penalty = 0;
425                 else
426                         ost->ltd_qos.ltq_penalty -=
427                                 ost->ltd_qos.ltq_penalty_per_obj;
428
429                 lod_qos_calc_weight(lod, i);
430
431                 /* Recalc the total weight of usable osts */
432                 if (ost->ltd_qos.ltq_usable)
433                         *total_wt += ost->ltd_qos.ltq_weight;
434
435                 QOS_DEBUG("recalc tgt %d usable=%d avail="LPU64
436                           " ostppo="LPU64" ostp="LPU64" ossppo="LPU64
437                           " ossp="LPU64" wt="LPU64"\n",
438                           i, ost->ltd_qos.ltq_usable, TGT_BAVAIL(i) >> 10,
439                           ost->ltd_qos.ltq_penalty_per_obj >> 10,
440                           ost->ltd_qos.ltq_penalty >> 10,
441                           ost->ltd_qos.ltq_oss->lqo_penalty_per_obj >> 10,
442                           ost->ltd_qos.ltq_oss->lqo_penalty >> 10,
443                           ost->ltd_qos.ltq_weight >> 10);
444         }
445
446         RETURN(0);
447 }
448
449 #define LOV_QOS_EMPTY ((__u32)-1)
450 /* compute optimal round-robin order, based on OSTs per OSS */
451 static int lod_qos_calc_rr(struct lod_device *lod, struct ost_pool *src_pool,
452                            struct lod_qos_rr *lqr)
453 {
454         struct lod_qos_oss  *oss;
455         struct lod_tgt_desc *ost;
456         unsigned placed, real_count;
457         unsigned int i;
458         int rc;
459         ENTRY;
460
461         if (!lqr->lqr_dirty) {
462                 LASSERT(lqr->lqr_pool.op_size);
463                 RETURN(0);
464         }
465
466         /* Do actual allocation. */
467         down_write(&lod->lod_qos.lq_rw_sem);
468
469         /*
470          * Check again. While we were sleeping on @lq_rw_sem something could
471          * change.
472          */
473         if (!lqr->lqr_dirty) {
474                 LASSERT(lqr->lqr_pool.op_size);
475                 up_write(&lod->lod_qos.lq_rw_sem);
476                 RETURN(0);
477         }
478
479         real_count = src_pool->op_count;
480
481         /* Zero the pool array */
482         /* alloc_rr is holding a read lock on the pool, so nobody is adding/
483            deleting from the pool. The lq_rw_sem insures that nobody else
484            is reading. */
485         lqr->lqr_pool.op_count = real_count;
486         rc = lod_ost_pool_extend(&lqr->lqr_pool, real_count);
487         if (rc) {
488                 up_write(&lod->lod_qos.lq_rw_sem);
489                 RETURN(rc);
490         }
491         for (i = 0; i < lqr->lqr_pool.op_count; i++)
492                 lqr->lqr_pool.op_array[i] = LOV_QOS_EMPTY;
493
494         /* Place all the OSTs from 1 OSS at the same time. */
495         placed = 0;
496         list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list) {
497                 int j = 0;
498
499                 for (i = 0; i < lqr->lqr_pool.op_count; i++) {
500                         int next;
501
502                         if (!cfs_bitmap_check(lod->lod_ost_bitmap,
503                                                 src_pool->op_array[i]))
504                                 continue;
505
506                         ost = OST_TGT(lod,src_pool->op_array[i]);
507                         LASSERT(ost && ost->ltd_ost);
508                         if (ost->ltd_qos.ltq_oss != oss)
509                                 continue;
510
511                         /* Evenly space these OSTs across arrayspace */
512                         next = j * lqr->lqr_pool.op_count / oss->lqo_ost_count;
513                         while (lqr->lqr_pool.op_array[next] != LOV_QOS_EMPTY)
514                                 next = (next + 1) % lqr->lqr_pool.op_count;
515
516                         lqr->lqr_pool.op_array[next] = src_pool->op_array[i];
517                         j++;
518                         placed++;
519                 }
520         }
521
522         lqr->lqr_dirty = 0;
523         up_write(&lod->lod_qos.lq_rw_sem);
524
525         if (placed != real_count) {
526                 /* This should never happen */
527                 LCONSOLE_ERROR_MSG(0x14e, "Failed to place all OSTs in the "
528                                    "round-robin list (%d of %d).\n",
529                                    placed, real_count);
530                 for (i = 0; i < lqr->lqr_pool.op_count; i++) {
531                         LCONSOLE(D_WARNING, "rr #%d ost idx=%d\n", i,
532                                  lqr->lqr_pool.op_array[i]);
533                 }
534                 lqr->lqr_dirty = 1;
535                 RETURN(-EAGAIN);
536         }
537
538 #if 0
539         for (i = 0; i < lqr->lqr_pool.op_count; i++)
540                 QOS_CONSOLE("rr #%d ost idx=%d\n", i, lqr->lqr_pool.op_array[i]);
541 #endif
542
543         RETURN(0);
544 }
545
546 /**
547  * A helper function to:
548  *   create in-core lu object on the specified OSP
549  *   declare creation of the object
550  * IMPORTANT: at this stage object is anonymouos - it has no fid assigned
551  *            this is a workaround till we have natural FIDs on OST
552  *
553  *            at this point we want to declare (reserve) object for us as
554  *            we can't block at execution (when create method is called).
555  *            otherwise we'd block whole transaction batch
556  */
557 static struct dt_object *lod_qos_declare_object_on(const struct lu_env *env,
558                                                    struct lod_device *d,
559                                                    __u32 ost_idx,
560                                                    struct thandle *th)
561 {
562         struct lod_tgt_desc *ost;
563         struct lu_object *o, *n;
564         struct lu_device *nd;
565         struct dt_object *dt;
566         int               rc;
567         ENTRY;
568
569         LASSERT(d);
570         LASSERT(ost_idx < d->lod_osts_size);
571         ost = OST_TGT(d,ost_idx);
572         LASSERT(ost);
573         LASSERT(ost->ltd_ost);
574
575         nd = &ost->ltd_ost->dd_lu_dev;
576
577         /*
578          * allocate anonymous object with zero fid, real fid
579          * will be assigned by OSP within transaction
580          * XXX: to be fixed with fully-functional OST fids
581          */
582         o = lu_object_anon(env, nd, NULL);
583         if (IS_ERR(o))
584                 GOTO(out, dt = ERR_PTR(PTR_ERR(o)));
585
586         n = lu_object_locate(o->lo_header, nd->ld_type);
587         if (unlikely(n == NULL)) {
588                 CERROR("can't find slice\n");
589                 lu_object_put(env, o);
590                 GOTO(out, dt = ERR_PTR(-EINVAL));
591         }
592
593         dt = container_of(n, struct dt_object, do_lu);
594
595         rc = dt_declare_create(env, dt, NULL, NULL, NULL, th);
596         if (rc) {
597                 CDEBUG(D_OTHER, "can't declare creation on #%u: %d\n",
598                        ost_idx, rc);
599                 lu_object_put(env, o);
600                 dt = ERR_PTR(rc);
601         }
602
603 out:
604         RETURN(dt);
605 }
606
607 static int min_stripe_count(__u32 stripe_cnt, int flags)
608 {
609         return (flags & LOV_USES_DEFAULT_STRIPE ?
610                         stripe_cnt - (stripe_cnt / 4) : stripe_cnt);
611 }
612
613 #define LOV_CREATE_RESEED_MULT 30
614 #define LOV_CREATE_RESEED_MIN  2000
615
616 static int inline lod_qos_dev_is_full(struct obd_statfs *msfs)
617 {
618         __u64 used;
619         int   bs = msfs->os_bsize;
620
621         LASSERT(((bs - 1) & bs) == 0);
622
623         /* the minimum of 0.1% used blocks and 1GB bytes. */
624         used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10,
625                         1 << (31 - ffs(bs)));
626         return (msfs->os_bavail < used);
627 }
628
629 static inline int lod_qos_ost_in_use_clear(const struct lu_env *env,
630                                            __u32 stripes)
631 {
632         struct lod_thread_info *info = lod_env_info(env);
633
634         if (info->lti_ea_store_size < sizeof(int) * stripes)
635                 lod_ea_store_resize(info, stripes * sizeof(int));
636         if (info->lti_ea_store_size < sizeof(int) * stripes) {
637                 CERROR("can't allocate memory for ost-in-use array\n");
638                 return -ENOMEM;
639         }
640         memset(info->lti_ea_store, -1, sizeof(int) * stripes);
641         return 0;
642 }
643
644 static inline void lod_qos_ost_in_use(const struct lu_env *env, int idx, int ost)
645 {
646         struct lod_thread_info *info = lod_env_info(env);
647         int *osts = info->lti_ea_store;
648
649         LASSERT(info->lti_ea_store_size >= idx * sizeof(int));
650         osts[idx] = ost;
651 }
652
653 static int lod_qos_is_ost_used(const struct lu_env *env, int ost, __u32 stripes)
654 {
655         struct lod_thread_info *info = lod_env_info(env);
656         int *osts = info->lti_ea_store;
657         __u32 j;
658
659         for (j = 0; j < stripes; j++) {
660                 if (osts[j] == ost)
661                         return 1;
662         }
663         return 0;
664 }
665
666 /* Allocate objects on OSTs with round-robin algorithm */
667 static int lod_alloc_rr(const struct lu_env *env, struct lod_object *lo,
668                         struct dt_object **stripe, int flags,
669                         struct thandle *th)
670 {
671         struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
672         struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs;
673         struct pool_desc  *pool = NULL;
674         struct ost_pool   *osts;
675         struct lod_qos_rr *lqr;
676         struct dt_object  *o;
677         unsigned int       i, array_idx;
678         int                rc;
679         __u32              ost_start_idx_temp;
680         int                speed = 0;
681         __u32              stripe_idx = 0;
682         __u32              stripe_cnt = lo->ldo_stripenr;
683         __u32              stripe_cnt_min = min_stripe_count(stripe_cnt, flags);
684         __u32              ost_idx;
685         ENTRY;
686
687         if (lo->ldo_pool)
688                 pool = lod_find_pool(m, lo->ldo_pool);
689
690         if (pool != NULL) {
691                 down_read(&pool_tgt_rw_sem(pool));
692                 osts = &(pool->pool_obds);
693                 lqr = &(pool->pool_rr);
694         } else {
695                 osts = &(m->lod_pool_info);
696                 lqr = &(m->lod_qos.lq_rr);
697         }
698
699         rc = lod_qos_calc_rr(m, osts, lqr);
700         if (rc)
701                 GOTO(out, rc);
702
703         rc = lod_qos_ost_in_use_clear(env, lo->ldo_stripenr);
704         if (rc)
705                 GOTO(out, rc);
706
707         if (--lqr->lqr_start_count <= 0) {
708                 lqr->lqr_start_idx = cfs_rand() % osts->op_count;
709                 lqr->lqr_start_count =
710                         (LOV_CREATE_RESEED_MIN / max(osts->op_count, 1U) +
711                          LOV_CREATE_RESEED_MULT) * max(osts->op_count, 1U);
712         } else if (stripe_cnt_min >= osts->op_count ||
713                         lqr->lqr_start_idx > osts->op_count) {
714                 /* If we have allocated from all of the OSTs, slowly
715                  * precess the next start if the OST/stripe count isn't
716                  * already doing this for us. */
717                 lqr->lqr_start_idx %= osts->op_count;
718                 if (stripe_cnt > 1 && (osts->op_count % stripe_cnt) != 1)
719                         ++lqr->lqr_offset_idx;
720         }
721         down_read(&m->lod_qos.lq_rw_sem);
722         ost_start_idx_temp = lqr->lqr_start_idx;
723
724 repeat_find:
725         array_idx = (lqr->lqr_start_idx + lqr->lqr_offset_idx) %
726                         osts->op_count;
727
728         QOS_DEBUG("pool '%s' want %d startidx %d startcnt %d offset %d "
729                   "active %d count %d arrayidx %d\n",
730                   lo->ldo_pool ? lo->ldo_pool : "",
731                   stripe_cnt, lqr->lqr_start_idx, lqr->lqr_start_count,
732                   lqr->lqr_offset_idx, osts->op_count, osts->op_count,
733                   array_idx);
734
735         for (i = 0; i < osts->op_count && stripe_idx < lo->ldo_stripenr;
736              i++, array_idx = (array_idx + 1) % osts->op_count) {
737                 ++lqr->lqr_start_idx;
738                 ost_idx = lqr->lqr_pool.op_array[array_idx];
739
740                 QOS_DEBUG("#%d strt %d act %d strp %d ary %d idx %d\n",
741                           i, lqr->lqr_start_idx, /* XXX: active*/ 0,
742                           stripe_idx, array_idx, ost_idx);
743
744                 if ((ost_idx == LOV_QOS_EMPTY) ||
745                     !cfs_bitmap_check(m->lod_ost_bitmap, ost_idx))
746                         continue;
747
748                 /* Fail Check before osc_precreate() is called
749                    so we can only 'fail' single OSC. */
750                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && ost_idx == 0)
751                         continue;
752
753                 rc = lod_statfs_and_check(env, m, ost_idx, sfs);
754                 if (rc) {
755                         /* this OSP doesn't feel well */
756                         continue;
757                 }
758
759                 /*
760                  * skip full devices
761                  */
762                 if (lod_qos_dev_is_full(sfs)) {
763                         QOS_DEBUG("#%d is full\n", ost_idx);
764                         continue;
765                 }
766
767                 /*
768                  * We expect number of precreated objects in f_ffree at
769                  * the first iteration, skip OSPs with no objects ready
770                  */
771                 if (sfs->os_fprecreated == 0 && speed == 0) {
772                         QOS_DEBUG("#%d: precreation is empty\n", ost_idx);
773                         continue;
774                 }
775
776                 /*
777                  * try to use another OSP if this one is degraded
778                  */
779                 if (sfs->os_state & OS_STATE_DEGRADED && speed < 2) {
780                         QOS_DEBUG("#%d: degraded\n", ost_idx);
781                         continue;
782                 }
783
784                 /*
785                  * do not put >1 objects on a single OST
786                  */
787                 if (speed && lod_qos_is_ost_used(env, ost_idx, stripe_idx))
788                         continue;
789
790                 o = lod_qos_declare_object_on(env, m, ost_idx, th);
791                 if (IS_ERR(o)) {
792                         CDEBUG(D_OTHER, "can't declare new object on #%u: %d\n",
793                                ost_idx, (int) PTR_ERR(o));
794                         rc = PTR_ERR(o);
795                         continue;
796                 }
797
798                 /*
799                  * We've successfuly declared (reserved) an object
800                  */
801                 lod_qos_ost_in_use(env, stripe_idx, ost_idx);
802                 stripe[stripe_idx] = o;
803                 stripe_idx++;
804
805         }
806         if ((speed < 2) && (stripe_idx < stripe_cnt_min)) {
807                 /* Try again, allowing slower OSCs */
808                 speed++;
809                 lqr->lqr_start_idx = ost_start_idx_temp;
810                 goto repeat_find;
811         }
812
813         up_read(&m->lod_qos.lq_rw_sem);
814
815         if (stripe_idx) {
816                 lo->ldo_stripenr = stripe_idx;
817                 /* at least one stripe is allocated */
818                 rc = 0;
819         } else {
820                 /* nobody provided us with a single object */
821                 rc = -ENOSPC;
822         }
823
824 out:
825         if (pool != NULL) {
826                 up_read(&pool_tgt_rw_sem(pool));
827                 /* put back ref got by lod_find_pool() */
828                 lod_pool_putref(pool);
829         }
830
831         RETURN(rc);
832 }
833
834 /* alloc objects on osts with specific stripe offset */
835 static int lod_alloc_specific(const struct lu_env *env, struct lod_object *lo,
836                               struct dt_object **stripe, int flags,
837                               struct thandle *th)
838 {
839         struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
840         struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs;
841         struct dt_object  *o;
842         __u32              ost_idx;
843         unsigned int       i, array_idx, ost_count;
844         int                rc, stripe_num = 0;
845         int                speed = 0;
846         struct pool_desc  *pool = NULL;
847         struct ost_pool   *osts;
848         ENTRY;
849
850         rc = lod_qos_ost_in_use_clear(env, lo->ldo_stripenr);
851         if (rc)
852                 GOTO(out, rc);
853
854         if (lo->ldo_pool)
855                 pool = lod_find_pool(m, lo->ldo_pool);
856
857         if (pool != NULL) {
858                 down_read(&pool_tgt_rw_sem(pool));
859                 osts = &(pool->pool_obds);
860         } else {
861                 osts = &(m->lod_pool_info);
862         }
863
864         ost_count = osts->op_count;
865
866 repeat_find:
867         /* search loi_ost_idx in ost array */
868         array_idx = 0;
869         for (i = 0; i < ost_count; i++) {
870                 if (osts->op_array[i] == lo->ldo_def_stripe_offset) {
871                         array_idx = i;
872                         break;
873                 }
874         }
875         if (i == ost_count) {
876                 CERROR("Start index %d not found in pool '%s'\n",
877                        lo->ldo_def_stripe_offset,
878                        lo->ldo_pool ? lo->ldo_pool : "");
879                 GOTO(out, rc = -EINVAL);
880         }
881
882         for (i = 0; i < ost_count;
883                         i++, array_idx = (array_idx + 1) % ost_count) {
884                 ost_idx = osts->op_array[array_idx];
885
886                 if (!cfs_bitmap_check(m->lod_ost_bitmap, ost_idx))
887                         continue;
888
889                 /* Fail Check before osc_precreate() is called
890                    so we can only 'fail' single OSC. */
891                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && ost_idx == 0)
892                         continue;
893
894                 /*
895                  * do not put >1 objects on a single OST
896                  */
897                 if (lod_qos_is_ost_used(env, ost_idx, stripe_num))
898                         continue;
899
900                 /* Drop slow OSCs if we can, but not for requested start idx.
901                  *
902                  * This means "if OSC is slow and it is not the requested
903                  * start OST, then it can be skipped, otherwise skip it only
904                  * if it is inactive/recovering/out-of-space." */
905
906                 rc = lod_statfs_and_check(env, m, ost_idx, sfs);
907                 if (rc) {
908                         /* this OSP doesn't feel well */
909                         continue;
910                 }
911
912                 /*
913                  * We expect number of precreated objects in f_ffree at
914                  * the first iteration, skip OSPs with no objects ready
915                  * don't apply this logic to OST specified with stripe_offset
916                  */
917                 if (i != 0 && sfs->os_fprecreated == 0 && speed == 0)
918                         continue;
919
920                 o = lod_qos_declare_object_on(env, m, ost_idx, th);
921                 if (IS_ERR(o)) {
922                         CDEBUG(D_OTHER, "can't declare new object on #%u: %d\n",
923                                ost_idx, (int) PTR_ERR(o));
924                         continue;
925                 }
926
927                 /*
928                  * We've successfuly declared (reserved) an object
929                  */
930                 lod_qos_ost_in_use(env, stripe_num, ost_idx);
931                 stripe[stripe_num] = o;
932                 stripe_num++;
933
934                 /* We have enough stripes */
935                 if (stripe_num == lo->ldo_stripenr)
936                         GOTO(out, rc = 0);
937         }
938         if (speed < 2) {
939                 /* Try again, allowing slower OSCs */
940                 speed++;
941                 goto repeat_find;
942         }
943
944         /* If we were passed specific striping params, then a failure to
945          * meet those requirements is an error, since we can't reallocate
946          * that memory (it might be part of a larger array or something).
947          *
948          * We can only get here if lsm_stripe_count was originally > 1.
949          */
950         CERROR("can't lstripe objid "DFID": have %d want %u\n",
951                PFID(lu_object_fid(lod2lu_obj(lo))), stripe_num,
952                lo->ldo_stripenr);
953         rc = -EFBIG;
954 out:
955         if (pool != NULL) {
956                 up_read(&pool_tgt_rw_sem(pool));
957                 /* put back ref got by lod_find_pool() */
958                 lod_pool_putref(pool);
959         }
960
961         RETURN(rc);
962 }
963
964 static inline int lod_qos_is_usable(struct lod_device *lod)
965 {
966 #ifdef FORCE_QOS
967         /* to be able to debug QoS code */
968         return 1;
969 #endif
970
971         /* Detect -EAGAIN early, before expensive lock is taken. */
972         if (!lod->lod_qos.lq_dirty && lod->lod_qos.lq_same_space)
973                 return 0;
974
975         if (lod->lod_desc.ld_active_tgt_count < 2)
976                 return 0;
977
978         return 1;
979 }
980
981 /* Alloc objects on OSTs with optimization based on:
982    - free space
983    - network resources (shared OSS's)
984  */
985 static int lod_alloc_qos(const struct lu_env *env, struct lod_object *lo,
986                          struct dt_object **stripe, int flags,
987                          struct thandle *th)
988 {
989         struct lod_device   *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
990         struct obd_statfs   *sfs = &lod_env_info(env)->lti_osfs;
991         struct lod_tgt_desc *ost;
992         struct dt_object    *o;
993         __u64                total_weight = 0;
994         unsigned int         i;
995         int                  rc = 0;
996         __u32                nfound, good_osts;
997         __u32                stripe_cnt = lo->ldo_stripenr;
998         __u32                stripe_cnt_min;
999         struct pool_desc    *pool = NULL;
1000         struct ost_pool    *osts;
1001         ENTRY;
1002
1003         stripe_cnt_min = min_stripe_count(stripe_cnt, flags);
1004         if (stripe_cnt_min < 1)
1005                 RETURN(-EINVAL);
1006
1007         if (lo->ldo_pool)
1008                 pool = lod_find_pool(m, lo->ldo_pool);
1009
1010         if (pool != NULL) {
1011                 down_read(&pool_tgt_rw_sem(pool));
1012                 osts = &(pool->pool_obds);
1013         } else {
1014                 osts = &(m->lod_pool_info);
1015         }
1016
1017         /* Detect -EAGAIN early, before expensive lock is taken. */
1018         if (!lod_qos_is_usable(m))
1019                 GOTO(out_nolock, rc = -EAGAIN);
1020
1021         /* Do actual allocation, use write lock here. */
1022         down_write(&m->lod_qos.lq_rw_sem);
1023
1024         /*
1025          * Check again, while we were sleeping on @lq_rw_sem things could
1026          * change.
1027          */
1028         if (!lod_qos_is_usable(m))
1029                 GOTO(out, rc = -EAGAIN);
1030
1031         rc = lod_qos_calc_ppo(m);
1032         if (rc)
1033                 GOTO(out, rc);
1034
1035         rc = lod_qos_ost_in_use_clear(env, lo->ldo_stripenr);
1036         if (rc)
1037                 GOTO(out, rc);
1038
1039         good_osts = 0;
1040         /* Find all the OSTs that are valid stripe candidates */
1041         for (i = 0; i < osts->op_count; i++) {
1042                 if (!cfs_bitmap_check(m->lod_ost_bitmap, osts->op_array[i]))
1043                         continue;
1044
1045                 rc = lod_statfs_and_check(env, m, osts->op_array[i], sfs);
1046                 if (rc) {
1047                         /* this OSP doesn't feel well */
1048                         continue;
1049                 }
1050
1051                 /*
1052                  * skip full devices
1053                  */
1054                 if (lod_qos_dev_is_full(sfs))
1055                         continue;
1056
1057                 /* Fail Check before osc_precreate() is called
1058                    so we can only 'fail' single OSC. */
1059                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) &&
1060                                    osts->op_array[i] == 0)
1061                         continue;
1062
1063                 ost = OST_TGT(m,osts->op_array[i]);
1064                 ost->ltd_qos.ltq_usable = 1;
1065                 lod_qos_calc_weight(m, osts->op_array[i]);
1066                 total_weight += ost->ltd_qos.ltq_weight;
1067
1068                 good_osts++;
1069         }
1070
1071         QOS_DEBUG("found %d good osts\n", good_osts);
1072
1073         if (good_osts < stripe_cnt_min)
1074                 GOTO(out, rc = -EAGAIN);
1075
1076         /* We have enough osts */
1077         if (good_osts < stripe_cnt)
1078                 stripe_cnt = good_osts;
1079
1080         /* Find enough OSTs with weighted random allocation. */
1081         nfound = 0;
1082         while (nfound < stripe_cnt) {
1083                 __u64 rand, cur_weight;
1084
1085                 cur_weight = 0;
1086                 rc = -ENOSPC;
1087
1088                 if (total_weight) {
1089 #if BITS_PER_LONG == 32
1090                         rand = cfs_rand() % (unsigned)total_weight;
1091                         /* If total_weight > 32-bit, first generate the high
1092                          * 32 bits of the random number, then add in the low
1093                          * 32 bits (truncated to the upper limit, if needed) */
1094                         if (total_weight > 0xffffffffULL)
1095                                 rand = (__u64)(cfs_rand() %
1096                                         (unsigned)(total_weight >> 32)) << 32;
1097                         else
1098                                 rand = 0;
1099
1100                         if (rand == (total_weight & 0xffffffff00000000ULL))
1101                                 rand |= cfs_rand() % (unsigned)total_weight;
1102                         else
1103                                 rand |= cfs_rand();
1104
1105 #else
1106                         rand = ((__u64)cfs_rand() << 32 | cfs_rand()) %
1107                                 total_weight;
1108 #endif
1109                 } else {
1110                         rand = 0;
1111                 }
1112
1113                 /* On average, this will hit larger-weighted osts more often.
1114                    0-weight osts will always get used last (only when rand=0) */
1115                 for (i = 0; i < osts->op_count; i++) {
1116                         __u32 idx = osts->op_array[i];
1117
1118                         if (!cfs_bitmap_check(m->lod_ost_bitmap, idx))
1119                                 continue;
1120
1121                         ost = OST_TGT(m,idx);
1122
1123                         if (!ost->ltd_qos.ltq_usable)
1124                                 continue;
1125
1126                         cur_weight += ost->ltd_qos.ltq_weight;
1127                         QOS_DEBUG("stripe_cnt=%d nfound=%d cur_weight="LPU64
1128                                   " rand="LPU64" total_weight="LPU64"\n",
1129                                   stripe_cnt, nfound, cur_weight, rand,
1130                                   total_weight);
1131
1132                         if (cur_weight < rand)
1133                                 continue;
1134
1135                         QOS_DEBUG("stripe=%d to idx=%d\n", nfound, idx);
1136
1137                         /*
1138                          * do not put >1 objects on a single OST
1139                          */
1140                         if (lod_qos_is_ost_used(env, idx, nfound))
1141                                 continue;
1142                         lod_qos_ost_in_use(env, nfound, idx);
1143
1144                         o = lod_qos_declare_object_on(env, m, idx, th);
1145                         if (IS_ERR(o)) {
1146                                 QOS_DEBUG("can't declare object on #%u: %d\n",
1147                                           idx, (int) PTR_ERR(o));
1148                                 continue;
1149                         }
1150                         stripe[nfound++] = o;
1151                         lod_qos_used(m, osts, idx, &total_weight);
1152                         rc = 0;
1153                         break;
1154                 }
1155
1156                 if (rc) {
1157                         /* no OST found on this iteration, give up */
1158                         break;
1159                 }
1160         }
1161
1162         if (unlikely(nfound != stripe_cnt)) {
1163                 /*
1164                  * when the decision to use weighted algorithm was made
1165                  * we had enough appropriate OSPs, but this state can
1166                  * change anytime (no space on OST, broken connection, etc)
1167                  * so it's possible OSP won't be able to provide us with
1168                  * an object due to just changed state
1169                  */
1170                 LCONSOLE_INFO("wanted %d, found %d\n", stripe_cnt, nfound);
1171                 for (i = 0; i < nfound; i++) {
1172                         LASSERT(stripe[i] != NULL);
1173                         lu_object_put(env, &stripe[i]->do_lu);
1174                         stripe[i] = NULL;
1175                 }
1176
1177                 /* makes sense to rebalance next time */
1178                 m->lod_qos.lq_dirty = 1;
1179                 m->lod_qos.lq_same_space = 0;
1180
1181                 rc = -EAGAIN;
1182         }
1183
1184 out:
1185         up_write(&m->lod_qos.lq_rw_sem);
1186
1187 out_nolock:
1188         if (pool != NULL) {
1189                 up_read(&pool_tgt_rw_sem(pool));
1190                 /* put back ref got by lod_find_pool() */
1191                 lod_pool_putref(pool);
1192         }
1193
1194         RETURN(rc);
1195 }
1196
1197 /* Find the max stripecount we should use */
1198 static __u16 lod_get_stripecnt(struct lod_device *lod, __u32 magic,
1199                                __u16 stripe_count)
1200 {
1201         __u32 max_stripes = LOV_MAX_STRIPE_COUNT_OLD;
1202
1203         if (!stripe_count)
1204                 stripe_count = lod->lod_desc.ld_default_stripe_count;
1205         if (stripe_count > lod->lod_desc.ld_active_tgt_count)
1206                 stripe_count = lod->lod_desc.ld_active_tgt_count;
1207         if (!stripe_count)
1208                 stripe_count = 1;
1209
1210         /* stripe count is based on whether OSD can handle larger EA sizes */
1211         if (lod->lod_osd_max_easize > 0)
1212                 max_stripes = lov_mds_md_max_stripe_count(
1213                         lod->lod_osd_max_easize, magic);
1214
1215         return (stripe_count < max_stripes) ? stripe_count : max_stripes;
1216 }
1217
1218 static int lod_use_defined_striping(const struct lu_env *env,
1219                                     struct lod_object *mo,
1220                                     const struct lu_buf *buf)
1221 {
1222         struct lov_mds_md_v1   *v1 = buf->lb_buf;
1223         struct lov_mds_md_v3   *v3 = buf->lb_buf;
1224         struct lov_ost_data_v1 *objs;
1225         __u32                   magic;
1226         int                     rc = 0;
1227         ENTRY;
1228
1229         magic = le32_to_cpu(v1->lmm_magic);
1230         if (magic == LOV_MAGIC_V1_DEF) {
1231                 magic = LOV_MAGIC_V1;
1232                 objs = &v1->lmm_objects[0];
1233         } else if (magic == LOV_MAGIC_V3_DEF) {
1234                 magic = LOV_MAGIC_V3;
1235                 objs = &v3->lmm_objects[0];
1236                 lod_object_set_pool(mo, v3->lmm_pool_name);
1237         } else {
1238                 GOTO(out, rc = -EINVAL);
1239         }
1240
1241         mo->ldo_pattern = le32_to_cpu(v1->lmm_pattern);
1242         mo->ldo_stripe_size = le32_to_cpu(v1->lmm_stripe_size);
1243         mo->ldo_stripenr = le16_to_cpu(v1->lmm_stripe_count);
1244         mo->ldo_layout_gen = le16_to_cpu(v1->lmm_layout_gen);
1245
1246         /* fixup for released file before object initialization */
1247         if (mo->ldo_pattern & LOV_PATTERN_F_RELEASED) {
1248                 mo->ldo_released_stripenr = mo->ldo_stripenr;
1249                 mo->ldo_stripenr = 0;
1250         }
1251
1252         LASSERT(buf->lb_len >= lov_mds_md_size(mo->ldo_stripenr, magic));
1253
1254         if (mo->ldo_stripenr > 0)
1255                 rc = lod_initialize_objects(env, mo, objs);
1256
1257 out:
1258         RETURN(rc);
1259 }
1260
1261 static int lod_qos_parse_config(const struct lu_env *env,
1262                                 struct lod_object *lo,
1263                                 const struct lu_buf *buf)
1264 {
1265         struct lod_device     *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
1266         struct lov_user_md_v1 *v1 = NULL;
1267         struct lov_user_md_v3 *v3 = NULL;
1268         struct pool_desc      *pool;
1269         __u32                  magic;
1270         int                    rc;
1271         ENTRY;
1272
1273         if (buf == NULL || buf->lb_buf == NULL || buf->lb_len == 0)
1274                 RETURN(0);
1275
1276         v1 = buf->lb_buf;
1277         magic = v1->lmm_magic;
1278
1279         if (magic == __swab32(LOV_USER_MAGIC_V1)) {
1280                 lustre_swab_lov_user_md_v1(v1);
1281                 magic = v1->lmm_magic;
1282         } else if (magic == __swab32(LOV_USER_MAGIC_V3)) {
1283                 v3 = buf->lb_buf;
1284                 lustre_swab_lov_user_md_v3(v3);
1285                 magic = v3->lmm_magic;
1286         }
1287
1288         if (unlikely(magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3)) {
1289                 /* try to use as fully defined striping */
1290                 rc = lod_use_defined_striping(env, lo, buf);
1291                 RETURN(rc);
1292         }
1293
1294         if (unlikely(buf->lb_len < sizeof(*v1))) {
1295                 CERROR("wrong size: %u\n", (unsigned) buf->lb_len);
1296                 RETURN(-EINVAL);
1297         }
1298
1299         v1->lmm_magic = magic;
1300         if (v1->lmm_pattern == 0)
1301                 v1->lmm_pattern = LOV_PATTERN_RAID0;
1302         if (lov_pattern(v1->lmm_pattern) != LOV_PATTERN_RAID0) {
1303                 CERROR("invalid pattern: %x\n", v1->lmm_pattern);
1304                 RETURN(-EINVAL);
1305         }
1306         lo->ldo_pattern = v1->lmm_pattern;
1307
1308         if (v1->lmm_stripe_size)
1309                 lo->ldo_stripe_size = v1->lmm_stripe_size;
1310         if (lo->ldo_stripe_size & (LOV_MIN_STRIPE_SIZE - 1))
1311                 lo->ldo_stripe_size = LOV_MIN_STRIPE_SIZE;
1312
1313         if (v1->lmm_stripe_count)
1314                 lo->ldo_stripenr = v1->lmm_stripe_count;
1315
1316         if ((v1->lmm_stripe_offset >= d->lod_desc.ld_tgt_count) &&
1317             (v1->lmm_stripe_offset != (typeof(v1->lmm_stripe_offset))(-1))) {
1318                 CERROR("invalid offset: %x\n", v1->lmm_stripe_offset);
1319                 RETURN(-EINVAL);
1320         }
1321         lo->ldo_def_stripe_offset = v1->lmm_stripe_offset;
1322
1323         CDEBUG(D_OTHER, "lsm: %u size, %u stripes, %u offset\n",
1324                v1->lmm_stripe_size, v1->lmm_stripe_count,
1325                v1->lmm_stripe_offset);
1326
1327         if (v1->lmm_magic == LOV_MAGIC_V3) {
1328                 if (buf->lb_len < sizeof(*v3)) {
1329                         CERROR("wrong size: %u\n", (unsigned) buf->lb_len);
1330                         RETURN(-EINVAL);
1331                 }
1332
1333                 v3 = buf->lb_buf;
1334                 lod_object_set_pool(lo, v3->lmm_pool_name);
1335
1336                 /* In the function below, .hs_keycmp resolves to
1337                  * pool_hashkey_keycmp() */
1338                 /* coverity[overrun-buffer-val] */
1339                 pool = lod_find_pool(d, v3->lmm_pool_name);
1340                 if (pool != NULL) {
1341                         if (lo->ldo_def_stripe_offset !=
1342                             (typeof(v1->lmm_stripe_offset))(-1)) {
1343                                 rc = lo->ldo_def_stripe_offset;
1344                                 rc = lod_check_index_in_pool(rc, pool);
1345                                 if (rc < 0) {
1346                                         lod_pool_putref(pool);
1347                                         CERROR("invalid offset\n");
1348                                         RETURN(-EINVAL);
1349                                 }
1350                         }
1351
1352                         if (lo->ldo_stripenr > pool_tgt_count(pool))
1353                                 lo->ldo_stripenr= pool_tgt_count(pool);
1354
1355                         lod_pool_putref(pool);
1356                 }
1357         } else
1358                 lod_object_set_pool(lo, NULL);
1359
1360         /* fixup for released file */
1361         if (lo->ldo_pattern & LOV_PATTERN_F_RELEASED) {
1362                 lo->ldo_released_stripenr = lo->ldo_stripenr;
1363                 lo->ldo_stripenr = 0;
1364         }
1365
1366         RETURN(0);
1367 }
1368
1369 /*
1370  * buf should be NULL or contain striping settings
1371  */
1372 int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
1373                         struct lu_attr *attr, const struct lu_buf *buf,
1374                         struct thandle *th)
1375 {
1376         struct lod_device      *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
1377         struct dt_object      **stripe;
1378         int                     stripe_len;
1379         int                     flag = LOV_USES_ASSIGNED_STRIPE;
1380         int                     i, rc;
1381         ENTRY;
1382
1383         LASSERT(lo);
1384
1385         /* no OST available */
1386         /* XXX: should we be waiting a bit to prevent failures during
1387          * cluster initialization? */
1388         if (d->lod_ostnr == 0)
1389                 GOTO(out, rc = -EIO);
1390
1391         /*
1392          * by this time, the object's ldo_stripenr and ldo_stripe_size
1393          * contain default value for striping: taken from the parent
1394          * or from filesystem defaults
1395          *
1396          * in case the caller is passing lovea with new striping config,
1397          * we may need to parse lovea and apply new configuration
1398          */
1399         rc = lod_qos_parse_config(env, lo, buf);
1400         if (rc)
1401                 GOTO(out, rc);
1402
1403         /* A released file is being created */
1404         if (lo->ldo_stripenr == 0)
1405                 GOTO(out, rc = 0);
1406
1407         if (likely(lo->ldo_stripe == NULL)) {
1408                 /*
1409                  * no striping has been created so far
1410                  */
1411                 LASSERT(lo->ldo_stripenr > 0);
1412                 /*
1413                  * statfs and check OST targets now, since ld_active_tgt_count
1414                  * could be changed if some OSTs are [de]activated manually.
1415                  */
1416                 lod_qos_statfs_update(env, d);
1417                 lo->ldo_stripenr = lod_get_stripecnt(d, LOV_MAGIC,
1418                                 lo->ldo_stripenr);
1419
1420                 stripe_len = lo->ldo_stripenr;
1421                 OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_len);
1422                 if (stripe == NULL)
1423                         GOTO(out, rc = -ENOMEM);
1424
1425                 lod_getref(&d->lod_ost_descs);
1426                 /* XXX: support for non-0 files w/o objects */
1427                 CDEBUG(D_OTHER, "tgt_count %d stripenr %d\n",
1428                                 d->lod_desc.ld_tgt_count, stripe_len);
1429                 if (lo->ldo_def_stripe_offset >= d->lod_desc.ld_tgt_count) {
1430                         rc = lod_alloc_qos(env, lo, stripe, flag, th);
1431                         if (rc == -EAGAIN)
1432                                 rc = lod_alloc_rr(env, lo, stripe, flag, th);
1433                 } else {
1434                         rc = lod_alloc_specific(env, lo, stripe, flag, th);
1435                 }
1436                 lod_putref(d, &d->lod_ost_descs);
1437
1438                 if (rc < 0) {
1439                         for (i = 0; i < stripe_len; i++)
1440                                 if (stripe[i] != NULL)
1441                                         lu_object_put(env, &stripe[i]->do_lu);
1442
1443                         OBD_FREE(stripe, sizeof(stripe[0]) * stripe_len);
1444                         lo->ldo_stripenr = 0;
1445                 } else {
1446                         lo->ldo_stripe = stripe;
1447                         lo->ldo_stripes_allocated = stripe_len;
1448                 }
1449         } else {
1450                 /*
1451                  * lod_qos_parse_config() found supplied buf as a predefined
1452                  * striping (not a hint), so it allocated all the object
1453                  * now we need to create them
1454                  */
1455                 for (i = 0; i < lo->ldo_stripenr; i++) {
1456                         struct dt_object  *o;
1457
1458                         o = lo->ldo_stripe[i];
1459                         LASSERT(o);
1460
1461                         rc = dt_declare_create(env, o, attr, NULL, NULL, th);
1462                         if (rc) {
1463                                 CERROR("can't declare create: %d\n", rc);
1464                                 break;
1465                         }
1466                 }
1467         }
1468
1469 out:
1470         RETURN(rc);
1471 }
1472