Whamcloud - gitweb
LU-3230 osp: unstick precreates on unmount
[fs/lustre-release.git] / lustre / lod / lod_qos.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2013, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/lod/lod_qos.c
33  *
34  */
35
36 #define DEBUG_SUBSYSTEM S_LOV
37
38 #include <libcfs/libcfs.h>
39 #include <obd_class.h>
40 #include <obd_lov.h>
41 #include <lustre/lustre_idl.h>
42 #include "lod_internal.h"
43
44 /*
45  * force QoS policy (not RR) to be used for testing purposes
46  */
47 #define FORCE_QOS_
48
49 #define D_QOS   D_OTHER
50
51 #if 0
52 #define QOS_DEBUG(fmt, ...)     CDEBUG(D_OTHER, fmt, ## __VA_ARGS__)
53 #define QOS_CONSOLE(fmt, ...)   LCONSOLE(D_OTHER, fmt, ## __VA_ARGS__)
54 #else
55 #define QOS_DEBUG(fmt, ...)
56 #define QOS_CONSOLE(fmt, ...)
57 #endif
58
59 #define TGT_BAVAIL(i) (OST_TGT(lod,i)->ltd_statfs.os_bavail * \
60                        OST_TGT(lod,i)->ltd_statfs.os_bsize)
61
62 int qos_add_tgt(struct lod_device *lod, struct lod_tgt_desc *ost_desc)
63 {
64         struct lov_qos_oss *oss = NULL, *temposs;
65         struct obd_export  *exp = ost_desc->ltd_exp;
66         int                 rc = 0, found = 0;
67         cfs_list_t         *list;
68         ENTRY;
69
70         down_write(&lod->lod_qos.lq_rw_sem);
71         /*
72          * a bit hacky approach to learn NID of corresponding connection
73          * but there is no official API to access information like this
74          * with OSD API.
75          */
76         cfs_list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list) {
77                 if (obd_uuid_equals(&oss->lqo_uuid,
78                                     &exp->exp_connection->c_remote_uuid)) {
79                         found++;
80                         break;
81                 }
82         }
83
84         if (!found) {
85                 OBD_ALLOC_PTR(oss);
86                 if (!oss)
87                         GOTO(out, rc = -ENOMEM);
88                 memcpy(&oss->lqo_uuid, &exp->exp_connection->c_remote_uuid,
89                        sizeof(oss->lqo_uuid));
90         } else {
91                 /* Assume we have to move this one */
92                 cfs_list_del(&oss->lqo_oss_list);
93         }
94
95         oss->lqo_ost_count++;
96         ost_desc->ltd_qos.ltq_oss = oss;
97
98         CDEBUG(D_QOS, "add tgt %s to OSS %s (%d OSTs)\n",
99                obd_uuid2str(&ost_desc->ltd_uuid), obd_uuid2str(&oss->lqo_uuid),
100                oss->lqo_ost_count);
101
102         /* Add sorted by # of OSTs.  Find the first entry that we're
103            bigger than... */
104         list = &lod->lod_qos.lq_oss_list;
105         cfs_list_for_each_entry(temposs, list, lqo_oss_list) {
106                 if (oss->lqo_ost_count > temposs->lqo_ost_count)
107                         break;
108         }
109         /* ...and add before it.  If we're the first or smallest, temposs
110            points to the list head, and we add to the end. */
111         cfs_list_add_tail(&oss->lqo_oss_list, &temposs->lqo_oss_list);
112
113         lod->lod_qos.lq_dirty = 1;
114         lod->lod_qos.lq_rr.lqr_dirty = 1;
115
116 out:
117         up_write(&lod->lod_qos.lq_rw_sem);
118         RETURN(rc);
119 }
120
121 int qos_del_tgt(struct lod_device *lod, struct lod_tgt_desc *ost_desc)
122 {
123         struct lov_qos_oss *oss;
124         int                 rc = 0;
125         ENTRY;
126
127         down_write(&lod->lod_qos.lq_rw_sem);
128         oss = ost_desc->ltd_qos.ltq_oss;
129         if (!oss)
130                 GOTO(out, rc = -ENOENT);
131
132         oss->lqo_ost_count--;
133         if (oss->lqo_ost_count == 0) {
134                 CDEBUG(D_QOS, "removing OSS %s\n",
135                        obd_uuid2str(&oss->lqo_uuid));
136                 cfs_list_del(&oss->lqo_oss_list);
137                 ost_desc->ltd_qos.ltq_oss = NULL;
138                 OBD_FREE_PTR(oss);
139         }
140
141         lod->lod_qos.lq_dirty = 1;
142         lod->lod_qos.lq_rr.lqr_dirty = 1;
143 out:
144         up_write(&lod->lod_qos.lq_rw_sem);
145         RETURN(rc);
146 }
147
148 static int lod_statfs_and_check(const struct lu_env *env, struct lod_device *d,
149                                 int index, struct obd_statfs *sfs)
150 {
151         struct lod_tgt_desc *ost;
152         int                  rc;
153
154         LASSERT(d);
155         ost = OST_TGT(d,index);
156         LASSERT(ost);
157
158         rc = dt_statfs(env, ost->ltd_ost, sfs);
159         if (rc && rc != -ENOTCONN)
160                 CERROR("%s: statfs: rc = %d\n", lod2obd(d)->obd_name, rc);
161
162         /* check whether device has changed state (active, inactive) */
163         if (rc != 0 && ost->ltd_active) {
164                 /* turned inactive? */
165                 spin_lock(&d->lod_desc_lock);
166                 if (ost->ltd_active) {
167                         ost->ltd_active = 0;
168                         LASSERT(d->lod_desc.ld_active_tgt_count > 0);
169                         d->lod_desc.ld_active_tgt_count--;
170                         d->lod_qos.lq_dirty = 1;
171                         d->lod_qos.lq_rr.lqr_dirty = 1;
172                         CDEBUG(D_CONFIG, "%s: turns inactive\n",
173                                ost->ltd_exp->exp_obd->obd_name);
174                 }
175                 spin_unlock(&d->lod_desc_lock);
176         } else if (rc == 0 && ost->ltd_active == 0) {
177                 /* turned active? */
178                 LASSERTF(d->lod_desc.ld_active_tgt_count < d->lod_ostnr,
179                          "active tgt count %d, ost nr %d\n",
180                          d->lod_desc.ld_active_tgt_count, d->lod_ostnr);
181                 spin_lock(&d->lod_desc_lock);
182                 if (ost->ltd_active == 0) {
183                         ost->ltd_active = 1;
184                         d->lod_desc.ld_active_tgt_count++;
185                         d->lod_qos.lq_dirty = 1;
186                         d->lod_qos.lq_rr.lqr_dirty = 1;
187                         CDEBUG(D_CONFIG, "%s: turns active\n",
188                                ost->ltd_exp->exp_obd->obd_name);
189                 }
190                 spin_unlock(&d->lod_desc_lock);
191         }
192
193         RETURN(rc);
194 }
195
196 static void lod_qos_statfs_update(const struct lu_env *env,
197                                   struct lod_device *lod)
198 {
199         struct obd_device *obd = lod2obd(lod);
200         struct ost_pool   *osts = &(lod->lod_pool_info);
201         int                i, idx, rc = 0;
202         __u64              max_age, avail;
203         ENTRY;
204
205         max_age = cfs_time_shift_64(-2 * lod->lod_desc.ld_qos_maxage);
206
207         if (cfs_time_beforeq_64(max_age, obd->obd_osfs_age))
208                 /* statfs data are quite recent, don't need to refresh it */
209                 RETURN_EXIT;
210
211         down_write(&lod->lod_qos.lq_rw_sem);
212         if (cfs_time_beforeq_64(max_age, obd->obd_osfs_age))
213                 GOTO(out, rc = 0);
214
215         for (i = 0; i < osts->op_count; i++) {
216                 idx = osts->op_array[i];
217                 avail = OST_TGT(lod,idx)->ltd_statfs.os_bavail;
218                 rc = lod_statfs_and_check(env, lod, idx,
219                                           &OST_TGT(lod,idx)->ltd_statfs);
220                 if (rc)
221                         break;
222                 if (OST_TGT(lod,idx)->ltd_statfs.os_bavail != avail)
223                         /* recalculate weigths */
224                         lod->lod_qos.lq_dirty = 1;
225         }
226         obd->obd_osfs_age = cfs_time_current_64();
227
228 out:
229         up_write(&lod->lod_qos.lq_rw_sem);
230         EXIT;
231 }
232
233 /* Recalculate per-object penalties for OSSs and OSTs,
234    depends on size of each ost in an oss */
235 static int lod_qos_calc_ppo(struct lod_device *lod)
236 {
237         struct lov_qos_oss *oss;
238         __u64               ba_max, ba_min, temp;
239         __u32               num_active;
240         int                 rc, i, prio_wide;
241         time_t              now, age;
242         ENTRY;
243
244         if (!lod->lod_qos.lq_dirty)
245                 GOTO(out, rc = 0);
246
247         num_active = lod->lod_desc.ld_active_tgt_count - 1;
248         if (num_active < 1)
249                 GOTO(out, rc = -EAGAIN);
250
251         /* find bavail on each OSS */
252         cfs_list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list)
253                                 oss->lqo_bavail = 0;
254         lod->lod_qos.lq_active_oss_count = 0;
255
256         /*
257          * How badly user wants to select OSTs "widely" (not recently chosen
258          * and not on recent OSS's).  As opposed to "freely" (free space
259          * avail.) 0-256
260          */
261         prio_wide = 256 - lod->lod_qos.lq_prio_free;
262
263         ba_min = (__u64)(-1);
264         ba_max = 0;
265         now = cfs_time_current_sec();
266         /* Calculate OST penalty per object
267          * (lod ref taken in lod_qos_prep_create()) */
268         cfs_foreach_bit(lod->lod_ost_bitmap, i) {
269                 LASSERT(OST_TGT(lod,i));
270                 temp = TGT_BAVAIL(i);
271                 if (!temp)
272                         continue;
273                 ba_min = min(temp, ba_min);
274                 ba_max = max(temp, ba_max);
275
276                 /* Count the number of usable OSS's */
277                 if (OST_TGT(lod,i)->ltd_qos.ltq_oss->lqo_bavail == 0)
278                         lod->lod_qos.lq_active_oss_count++;
279                 OST_TGT(lod,i)->ltd_qos.ltq_oss->lqo_bavail += temp;
280
281                 /* per-OST penalty is prio * TGT_bavail / (num_ost - 1) / 2 */
282                 temp >>= 1;
283                 lov_do_div64(temp, num_active);
284                 OST_TGT(lod,i)->ltd_qos.ltq_penalty_per_obj =
285                         (temp * prio_wide) >> 8;
286
287                 age = (now - OST_TGT(lod,i)->ltd_qos.ltq_used) >> 3;
288                 if (lod->lod_qos.lq_reset ||
289                     age > 32 * lod->lod_desc.ld_qos_maxage)
290                         OST_TGT(lod,i)->ltd_qos.ltq_penalty = 0;
291                 else if (age > lod->lod_desc.ld_qos_maxage)
292                         /* Decay the penalty by half for every 8x the update
293                          * interval that the device has been idle.  That gives
294                          * lots of time for the statfs information to be
295                          * updated (which the penalty is only a proxy for),
296                          * and avoids penalizing OSS/OSTs under light load. */
297                         OST_TGT(lod,i)->ltd_qos.ltq_penalty >>=
298                                 (age / lod->lod_desc.ld_qos_maxage);
299         }
300
301         num_active = lod->lod_qos.lq_active_oss_count - 1;
302         if (num_active < 1) {
303                 /* If there's only 1 OSS, we can't penalize it, so instead
304                    we have to double the OST penalty */
305                 num_active = 1;
306                 cfs_foreach_bit(lod->lod_ost_bitmap, i)
307                         OST_TGT(lod,i)->ltd_qos.ltq_penalty_per_obj <<= 1;
308         }
309
310         /* Per-OSS penalty is prio * oss_avail / oss_osts / (num_oss - 1) / 2 */
311         cfs_list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list) {
312                 temp = oss->lqo_bavail >> 1;
313                 lov_do_div64(temp, oss->lqo_ost_count * num_active);
314                 oss->lqo_penalty_per_obj = (temp * prio_wide) >> 8;
315
316                 age = (now - oss->lqo_used) >> 3;
317                 if (lod->lod_qos.lq_reset ||
318                     age > 32 * lod->lod_desc.ld_qos_maxage)
319                         oss->lqo_penalty = 0;
320                 else if (age > lod->lod_desc.ld_qos_maxage)
321                         /* Decay the penalty by half for every 8x the update
322                          * interval that the device has been idle.  That gives
323                          * lots of time for the statfs information to be
324                          * updated (which the penalty is only a proxy for),
325                          * and avoids penalizing OSS/OSTs under light load. */
326                         oss->lqo_penalty >>= age / lod->lod_desc.ld_qos_maxage;
327         }
328
329         lod->lod_qos.lq_dirty = 0;
330         lod->lod_qos.lq_reset = 0;
331
332         /* If each ost has almost same free space,
333          * do rr allocation for better creation performance */
334         lod->lod_qos.lq_same_space = 0;
335         if ((ba_max * (256 - lod->lod_qos.lq_threshold_rr)) >> 8 < ba_min) {
336                 lod->lod_qos.lq_same_space = 1;
337                 /* Reset weights for the next time we enter qos mode */
338                 lod->lod_qos.lq_reset = 1;
339         }
340         rc = 0;
341
342 out:
343 #ifndef FORCE_QOS
344         if (!rc && lod->lod_qos.lq_same_space)
345                 RETURN(-EAGAIN);
346 #endif
347         RETURN(rc);
348 }
349
350 static int lod_qos_calc_weight(struct lod_device *lod, int i)
351 {
352         __u64 temp, temp2;
353
354         /* Final ost weight = TGT_BAVAIL - ost_penalty - oss_penalty */
355         temp = TGT_BAVAIL(i);
356         temp2 = OST_TGT(lod,i)->ltd_qos.ltq_penalty +
357                 OST_TGT(lod,i)->ltd_qos.ltq_oss->lqo_penalty;
358         if (temp < temp2)
359                 OST_TGT(lod,i)->ltd_qos.ltq_weight = 0;
360         else
361                 OST_TGT(lod,i)->ltd_qos.ltq_weight = temp - temp2;
362         return 0;
363 }
364
365 /* We just used this index for a stripe; adjust everyone's weights */
366 static int lod_qos_used(struct lod_device *lod, struct ost_pool *osts,
367                         __u32 index, __u64 *total_wt)
368 {
369         struct lod_tgt_desc *ost;
370         struct lov_qos_oss  *oss;
371         int j;
372         ENTRY;
373
374         ost = OST_TGT(lod,index);
375         LASSERT(ost);
376
377         /* Don't allocate on this devuce anymore, until the next alloc_qos */
378         ost->ltd_qos.ltq_usable = 0;
379
380         oss = ost->ltd_qos.ltq_oss;
381
382         /* Decay old penalty by half (we're adding max penalty, and don't
383            want it to run away.) */
384         ost->ltd_qos.ltq_penalty >>= 1;
385         oss->lqo_penalty >>= 1;
386
387         /* mark the OSS and OST as recently used */
388         ost->ltd_qos.ltq_used = oss->lqo_used = cfs_time_current_sec();
389
390         /* Set max penalties for this OST and OSS */
391         ost->ltd_qos.ltq_penalty +=
392                 ost->ltd_qos.ltq_penalty_per_obj * lod->lod_ostnr;
393         oss->lqo_penalty += oss->lqo_penalty_per_obj *
394                 lod->lod_qos.lq_active_oss_count;
395
396         /* Decrease all OSS penalties */
397         cfs_list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list) {
398                 if (oss->lqo_penalty < oss->lqo_penalty_per_obj)
399                         oss->lqo_penalty = 0;
400                 else
401                         oss->lqo_penalty -= oss->lqo_penalty_per_obj;
402         }
403
404         *total_wt = 0;
405         /* Decrease all OST penalties */
406         for (j = 0; j < osts->op_count; j++) {
407                 int i;
408
409                 i = osts->op_array[j];
410                 if (!cfs_bitmap_check(lod->lod_ost_bitmap, i))
411                         continue;
412
413                 ost = OST_TGT(lod,i);
414                 LASSERT(ost);
415
416                 if (ost->ltd_qos.ltq_penalty <
417                                 ost->ltd_qos.ltq_penalty_per_obj)
418                         ost->ltd_qos.ltq_penalty = 0;
419                 else
420                         ost->ltd_qos.ltq_penalty -=
421                                 ost->ltd_qos.ltq_penalty_per_obj;
422
423                 lod_qos_calc_weight(lod, i);
424
425                 /* Recalc the total weight of usable osts */
426                 if (ost->ltd_qos.ltq_usable)
427                         *total_wt += ost->ltd_qos.ltq_weight;
428
429                 QOS_DEBUG("recalc tgt %d usable=%d avail="LPU64
430                           " ostppo="LPU64" ostp="LPU64" ossppo="LPU64
431                           " ossp="LPU64" wt="LPU64"\n",
432                           i, ost->ltd_qos.ltq_usable, TGT_BAVAIL(i) >> 10,
433                           ost->ltd_qos.ltq_penalty_per_obj >> 10,
434                           ost->ltd_qos.ltq_penalty >> 10,
435                           ost->ltd_qos.ltq_oss->lqo_penalty_per_obj >> 10,
436                           ost->ltd_qos.ltq_oss->lqo_penalty >> 10,
437                           ost->ltd_qos.ltq_weight >> 10);
438         }
439
440         RETURN(0);
441 }
442
443 #define LOV_QOS_EMPTY ((__u32)-1)
444 /* compute optimal round-robin order, based on OSTs per OSS */
445 static int lod_qos_calc_rr(struct lod_device *lod, struct ost_pool *src_pool,
446                            struct lov_qos_rr *lqr)
447 {
448         struct lov_qos_oss  *oss;
449         struct lod_tgt_desc *ost;
450         unsigned placed, real_count;
451         int i, rc;
452         ENTRY;
453
454         if (!lqr->lqr_dirty) {
455                 LASSERT(lqr->lqr_pool.op_size);
456                 RETURN(0);
457         }
458
459         /* Do actual allocation. */
460         down_write(&lod->lod_qos.lq_rw_sem);
461
462         /*
463          * Check again. While we were sleeping on @lq_rw_sem something could
464          * change.
465          */
466         if (!lqr->lqr_dirty) {
467                 LASSERT(lqr->lqr_pool.op_size);
468                 up_write(&lod->lod_qos.lq_rw_sem);
469                 RETURN(0);
470         }
471
472         real_count = src_pool->op_count;
473
474         /* Zero the pool array */
475         /* alloc_rr is holding a read lock on the pool, so nobody is adding/
476            deleting from the pool. The lq_rw_sem insures that nobody else
477            is reading. */
478         lqr->lqr_pool.op_count = real_count;
479         rc = lod_ost_pool_extend(&lqr->lqr_pool, real_count);
480         if (rc) {
481                 up_write(&lod->lod_qos.lq_rw_sem);
482                 RETURN(rc);
483         }
484         for (i = 0; i < lqr->lqr_pool.op_count; i++)
485                 lqr->lqr_pool.op_array[i] = LOV_QOS_EMPTY;
486
487         /* Place all the OSTs from 1 OSS at the same time. */
488         placed = 0;
489         cfs_list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list) {
490                 int j = 0;
491
492                 for (i = 0; i < lqr->lqr_pool.op_count; i++) {
493                         int next;
494
495                         if (!cfs_bitmap_check(lod->lod_ost_bitmap,
496                                                 src_pool->op_array[i]))
497                                 continue;
498
499                         ost = OST_TGT(lod,src_pool->op_array[i]);
500                         LASSERT(ost && ost->ltd_ost);
501                         if (ost->ltd_qos.ltq_oss != oss)
502                                 continue;
503
504                         /* Evenly space these OSTs across arrayspace */
505                         next = j * lqr->lqr_pool.op_count / oss->lqo_ost_count;
506                         while (lqr->lqr_pool.op_array[next] != LOV_QOS_EMPTY)
507                                 next = (next + 1) % lqr->lqr_pool.op_count;
508
509                         lqr->lqr_pool.op_array[next] = src_pool->op_array[i];
510                         j++;
511                         placed++;
512                 }
513         }
514
515         lqr->lqr_dirty = 0;
516         up_write(&lod->lod_qos.lq_rw_sem);
517
518         if (placed != real_count) {
519                 /* This should never happen */
520                 LCONSOLE_ERROR_MSG(0x14e, "Failed to place all OSTs in the "
521                                    "round-robin list (%d of %d).\n",
522                                    placed, real_count);
523                 for (i = 0; i < lqr->lqr_pool.op_count; i++) {
524                         LCONSOLE(D_WARNING, "rr #%d ost idx=%d\n", i,
525                                  lqr->lqr_pool.op_array[i]);
526                 }
527                 lqr->lqr_dirty = 1;
528                 RETURN(-EAGAIN);
529         }
530
531 #if 0
532         for (i = 0; i < lqr->lqr_pool.op_count; i++)
533                 QOS_CONSOLE("rr #%d ost idx=%d\n", i, lqr->lqr_pool.op_array[i]);
534 #endif
535
536         RETURN(0);
537 }
538
539 /**
540  * A helper function to:
541  *   create in-core lu object on the specified OSP
542  *   declare creation of the object
543  * IMPORTANT: at this stage object is anonymouos - it has no fid assigned
544  *            this is a workaround till we have natural FIDs on OST
545  *
546  *            at this point we want to declare (reserve) object for us as
547  *            we can't block at execution (when create method is called).
548  *            otherwise we'd block whole transaction batch
549  */
550 static struct dt_object *lod_qos_declare_object_on(const struct lu_env *env,
551                                                    struct lod_device *d,
552                                                    int ost_idx,
553                                                    struct thandle *th)
554 {
555         struct lod_tgt_desc *ost;
556         struct lu_object *o, *n;
557         struct lu_device *nd;
558         struct dt_object *dt;
559         int               rc;
560         ENTRY;
561
562         LASSERT(d);
563         LASSERT(ost_idx >= 0);
564         LASSERT(ost_idx < d->lod_osts_size);
565         ost = OST_TGT(d,ost_idx);
566         LASSERT(ost);
567         LASSERT(ost->ltd_ost);
568
569         nd = &ost->ltd_ost->dd_lu_dev;
570
571         /*
572          * allocate anonymous object with zero fid, real fid
573          * will be assigned by OSP within transaction
574          * XXX: to be fixed with fully-functional OST fids
575          */
576         o = lu_object_anon(env, nd, NULL);
577         if (IS_ERR(o))
578                 GOTO(out, dt = ERR_PTR(PTR_ERR(o)));
579
580         n = lu_object_locate(o->lo_header, nd->ld_type);
581         if (unlikely(n == NULL)) {
582                 CERROR("can't find slice\n");
583                 lu_object_put(env, o);
584                 GOTO(out, dt = ERR_PTR(-EINVAL));
585         }
586
587         dt = container_of(n, struct dt_object, do_lu);
588
589         rc = dt_declare_create(env, dt, NULL, NULL, NULL, th);
590         if (rc) {
591                 CDEBUG(D_OTHER, "can't declare creation on #%u: %d\n",
592                        ost_idx, rc);
593                 lu_object_put(env, o);
594                 dt = ERR_PTR(rc);
595         }
596
597 out:
598         RETURN(dt);
599 }
600
601 static int min_stripe_count(int stripe_cnt, int flags)
602 {
603         return (flags & LOV_USES_DEFAULT_STRIPE ?
604                         stripe_cnt - (stripe_cnt / 4) : stripe_cnt);
605 }
606
607 #define LOV_CREATE_RESEED_MULT 30
608 #define LOV_CREATE_RESEED_MIN  2000
609
610 static int inline lod_qos_dev_is_full(struct obd_statfs *msfs)
611 {
612         __u64 used;
613         int   bs = msfs->os_bsize;
614
615         LASSERT(((bs - 1) & bs) == 0);
616
617         /* the minimum of 0.1% used blocks and 1GB bytes. */
618         used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10,
619                         1 << (31 - ffs(bs)));
620         return (msfs->os_bavail < used);
621 }
622
623 int lod_ea_store_resize(struct lod_thread_info *info, int size);
624
625 static inline int lod_qos_ost_in_use_clear(const struct lu_env *env, int stripes)
626 {
627         struct lod_thread_info *info = lod_env_info(env);
628
629         if (info->lti_ea_store_size < sizeof(int) * stripes)
630                 lod_ea_store_resize(info, stripes * sizeof(int));
631         if (info->lti_ea_store_size < sizeof(int) * stripes) {
632                 CERROR("can't allocate memory for ost-in-use array\n");
633                 return -ENOMEM;
634         }
635         memset(info->lti_ea_store, -1, sizeof(int) * stripes);
636         return 0;
637 }
638
639 static inline void lod_qos_ost_in_use(const struct lu_env *env, int idx, int ost)
640 {
641         struct lod_thread_info *info = lod_env_info(env);
642         int *osts = info->lti_ea_store;
643
644         LASSERT(info->lti_ea_store_size >= idx * sizeof(int));
645         osts[idx] = ost;
646 }
647
648 static int lod_qos_is_ost_used(const struct lu_env *env, int ost, int stripes)
649 {
650         struct lod_thread_info *info = lod_env_info(env);
651         int *osts = info->lti_ea_store;
652         int j;
653
654         for (j = 0; j < stripes; j++) {
655                 if (osts[j] == ost)
656                         return 1;
657         }
658         return 0;
659 }
660
661 /* Allocate objects on OSTs with round-robin algorithm */
662 static int lod_alloc_rr(const struct lu_env *env, struct lod_object *lo,
663                         struct dt_object **stripe, int flags,
664                         struct thandle *th)
665 {
666         struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
667         struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs;
668         struct pool_desc  *pool = NULL;
669         struct ost_pool   *osts;
670         struct lov_qos_rr *lqr;
671         struct dt_object  *o;
672         unsigned           array_idx;
673         int                i, rc;
674         int                ost_start_idx_temp;
675         int                speed = 0;
676         int                stripe_idx = 0;
677         int                stripe_cnt = lo->ldo_stripenr;
678         int                stripe_cnt_min = min_stripe_count(stripe_cnt, flags);
679         __u32              ost_idx;
680         ENTRY;
681
682         if (lo->ldo_pool)
683                 pool = lod_find_pool(m, lo->ldo_pool);
684
685         if (pool != NULL) {
686                 down_read(&pool_tgt_rw_sem(pool));
687                 osts = &(pool->pool_obds);
688                 lqr = &(pool->pool_rr);
689         } else {
690                 osts = &(m->lod_pool_info);
691                 lqr = &(m->lod_qos.lq_rr);
692         }
693
694         rc = lod_qos_calc_rr(m, osts, lqr);
695         if (rc)
696                 GOTO(out, rc);
697
698         rc = lod_qos_ost_in_use_clear(env, lo->ldo_stripenr);
699         if (rc)
700                 GOTO(out, rc);
701
702         if (--lqr->lqr_start_count <= 0) {
703                 lqr->lqr_start_idx = cfs_rand() % osts->op_count;
704                 lqr->lqr_start_count =
705                         (LOV_CREATE_RESEED_MIN / max(osts->op_count, 1U) +
706                          LOV_CREATE_RESEED_MULT) * max(osts->op_count, 1U);
707         } else if (stripe_cnt_min >= osts->op_count ||
708                         lqr->lqr_start_idx > osts->op_count) {
709                 /* If we have allocated from all of the OSTs, slowly
710                  * precess the next start if the OST/stripe count isn't
711                  * already doing this for us. */
712                 lqr->lqr_start_idx %= osts->op_count;
713                 if (stripe_cnt > 1 && (osts->op_count % stripe_cnt) != 1)
714                         ++lqr->lqr_offset_idx;
715         }
716         down_read(&m->lod_qos.lq_rw_sem);
717         ost_start_idx_temp = lqr->lqr_start_idx;
718
719 repeat_find:
720         array_idx = (lqr->lqr_start_idx + lqr->lqr_offset_idx) %
721                         osts->op_count;
722
723         QOS_DEBUG("pool '%s' want %d startidx %d startcnt %d offset %d "
724                   "active %d count %d arrayidx %d\n",
725                   lo->ldo_pool ? lo->ldo_pool : "",
726                   stripe_cnt, lqr->lqr_start_idx, lqr->lqr_start_count,
727                   lqr->lqr_offset_idx, osts->op_count, osts->op_count,
728                   array_idx);
729
730         for (i = 0; i < osts->op_count && stripe_idx < lo->ldo_stripenr;
731              i++, array_idx = (array_idx + 1) % osts->op_count) {
732                 ++lqr->lqr_start_idx;
733                 ost_idx = lqr->lqr_pool.op_array[array_idx];
734
735                 QOS_DEBUG("#%d strt %d act %d strp %d ary %d idx %d\n",
736                           i, lqr->lqr_start_idx, /* XXX: active*/ 0,
737                           stripe_idx, array_idx, ost_idx);
738
739                 if ((ost_idx == LOV_QOS_EMPTY) ||
740                     !cfs_bitmap_check(m->lod_ost_bitmap, ost_idx))
741                         continue;
742
743                 /* Fail Check before osc_precreate() is called
744                    so we can only 'fail' single OSC. */
745                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && ost_idx == 0)
746                         continue;
747
748                 rc = lod_statfs_and_check(env, m, ost_idx, sfs);
749                 if (rc) {
750                         /* this OSP doesn't feel well */
751                         continue;
752                 }
753
754                 /*
755                  * skip full devices
756                  */
757                 if (lod_qos_dev_is_full(sfs)) {
758                         QOS_DEBUG("#%d is full\n", ost_idx);
759                         continue;
760                 }
761
762                 /*
763                  * We expect number of precreated objects in f_ffree at
764                  * the first iteration, skip OSPs with no objects ready
765                  */
766                 if (sfs->os_fprecreated == 0 && speed == 0) {
767                         QOS_DEBUG("#%d: precreation is empty\n", ost_idx);
768                         continue;
769                 }
770
771                 /*
772                  * try to use another OSP if this one is degraded
773                  */
774                 if (sfs->os_state == OS_STATE_DEGRADED && speed < 2) {
775                         QOS_DEBUG("#%d: degraded\n", ost_idx);
776                         continue;
777                 }
778
779                 /*
780                  * do not put >1 objects on a single OST
781                  */
782                 if (speed && lod_qos_is_ost_used(env, ost_idx, stripe_idx))
783                         continue;
784
785                 o = lod_qos_declare_object_on(env, m, ost_idx, th);
786                 if (IS_ERR(o)) {
787                         CDEBUG(D_OTHER, "can't declare new object on #%u: %d\n",
788                                ost_idx, (int) PTR_ERR(o));
789                         rc = PTR_ERR(o);
790                         continue;
791                 }
792
793                 /*
794                  * We've successfuly declared (reserved) an object
795                  */
796                 lod_qos_ost_in_use(env, stripe_idx, ost_idx);
797                 stripe[stripe_idx] = o;
798                 stripe_idx++;
799
800         }
801         if ((speed < 2) && (stripe_idx < stripe_cnt_min)) {
802                 /* Try again, allowing slower OSCs */
803                 speed++;
804                 lqr->lqr_start_idx = ost_start_idx_temp;
805                 goto repeat_find;
806         }
807
808         up_read(&m->lod_qos.lq_rw_sem);
809
810         if (stripe_idx) {
811                 lo->ldo_stripenr = stripe_idx;
812                 /* at least one stripe is allocated */
813                 rc = 0;
814         } else {
815                 /* nobody provided us with a single object */
816                 rc = -ENOSPC;
817         }
818
819 out:
820         if (pool != NULL) {
821                 up_read(&pool_tgt_rw_sem(pool));
822                 /* put back ref got by lod_find_pool() */
823                 lod_pool_putref(pool);
824         }
825
826         RETURN(rc);
827 }
828
829 /* alloc objects on osts with specific stripe offset */
830 static int lod_alloc_specific(const struct lu_env *env, struct lod_object *lo,
831                               struct dt_object **stripe, int flags,
832                               struct thandle *th)
833 {
834         struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
835         struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs;
836         struct dt_object  *o;
837         unsigned           ost_idx, array_idx, ost_count;
838         int                i, rc, stripe_num = 0;
839         int                speed = 0;
840         struct pool_desc  *pool = NULL;
841         struct ost_pool   *osts;
842         ENTRY;
843
844         rc = lod_qos_ost_in_use_clear(env, lo->ldo_stripenr);
845         if (rc)
846                 GOTO(out, rc);
847
848         if (lo->ldo_pool)
849                 pool = lod_find_pool(m, lo->ldo_pool);
850
851         if (pool != NULL) {
852                 down_read(&pool_tgt_rw_sem(pool));
853                 osts = &(pool->pool_obds);
854         } else {
855                 osts = &(m->lod_pool_info);
856         }
857
858         ost_count = osts->op_count;
859
860 repeat_find:
861         /* search loi_ost_idx in ost array */
862         array_idx = 0;
863         for (i = 0; i < ost_count; i++) {
864                 if (osts->op_array[i] == lo->ldo_def_stripe_offset) {
865                         array_idx = i;
866                         break;
867                 }
868         }
869         if (i == ost_count) {
870                 CERROR("Start index %d not found in pool '%s'\n",
871                        lo->ldo_def_stripe_offset,
872                        lo->ldo_pool ? lo->ldo_pool : "");
873                 GOTO(out, rc = -EINVAL);
874         }
875
876         for (i = 0; i < ost_count;
877                         i++, array_idx = (array_idx + 1) % ost_count) {
878                 ost_idx = osts->op_array[array_idx];
879
880                 if (!cfs_bitmap_check(m->lod_ost_bitmap, ost_idx))
881                         continue;
882
883                 /* Fail Check before osc_precreate() is called
884                    so we can only 'fail' single OSC. */
885                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && ost_idx == 0)
886                         continue;
887
888                 /*
889                  * do not put >1 objects on a single OST
890                  */
891                 if (lod_qos_is_ost_used(env, ost_idx, stripe_num))
892                         continue;
893
894                 /* Drop slow OSCs if we can, but not for requested start idx.
895                  *
896                  * This means "if OSC is slow and it is not the requested
897                  * start OST, then it can be skipped, otherwise skip it only
898                  * if it is inactive/recovering/out-of-space." */
899
900                 rc = lod_statfs_and_check(env, m, ost_idx, sfs);
901                 if (rc) {
902                         /* this OSP doesn't feel well */
903                         continue;
904                 }
905
906                 /*
907                  * We expect number of precreated objects in f_ffree at
908                  * the first iteration, skip OSPs with no objects ready
909                  * don't apply this logic to OST specified with stripe_offset
910                  */
911                 if (i != 0 && sfs->os_fprecreated == 0 && speed == 0)
912                         continue;
913
914                 o = lod_qos_declare_object_on(env, m, ost_idx, th);
915                 if (IS_ERR(o)) {
916                         CDEBUG(D_OTHER, "can't declare new object on #%u: %d\n",
917                                ost_idx, (int) PTR_ERR(o));
918                         continue;
919                 }
920
921                 /*
922                  * We've successfuly declared (reserved) an object
923                  */
924                 lod_qos_ost_in_use(env, stripe_num, ost_idx);
925                 stripe[stripe_num] = o;
926                 stripe_num++;
927
928                 /* We have enough stripes */
929                 if (stripe_num == lo->ldo_stripenr)
930                         GOTO(out, rc = 0);
931         }
932         if (speed < 2) {
933                 /* Try again, allowing slower OSCs */
934                 speed++;
935                 goto repeat_find;
936         }
937
938         /* If we were passed specific striping params, then a failure to
939          * meet those requirements is an error, since we can't reallocate
940          * that memory (it might be part of a larger array or something).
941          *
942          * We can only get here if lsm_stripe_count was originally > 1.
943          */
944         CERROR("can't lstripe objid "DFID": have %d want %u\n",
945                PFID(lu_object_fid(lod2lu_obj(lo))), stripe_num,
946                lo->ldo_stripenr);
947         rc = -EFBIG;
948 out:
949         if (pool != NULL) {
950                 up_read(&pool_tgt_rw_sem(pool));
951                 /* put back ref got by lod_find_pool() */
952                 lod_pool_putref(pool);
953         }
954
955         RETURN(rc);
956 }
957
958 static inline int lod_qos_is_usable(struct lod_device *lod)
959 {
960 #ifdef FORCE_QOS
961         /* to be able to debug QoS code */
962         return 1;
963 #endif
964
965         /* Detect -EAGAIN early, before expensive lock is taken. */
966         if (!lod->lod_qos.lq_dirty && lod->lod_qos.lq_same_space)
967                 return 0;
968
969         if (lod->lod_desc.ld_active_tgt_count < 2)
970                 return 0;
971
972         return 1;
973 }
974
975 /* Alloc objects on OSTs with optimization based on:
976    - free space
977    - network resources (shared OSS's)
978  */
979 static int lod_alloc_qos(const struct lu_env *env, struct lod_object *lo,
980                          struct dt_object **stripe, int flags,
981                          struct thandle *th)
982 {
983         struct lod_device   *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
984         struct obd_statfs   *sfs = &lod_env_info(env)->lti_osfs;
985         struct lod_tgt_desc *ost;
986         struct dt_object    *o;
987         __u64                total_weight = 0;
988         int                  nfound, good_osts, i, rc = 0;
989         int                  stripe_cnt = lo->ldo_stripenr;
990         int                  stripe_cnt_min;
991         struct pool_desc    *pool = NULL;
992         struct ost_pool    *osts;
993         ENTRY;
994
995         stripe_cnt_min = min_stripe_count(stripe_cnt, flags);
996         if (stripe_cnt_min < 1)
997                 RETURN(-EINVAL);
998
999         if (lo->ldo_pool)
1000                 pool = lod_find_pool(m, lo->ldo_pool);
1001
1002         if (pool != NULL) {
1003                 down_read(&pool_tgt_rw_sem(pool));
1004                 osts = &(pool->pool_obds);
1005         } else {
1006                 osts = &(m->lod_pool_info);
1007         }
1008
1009         /* Detect -EAGAIN early, before expensive lock is taken. */
1010         if (!lod_qos_is_usable(m))
1011                 GOTO(out_nolock, rc = -EAGAIN);
1012
1013         /* Do actual allocation, use write lock here. */
1014         down_write(&m->lod_qos.lq_rw_sem);
1015
1016         /*
1017          * Check again, while we were sleeping on @lq_rw_sem things could
1018          * change.
1019          */
1020         if (!lod_qos_is_usable(m))
1021                 GOTO(out, rc = -EAGAIN);
1022
1023         rc = lod_qos_calc_ppo(m);
1024         if (rc)
1025                 GOTO(out, rc);
1026
1027         rc = lod_qos_ost_in_use_clear(env, lo->ldo_stripenr);
1028         if (rc)
1029                 GOTO(out, rc);
1030
1031         good_osts = 0;
1032         /* Find all the OSTs that are valid stripe candidates */
1033         for (i = 0; i < osts->op_count; i++) {
1034                 if (!cfs_bitmap_check(m->lod_ost_bitmap, osts->op_array[i]))
1035                         continue;
1036
1037                 rc = lod_statfs_and_check(env, m, osts->op_array[i], sfs);
1038                 if (rc) {
1039                         /* this OSP doesn't feel well */
1040                         continue;
1041                 }
1042
1043                 /*
1044                  * skip full devices
1045                  */
1046                 if (lod_qos_dev_is_full(sfs))
1047                         continue;
1048
1049                 /* Fail Check before osc_precreate() is called
1050                    so we can only 'fail' single OSC. */
1051                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) &&
1052                                    osts->op_array[i] == 0)
1053                         continue;
1054
1055                 ost = OST_TGT(m,osts->op_array[i]);
1056                 ost->ltd_qos.ltq_usable = 1;
1057                 lod_qos_calc_weight(m, osts->op_array[i]);
1058                 total_weight += ost->ltd_qos.ltq_weight;
1059
1060                 good_osts++;
1061         }
1062
1063         QOS_DEBUG("found %d good osts\n", good_osts);
1064
1065         if (good_osts < stripe_cnt_min)
1066                 GOTO(out, rc = -EAGAIN);
1067
1068         /* We have enough osts */
1069         if (good_osts < stripe_cnt)
1070                 stripe_cnt = good_osts;
1071
1072         /* Find enough OSTs with weighted random allocation. */
1073         nfound = 0;
1074         while (nfound < stripe_cnt) {
1075                 __u64 rand, cur_weight;
1076
1077                 cur_weight = 0;
1078                 rc = -ENOSPC;
1079
1080                 if (total_weight) {
1081 #if BITS_PER_LONG == 32
1082                         rand = cfs_rand() % (unsigned)total_weight;
1083                         /* If total_weight > 32-bit, first generate the high
1084                          * 32 bits of the random number, then add in the low
1085                          * 32 bits (truncated to the upper limit, if needed) */
1086                         if (total_weight > 0xffffffffULL)
1087                                 rand = (__u64)(cfs_rand() %
1088                                         (unsigned)(total_weight >> 32)) << 32;
1089                         else
1090                                 rand = 0;
1091
1092                         if (rand == (total_weight & 0xffffffff00000000ULL))
1093                                 rand |= cfs_rand() % (unsigned)total_weight;
1094                         else
1095                                 rand |= cfs_rand();
1096
1097 #else
1098                         rand = ((__u64)cfs_rand() << 32 | cfs_rand()) %
1099                                 total_weight;
1100 #endif
1101                 } else {
1102                         rand = 0;
1103                 }
1104
1105                 /* On average, this will hit larger-weighted osts more often.
1106                    0-weight osts will always get used last (only when rand=0) */
1107                 for (i = 0; i < osts->op_count; i++) {
1108                         int idx = osts->op_array[i];
1109
1110                         if (!cfs_bitmap_check(m->lod_ost_bitmap, idx))
1111                                 continue;
1112
1113                         ost = OST_TGT(m,idx);
1114
1115                         if (!ost->ltd_qos.ltq_usable)
1116                                 continue;
1117
1118                         cur_weight += ost->ltd_qos.ltq_weight;
1119                         QOS_DEBUG("stripe_cnt=%d nfound=%d cur_weight="LPU64
1120                                   " rand="LPU64" total_weight="LPU64"\n",
1121                                   stripe_cnt, nfound, cur_weight, rand,
1122                                   total_weight);
1123
1124                         if (cur_weight < rand)
1125                                 continue;
1126
1127                         QOS_DEBUG("stripe=%d to idx=%d\n", nfound, idx);
1128
1129                         /*
1130                          * do not put >1 objects on a single OST
1131                          */
1132                         if (lod_qos_is_ost_used(env, idx, nfound))
1133                                 continue;
1134                         lod_qos_ost_in_use(env, nfound, idx);
1135
1136                         o = lod_qos_declare_object_on(env, m, idx, th);
1137                         if (IS_ERR(o)) {
1138                                 QOS_DEBUG("can't declare object on #%u: %d\n",
1139                                           idx, (int) PTR_ERR(o));
1140                                 continue;
1141                         }
1142                         stripe[nfound++] = o;
1143                         lod_qos_used(m, osts, idx, &total_weight);
1144                         rc = 0;
1145                         break;
1146                 }
1147
1148                 if (rc) {
1149                         /* no OST found on this iteration, give up */
1150                         break;
1151                 }
1152         }
1153
1154         if (unlikely(nfound != stripe_cnt)) {
1155                 /*
1156                  * when the decision to use weighted algorithm was made
1157                  * we had enough appropriate OSPs, but this state can
1158                  * change anytime (no space on OST, broken connection, etc)
1159                  * so it's possible OSP won't be able to provide us with
1160                  * an object due to just changed state
1161                  */
1162                 LCONSOLE_INFO("wanted %d, found %d\n", stripe_cnt, nfound);
1163                 for (i = 0; i < nfound; i++) {
1164                         LASSERT(stripe[i] != NULL);
1165                         lu_object_put(env, &stripe[i]->do_lu);
1166                         stripe[i] = NULL;
1167                 }
1168
1169                 /* makes sense to rebalance next time */
1170                 m->lod_qos.lq_dirty = 1;
1171                 m->lod_qos.lq_same_space = 0;
1172
1173                 rc = -EAGAIN;
1174         }
1175
1176 out:
1177         up_write(&m->lod_qos.lq_rw_sem);
1178
1179 out_nolock:
1180         if (pool != NULL) {
1181                 up_read(&pool_tgt_rw_sem(pool));
1182                 /* put back ref got by lod_find_pool() */
1183                 lod_pool_putref(pool);
1184         }
1185
1186         RETURN(rc);
1187 }
1188
1189 /* Find the max stripecount we should use */
1190 static __u16 lod_get_stripecnt(struct lod_device *lod, __u32 magic,
1191                                __u16 stripe_count)
1192 {
1193         __u32 max_stripes = LOV_MAX_STRIPE_COUNT_OLD;
1194
1195         if (!stripe_count)
1196                 stripe_count = lod->lod_desc.ld_default_stripe_count;
1197         if (stripe_count > lod->lod_desc.ld_active_tgt_count)
1198                 stripe_count = lod->lod_desc.ld_active_tgt_count;
1199         if (!stripe_count)
1200                 stripe_count = 1;
1201
1202         /* stripe count is based on whether OSD can handle larger EA sizes */
1203         if (lod->lod_osd_max_easize > 0)
1204                 max_stripes = lov_mds_md_stripecnt(lod->lod_osd_max_easize,
1205                                                    magic);
1206
1207         return (stripe_count < max_stripes) ? stripe_count : max_stripes;
1208 }
1209
1210 static int lod_use_defined_striping(const struct lu_env *env,
1211                                     struct lod_object *mo,
1212                                     const struct lu_buf *buf)
1213 {
1214         struct lov_mds_md_v1   *v1 = buf->lb_buf;
1215         struct lov_mds_md_v3   *v3 = buf->lb_buf;
1216         struct lov_ost_data_v1 *objs;
1217         __u32                   magic;
1218         int                     rc = 0;
1219         ENTRY;
1220
1221         magic = le32_to_cpu(v1->lmm_magic);
1222         if (magic == LOV_MAGIC_V1_DEF) {
1223                 magic = LOV_MAGIC_V1;
1224                 objs = &v1->lmm_objects[0];
1225         } else if (magic == LOV_MAGIC_V3_DEF) {
1226                 magic = LOV_MAGIC_V3;
1227                 objs = &v3->lmm_objects[0];
1228                 lod_object_set_pool(mo, v3->lmm_pool_name);
1229         } else {
1230                 GOTO(out, rc = -EINVAL);
1231         }
1232
1233         mo->ldo_pattern = le32_to_cpu(v1->lmm_pattern);
1234         mo->ldo_stripe_size = le32_to_cpu(v1->lmm_stripe_size);
1235         mo->ldo_stripenr = le16_to_cpu(v1->lmm_stripe_count);
1236         mo->ldo_layout_gen = le16_to_cpu(v1->lmm_layout_gen);
1237
1238         /* fixup for released file before object initialization */
1239         if (mo->ldo_pattern & LOV_PATTERN_F_RELEASED) {
1240                 mo->ldo_released_stripenr = mo->ldo_stripenr;
1241                 mo->ldo_stripenr = 0;
1242         }
1243
1244         LASSERT(buf->lb_len >= lov_mds_md_size(mo->ldo_stripenr, magic));
1245
1246         if (mo->ldo_stripenr > 0)
1247                 rc = lod_initialize_objects(env, mo, objs);
1248
1249 out:
1250         RETURN(rc);
1251 }
1252
1253 static int lod_qos_parse_config(const struct lu_env *env,
1254                                 struct lod_object *lo,
1255                                 const struct lu_buf *buf)
1256 {
1257         struct lod_device     *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
1258         struct lov_user_md_v1 *v1 = NULL;
1259         struct lov_user_md_v3 *v3 = NULL;
1260         struct pool_desc      *pool;
1261         __u32                  magic;
1262         int                    rc;
1263         ENTRY;
1264
1265         if (buf == NULL || buf->lb_buf == NULL || buf->lb_len == 0)
1266                 RETURN(0);
1267
1268         v1 = buf->lb_buf;
1269         magic = v1->lmm_magic;
1270
1271         if (magic == __swab32(LOV_USER_MAGIC_V1)) {
1272                 lustre_swab_lov_user_md_v1(v1);
1273                 magic = v1->lmm_magic;
1274         } else if (magic == __swab32(LOV_USER_MAGIC_V3)) {
1275                 v3 = buf->lb_buf;
1276                 lustre_swab_lov_user_md_v3(v3);
1277                 magic = v3->lmm_magic;
1278         }
1279
1280         if (unlikely(magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3)) {
1281                 /* try to use as fully defined striping */
1282                 rc = lod_use_defined_striping(env, lo, buf);
1283                 RETURN(rc);
1284         }
1285
1286         if (unlikely(buf->lb_len < sizeof(*v1))) {
1287                 CERROR("wrong size: %u\n", (unsigned) buf->lb_len);
1288                 RETURN(-EINVAL);
1289         }
1290
1291         v1->lmm_magic = magic;
1292         if (v1->lmm_pattern == 0)
1293                 v1->lmm_pattern = LOV_PATTERN_RAID0;
1294         if (lov_pattern(v1->lmm_pattern) != LOV_PATTERN_RAID0) {
1295                 CERROR("invalid pattern: %x\n", v1->lmm_pattern);
1296                 RETURN(-EINVAL);
1297         }
1298         lo->ldo_pattern = v1->lmm_pattern;
1299
1300         if (v1->lmm_stripe_size)
1301                 lo->ldo_stripe_size = v1->lmm_stripe_size;
1302         if (lo->ldo_stripe_size & (LOV_MIN_STRIPE_SIZE - 1))
1303                 lo->ldo_stripe_size = LOV_MIN_STRIPE_SIZE;
1304
1305         if (v1->lmm_stripe_count)
1306                 lo->ldo_stripenr = v1->lmm_stripe_count;
1307
1308         if ((v1->lmm_stripe_offset >= d->lod_desc.ld_tgt_count) &&
1309             (v1->lmm_stripe_offset != (typeof(v1->lmm_stripe_offset))(-1))) {
1310                 CERROR("invalid offset: %x\n", v1->lmm_stripe_offset);
1311                 RETURN(-EINVAL);
1312         }
1313         lo->ldo_def_stripe_offset = v1->lmm_stripe_offset;
1314
1315         CDEBUG(D_OTHER, "lsm: %u size, %u stripes, %u offset\n",
1316                v1->lmm_stripe_size, v1->lmm_stripe_count,
1317                v1->lmm_stripe_offset);
1318
1319         if (v1->lmm_magic == LOV_MAGIC_V3) {
1320                 if (buf->lb_len < sizeof(*v3)) {
1321                         CERROR("wrong size: %u\n", (unsigned) buf->lb_len);
1322                         RETURN(-EINVAL);
1323                 }
1324
1325                 v3 = buf->lb_buf;
1326                 lod_object_set_pool(lo, v3->lmm_pool_name);
1327
1328                 /* In the function below, .hs_keycmp resolves to
1329                  * pool_hashkey_keycmp() */
1330                 /* coverity[overrun-buffer-val] */
1331                 pool = lod_find_pool(d, v3->lmm_pool_name);
1332                 if (pool != NULL) {
1333                         if (lo->ldo_def_stripe_offset !=
1334                             (typeof(v1->lmm_stripe_offset))(-1)) {
1335                                 rc = lo->ldo_def_stripe_offset;
1336                                 rc = lod_check_index_in_pool(rc, pool);
1337                                 if (rc < 0) {
1338                                         lod_pool_putref(pool);
1339                                         CERROR("invalid offset\n");
1340                                         RETURN(-EINVAL);
1341                                 }
1342                         }
1343
1344                         if (lo->ldo_stripenr > pool_tgt_count(pool))
1345                                 lo->ldo_stripenr= pool_tgt_count(pool);
1346
1347                         lod_pool_putref(pool);
1348                 }
1349         } else
1350                 lod_object_set_pool(lo, NULL);
1351
1352         /* fixup for released file */
1353         if (lo->ldo_pattern & LOV_PATTERN_F_RELEASED) {
1354                 lo->ldo_released_stripenr = lo->ldo_stripenr;
1355                 lo->ldo_stripenr = 0;
1356         }
1357
1358         RETURN(0);
1359 }
1360
1361 /*
1362  * buf should be NULL or contain striping settings
1363  */
1364 int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
1365                         struct lu_attr *attr, const struct lu_buf *buf,
1366                         struct thandle *th)
1367 {
1368         struct lod_device      *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
1369         struct dt_object      **stripe;
1370         int                     stripe_len;
1371         int                     flag = LOV_USES_ASSIGNED_STRIPE;
1372         int                     i, rc;
1373         ENTRY;
1374
1375         LASSERT(lo);
1376
1377         /* no OST available */
1378         /* XXX: should we be waiting a bit to prevent failures during
1379          * cluster initialization? */
1380         if (d->lod_ostnr == 0)
1381                 GOTO(out, rc = -EIO);
1382
1383         /*
1384          * by this time, the object's ldo_stripenr and ldo_stripe_size
1385          * contain default value for striping: taken from the parent
1386          * or from filesystem defaults
1387          *
1388          * in case the caller is passing lovea with new striping config,
1389          * we may need to parse lovea and apply new configuration
1390          */
1391         rc = lod_qos_parse_config(env, lo, buf);
1392         if (rc)
1393                 GOTO(out, rc);
1394
1395         /* A released file is being created */
1396         if (lo->ldo_stripenr == 0)
1397                 GOTO(out, rc = 0);
1398
1399         if (likely(lo->ldo_stripe == NULL)) {
1400                 /*
1401                  * no striping has been created so far
1402                  */
1403                 LASSERT(lo->ldo_stripenr > 0);
1404                 /*
1405                  * statfs and check OST targets now, since ld_active_tgt_count
1406                  * could be changed if some OSTs are [de]activated manually.
1407                  */
1408                 lod_qos_statfs_update(env, d);
1409                 lo->ldo_stripenr = lod_get_stripecnt(d, LOV_MAGIC,
1410                                 lo->ldo_stripenr);
1411
1412                 stripe_len = lo->ldo_stripenr;
1413                 OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_len);
1414                 if (stripe == NULL)
1415                         GOTO(out, rc = -ENOMEM);
1416
1417                 lod_getref(&d->lod_ost_descs);
1418                 /* XXX: support for non-0 files w/o objects */
1419                 CDEBUG(D_OTHER, "tgt_count %d stripenr %d\n",
1420                                 d->lod_desc.ld_tgt_count, stripe_len);
1421                 if (lo->ldo_def_stripe_offset >= d->lod_desc.ld_tgt_count) {
1422                         rc = lod_alloc_qos(env, lo, stripe, flag, th);
1423                         if (rc == -EAGAIN)
1424                                 rc = lod_alloc_rr(env, lo, stripe, flag, th);
1425                 } else {
1426                         rc = lod_alloc_specific(env, lo, stripe, flag, th);
1427                 }
1428                 lod_putref(d, &d->lod_ost_descs);
1429
1430                 if (rc < 0) {
1431                         for (i = 0; i < stripe_len; i++)
1432                                 if (stripe[i] != NULL)
1433                                         lu_object_put(env, &stripe[i]->do_lu);
1434
1435                         OBD_FREE(stripe, sizeof(stripe[0]) * stripe_len);
1436                 } else {
1437                         lo->ldo_stripe = stripe;
1438                         lo->ldo_stripes_allocated = stripe_len;
1439                 }
1440         } else {
1441                 /*
1442                  * lod_qos_parse_config() found supplied buf as a predefined
1443                  * striping (not a hint), so it allocated all the object
1444                  * now we need to create them
1445                  */
1446                 for (i = 0; i < lo->ldo_stripenr; i++) {
1447                         struct dt_object  *o;
1448
1449                         o = lo->ldo_stripe[i];
1450                         LASSERT(o);
1451
1452                         rc = dt_declare_create(env, o, attr, NULL, NULL, th);
1453                         if (rc) {
1454                                 CERROR("can't declare create: %d\n", rc);
1455                                 break;
1456                         }
1457                 }
1458         }
1459
1460 out:
1461         RETURN(rc);
1462 }
1463