Whamcloud - gitweb
2a70c099d99794b62ece81e8cab4aa7bf0bb998c
[fs/lustre-release.git] / lustre / lod / lod_qos.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2012, Whamcloud, Inc.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/lod/lod_qos.c
33  *
34  */
35
36 #define DEBUG_SUBSYSTEM S_LOV
37
38 #include <libcfs/libcfs.h>
39 #include <obd_class.h>
40 #include <obd_lov.h>
41 #include <lustre/lustre_idl.h>
42 #include "lod_internal.h"
43
44 /*
45  * force QoS policy (not RR) to be used for testing purposes
46  */
47 #define FORCE_QOS_
48
49 #define D_QOS   D_OTHER
50
51 #if 0
52 #define QOS_DEBUG(fmt, ...)     CDEBUG(D_OTHER, fmt, ## __VA_ARGS__)
53 #define QOS_CONSOLE(fmt, ...)   LCONSOLE(D_OTHER, fmt, ## __VA_ARGS__)
54 #else
55 #define QOS_DEBUG(fmt, ...)
56 #define QOS_CONSOLE(fmt, ...)
57 #endif
58
59 #define TGT_BAVAIL(i) (OST_TGT(lod,i)->ltd_statfs.os_bavail * \
60                        OST_TGT(lod,i)->ltd_statfs.os_bsize)
61
62 int qos_add_tgt(struct lod_device *lod, struct lod_ost_desc *ost_desc)
63 {
64         struct lov_qos_oss *oss = NULL, *temposs;
65         struct obd_export  *exp = ost_desc->ltd_exp;
66         int                 rc = 0, found = 0;
67         cfs_list_t         *list;
68         ENTRY;
69
70         cfs_down_write(&lod->lod_qos.lq_rw_sem);
71         /*
72          * a bit hacky approach to learn NID of corresponding connection
73          * but there is no official API to access information like this
74          * with OSD API.
75          */
76         cfs_list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list) {
77                 if (obd_uuid_equals(&oss->lqo_uuid,
78                                     &exp->exp_connection->c_remote_uuid)) {
79                         found++;
80                         break;
81                 }
82         }
83
84         if (!found) {
85                 OBD_ALLOC_PTR(oss);
86                 if (!oss)
87                         GOTO(out, rc = -ENOMEM);
88                 memcpy(&oss->lqo_uuid, &exp->exp_connection->c_remote_uuid,
89                        sizeof(oss->lqo_uuid));
90         } else {
91                 /* Assume we have to move this one */
92                 cfs_list_del(&oss->lqo_oss_list);
93         }
94
95         oss->lqo_ost_count++;
96         ost_desc->ltd_qos.ltq_oss = oss;
97
98         CDEBUG(D_QOS, "add tgt %s to OSS %s (%d OSTs)\n",
99                obd_uuid2str(&ost_desc->ltd_uuid), obd_uuid2str(&oss->lqo_uuid),
100                oss->lqo_ost_count);
101
102         /* Add sorted by # of OSTs.  Find the first entry that we're
103            bigger than... */
104         list = &lod->lod_qos.lq_oss_list;
105         cfs_list_for_each_entry(temposs, list, lqo_oss_list) {
106                 if (oss->lqo_ost_count > temposs->lqo_ost_count)
107                         break;
108         }
109         /* ...and add before it.  If we're the first or smallest, temposs
110            points to the list head, and we add to the end. */
111         cfs_list_add_tail(&oss->lqo_oss_list, &temposs->lqo_oss_list);
112
113         lod->lod_qos.lq_dirty = 1;
114         lod->lod_qos.lq_rr.lqr_dirty = 1;
115
116 out:
117         cfs_up_write(&lod->lod_qos.lq_rw_sem);
118         RETURN(rc);
119 }
120
121 int qos_del_tgt(struct lod_device *lod, struct lod_ost_desc *ost_desc)
122 {
123         struct lov_qos_oss *oss;
124         int                 rc = 0;
125         ENTRY;
126
127         cfs_down_write(&lod->lod_qos.lq_rw_sem);
128         oss = ost_desc->ltd_qos.ltq_oss;
129         if (!oss)
130                 GOTO(out, rc = -ENOENT);
131
132         oss->lqo_ost_count--;
133         if (oss->lqo_ost_count == 0) {
134                 CDEBUG(D_QOS, "removing OSS %s\n",
135                        obd_uuid2str(&oss->lqo_uuid));
136                 cfs_list_del(&oss->lqo_oss_list);
137                 ost_desc->ltd_qos.ltq_oss = NULL;
138                 OBD_FREE_PTR(oss);
139         }
140
141         lod->lod_qos.lq_dirty = 1;
142         lod->lod_qos.lq_rr.lqr_dirty = 1;
143 out:
144         cfs_up_write(&lod->lod_qos.lq_rw_sem);
145         RETURN(rc);
146 }
147
148 static int lod_statfs_and_check(const struct lu_env *env, struct lod_device *d,
149                                 int index, struct obd_statfs *sfs)
150 {
151         struct lod_ost_desc *ost;
152         int                  rc;
153
154         LASSERT(d);
155         ost = OST_TGT(d,index);
156         LASSERT(ost);
157
158         rc = dt_statfs(env, ost->ltd_ost, sfs);
159         if (rc)
160                 return rc;
161
162         /* check whether device has changed state (active, inactive) */
163         if (unlikely(sfs->os_blocks == 0 && ost->ltd_active)) {
164                 /* turned inactive? */
165                 cfs_spin_lock(&d->lod_desc_lock);
166                 if (sfs->os_blocks == 0 && ost->ltd_active) {
167                         ost->ltd_active = 0;
168                         LASSERT(d->lod_desc.ld_active_tgt_count > 0);
169                         d->lod_desc.ld_active_tgt_count--;
170                         d->lod_qos.lq_dirty = 1;
171                         d->lod_qos.lq_rr.lqr_dirty = 1;
172                         CDEBUG(D_CONFIG, "%s: turns inactive\n",
173                                ost->ltd_exp->exp_obd->obd_name);
174                 }
175                 cfs_spin_unlock(&d->lod_desc_lock);
176         } else if (unlikely(sfs->os_blocks && ost->ltd_active == 0)) {
177                 /* turned active? */
178                 LASSERT(d->lod_desc.ld_active_tgt_count < d->lod_ostnr);
179                 cfs_spin_lock(&d->lod_desc_lock);
180                 if (sfs->os_blocks && ost->ltd_active == 0) {
181                         ost->ltd_active = 1;
182                         d->lod_desc.ld_active_tgt_count++;
183                         d->lod_qos.lq_dirty = 1;
184                         d->lod_qos.lq_rr.lqr_dirty = 1;
185                         CDEBUG(D_CONFIG, "%s: turns active\n",
186                                ost->ltd_exp->exp_obd->obd_name);
187                 }
188                 cfs_spin_unlock(&d->lod_desc_lock);
189         }
190
191         return rc;
192 }
193
194 /*
195  * Update statfs data if the current osfs age is older than max_age.
196  * If wait is not set, it means that we are called from lov_create()
197  * and we should just issue the rpcs without waiting for them to complete.
198  * If wait is set, we are called from alloc_qos() and we just have
199  * to wait for the request set to complete.
200  */
201 static void lod_qos_statfs_update(const struct lu_env *env,
202                                   struct lod_device *lod)
203 {
204         struct obd_device *obd = lod2obd(lod);
205         struct ost_pool   *osts = &(lod->lod_pool_info);
206         int                i, idx, rc = 0;
207         __u64              max_age, avail;
208         ENTRY;
209
210         max_age = cfs_time_shift_64(-2*lod->lod_desc.ld_qos_maxage);
211
212         if (cfs_time_beforeq_64(max_age, obd->obd_osfs_age))
213                 /* statfs data are quite recent, don't need to refresh it */
214                 RETURN_EXIT;
215
216         cfs_down_write(&lod->lod_qos.lq_rw_sem);
217         if (cfs_time_beforeq_64(max_age, obd->obd_osfs_age))
218                 GOTO(out, rc = 0);
219
220         for (i = 0; i < osts->op_count; i++) {
221                 idx = osts->op_array[i];
222                 avail = OST_TGT(lod,idx)->ltd_statfs.os_bavail;
223                 rc = lod_statfs_and_check(env, lod, idx,
224                                           &OST_TGT(lod,idx)->ltd_statfs);
225                 if (rc) {
226                         /* XXX: disable this OST till next refresh? */
227                         CERROR("can't refresh statfs: %d\n", rc);
228                         break;
229                 }
230                 if (OST_TGT(lod,idx)->ltd_statfs.os_bavail != avail)
231                         /* recalculate weigths */
232                         lod->lod_qos.lq_dirty = 1;
233         }
234         obd->obd_osfs_age = cfs_time_current_64();
235
236 out:
237         cfs_up_write(&lod->lod_qos.lq_rw_sem);
238 }
239
240 /* Recalculate per-object penalties for OSSs and OSTs,
241    depends on size of each ost in an oss */
242 static int lod_qos_calc_ppo(struct lod_device *lod)
243 {
244         struct lov_qos_oss *oss;
245         __u64               ba_max, ba_min, temp;
246         __u32               num_active;
247         int                 rc, i, prio_wide;
248         time_t              now, age;
249         ENTRY;
250
251         if (!lod->lod_qos.lq_dirty)
252                 GOTO(out, rc = 0);
253
254         num_active = lod->lod_desc.ld_active_tgt_count - 1;
255         if (num_active < 1)
256                 GOTO(out, rc = -EAGAIN);
257
258         /* find bavail on each OSS */
259         cfs_list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list)
260                                 oss->lqo_bavail = 0;
261         lod->lod_qos.lq_active_oss_count = 0;
262
263         /* How badly user wants to select osts "widely" (not recently chosen
264            and not on recent oss's).  As opposed to "freely" (free space
265            avail.) 0-256. */
266         prio_wide = 256 - lod->lod_qos.lq_prio_free;
267
268         ba_min = (__u64)(-1);
269         ba_max = 0;
270         now = cfs_time_current_sec();
271         /* Calculate OST penalty per object
272          * (lod ref taken in lod_qos_prep_create()) */
273         cfs_foreach_bit(lod->lod_ost_bitmap, i) {
274                 LASSERT(OST_TGT(lod,i));
275                 temp = TGT_BAVAIL(i);
276                 if (!temp)
277                         continue;
278                 ba_min = min(temp, ba_min);
279                 ba_max = max(temp, ba_max);
280
281                 /* Count the number of usable OSS's */
282                 if (OST_TGT(lod,i)->ltd_qos.ltq_oss->lqo_bavail == 0)
283                         lod->lod_qos.lq_active_oss_count++;
284                 OST_TGT(lod,i)->ltd_qos.ltq_oss->lqo_bavail += temp;
285
286                 /* per-OST penalty is prio * TGT_bavail / (num_ost - 1) / 2 */
287                 temp >>= 1;
288                 lov_do_div64(temp, num_active);
289                 OST_TGT(lod,i)->ltd_qos.ltq_penalty_per_obj =
290                         (temp * prio_wide) >> 8;
291
292                 age = (now - OST_TGT(lod,i)->ltd_qos.ltq_used) >> 3;
293                 if (lod->lod_qos.lq_reset ||
294                                 age > 32 * lod->lod_desc.ld_qos_maxage)
295                         OST_TGT(lod,i)->ltd_qos.ltq_penalty = 0;
296                 else if (age > lod->lod_desc.ld_qos_maxage)
297                         /* Decay the penalty by half for every 8x the update
298                          * interval that the device has been idle.  That gives
299                          * lots of time for the statfs information to be
300                          * updated (which the penalty is only a proxy for),
301                          * and avoids penalizing OSS/OSTs under light load. */
302                         OST_TGT(lod,i)->ltd_qos.ltq_penalty >>=
303                                 (age / lod->lod_desc.ld_qos_maxage);
304         }
305
306         num_active = lod->lod_qos.lq_active_oss_count - 1;
307         if (num_active < 1) {
308                 /* If there's only 1 OSS, we can't penalize it, so instead
309                    we have to double the OST penalty */
310                 num_active = 1;
311                 cfs_foreach_bit(lod->lod_ost_bitmap, i)
312                         OST_TGT(lod,i)->ltd_qos.ltq_penalty_per_obj <<= 1;
313         }
314
315         /* Per-OSS penalty is prio * oss_avail / oss_osts / (num_oss - 1) / 2 */
316         cfs_list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list) {
317                 temp = oss->lqo_bavail >> 1;
318                 lov_do_div64(temp, oss->lqo_ost_count * num_active);
319                 oss->lqo_penalty_per_obj = (temp * prio_wide) >> 8;
320
321                 age = (now - oss->lqo_used) >> 3;
322                 if (lod->lod_qos.lq_reset ||
323                     age > 32 * lod->lod_desc.ld_qos_maxage)
324                         oss->lqo_penalty = 0;
325                 else if (age > lod->lod_desc.ld_qos_maxage)
326                         /* Decay the penalty by half for every 8x the update
327                          * interval that the device has been idle.  That gives
328                          * lots of time for the statfs information to be
329                          * updated (which the penalty is only a proxy for),
330                          * and avoids penalizing OSS/OSTs under light load. */
331                         oss->lqo_penalty >>= age / lod->lod_desc.ld_qos_maxage;
332         }
333
334         lod->lod_qos.lq_dirty = 0;
335         lod->lod_qos.lq_reset = 0;
336
337         /* If each ost has almost same free space,
338          * do rr allocation for better creation performance */
339         lod->lod_qos.lq_same_space = 0;
340         if ((ba_max * (256 - lod->lod_qos.lq_threshold_rr)) >> 8 < ba_min) {
341                 lod->lod_qos.lq_same_space = 1;
342                 /* Reset weights for the next time we enter qos mode */
343                 lod->lod_qos.lq_reset = 1;
344         }
345         rc = 0;
346
347 out:
348 #ifndef FORCE_QOS
349         if (!rc && lod->lod_qos.lq_same_space)
350                 RETURN(-EAGAIN);
351 #endif
352         RETURN(rc);
353 }
354
355 static int lod_qos_calc_weight(struct lod_device *lod, int i)
356 {
357         __u64 temp, temp2;
358
359         /* Final ost weight = TGT_BAVAIL - ost_penalty - oss_penalty */
360         temp = TGT_BAVAIL(i);
361         temp2 = OST_TGT(lod,i)->ltd_qos.ltq_penalty +
362                 OST_TGT(lod,i)->ltd_qos.ltq_oss->lqo_penalty;
363         if (temp < temp2)
364                 OST_TGT(lod,i)->ltd_qos.ltq_weight = 0;
365         else
366                 OST_TGT(lod,i)->ltd_qos.ltq_weight = temp - temp2;
367         return 0;
368 }
369
370 /* We just used this index for a stripe; adjust everyone's weights */
371 static int lod_qos_used(struct lod_device *lod, struct ost_pool *osts,
372                         __u32 index, __u64 *total_wt)
373 {
374         struct lod_ost_desc *ost;
375         struct lov_qos_oss  *oss;
376         int j;
377         ENTRY;
378
379         ost = OST_TGT(lod,index);
380         LASSERT(ost);
381
382         /* Don't allocate on this devuce anymore, until the next alloc_qos */
383         ost->ltd_qos.ltq_usable = 0;
384
385         oss = ost->ltd_qos.ltq_oss;
386
387         /* Decay old penalty by half (we're adding max penalty, and don't
388            want it to run away.) */
389         ost->ltd_qos.ltq_penalty >>= 1;
390         oss->lqo_penalty >>= 1;
391
392         /* mark the OSS and OST as recently used */
393         ost->ltd_qos.ltq_used = oss->lqo_used = cfs_time_current_sec();
394
395         /* Set max penalties for this OST and OSS */
396         ost->ltd_qos.ltq_penalty +=
397                 ost->ltd_qos.ltq_penalty_per_obj * lod->lod_ostnr;
398         oss->lqo_penalty += oss->lqo_penalty_per_obj *
399                 lod->lod_qos.lq_active_oss_count;
400
401         /* Decrease all OSS penalties */
402         cfs_list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list) {
403                 if (oss->lqo_penalty < oss->lqo_penalty_per_obj)
404                         oss->lqo_penalty = 0;
405                 else
406                         oss->lqo_penalty -= oss->lqo_penalty_per_obj;
407         }
408
409         *total_wt = 0;
410         /* Decrease all OST penalties */
411         for (j = 0; j < osts->op_count; j++) {
412                 int i;
413
414                 i = osts->op_array[j];
415                 if (!cfs_bitmap_check(lod->lod_ost_bitmap, i))
416                         continue;
417
418                 ost = OST_TGT(lod,i);
419                 LASSERT(ost);
420
421                 if (ost->ltd_qos.ltq_penalty <
422                                 ost->ltd_qos.ltq_penalty_per_obj)
423                         ost->ltd_qos.ltq_penalty = 0;
424                 else
425                         ost->ltd_qos.ltq_penalty -=
426                                 ost->ltd_qos.ltq_penalty_per_obj;
427
428                 lod_qos_calc_weight(lod, i);
429
430                 /* Recalc the total weight of usable osts */
431                 if (ost->ltd_qos.ltq_usable)
432                         *total_wt += ost->ltd_qos.ltq_weight;
433
434                 QOS_DEBUG("recalc tgt %d usable=%d avail="LPU64
435                           " ostppo="LPU64" ostp="LPU64" ossppo="LPU64
436                           " ossp="LPU64" wt="LPU64"\n",
437                           i, ost->ltd_qos.ltq_usable, TGT_BAVAIL(i) >> 10,
438                           ost->ltd_qos.ltq_penalty_per_obj >> 10,
439                           ost->ltd_qos.ltq_penalty >> 10,
440                           ost->ltd_qos.ltq_oss->lqo_penalty_per_obj >> 10,
441                           ost->ltd_qos.ltq_oss->lqo_penalty >> 10,
442                           ost->ltd_qos.ltq_weight >> 10);
443         }
444
445         RETURN(0);
446 }
447
448 #define LOV_QOS_EMPTY ((__u32)-1)
449 /* compute optimal round-robin order, based on OSTs per OSS */
450 static int lod_qos_calc_rr(struct lod_device *lod, struct ost_pool *src_pool,
451                            struct lov_qos_rr *lqr)
452 {
453         struct lov_qos_oss  *oss;
454         struct lod_ost_desc *ost;
455         unsigned placed, real_count;
456         int i, rc;
457         ENTRY;
458
459         if (!lqr->lqr_dirty) {
460                 LASSERT(lqr->lqr_pool.op_size);
461                 RETURN(0);
462         }
463
464         /* Do actual allocation. */
465         cfs_down_write(&lod->lod_qos.lq_rw_sem);
466
467         /*
468          * Check again. While we were sleeping on @lq_rw_sem something could
469          * change.
470          */
471         if (!lqr->lqr_dirty) {
472                 LASSERT(lqr->lqr_pool.op_size);
473                 cfs_up_write(&lod->lod_qos.lq_rw_sem);
474                 RETURN(0);
475         }
476
477         real_count = src_pool->op_count;
478
479         /* Zero the pool array */
480         /* alloc_rr is holding a read lock on the pool, so nobody is adding/
481            deleting from the pool. The lq_rw_sem insures that nobody else
482            is reading. */
483         lqr->lqr_pool.op_count = real_count;
484         rc = lod_ost_pool_extend(&lqr->lqr_pool, real_count);
485         if (rc) {
486                 cfs_up_write(&lod->lod_qos.lq_rw_sem);
487                 RETURN(rc);
488         }
489         for (i = 0; i < lqr->lqr_pool.op_count; i++)
490                 lqr->lqr_pool.op_array[i] = LOV_QOS_EMPTY;
491
492         /* Place all the OSTs from 1 OSS at the same time. */
493         placed = 0;
494         cfs_list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list) {
495                 int j = 0;
496
497                 for (i = 0; i < lqr->lqr_pool.op_count; i++) {
498                         int next;
499
500                         if (!cfs_bitmap_check(lod->lod_ost_bitmap,
501                                                 src_pool->op_array[i]))
502                                 continue;
503
504                         ost = OST_TGT(lod,src_pool->op_array[i]);
505                         LASSERT(ost && ost->ltd_ost);
506                         if (ost->ltd_qos.ltq_oss != oss)
507                                 continue;
508
509                         /* Evenly space these OSTs across arrayspace */
510                         next = j * lqr->lqr_pool.op_count / oss->lqo_ost_count;
511                         while (lqr->lqr_pool.op_array[next] != LOV_QOS_EMPTY)
512                                 next = (next + 1) % lqr->lqr_pool.op_count;
513
514                         lqr->lqr_pool.op_array[next] = src_pool->op_array[i];
515                         j++;
516                         placed++;
517                 }
518         }
519
520         lqr->lqr_dirty = 0;
521         cfs_up_write(&lod->lod_qos.lq_rw_sem);
522
523         if (placed != real_count) {
524                 /* This should never happen */
525                 LCONSOLE_ERROR_MSG(0x14e, "Failed to place all OSTs in the "
526                                    "round-robin list (%d of %d).\n",
527                                    placed, real_count);
528                 for (i = 0; i < lqr->lqr_pool.op_count; i++) {
529                         LCONSOLE(D_WARNING, "rr #%d ost idx=%d\n", i,
530                                  lqr->lqr_pool.op_array[i]);
531                 }
532                 lqr->lqr_dirty = 1;
533                 RETURN(-EAGAIN);
534         }
535
536 #if 0
537         for (i = 0; i < lqr->lqr_pool.op_count; i++)
538                 QOS_CONSOLE("rr #%d ost idx=%d\n", i, lqr->lqr_pool.op_array[i]);
539 #endif
540
541         RETURN(0);
542 }
543
544 /**
545  * A helper function to:
546  *   create in-core lu object on the specified OSP
547  *   declare creation of the object
548  * IMPORTANT: at this stage object is anonymouos - it has no fid assigned
549  *            this is a workaround till we have natural FIDs on OST
550  *
551  *            at this point we want to declare (reserve) object for us as
552  *            we can't block at execution (when create method is called).
553  *            otherwise we'd block whole transaction batch
554  */
555 static struct dt_object *lod_qos_declare_object_on(const struct lu_env *env,
556                                                    struct lod_device *d,
557                                                    int ost_idx,
558                                                    struct thandle *th)
559 {
560         struct lod_ost_desc *ost;
561         struct lu_object *o, *n;
562         struct lu_device *nd;
563         struct dt_object *dt;
564         int               rc;
565         ENTRY;
566
567         LASSERT(d);
568         LASSERT(ost_idx >= 0);
569         LASSERT(ost_idx < d->lod_osts_size);
570         ost = OST_TGT(d,ost_idx);
571         LASSERT(ost);
572         LASSERT(ost->ltd_ost);
573
574         nd = &ost->ltd_ost->dd_lu_dev;
575
576         /*
577          * allocate anonymous object with zero fid, real fid
578          * will be assigned by OSP within transaction
579          * XXX: to be fixed with fully-functional OST fids
580          */
581         o = lu_object_anon(env, nd, NULL);
582         if (IS_ERR(o))
583                 GOTO(out, dt = ERR_PTR(PTR_ERR(o)));
584
585         n = lu_object_locate(o->lo_header, nd->ld_type);
586         if (unlikely(n == NULL)) {
587                 CERROR("can't find slice\n");
588                 lu_object_put(env, o);
589                 GOTO(out, dt = ERR_PTR(-EINVAL));
590         }
591
592         dt = container_of(n, struct dt_object, do_lu);
593
594         rc = dt_declare_create(env, dt, NULL, NULL, NULL, th);
595         if (rc) {
596                 CDEBUG(D_OTHER, "can't declare creation on #%u: %d\n",
597                        ost_idx, rc);
598                 lu_object_put(env, o);
599                 dt = ERR_PTR(rc);
600         }
601
602 out:
603         RETURN(dt);
604 }
605
606 static int min_stripe_count(int stripe_cnt, int flags)
607 {
608         return (flags & LOV_USES_DEFAULT_STRIPE ?
609                         stripe_cnt - (stripe_cnt / 4) : stripe_cnt);
610 }
611
612 #define LOV_CREATE_RESEED_MULT 30
613 #define LOV_CREATE_RESEED_MIN  2000
614
615 static int inline lod_qos_dev_is_full(struct obd_statfs *msfs)
616 {
617         __u64 used;
618         int   bs = msfs->os_bsize;
619
620         LASSERT(((bs - 1) & bs) == 0);
621
622         /* the minimum of 0.1% used blocks and 1GB bytes. */
623         used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10,
624                         1 << (31 - cfs_ffs(bs)));
625         return (msfs->os_bavail < used);
626 }
627
628 int lod_ea_store_resize(struct lod_thread_info *info, int size);
629
630 static inline int lod_qos_ost_in_use_clear(const struct lu_env *env, int stripes)
631 {
632         struct lod_thread_info *info = lod_env_info(env);
633
634         if (info->lti_ea_store_size < sizeof(int) * stripes)
635                 lod_ea_store_resize(info, stripes * sizeof(int));
636         if (info->lti_ea_store_size < sizeof(int) * stripes) {
637                 CERROR("can't allocate memory for ost-in-use array\n");
638                 return -ENOMEM;
639         }
640         memset(info->lti_ea_store, 0, sizeof(int) * stripes);
641         return 0;
642 }
643
644 static inline void lod_qos_ost_in_use(const struct lu_env *env, int idx, int ost)
645 {
646         struct lod_thread_info *info = lod_env_info(env);
647         int *osts = info->lti_ea_store;
648
649         LASSERT(info->lti_ea_store_size >= idx * sizeof(int));
650         osts[idx] = ost;
651 }
652
653 static int lod_qos_is_ost_used(const struct lu_env *env, int ost, int stripes)
654 {
655         struct lod_thread_info *info = lod_env_info(env);
656         int *osts = info->lti_ea_store;
657         int j;
658
659         for (j = 0; j < stripes; j++) {
660                 if (osts[j] == ost)
661                         return 1;
662         }
663         return 0;
664 }
665
666 /* Allocate objects on osts with round-robin algorithm */
667 static int lod_alloc_rr(const struct lu_env *env, struct lod_object *lo,
668                         int flags, struct thandle *th)
669 {
670         struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
671         struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs;
672         struct pool_desc  *pool = NULL;
673         struct ost_pool   *osts;
674         struct lov_qos_rr *lqr;
675         struct dt_object  *o;
676         unsigned           array_idx;
677         int                i, rc;
678         int                ost_start_idx_temp;
679         int                speed = 0;
680         int                stripe_idx = 0;
681         int                stripe_cnt = lo->ldo_stripenr;
682         int                stripe_cnt_min = min_stripe_count(stripe_cnt, flags);
683         __u32              ost_idx;
684         ENTRY;
685
686         if (lo->ldo_pool)
687                 pool = lod_find_pool(m, lo->ldo_pool);
688
689         if (pool != NULL) {
690                 cfs_down_read(&pool_tgt_rw_sem(pool));
691                 osts = &(pool->pool_obds);
692                 lqr = &(pool->pool_rr);
693         } else {
694                 osts = &(m->lod_pool_info);
695                 lqr = &(m->lod_qos.lq_rr);
696         }
697
698         rc = lod_qos_calc_rr(m, osts, lqr);
699         if (rc)
700                 GOTO(out, rc);
701
702         rc = lod_qos_ost_in_use_clear(env, lo->ldo_stripenr);
703         if (rc)
704                 GOTO(out, rc);
705
706         if (--lqr->lqr_start_count <= 0) {
707                 lqr->lqr_start_idx = cfs_rand() % osts->op_count;
708                 lqr->lqr_start_count =
709                         (LOV_CREATE_RESEED_MIN / max(osts->op_count, 1U) +
710                          LOV_CREATE_RESEED_MULT) * max(osts->op_count, 1U);
711         } else if (stripe_cnt_min >= osts->op_count ||
712                         lqr->lqr_start_idx > osts->op_count) {
713                 /* If we have allocated from all of the OSTs, slowly
714                  * precess the next start if the OST/stripe count isn't
715                  * already doing this for us. */
716                 lqr->lqr_start_idx %= osts->op_count;
717                 if (stripe_cnt > 1 && (osts->op_count % stripe_cnt) != 1)
718                         ++lqr->lqr_offset_idx;
719         }
720         cfs_down_read(&m->lod_qos.lq_rw_sem);
721         ost_start_idx_temp = lqr->lqr_start_idx;
722
723 repeat_find:
724         array_idx = (lqr->lqr_start_idx + lqr->lqr_offset_idx) %
725                         osts->op_count;
726
727         QOS_DEBUG("pool '%s' want %d startidx %d startcnt %d offset %d "
728                   "active %d count %d arrayidx %d\n",
729                   lo->ldo_pool ? lo->ldo_pool : "",
730                   stripe_cnt, lqr->lqr_start_idx, lqr->lqr_start_count,
731                   lqr->lqr_offset_idx, osts->op_count, osts->op_count,
732                   array_idx);
733
734         for (i = 0; i < osts->op_count;
735                         i++, array_idx = (array_idx + 1) % osts->op_count) {
736                 ++lqr->lqr_start_idx;
737                 ost_idx = lqr->lqr_pool.op_array[array_idx];
738
739                 QOS_DEBUG("#%d strt %d act %d strp %d ary %d idx %d\n",
740                           i, lqr->lqr_start_idx, /* XXX: active*/ 0,
741                           stripe_idx, array_idx, ost_idx);
742
743                 if ((ost_idx == LOV_QOS_EMPTY) ||
744                                 !cfs_bitmap_check(m->lod_ost_bitmap, ost_idx))
745                         continue;
746
747                 /* Fail Check before osc_precreate() is called
748                    so we can only 'fail' single OSC. */
749                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && ost_idx == 0)
750                         continue;
751
752                 rc = lod_statfs_and_check(env, m, ost_idx, sfs);
753                 if (rc) {
754                         /* this OSP doesn't feel well */
755                         CERROR("can't statfs #%u: %d\n", ost_idx, rc);
756                         continue;
757                 }
758
759                 /*
760                  * skip empty devices - usually it means inactive device
761                  */
762                 if (sfs->os_blocks == 0) {
763                         QOS_DEBUG("#%d: inactive\n", ost_idx);
764                         continue;
765                 }
766
767                 /*
768                  * skip full devices
769                  */
770                 if (lod_qos_dev_is_full(sfs)) {
771                         QOS_DEBUG("#%d is full\n", ost_idx);
772                         continue;
773                 }
774
775                 /*
776                  * We expect number of precreated objects in f_ffree at
777                  * the first iteration, skip OSPs with no objects ready
778                  */
779                 if (sfs->os_ffree == 0 && speed == 0) {
780                         QOS_DEBUG("#%d: precreation is empty\n", ost_idx);
781                         continue;
782                 }
783
784                 /*
785                  * try to use another OSP if this one is degraded
786                  */
787                 if (sfs->os_state == OS_STATE_DEGRADED && speed == 0) {
788                         QOS_DEBUG("#%d: degraded\n", ost_idx);
789                         continue;
790                 }
791
792                 /*
793                  * do not put >1 objects on a single OST
794                  */
795                 if (speed && lod_qos_is_ost_used(env, ost_idx, stripe_idx))
796                         continue;
797
798                 o = lod_qos_declare_object_on(env, m, ost_idx, th);
799                 if (IS_ERR(o)) {
800                         CDEBUG(D_OTHER, "can't declare new object on #%u: %d\n",
801                                ost_idx, (int) PTR_ERR(o));
802                         rc = PTR_ERR(o);
803                         continue;
804                 }
805
806                 /*
807                  * We've successfuly declared (reserved) an object
808                  */
809                 lod_qos_ost_in_use(env, stripe_idx, ost_idx);
810                 lo->ldo_stripe[stripe_idx] = o;
811                 stripe_idx++;
812
813                 /* We have enough stripes */
814                 if (stripe_idx == lo->ldo_stripenr)
815                         break;
816         }
817         if ((speed < 2) && (stripe_idx < stripe_cnt_min)) {
818                 /* Try again, allowing slower OSCs */
819                 speed++;
820                 lqr->lqr_start_idx = ost_start_idx_temp;
821                 goto repeat_find;
822         }
823
824         cfs_up_read(&m->lod_qos.lq_rw_sem);
825
826         if (stripe_idx) {
827                 lo->ldo_stripenr = stripe_idx;
828                 /* at least one stripe is allocated */
829                 rc = 0;
830         } else {
831                 /* nobody provided us with a single object */
832                 rc = -ENOSPC;
833         }
834
835 out:
836         if (pool != NULL) {
837                 cfs_up_read(&pool_tgt_rw_sem(pool));
838                 /* put back ref got by lod_find_pool() */
839                 lod_pool_putref(pool);
840         }
841
842         RETURN(rc);
843 }
844
845 /* alloc objects on osts with specific stripe offset */
846 static int lod_alloc_specific(const struct lu_env *env, struct lod_object *lo,
847                               int flags, struct thandle *th)
848 {
849         struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
850         struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs;
851         struct dt_object  *o;
852         unsigned           ost_idx, array_idx, ost_count;
853         int                i, rc, stripe_num = 0;
854         int                speed = 0;
855         struct pool_desc  *pool = NULL;
856         struct ost_pool   *osts;
857         ENTRY;
858
859         if (lo->ldo_pool)
860                 pool = lod_find_pool(m, lo->ldo_pool);
861
862         if (pool != NULL) {
863                 cfs_down_read(&pool_tgt_rw_sem(pool));
864                 osts = &(pool->pool_obds);
865         } else {
866                 osts = &(m->lod_pool_info);
867         }
868
869         ost_count = osts->op_count;
870
871 repeat_find:
872         /* search loi_ost_idx in ost array */
873         array_idx = 0;
874         for (i = 0; i < ost_count; i++) {
875                 if (osts->op_array[i] == lo->ldo_def_stripe_offset) {
876                         array_idx = i;
877                         break;
878                 }
879         }
880         if (i == ost_count) {
881                 CERROR("Start index %d not found in pool '%s'\n",
882                        lo->ldo_def_stripe_offset,
883                        lo->ldo_pool ? lo->ldo_pool : "");
884                 GOTO(out, rc = -EINVAL);
885         }
886
887         for (i = 0; i < ost_count;
888                         i++, array_idx = (array_idx + 1) % ost_count) {
889                 ost_idx = osts->op_array[array_idx];
890
891                 if (!cfs_bitmap_check(m->lod_ost_bitmap, ost_idx))
892                         continue;
893
894                 /* Fail Check before osc_precreate() is called
895                    so we can only 'fail' single OSC. */
896                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && ost_idx == 0)
897                         continue;
898
899                 /* Drop slow OSCs if we can, but not for requested start idx.
900                  *
901                  * This means "if OSC is slow and it is not the requested
902                  * start OST, then it can be skipped, otherwise skip it only
903                  * if it is inactive/recovering/out-of-space." */
904
905                 rc = lod_statfs_and_check(env, m, ost_idx, sfs);
906                 if (rc) {
907                         /* this OSP doesn't feel well */
908                         CERROR("can't statfs #%u: %d\n", ost_idx, rc);
909                         continue;
910                 }
911
912                 /*
913                  * skip empty devices - usually it means inactive device
914                  */
915                 if (sfs->os_blocks == 0)
916                         continue;
917
918                 /*
919                  * We expect number of precreated objects in f_ffree at
920                  * the first iteration, skip OSPs with no objects ready
921                  * don't apply this logic to OST specified with stripe_offset
922                  */
923                 if (i != 0 && sfs->os_ffree == 0 && speed == 0)
924                         continue;
925
926                 o = lod_qos_declare_object_on(env, m, ost_idx, th);
927                 if (IS_ERR(o)) {
928                         CDEBUG(D_OTHER, "can't declare new object on #%u: %d\n",
929                                ost_idx, (int) PTR_ERR(o));
930                         continue;
931                 }
932
933                 /*
934                  * We've successfuly declared (reserved) an object
935                  */
936                 lo->ldo_stripe[stripe_num] = o;
937                 stripe_num++;
938
939                 /* We have enough stripes */
940                 if (stripe_num == lo->ldo_stripenr)
941                         GOTO(out, rc = 0);
942         }
943         if (speed < 2) {
944                 /* Try again, allowing slower OSCs */
945                 speed++;
946                 goto repeat_find;
947         }
948
949         /* If we were passed specific striping params, then a failure to
950          * meet those requirements is an error, since we can't reallocate
951          * that memory (it might be part of a larger array or something).
952          *
953          * We can only get here if lsm_stripe_count was originally > 1.
954          */
955         CERROR("can't lstripe objid "DFID": have %d want %u\n",
956                PFID(lu_object_fid(lod2lu_obj(lo))), stripe_num,
957                lo->ldo_stripenr);
958         rc = -EFBIG;
959 out:
960         if (pool != NULL) {
961                 cfs_up_read(&pool_tgt_rw_sem(pool));
962                 /* put back ref got by lod_find_pool() */
963                 lod_pool_putref(pool);
964         }
965
966         RETURN(rc);
967 }
968
969 static inline int lod_qos_is_usable(struct lod_device *lod)
970 {
971 #ifdef FORCE_QOS
972         /* to be able to debug QoS code */
973         return 1;
974 #endif
975
976         /* Detect -EAGAIN early, before expensive lock is taken. */
977         if (!lod->lod_qos.lq_dirty && lod->lod_qos.lq_same_space)
978                 return 0;
979
980         if (lod->lod_desc.ld_active_tgt_count < 2)
981                 return 0;
982
983         return 1;
984 }
985
986 /* Alloc objects on osts with optimization based on:
987    - free space
988    - network resources (shared OSS's)
989  */
990 static int lod_alloc_qos(const struct lu_env *env, struct lod_object *lo,
991                          int flags, struct thandle *th)
992 {
993         struct lod_device   *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
994         struct obd_statfs   *sfs = &lod_env_info(env)->lti_osfs;
995         struct lod_ost_desc *ost;
996         struct dt_object    *o;
997         __u64                total_weight = 0;
998         int                  nfound, good_osts, i, rc = 0;
999         int                  stripe_cnt = lo->ldo_stripenr;
1000         int                  stripe_cnt_min;
1001         struct pool_desc    *pool = NULL;
1002         struct ost_pool    *osts;
1003         ENTRY;
1004
1005         stripe_cnt_min = min_stripe_count(stripe_cnt, flags);
1006         if (stripe_cnt_min < 1)
1007                 RETURN(-EINVAL);
1008
1009         if (lo->ldo_pool)
1010                 pool = lod_find_pool(m, lo->ldo_pool);
1011
1012         if (pool != NULL) {
1013                 cfs_down_read(&pool_tgt_rw_sem(pool));
1014                 osts = &(pool->pool_obds);
1015         } else {
1016                 osts = &(m->lod_pool_info);
1017         }
1018
1019         /* Detect -EAGAIN early, before expensive lock is taken. */
1020         if (!lod_qos_is_usable(m))
1021                 GOTO(out_nolock, rc = -EAGAIN);
1022
1023         /* Do actual allocation, use write lock here. */
1024         cfs_down_write(&m->lod_qos.lq_rw_sem);
1025
1026         /*
1027          * Check again, while we were sleeping on @lq_rw_sem things could
1028          * change.
1029          */
1030         if (!lod_qos_is_usable(m))
1031                 GOTO(out, rc = -EAGAIN);
1032
1033         rc = lod_qos_calc_ppo(m);
1034         if (rc)
1035                 GOTO(out, rc);
1036
1037         rc = lod_qos_ost_in_use_clear(env, lo->ldo_stripenr);
1038         if (rc)
1039                 GOTO(out, rc);
1040
1041         good_osts = 0;
1042         /* Find all the OSTs that are valid stripe candidates */
1043         for (i = 0; i < osts->op_count; i++) {
1044                 if (!cfs_bitmap_check(m->lod_ost_bitmap, osts->op_array[i]))
1045                         continue;
1046
1047                 rc = lod_statfs_and_check(env, m, osts->op_array[i], sfs);
1048                 if (rc) {
1049                         /* this OSP doesn't feel well */
1050                         CERROR("can't statfs #%u: %d\n", i, rc);
1051                         continue;
1052                 }
1053
1054                 /*
1055                  * skip empty devices - usually it means inactive device
1056                  */
1057                 if (sfs->os_blocks == 0)
1058                         continue;
1059
1060                 /*
1061                  * skip full devices
1062                  */
1063                 if (lod_qos_dev_is_full(sfs))
1064                         continue;
1065
1066                 /* Fail Check before osc_precreate() is called
1067                    so we can only 'fail' single OSC. */
1068                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) &&
1069                                    osts->op_array[i] == 0)
1070                         continue;
1071
1072                 ost = OST_TGT(m,osts->op_array[i]);
1073                 ost->ltd_qos.ltq_usable = 1;
1074                 lod_qos_calc_weight(m, osts->op_array[i]);
1075                 total_weight += ost->ltd_qos.ltq_weight;
1076
1077                 good_osts++;
1078         }
1079
1080         QOS_DEBUG("found %d good osts\n", good_osts);
1081
1082         if (good_osts < stripe_cnt_min)
1083                 GOTO(out, rc = -EAGAIN);
1084
1085         /* We have enough osts */
1086         if (good_osts < stripe_cnt)
1087                 stripe_cnt = good_osts;
1088
1089         /* Find enough OSTs with weighted random allocation. */
1090         nfound = 0;
1091         while (nfound < stripe_cnt) {
1092                 __u64 rand, cur_weight;
1093
1094                 cur_weight = 0;
1095                 rc = -ENOSPC;
1096
1097                 if (total_weight) {
1098 #if BITS_PER_LONG == 32
1099                         rand = cfs_rand() % (unsigned)total_weight;
1100                         /* If total_weight > 32-bit, first generate the high
1101                          * 32 bits of the random number, then add in the low
1102                          * 32 bits (truncated to the upper limit, if needed) */
1103                         if (total_weight > 0xffffffffULL)
1104                                 rand = (__u64)(cfs_rand() %
1105                                         (unsigned)(total_weight >> 32)) << 32;
1106                         else
1107                                 rand = 0;
1108
1109                         if (rand == (total_weight & 0xffffffff00000000ULL))
1110                                 rand |= cfs_rand() % (unsigned)total_weight;
1111                         else
1112                                 rand |= cfs_rand();
1113
1114 #else
1115                         rand = ((__u64)cfs_rand() << 32 | cfs_rand()) %
1116                                 total_weight;
1117 #endif
1118                 } else {
1119                         rand = 0;
1120                 }
1121
1122                 /* On average, this will hit larger-weighted osts more often.
1123                    0-weight osts will always get used last (only when rand=0) */
1124                 for (i = 0; i < osts->op_count; i++) {
1125                         int idx = osts->op_array[i];
1126
1127                         if (!cfs_bitmap_check(m->lod_ost_bitmap, idx))
1128                                 continue;
1129
1130                         ost = OST_TGT(m,idx);
1131
1132                         if (!ost->ltd_qos.ltq_usable)
1133                                 continue;
1134
1135                         cur_weight += ost->ltd_qos.ltq_weight;
1136                         QOS_DEBUG("stripe_cnt=%d nfound=%d cur_weight="LPU64
1137                                   " rand="LPU64" total_weight="LPU64"\n",
1138                                   stripe_cnt, nfound, cur_weight, rand,
1139                                   total_weight);
1140
1141                         if (cur_weight < rand)
1142                                 continue;
1143
1144                         QOS_DEBUG("stripe=%d to idx=%d\n", nfound, idx);
1145
1146                         /*
1147                          * do not put >1 objects on a single OST
1148                          */
1149                         if (lod_qos_is_ost_used(env, idx, nfound))
1150                                 continue;
1151                         lod_qos_ost_in_use(env, nfound, idx);
1152
1153                         o = lod_qos_declare_object_on(env, m, idx, th);
1154                         if (IS_ERR(o)) {
1155                                 QOS_DEBUG("can't declare object on #%u: %d\n",
1156                                           idx, (int) PTR_ERR(o));
1157                                 continue;
1158                         }
1159                         lo->ldo_stripe[nfound++] = o;
1160                         lod_qos_used(m, osts, idx, &total_weight);
1161                         rc = 0;
1162                         break;
1163                 }
1164         }
1165
1166         if (unlikely(nfound != stripe_cnt)) {
1167                 /*
1168                  * when the decision to use weighted algorithm was made
1169                  * we had enough appropriate OSPs, but this state can
1170                  * change anytime (no space on OST, broken connection, etc)
1171                  * so it's possible OSP won't be able to provide us with
1172                  * an object due to just changed state
1173                  */
1174                 LCONSOLE_INFO("wanted %d, found %d\n", stripe_cnt, nfound);
1175                 for (i = 0; i < nfound; i++) {
1176                         LASSERT(lo->ldo_stripe[i]);
1177                         lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
1178                         lo->ldo_stripe[i] = NULL;
1179                 }
1180
1181                 /* makes sense to rebalance next time */
1182                 m->lod_qos.lq_dirty = 1;
1183                 m->lod_qos.lq_same_space = 0;
1184
1185                 rc = -EAGAIN;
1186         }
1187
1188 out:
1189         cfs_up_write(&m->lod_qos.lq_rw_sem);
1190
1191 out_nolock:
1192         if (pool != NULL) {
1193                 cfs_up_read(&pool_tgt_rw_sem(pool));
1194                 /* put back ref got by lod_find_pool() */
1195                 lod_pool_putref(pool);
1196         }
1197
1198         RETURN(rc);
1199 }
1200
1201 /* Find the max stripecount we should use */
1202 static __u16 lod_get_stripecnt(struct lod_device *lod, __u32 magic,
1203                                __u16 stripe_count)
1204 {
1205         __u32 max_stripes = LOV_MAX_STRIPE_COUNT_OLD;
1206
1207         if (!stripe_count)
1208                 stripe_count = lod->lod_desc.ld_default_stripe_count;
1209         if (stripe_count > lod->lod_desc.ld_active_tgt_count)
1210                 stripe_count = lod->lod_desc.ld_active_tgt_count;
1211         if (!stripe_count)
1212                 stripe_count = 1;
1213
1214         /* stripe count is based on whether OSD can handle larger EA sizes */
1215         if (lod->lod_osd_max_easize > 0)
1216                 max_stripes = lov_mds_md_stripecnt(lod->lod_osd_max_easize,
1217                                                    magic);
1218
1219         return (stripe_count < max_stripes) ? stripe_count : max_stripes;
1220 }
1221
1222 static int lod_use_defined_striping(const struct lu_env *env,
1223                                     struct lod_object *mo,
1224                                     const struct lu_buf *buf)
1225 {
1226         struct lod_device      *d = lu2lod_dev(lod2lu_obj(mo)->lo_dev);
1227         struct lov_mds_md_v1   *v1 = buf->lb_buf;
1228         struct lov_mds_md_v3   *v3 = buf->lb_buf;
1229         struct lov_ost_data_v1 *objs;
1230         __u32                   magic;
1231         int                     rc;
1232         ENTRY;
1233
1234         rc = lod_verify_striping(d, buf, 1);
1235         if (rc)
1236                 RETURN(rc);
1237
1238         magic = le32_to_cpu(v1->lmm_magic);
1239         if (magic == LOV_MAGIC_V1_DEF) {
1240                 objs = &v1->lmm_objects[0];
1241         } else if (magic == LOV_MAGIC_V3_DEF) {
1242                 objs = &v3->lmm_objects[0];
1243                 lod_object_set_pool(mo, v3->lmm_pool_name);
1244         } else {
1245                 GOTO(out, rc = -EINVAL);
1246         }
1247
1248         /*
1249          * LOD shouldn't be aware of recovery at all,
1250          * but this track recovery status (to some extent)
1251          * to be do additional checks like this one
1252          */
1253         LASSERT(d->lod_recovery_completed == 0);
1254
1255         mo->ldo_stripe_size = le32_to_cpu(v1->lmm_stripe_size);
1256         mo->ldo_stripenr = le16_to_cpu(v1->lmm_stripe_count);
1257         mo->ldo_layout_gen = le16_to_cpu(v1->lmm_layout_gen);
1258         LASSERT(buf->lb_len >= lov_mds_md_size(mo->ldo_stripenr, magic));
1259
1260         rc = lod_initialize_objects(env, mo, objs);
1261
1262 out:
1263         RETURN(rc);
1264 }
1265
1266 static int lod_qos_parse_config(const struct lu_env *env,
1267                                 struct lod_object *lo,
1268                                 const struct lu_buf *buf)
1269 {
1270         struct lod_device     *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
1271         struct lov_user_md_v1 *v1 = NULL;
1272         struct lov_user_md_v3 *v3 = NULL;
1273         struct pool_desc      *pool;
1274         __u32                  magic;
1275         int                    rc;
1276         ENTRY;
1277
1278         if (buf == NULL || buf->lb_buf == NULL || buf->lb_len == 0)
1279                 RETURN(0);
1280
1281         v1 = buf->lb_buf;
1282         magic = v1->lmm_magic;
1283
1284         if (magic == __swab32(LOV_USER_MAGIC_V1))
1285                 lustre_swab_lov_user_md_v1(v1);
1286         else if (magic == __swab32(LOV_USER_MAGIC_V3))
1287                 lustre_swab_lov_user_md_v3(v3);
1288
1289         if (unlikely(magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3)) {
1290                 /* try to use as fully defined striping */
1291                 rc = lod_use_defined_striping(env, lo, buf);
1292                 RETURN(rc);
1293         }
1294
1295         if (unlikely(buf->lb_len < sizeof(*v1))) {
1296                 CERROR("wrong size: %u\n", (unsigned) buf->lb_len);
1297                 RETURN(-EINVAL);
1298         }
1299
1300         if (v1->lmm_pattern != 0 && v1->lmm_pattern != LOV_PATTERN_RAID0) {
1301                 CERROR("invalid pattern: %x\n", v1->lmm_pattern);
1302                 RETURN(-EINVAL);
1303         }
1304
1305         if (v1->lmm_stripe_size)
1306                 lo->ldo_stripe_size = v1->lmm_stripe_size;
1307         if (lo->ldo_stripe_size & (LOV_MIN_STRIPE_SIZE - 1))
1308                 lo->ldo_stripe_size = LOV_MIN_STRIPE_SIZE;
1309
1310         if (v1->lmm_stripe_count)
1311                 lo->ldo_stripenr = v1->lmm_stripe_count;
1312
1313         if ((v1->lmm_stripe_offset >= d->lod_desc.ld_tgt_count) &&
1314             (v1->lmm_stripe_offset != (typeof(v1->lmm_stripe_offset))(-1))) {
1315                 CERROR("invalid offset: %x\n", v1->lmm_stripe_offset);
1316                 RETURN(-EINVAL);
1317         }
1318         lo->ldo_def_stripe_offset = v1->lmm_stripe_offset;
1319
1320         CDEBUG(D_OTHER, "lsm: %u size, %u stripes, %u offset\n",
1321                v1->lmm_stripe_size, v1->lmm_stripe_count,
1322                v1->lmm_stripe_offset);
1323
1324         if (v1->lmm_magic == LOV_MAGIC_V3) {
1325                 if (buf->lb_len < sizeof(*v3)) {
1326                         CERROR("wrong size: %u\n", (unsigned) buf->lb_len);
1327                         RETURN(-EINVAL);
1328                 }
1329
1330                 v3 = buf->lb_buf;
1331                 lod_object_set_pool(lo, v3->lmm_pool_name);
1332
1333                 pool = lod_find_pool(d, v3->lmm_pool_name);
1334                 if (pool != NULL) {
1335                         if (lo->ldo_def_stripe_offset !=
1336                             (typeof(v1->lmm_stripe_offset))(-1)) {
1337                                 rc = lo->ldo_def_stripe_offset;
1338                                 rc = lod_check_index_in_pool(rc, pool);
1339                                 if (rc < 0) {
1340                                         lod_pool_putref(pool);
1341                                         CERROR("invalid offset\n");
1342                                         RETURN(-EINVAL);
1343                                 }
1344                         }
1345
1346                         if (lo->ldo_stripenr > pool_tgt_count(pool))
1347                                 lo->ldo_stripenr= pool_tgt_count(pool);
1348
1349                         lod_pool_putref(pool);
1350                 }
1351         } else
1352                 lod_object_set_pool(lo, NULL);
1353
1354         RETURN(0);
1355 }
1356
1357 /*
1358  * buf should be NULL or contain striping settings
1359  */
1360 int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
1361                         struct lu_attr *attr, const struct lu_buf *buf,
1362                         struct thandle *th)
1363 {
1364         struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
1365         int flag = LOV_USES_ASSIGNED_STRIPE;
1366         int i, rc = 0;
1367         ENTRY;
1368
1369         LASSERT(lo);
1370
1371         /* no OST available */
1372         /* XXX: should we be waiting a bit to prevent failures during
1373          * cluster initialization? */
1374         if (d->lod_ostnr == 0)
1375                 GOTO(out, rc = -EIO);
1376
1377         /*
1378          * by this time, the object's ldo_stripenr and ldo_stripe_size
1379          * contain default value for striping: taken from the parent
1380          * or from filesystem defaults
1381          *
1382          * in case the caller is passing lovea with new striping config,
1383          * we may need to parse lovea and apply new configuration
1384          */
1385         rc = lod_qos_parse_config(env, lo, buf);
1386         if (rc)
1387                 GOTO(out, rc);
1388
1389         if (likely(lo->ldo_stripe == NULL)) {
1390                 /*
1391                  * no striping has been created so far
1392                  */
1393                 LASSERT(lo->ldo_stripenr > 0);
1394                 lo->ldo_stripenr = lod_get_stripecnt(d, LOV_MAGIC,
1395                                 lo->ldo_stripenr);
1396                 i = sizeof(struct dt_object *) * lo->ldo_stripenr;
1397                 OBD_ALLOC(lo->ldo_stripe, i);
1398                 if (lo->ldo_stripe == NULL)
1399                         GOTO(out, rc = -ENOMEM);
1400                 lo->ldo_stripes_allocated = lo->ldo_stripenr;
1401
1402                 lod_getref(d);
1403                 /* XXX: support for non-0 files w/o objects */
1404                 if (lo->ldo_def_stripe_offset >= d->lod_desc.ld_tgt_count) {
1405                         lod_qos_statfs_update(env, d);
1406                         rc = lod_alloc_qos(env, lo, flag, th);
1407                         if (rc == -EAGAIN)
1408                                 rc = lod_alloc_rr(env, lo, flag, th);
1409                 } else
1410                         rc = lod_alloc_specific(env, lo, flag, th);
1411                 lod_putref(d);
1412         } else {
1413                 /*
1414                  * lod_qos_parse_config() found supplied buf as a predefined
1415                  * striping (not a hint), so it allocated all the object
1416                  * now we need to create them
1417                  */
1418                 for (i = 0; i < lo->ldo_stripenr; i++) {
1419                         struct dt_object  *o;
1420
1421                         o = lo->ldo_stripe[i];
1422                         LASSERT(o);
1423
1424                         rc = dt_declare_create(env, o, attr, NULL, NULL, th);
1425                         if (rc) {
1426                                 CERROR("can't declare create: %d\n", rc);
1427                                 break;
1428                         }
1429                 }
1430         }
1431
1432 out:
1433         RETURN(rc);
1434 }
1435