Whamcloud - gitweb
LU-2789 lod: initialize objects in a detached stripe array
[fs/lustre-release.git] / lustre / lod / lod_qos.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/lod/lod_qos.c
33  *
34  */
35
36 #define DEBUG_SUBSYSTEM S_LOV
37
38 #include <libcfs/libcfs.h>
39 #include <obd_class.h>
40 #include <obd_lov.h>
41 #include <lustre/lustre_idl.h>
42 #include "lod_internal.h"
43
44 /*
45  * force QoS policy (not RR) to be used for testing purposes
46  */
47 #define FORCE_QOS_
48
49 #define D_QOS   D_OTHER
50
51 #if 0
52 #define QOS_DEBUG(fmt, ...)     CDEBUG(D_OTHER, fmt, ## __VA_ARGS__)
53 #define QOS_CONSOLE(fmt, ...)   LCONSOLE(D_OTHER, fmt, ## __VA_ARGS__)
54 #else
55 #define QOS_DEBUG(fmt, ...)
56 #define QOS_CONSOLE(fmt, ...)
57 #endif
58
59 #define TGT_BAVAIL(i) (OST_TGT(lod,i)->ltd_statfs.os_bavail * \
60                        OST_TGT(lod,i)->ltd_statfs.os_bsize)
61
62 int qos_add_tgt(struct lod_device *lod, struct lod_tgt_desc *ost_desc)
63 {
64         struct lov_qos_oss *oss = NULL, *temposs;
65         struct obd_export  *exp = ost_desc->ltd_exp;
66         int                 rc = 0, found = 0;
67         cfs_list_t         *list;
68         ENTRY;
69
70         down_write(&lod->lod_qos.lq_rw_sem);
71         /*
72          * a bit hacky approach to learn NID of corresponding connection
73          * but there is no official API to access information like this
74          * with OSD API.
75          */
76         cfs_list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list) {
77                 if (obd_uuid_equals(&oss->lqo_uuid,
78                                     &exp->exp_connection->c_remote_uuid)) {
79                         found++;
80                         break;
81                 }
82         }
83
84         if (!found) {
85                 OBD_ALLOC_PTR(oss);
86                 if (!oss)
87                         GOTO(out, rc = -ENOMEM);
88                 memcpy(&oss->lqo_uuid, &exp->exp_connection->c_remote_uuid,
89                        sizeof(oss->lqo_uuid));
90         } else {
91                 /* Assume we have to move this one */
92                 cfs_list_del(&oss->lqo_oss_list);
93         }
94
95         oss->lqo_ost_count++;
96         ost_desc->ltd_qos.ltq_oss = oss;
97
98         CDEBUG(D_QOS, "add tgt %s to OSS %s (%d OSTs)\n",
99                obd_uuid2str(&ost_desc->ltd_uuid), obd_uuid2str(&oss->lqo_uuid),
100                oss->lqo_ost_count);
101
102         /* Add sorted by # of OSTs.  Find the first entry that we're
103            bigger than... */
104         list = &lod->lod_qos.lq_oss_list;
105         cfs_list_for_each_entry(temposs, list, lqo_oss_list) {
106                 if (oss->lqo_ost_count > temposs->lqo_ost_count)
107                         break;
108         }
109         /* ...and add before it.  If we're the first or smallest, temposs
110            points to the list head, and we add to the end. */
111         cfs_list_add_tail(&oss->lqo_oss_list, &temposs->lqo_oss_list);
112
113         lod->lod_qos.lq_dirty = 1;
114         lod->lod_qos.lq_rr.lqr_dirty = 1;
115
116 out:
117         up_write(&lod->lod_qos.lq_rw_sem);
118         RETURN(rc);
119 }
120
121 int qos_del_tgt(struct lod_device *lod, struct lod_tgt_desc *ost_desc)
122 {
123         struct lov_qos_oss *oss;
124         int                 rc = 0;
125         ENTRY;
126
127         down_write(&lod->lod_qos.lq_rw_sem);
128         oss = ost_desc->ltd_qos.ltq_oss;
129         if (!oss)
130                 GOTO(out, rc = -ENOENT);
131
132         oss->lqo_ost_count--;
133         if (oss->lqo_ost_count == 0) {
134                 CDEBUG(D_QOS, "removing OSS %s\n",
135                        obd_uuid2str(&oss->lqo_uuid));
136                 cfs_list_del(&oss->lqo_oss_list);
137                 ost_desc->ltd_qos.ltq_oss = NULL;
138                 OBD_FREE_PTR(oss);
139         }
140
141         lod->lod_qos.lq_dirty = 1;
142         lod->lod_qos.lq_rr.lqr_dirty = 1;
143 out:
144         up_write(&lod->lod_qos.lq_rw_sem);
145         RETURN(rc);
146 }
147
148 static int lod_statfs_and_check(const struct lu_env *env, struct lod_device *d,
149                                 int index, struct obd_statfs *sfs)
150 {
151         struct lod_tgt_desc *ost;
152         int                  rc;
153
154         LASSERT(d);
155         ost = OST_TGT(d,index);
156         LASSERT(ost);
157
158         rc = dt_statfs(env, ost->ltd_ost, sfs);
159         if (rc && rc != -ENOTCONN)
160                 CERROR("%s: statfs: rc = %d\n", lod2obd(d)->obd_name, rc);
161
162         /* check whether device has changed state (active, inactive) */
163         if (rc != 0 && ost->ltd_active) {
164                 /* turned inactive? */
165                 spin_lock(&d->lod_desc_lock);
166                 if (ost->ltd_active) {
167                         ost->ltd_active = 0;
168                         LASSERT(d->lod_desc.ld_active_tgt_count > 0);
169                         d->lod_desc.ld_active_tgt_count--;
170                         d->lod_qos.lq_dirty = 1;
171                         d->lod_qos.lq_rr.lqr_dirty = 1;
172                         CDEBUG(D_CONFIG, "%s: turns inactive\n",
173                                ost->ltd_exp->exp_obd->obd_name);
174                 }
175                 spin_unlock(&d->lod_desc_lock);
176         } else if (rc == 0 && ost->ltd_active == 0) {
177                 /* turned active? */
178                 LASSERT(d->lod_desc.ld_active_tgt_count < d->lod_ostnr);
179                 spin_lock(&d->lod_desc_lock);
180                 if (ost->ltd_active == 0) {
181                         ost->ltd_active = 1;
182                         d->lod_desc.ld_active_tgt_count++;
183                         d->lod_qos.lq_dirty = 1;
184                         d->lod_qos.lq_rr.lqr_dirty = 1;
185                         CDEBUG(D_CONFIG, "%s: turns active\n",
186                                ost->ltd_exp->exp_obd->obd_name);
187                 }
188                 spin_unlock(&d->lod_desc_lock);
189         }
190
191         return rc;
192 }
193
194 static void lod_qos_statfs_update(const struct lu_env *env,
195                                   struct lod_device *lod)
196 {
197         struct obd_device *obd = lod2obd(lod);
198         struct ost_pool   *osts = &(lod->lod_pool_info);
199         int                i, idx, rc = 0;
200         __u64              max_age, avail;
201         ENTRY;
202
203         max_age = cfs_time_shift_64(-2 * lod->lod_desc.ld_qos_maxage);
204
205         if (cfs_time_beforeq_64(max_age, obd->obd_osfs_age))
206                 /* statfs data are quite recent, don't need to refresh it */
207                 RETURN_EXIT;
208
209         down_write(&lod->lod_qos.lq_rw_sem);
210         if (cfs_time_beforeq_64(max_age, obd->obd_osfs_age))
211                 GOTO(out, rc = 0);
212
213         for (i = 0; i < osts->op_count; i++) {
214                 idx = osts->op_array[i];
215                 avail = OST_TGT(lod,idx)->ltd_statfs.os_bavail;
216                 rc = lod_statfs_and_check(env, lod, idx,
217                                           &OST_TGT(lod,idx)->ltd_statfs);
218                 if (rc)
219                         break;
220                 if (OST_TGT(lod,idx)->ltd_statfs.os_bavail != avail)
221                         /* recalculate weigths */
222                         lod->lod_qos.lq_dirty = 1;
223         }
224         obd->obd_osfs_age = cfs_time_current_64();
225
226 out:
227         up_write(&lod->lod_qos.lq_rw_sem);
228 }
229
230 /* Recalculate per-object penalties for OSSs and OSTs,
231    depends on size of each ost in an oss */
232 static int lod_qos_calc_ppo(struct lod_device *lod)
233 {
234         struct lov_qos_oss *oss;
235         __u64               ba_max, ba_min, temp;
236         __u32               num_active;
237         int                 rc, i, prio_wide;
238         time_t              now, age;
239         ENTRY;
240
241         if (!lod->lod_qos.lq_dirty)
242                 GOTO(out, rc = 0);
243
244         num_active = lod->lod_desc.ld_active_tgt_count - 1;
245         if (num_active < 1)
246                 GOTO(out, rc = -EAGAIN);
247
248         /* find bavail on each OSS */
249         cfs_list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list)
250                                 oss->lqo_bavail = 0;
251         lod->lod_qos.lq_active_oss_count = 0;
252
253         /*
254          * How badly user wants to select OSTs "widely" (not recently chosen
255          * and not on recent OSS's).  As opposed to "freely" (free space
256          * avail.) 0-256
257          */
258         prio_wide = 256 - lod->lod_qos.lq_prio_free;
259
260         ba_min = (__u64)(-1);
261         ba_max = 0;
262         now = cfs_time_current_sec();
263         /* Calculate OST penalty per object
264          * (lod ref taken in lod_qos_prep_create()) */
265         cfs_foreach_bit(lod->lod_ost_bitmap, i) {
266                 LASSERT(OST_TGT(lod,i));
267                 temp = TGT_BAVAIL(i);
268                 if (!temp)
269                         continue;
270                 ba_min = min(temp, ba_min);
271                 ba_max = max(temp, ba_max);
272
273                 /* Count the number of usable OSS's */
274                 if (OST_TGT(lod,i)->ltd_qos.ltq_oss->lqo_bavail == 0)
275                         lod->lod_qos.lq_active_oss_count++;
276                 OST_TGT(lod,i)->ltd_qos.ltq_oss->lqo_bavail += temp;
277
278                 /* per-OST penalty is prio * TGT_bavail / (num_ost - 1) / 2 */
279                 temp >>= 1;
280                 lov_do_div64(temp, num_active);
281                 OST_TGT(lod,i)->ltd_qos.ltq_penalty_per_obj =
282                         (temp * prio_wide) >> 8;
283
284                 age = (now - OST_TGT(lod,i)->ltd_qos.ltq_used) >> 3;
285                 if (lod->lod_qos.lq_reset ||
286                     age > 32 * lod->lod_desc.ld_qos_maxage)
287                         OST_TGT(lod,i)->ltd_qos.ltq_penalty = 0;
288                 else if (age > lod->lod_desc.ld_qos_maxage)
289                         /* Decay the penalty by half for every 8x the update
290                          * interval that the device has been idle.  That gives
291                          * lots of time for the statfs information to be
292                          * updated (which the penalty is only a proxy for),
293                          * and avoids penalizing OSS/OSTs under light load. */
294                         OST_TGT(lod,i)->ltd_qos.ltq_penalty >>=
295                                 (age / lod->lod_desc.ld_qos_maxage);
296         }
297
298         num_active = lod->lod_qos.lq_active_oss_count - 1;
299         if (num_active < 1) {
300                 /* If there's only 1 OSS, we can't penalize it, so instead
301                    we have to double the OST penalty */
302                 num_active = 1;
303                 cfs_foreach_bit(lod->lod_ost_bitmap, i)
304                         OST_TGT(lod,i)->ltd_qos.ltq_penalty_per_obj <<= 1;
305         }
306
307         /* Per-OSS penalty is prio * oss_avail / oss_osts / (num_oss - 1) / 2 */
308         cfs_list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list) {
309                 temp = oss->lqo_bavail >> 1;
310                 lov_do_div64(temp, oss->lqo_ost_count * num_active);
311                 oss->lqo_penalty_per_obj = (temp * prio_wide) >> 8;
312
313                 age = (now - oss->lqo_used) >> 3;
314                 if (lod->lod_qos.lq_reset ||
315                     age > 32 * lod->lod_desc.ld_qos_maxage)
316                         oss->lqo_penalty = 0;
317                 else if (age > lod->lod_desc.ld_qos_maxage)
318                         /* Decay the penalty by half for every 8x the update
319                          * interval that the device has been idle.  That gives
320                          * lots of time for the statfs information to be
321                          * updated (which the penalty is only a proxy for),
322                          * and avoids penalizing OSS/OSTs under light load. */
323                         oss->lqo_penalty >>= age / lod->lod_desc.ld_qos_maxage;
324         }
325
326         lod->lod_qos.lq_dirty = 0;
327         lod->lod_qos.lq_reset = 0;
328
329         /* If each ost has almost same free space,
330          * do rr allocation for better creation performance */
331         lod->lod_qos.lq_same_space = 0;
332         if ((ba_max * (256 - lod->lod_qos.lq_threshold_rr)) >> 8 < ba_min) {
333                 lod->lod_qos.lq_same_space = 1;
334                 /* Reset weights for the next time we enter qos mode */
335                 lod->lod_qos.lq_reset = 1;
336         }
337         rc = 0;
338
339 out:
340 #ifndef FORCE_QOS
341         if (!rc && lod->lod_qos.lq_same_space)
342                 RETURN(-EAGAIN);
343 #endif
344         RETURN(rc);
345 }
346
347 static int lod_qos_calc_weight(struct lod_device *lod, int i)
348 {
349         __u64 temp, temp2;
350
351         /* Final ost weight = TGT_BAVAIL - ost_penalty - oss_penalty */
352         temp = TGT_BAVAIL(i);
353         temp2 = OST_TGT(lod,i)->ltd_qos.ltq_penalty +
354                 OST_TGT(lod,i)->ltd_qos.ltq_oss->lqo_penalty;
355         if (temp < temp2)
356                 OST_TGT(lod,i)->ltd_qos.ltq_weight = 0;
357         else
358                 OST_TGT(lod,i)->ltd_qos.ltq_weight = temp - temp2;
359         return 0;
360 }
361
362 /* We just used this index for a stripe; adjust everyone's weights */
363 static int lod_qos_used(struct lod_device *lod, struct ost_pool *osts,
364                         __u32 index, __u64 *total_wt)
365 {
366         struct lod_tgt_desc *ost;
367         struct lov_qos_oss  *oss;
368         int j;
369         ENTRY;
370
371         ost = OST_TGT(lod,index);
372         LASSERT(ost);
373
374         /* Don't allocate on this devuce anymore, until the next alloc_qos */
375         ost->ltd_qos.ltq_usable = 0;
376
377         oss = ost->ltd_qos.ltq_oss;
378
379         /* Decay old penalty by half (we're adding max penalty, and don't
380            want it to run away.) */
381         ost->ltd_qos.ltq_penalty >>= 1;
382         oss->lqo_penalty >>= 1;
383
384         /* mark the OSS and OST as recently used */
385         ost->ltd_qos.ltq_used = oss->lqo_used = cfs_time_current_sec();
386
387         /* Set max penalties for this OST and OSS */
388         ost->ltd_qos.ltq_penalty +=
389                 ost->ltd_qos.ltq_penalty_per_obj * lod->lod_ostnr;
390         oss->lqo_penalty += oss->lqo_penalty_per_obj *
391                 lod->lod_qos.lq_active_oss_count;
392
393         /* Decrease all OSS penalties */
394         cfs_list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list) {
395                 if (oss->lqo_penalty < oss->lqo_penalty_per_obj)
396                         oss->lqo_penalty = 0;
397                 else
398                         oss->lqo_penalty -= oss->lqo_penalty_per_obj;
399         }
400
401         *total_wt = 0;
402         /* Decrease all OST penalties */
403         for (j = 0; j < osts->op_count; j++) {
404                 int i;
405
406                 i = osts->op_array[j];
407                 if (!cfs_bitmap_check(lod->lod_ost_bitmap, i))
408                         continue;
409
410                 ost = OST_TGT(lod,i);
411                 LASSERT(ost);
412
413                 if (ost->ltd_qos.ltq_penalty <
414                                 ost->ltd_qos.ltq_penalty_per_obj)
415                         ost->ltd_qos.ltq_penalty = 0;
416                 else
417                         ost->ltd_qos.ltq_penalty -=
418                                 ost->ltd_qos.ltq_penalty_per_obj;
419
420                 lod_qos_calc_weight(lod, i);
421
422                 /* Recalc the total weight of usable osts */
423                 if (ost->ltd_qos.ltq_usable)
424                         *total_wt += ost->ltd_qos.ltq_weight;
425
426                 QOS_DEBUG("recalc tgt %d usable=%d avail="LPU64
427                           " ostppo="LPU64" ostp="LPU64" ossppo="LPU64
428                           " ossp="LPU64" wt="LPU64"\n",
429                           i, ost->ltd_qos.ltq_usable, TGT_BAVAIL(i) >> 10,
430                           ost->ltd_qos.ltq_penalty_per_obj >> 10,
431                           ost->ltd_qos.ltq_penalty >> 10,
432                           ost->ltd_qos.ltq_oss->lqo_penalty_per_obj >> 10,
433                           ost->ltd_qos.ltq_oss->lqo_penalty >> 10,
434                           ost->ltd_qos.ltq_weight >> 10);
435         }
436
437         RETURN(0);
438 }
439
440 #define LOV_QOS_EMPTY ((__u32)-1)
441 /* compute optimal round-robin order, based on OSTs per OSS */
442 static int lod_qos_calc_rr(struct lod_device *lod, struct ost_pool *src_pool,
443                            struct lov_qos_rr *lqr)
444 {
445         struct lov_qos_oss  *oss;
446         struct lod_tgt_desc *ost;
447         unsigned placed, real_count;
448         int i, rc;
449         ENTRY;
450
451         if (!lqr->lqr_dirty) {
452                 LASSERT(lqr->lqr_pool.op_size);
453                 RETURN(0);
454         }
455
456         /* Do actual allocation. */
457         down_write(&lod->lod_qos.lq_rw_sem);
458
459         /*
460          * Check again. While we were sleeping on @lq_rw_sem something could
461          * change.
462          */
463         if (!lqr->lqr_dirty) {
464                 LASSERT(lqr->lqr_pool.op_size);
465                 up_write(&lod->lod_qos.lq_rw_sem);
466                 RETURN(0);
467         }
468
469         real_count = src_pool->op_count;
470
471         /* Zero the pool array */
472         /* alloc_rr is holding a read lock on the pool, so nobody is adding/
473            deleting from the pool. The lq_rw_sem insures that nobody else
474            is reading. */
475         lqr->lqr_pool.op_count = real_count;
476         rc = lod_ost_pool_extend(&lqr->lqr_pool, real_count);
477         if (rc) {
478                 up_write(&lod->lod_qos.lq_rw_sem);
479                 RETURN(rc);
480         }
481         for (i = 0; i < lqr->lqr_pool.op_count; i++)
482                 lqr->lqr_pool.op_array[i] = LOV_QOS_EMPTY;
483
484         /* Place all the OSTs from 1 OSS at the same time. */
485         placed = 0;
486         cfs_list_for_each_entry(oss, &lod->lod_qos.lq_oss_list, lqo_oss_list) {
487                 int j = 0;
488
489                 for (i = 0; i < lqr->lqr_pool.op_count; i++) {
490                         int next;
491
492                         if (!cfs_bitmap_check(lod->lod_ost_bitmap,
493                                                 src_pool->op_array[i]))
494                                 continue;
495
496                         ost = OST_TGT(lod,src_pool->op_array[i]);
497                         LASSERT(ost && ost->ltd_ost);
498                         if (ost->ltd_qos.ltq_oss != oss)
499                                 continue;
500
501                         /* Evenly space these OSTs across arrayspace */
502                         next = j * lqr->lqr_pool.op_count / oss->lqo_ost_count;
503                         while (lqr->lqr_pool.op_array[next] != LOV_QOS_EMPTY)
504                                 next = (next + 1) % lqr->lqr_pool.op_count;
505
506                         lqr->lqr_pool.op_array[next] = src_pool->op_array[i];
507                         j++;
508                         placed++;
509                 }
510         }
511
512         lqr->lqr_dirty = 0;
513         up_write(&lod->lod_qos.lq_rw_sem);
514
515         if (placed != real_count) {
516                 /* This should never happen */
517                 LCONSOLE_ERROR_MSG(0x14e, "Failed to place all OSTs in the "
518                                    "round-robin list (%d of %d).\n",
519                                    placed, real_count);
520                 for (i = 0; i < lqr->lqr_pool.op_count; i++) {
521                         LCONSOLE(D_WARNING, "rr #%d ost idx=%d\n", i,
522                                  lqr->lqr_pool.op_array[i]);
523                 }
524                 lqr->lqr_dirty = 1;
525                 RETURN(-EAGAIN);
526         }
527
528 #if 0
529         for (i = 0; i < lqr->lqr_pool.op_count; i++)
530                 QOS_CONSOLE("rr #%d ost idx=%d\n", i, lqr->lqr_pool.op_array[i]);
531 #endif
532
533         RETURN(0);
534 }
535
536 /**
537  * A helper function to:
538  *   create in-core lu object on the specified OSP
539  *   declare creation of the object
540  * IMPORTANT: at this stage object is anonymouos - it has no fid assigned
541  *            this is a workaround till we have natural FIDs on OST
542  *
543  *            at this point we want to declare (reserve) object for us as
544  *            we can't block at execution (when create method is called).
545  *            otherwise we'd block whole transaction batch
546  */
547 static struct dt_object *lod_qos_declare_object_on(const struct lu_env *env,
548                                                    struct lod_device *d,
549                                                    int ost_idx,
550                                                    struct thandle *th)
551 {
552         struct lod_tgt_desc *ost;
553         struct lu_object *o, *n;
554         struct lu_device *nd;
555         struct dt_object *dt;
556         int               rc;
557         ENTRY;
558
559         LASSERT(d);
560         LASSERT(ost_idx >= 0);
561         LASSERT(ost_idx < d->lod_osts_size);
562         ost = OST_TGT(d,ost_idx);
563         LASSERT(ost);
564         LASSERT(ost->ltd_ost);
565
566         nd = &ost->ltd_ost->dd_lu_dev;
567
568         /*
569          * allocate anonymous object with zero fid, real fid
570          * will be assigned by OSP within transaction
571          * XXX: to be fixed with fully-functional OST fids
572          */
573         o = lu_object_anon(env, nd, NULL);
574         if (IS_ERR(o))
575                 GOTO(out, dt = ERR_PTR(PTR_ERR(o)));
576
577         n = lu_object_locate(o->lo_header, nd->ld_type);
578         if (unlikely(n == NULL)) {
579                 CERROR("can't find slice\n");
580                 lu_object_put(env, o);
581                 GOTO(out, dt = ERR_PTR(-EINVAL));
582         }
583
584         dt = container_of(n, struct dt_object, do_lu);
585
586         rc = dt_declare_create(env, dt, NULL, NULL, NULL, th);
587         if (rc) {
588                 CDEBUG(D_OTHER, "can't declare creation on #%u: %d\n",
589                        ost_idx, rc);
590                 lu_object_put(env, o);
591                 dt = ERR_PTR(rc);
592         }
593
594 out:
595         RETURN(dt);
596 }
597
598 static int min_stripe_count(int stripe_cnt, int flags)
599 {
600         return (flags & LOV_USES_DEFAULT_STRIPE ?
601                         stripe_cnt - (stripe_cnt / 4) : stripe_cnt);
602 }
603
604 #define LOV_CREATE_RESEED_MULT 30
605 #define LOV_CREATE_RESEED_MIN  2000
606
607 static int inline lod_qos_dev_is_full(struct obd_statfs *msfs)
608 {
609         __u64 used;
610         int   bs = msfs->os_bsize;
611
612         LASSERT(((bs - 1) & bs) == 0);
613
614         /* the minimum of 0.1% used blocks and 1GB bytes. */
615         used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10,
616                         1 << (31 - ffs(bs)));
617         return (msfs->os_bavail < used);
618 }
619
620 int lod_ea_store_resize(struct lod_thread_info *info, int size);
621
622 static inline int lod_qos_ost_in_use_clear(const struct lu_env *env, int stripes)
623 {
624         struct lod_thread_info *info = lod_env_info(env);
625
626         if (info->lti_ea_store_size < sizeof(int) * stripes)
627                 lod_ea_store_resize(info, stripes * sizeof(int));
628         if (info->lti_ea_store_size < sizeof(int) * stripes) {
629                 CERROR("can't allocate memory for ost-in-use array\n");
630                 return -ENOMEM;
631         }
632         memset(info->lti_ea_store, 0, sizeof(int) * stripes);
633         return 0;
634 }
635
636 static inline void lod_qos_ost_in_use(const struct lu_env *env, int idx, int ost)
637 {
638         struct lod_thread_info *info = lod_env_info(env);
639         int *osts = info->lti_ea_store;
640
641         LASSERT(info->lti_ea_store_size >= idx * sizeof(int));
642         osts[idx] = ost;
643 }
644
645 static int lod_qos_is_ost_used(const struct lu_env *env, int ost, int stripes)
646 {
647         struct lod_thread_info *info = lod_env_info(env);
648         int *osts = info->lti_ea_store;
649         int j;
650
651         for (j = 0; j < stripes; j++) {
652                 if (osts[j] == ost)
653                         return 1;
654         }
655         return 0;
656 }
657
658 /* Allocate objects on OSTs with round-robin algorithm */
659 static int lod_alloc_rr(const struct lu_env *env, struct lod_object *lo,
660                         struct dt_object **stripe, int flags,
661                         struct thandle *th)
662 {
663         struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
664         struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs;
665         struct pool_desc  *pool = NULL;
666         struct ost_pool   *osts;
667         struct lov_qos_rr *lqr;
668         struct dt_object  *o;
669         unsigned           array_idx;
670         int                i, rc;
671         int                ost_start_idx_temp;
672         int                speed = 0;
673         int                stripe_idx = 0;
674         int                stripe_cnt = lo->ldo_stripenr;
675         int                stripe_cnt_min = min_stripe_count(stripe_cnt, flags);
676         __u32              ost_idx;
677         ENTRY;
678
679         if (lo->ldo_pool)
680                 pool = lod_find_pool(m, lo->ldo_pool);
681
682         if (pool != NULL) {
683                 down_read(&pool_tgt_rw_sem(pool));
684                 osts = &(pool->pool_obds);
685                 lqr = &(pool->pool_rr);
686         } else {
687                 osts = &(m->lod_pool_info);
688                 lqr = &(m->lod_qos.lq_rr);
689         }
690
691         rc = lod_qos_calc_rr(m, osts, lqr);
692         if (rc)
693                 GOTO(out, rc);
694
695         rc = lod_qos_ost_in_use_clear(env, lo->ldo_stripenr);
696         if (rc)
697                 GOTO(out, rc);
698
699         if (--lqr->lqr_start_count <= 0) {
700                 lqr->lqr_start_idx = cfs_rand() % osts->op_count;
701                 lqr->lqr_start_count =
702                         (LOV_CREATE_RESEED_MIN / max(osts->op_count, 1U) +
703                          LOV_CREATE_RESEED_MULT) * max(osts->op_count, 1U);
704         } else if (stripe_cnt_min >= osts->op_count ||
705                         lqr->lqr_start_idx > osts->op_count) {
706                 /* If we have allocated from all of the OSTs, slowly
707                  * precess the next start if the OST/stripe count isn't
708                  * already doing this for us. */
709                 lqr->lqr_start_idx %= osts->op_count;
710                 if (stripe_cnt > 1 && (osts->op_count % stripe_cnt) != 1)
711                         ++lqr->lqr_offset_idx;
712         }
713         down_read(&m->lod_qos.lq_rw_sem);
714         ost_start_idx_temp = lqr->lqr_start_idx;
715
716 repeat_find:
717         array_idx = (lqr->lqr_start_idx + lqr->lqr_offset_idx) %
718                         osts->op_count;
719
720         QOS_DEBUG("pool '%s' want %d startidx %d startcnt %d offset %d "
721                   "active %d count %d arrayidx %d\n",
722                   lo->ldo_pool ? lo->ldo_pool : "",
723                   stripe_cnt, lqr->lqr_start_idx, lqr->lqr_start_count,
724                   lqr->lqr_offset_idx, osts->op_count, osts->op_count,
725                   array_idx);
726
727         for (i = 0; i < osts->op_count && stripe_idx < lo->ldo_stripenr;
728              i++, array_idx = (array_idx + 1) % osts->op_count) {
729                 ++lqr->lqr_start_idx;
730                 ost_idx = lqr->lqr_pool.op_array[array_idx];
731
732                 QOS_DEBUG("#%d strt %d act %d strp %d ary %d idx %d\n",
733                           i, lqr->lqr_start_idx, /* XXX: active*/ 0,
734                           stripe_idx, array_idx, ost_idx);
735
736                 if ((ost_idx == LOV_QOS_EMPTY) ||
737                     !cfs_bitmap_check(m->lod_ost_bitmap, ost_idx))
738                         continue;
739
740                 /* Fail Check before osc_precreate() is called
741                    so we can only 'fail' single OSC. */
742                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && ost_idx == 0)
743                         continue;
744
745                 rc = lod_statfs_and_check(env, m, ost_idx, sfs);
746                 if (rc) {
747                         /* this OSP doesn't feel well */
748                         continue;
749                 }
750
751                 /*
752                  * skip full devices
753                  */
754                 if (lod_qos_dev_is_full(sfs)) {
755                         QOS_DEBUG("#%d is full\n", ost_idx);
756                         continue;
757                 }
758
759                 /*
760                  * We expect number of precreated objects in f_ffree at
761                  * the first iteration, skip OSPs with no objects ready
762                  */
763                 if (sfs->os_fprecreated == 0 && speed == 0) {
764                         QOS_DEBUG("#%d: precreation is empty\n", ost_idx);
765                         continue;
766                 }
767
768                 /*
769                  * try to use another OSP if this one is degraded
770                  */
771                 if (sfs->os_state == OS_STATE_DEGRADED && speed < 2) {
772                         QOS_DEBUG("#%d: degraded\n", ost_idx);
773                         continue;
774                 }
775
776                 /*
777                  * do not put >1 objects on a single OST
778                  */
779                 if (speed && lod_qos_is_ost_used(env, ost_idx, stripe_idx))
780                         continue;
781
782                 o = lod_qos_declare_object_on(env, m, ost_idx, th);
783                 if (IS_ERR(o)) {
784                         CDEBUG(D_OTHER, "can't declare new object on #%u: %d\n",
785                                ost_idx, (int) PTR_ERR(o));
786                         rc = PTR_ERR(o);
787                         continue;
788                 }
789
790                 /*
791                  * We've successfuly declared (reserved) an object
792                  */
793                 lod_qos_ost_in_use(env, stripe_idx, ost_idx);
794                 stripe[stripe_idx] = o;
795                 stripe_idx++;
796
797         }
798         if ((speed < 2) && (stripe_idx < stripe_cnt_min)) {
799                 /* Try again, allowing slower OSCs */
800                 speed++;
801                 lqr->lqr_start_idx = ost_start_idx_temp;
802                 goto repeat_find;
803         }
804
805         up_read(&m->lod_qos.lq_rw_sem);
806
807         if (stripe_idx) {
808                 lo->ldo_stripenr = stripe_idx;
809                 /* at least one stripe is allocated */
810                 rc = 0;
811         } else {
812                 /* nobody provided us with a single object */
813                 rc = -ENOSPC;
814         }
815
816 out:
817         if (pool != NULL) {
818                 up_read(&pool_tgt_rw_sem(pool));
819                 /* put back ref got by lod_find_pool() */
820                 lod_pool_putref(pool);
821         }
822
823         RETURN(rc);
824 }
825
826 /* alloc objects on osts with specific stripe offset */
827 static int lod_alloc_specific(const struct lu_env *env, struct lod_object *lo,
828                               struct dt_object **stripe, int flags,
829                               struct thandle *th)
830 {
831         struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
832         struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs;
833         struct dt_object  *o;
834         unsigned           ost_idx, array_idx, ost_count;
835         int                i, rc, stripe_num = 0;
836         int                speed = 0;
837         struct pool_desc  *pool = NULL;
838         struct ost_pool   *osts;
839         ENTRY;
840
841         rc = lod_qos_ost_in_use_clear(env, lo->ldo_stripenr);
842         if (rc)
843                 GOTO(out, rc);
844
845         if (lo->ldo_pool)
846                 pool = lod_find_pool(m, lo->ldo_pool);
847
848         if (pool != NULL) {
849                 down_read(&pool_tgt_rw_sem(pool));
850                 osts = &(pool->pool_obds);
851         } else {
852                 osts = &(m->lod_pool_info);
853         }
854
855         ost_count = osts->op_count;
856
857 repeat_find:
858         /* search loi_ost_idx in ost array */
859         array_idx = 0;
860         for (i = 0; i < ost_count; i++) {
861                 if (osts->op_array[i] == lo->ldo_def_stripe_offset) {
862                         array_idx = i;
863                         break;
864                 }
865         }
866         if (i == ost_count) {
867                 CERROR("Start index %d not found in pool '%s'\n",
868                        lo->ldo_def_stripe_offset,
869                        lo->ldo_pool ? lo->ldo_pool : "");
870                 GOTO(out, rc = -EINVAL);
871         }
872
873         for (i = 0; i < ost_count;
874                         i++, array_idx = (array_idx + 1) % ost_count) {
875                 ost_idx = osts->op_array[array_idx];
876
877                 if (!cfs_bitmap_check(m->lod_ost_bitmap, ost_idx))
878                         continue;
879
880                 /* Fail Check before osc_precreate() is called
881                    so we can only 'fail' single OSC. */
882                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) && ost_idx == 0)
883                         continue;
884
885                 /*
886                  * do not put >1 objects on a single OST
887                  */
888                 if (lod_qos_is_ost_used(env, ost_idx, stripe_num))
889                         continue;
890
891                 /* Drop slow OSCs if we can, but not for requested start idx.
892                  *
893                  * This means "if OSC is slow and it is not the requested
894                  * start OST, then it can be skipped, otherwise skip it only
895                  * if it is inactive/recovering/out-of-space." */
896
897                 rc = lod_statfs_and_check(env, m, ost_idx, sfs);
898                 if (rc) {
899                         /* this OSP doesn't feel well */
900                         continue;
901                 }
902
903                 /*
904                  * We expect number of precreated objects in f_ffree at
905                  * the first iteration, skip OSPs with no objects ready
906                  * don't apply this logic to OST specified with stripe_offset
907                  */
908                 if (i != 0 && sfs->os_fprecreated == 0 && speed == 0)
909                         continue;
910
911                 o = lod_qos_declare_object_on(env, m, ost_idx, th);
912                 if (IS_ERR(o)) {
913                         CDEBUG(D_OTHER, "can't declare new object on #%u: %d\n",
914                                ost_idx, (int) PTR_ERR(o));
915                         continue;
916                 }
917
918                 /*
919                  * We've successfuly declared (reserved) an object
920                  */
921                 stripe[stripe_num] = o;
922                 stripe_num++;
923
924                 /* We have enough stripes */
925                 if (stripe_num == lo->ldo_stripenr)
926                         GOTO(out, rc = 0);
927         }
928         if (speed < 2) {
929                 /* Try again, allowing slower OSCs */
930                 speed++;
931                 goto repeat_find;
932         }
933
934         /* If we were passed specific striping params, then a failure to
935          * meet those requirements is an error, since we can't reallocate
936          * that memory (it might be part of a larger array or something).
937          *
938          * We can only get here if lsm_stripe_count was originally > 1.
939          */
940         CERROR("can't lstripe objid "DFID": have %d want %u\n",
941                PFID(lu_object_fid(lod2lu_obj(lo))), stripe_num,
942                lo->ldo_stripenr);
943         rc = -EFBIG;
944 out:
945         if (pool != NULL) {
946                 up_read(&pool_tgt_rw_sem(pool));
947                 /* put back ref got by lod_find_pool() */
948                 lod_pool_putref(pool);
949         }
950
951         RETURN(rc);
952 }
953
954 static inline int lod_qos_is_usable(struct lod_device *lod)
955 {
956 #ifdef FORCE_QOS
957         /* to be able to debug QoS code */
958         return 1;
959 #endif
960
961         /* Detect -EAGAIN early, before expensive lock is taken. */
962         if (!lod->lod_qos.lq_dirty && lod->lod_qos.lq_same_space)
963                 return 0;
964
965         if (lod->lod_desc.ld_active_tgt_count < 2)
966                 return 0;
967
968         return 1;
969 }
970
971 /* Alloc objects on OSTs with optimization based on:
972    - free space
973    - network resources (shared OSS's)
974  */
975 static int lod_alloc_qos(const struct lu_env *env, struct lod_object *lo,
976                          struct dt_object **stripe, int flags,
977                          struct thandle *th)
978 {
979         struct lod_device   *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
980         struct obd_statfs   *sfs = &lod_env_info(env)->lti_osfs;
981         struct lod_tgt_desc *ost;
982         struct dt_object    *o;
983         __u64                total_weight = 0;
984         int                  nfound, good_osts, i, rc = 0;
985         int                  stripe_cnt = lo->ldo_stripenr;
986         int                  stripe_cnt_min;
987         struct pool_desc    *pool = NULL;
988         struct ost_pool    *osts;
989         ENTRY;
990
991         stripe_cnt_min = min_stripe_count(stripe_cnt, flags);
992         if (stripe_cnt_min < 1)
993                 RETURN(-EINVAL);
994
995         if (lo->ldo_pool)
996                 pool = lod_find_pool(m, lo->ldo_pool);
997
998         if (pool != NULL) {
999                 down_read(&pool_tgt_rw_sem(pool));
1000                 osts = &(pool->pool_obds);
1001         } else {
1002                 osts = &(m->lod_pool_info);
1003         }
1004
1005         /* Detect -EAGAIN early, before expensive lock is taken. */
1006         if (!lod_qos_is_usable(m))
1007                 GOTO(out_nolock, rc = -EAGAIN);
1008
1009         /* Do actual allocation, use write lock here. */
1010         down_write(&m->lod_qos.lq_rw_sem);
1011
1012         /*
1013          * Check again, while we were sleeping on @lq_rw_sem things could
1014          * change.
1015          */
1016         if (!lod_qos_is_usable(m))
1017                 GOTO(out, rc = -EAGAIN);
1018
1019         rc = lod_qos_calc_ppo(m);
1020         if (rc)
1021                 GOTO(out, rc);
1022
1023         rc = lod_qos_ost_in_use_clear(env, lo->ldo_stripenr);
1024         if (rc)
1025                 GOTO(out, rc);
1026
1027         good_osts = 0;
1028         /* Find all the OSTs that are valid stripe candidates */
1029         for (i = 0; i < osts->op_count; i++) {
1030                 if (!cfs_bitmap_check(m->lod_ost_bitmap, osts->op_array[i]))
1031                         continue;
1032
1033                 rc = lod_statfs_and_check(env, m, osts->op_array[i], sfs);
1034                 if (rc) {
1035                         /* this OSP doesn't feel well */
1036                         continue;
1037                 }
1038
1039                 /*
1040                  * skip full devices
1041                  */
1042                 if (lod_qos_dev_is_full(sfs))
1043                         continue;
1044
1045                 /* Fail Check before osc_precreate() is called
1046                    so we can only 'fail' single OSC. */
1047                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_PRECREATE) &&
1048                                    osts->op_array[i] == 0)
1049                         continue;
1050
1051                 ost = OST_TGT(m,osts->op_array[i]);
1052                 ost->ltd_qos.ltq_usable = 1;
1053                 lod_qos_calc_weight(m, osts->op_array[i]);
1054                 total_weight += ost->ltd_qos.ltq_weight;
1055
1056                 good_osts++;
1057         }
1058
1059         QOS_DEBUG("found %d good osts\n", good_osts);
1060
1061         if (good_osts < stripe_cnt_min)
1062                 GOTO(out, rc = -EAGAIN);
1063
1064         /* We have enough osts */
1065         if (good_osts < stripe_cnt)
1066                 stripe_cnt = good_osts;
1067
1068         /* Find enough OSTs with weighted random allocation. */
1069         nfound = 0;
1070         while (nfound < stripe_cnt) {
1071                 __u64 rand, cur_weight;
1072
1073                 cur_weight = 0;
1074                 rc = -ENOSPC;
1075
1076                 if (total_weight) {
1077 #if BITS_PER_LONG == 32
1078                         rand = cfs_rand() % (unsigned)total_weight;
1079                         /* If total_weight > 32-bit, first generate the high
1080                          * 32 bits of the random number, then add in the low
1081                          * 32 bits (truncated to the upper limit, if needed) */
1082                         if (total_weight > 0xffffffffULL)
1083                                 rand = (__u64)(cfs_rand() %
1084                                         (unsigned)(total_weight >> 32)) << 32;
1085                         else
1086                                 rand = 0;
1087
1088                         if (rand == (total_weight & 0xffffffff00000000ULL))
1089                                 rand |= cfs_rand() % (unsigned)total_weight;
1090                         else
1091                                 rand |= cfs_rand();
1092
1093 #else
1094                         rand = ((__u64)cfs_rand() << 32 | cfs_rand()) %
1095                                 total_weight;
1096 #endif
1097                 } else {
1098                         rand = 0;
1099                 }
1100
1101                 /* On average, this will hit larger-weighted osts more often.
1102                    0-weight osts will always get used last (only when rand=0) */
1103                 for (i = 0; i < osts->op_count; i++) {
1104                         int idx = osts->op_array[i];
1105
1106                         if (!cfs_bitmap_check(m->lod_ost_bitmap, idx))
1107                                 continue;
1108
1109                         ost = OST_TGT(m,idx);
1110
1111                         if (!ost->ltd_qos.ltq_usable)
1112                                 continue;
1113
1114                         cur_weight += ost->ltd_qos.ltq_weight;
1115                         QOS_DEBUG("stripe_cnt=%d nfound=%d cur_weight="LPU64
1116                                   " rand="LPU64" total_weight="LPU64"\n",
1117                                   stripe_cnt, nfound, cur_weight, rand,
1118                                   total_weight);
1119
1120                         if (cur_weight < rand)
1121                                 continue;
1122
1123                         QOS_DEBUG("stripe=%d to idx=%d\n", nfound, idx);
1124
1125                         /*
1126                          * do not put >1 objects on a single OST
1127                          */
1128                         if (lod_qos_is_ost_used(env, idx, nfound))
1129                                 continue;
1130                         lod_qos_ost_in_use(env, nfound, idx);
1131
1132                         o = lod_qos_declare_object_on(env, m, idx, th);
1133                         if (IS_ERR(o)) {
1134                                 QOS_DEBUG("can't declare object on #%u: %d\n",
1135                                           idx, (int) PTR_ERR(o));
1136                                 continue;
1137                         }
1138                         stripe[nfound++] = o;
1139                         lod_qos_used(m, osts, idx, &total_weight);
1140                         rc = 0;
1141                         break;
1142                 }
1143
1144                 if (rc) {
1145                         /* no OST found on this iteration, give up */
1146                         break;
1147                 }
1148         }
1149
1150         if (unlikely(nfound != stripe_cnt)) {
1151                 /*
1152                  * when the decision to use weighted algorithm was made
1153                  * we had enough appropriate OSPs, but this state can
1154                  * change anytime (no space on OST, broken connection, etc)
1155                  * so it's possible OSP won't be able to provide us with
1156                  * an object due to just changed state
1157                  */
1158                 LCONSOLE_INFO("wanted %d, found %d\n", stripe_cnt, nfound);
1159                 for (i = 0; i < nfound; i++) {
1160                         LASSERT(stripe[i] != NULL);
1161                         lu_object_put(env, &stripe[i]->do_lu);
1162                         stripe[i] = NULL;
1163                 }
1164
1165                 /* makes sense to rebalance next time */
1166                 m->lod_qos.lq_dirty = 1;
1167                 m->lod_qos.lq_same_space = 0;
1168
1169                 rc = -EAGAIN;
1170         }
1171
1172 out:
1173         up_write(&m->lod_qos.lq_rw_sem);
1174
1175 out_nolock:
1176         if (pool != NULL) {
1177                 up_read(&pool_tgt_rw_sem(pool));
1178                 /* put back ref got by lod_find_pool() */
1179                 lod_pool_putref(pool);
1180         }
1181
1182         RETURN(rc);
1183 }
1184
1185 /* Find the max stripecount we should use */
1186 static __u16 lod_get_stripecnt(struct lod_device *lod, __u32 magic,
1187                                __u16 stripe_count)
1188 {
1189         __u32 max_stripes = LOV_MAX_STRIPE_COUNT_OLD;
1190
1191         if (!stripe_count)
1192                 stripe_count = lod->lod_desc.ld_default_stripe_count;
1193         if (stripe_count > lod->lod_desc.ld_active_tgt_count)
1194                 stripe_count = lod->lod_desc.ld_active_tgt_count;
1195         if (!stripe_count)
1196                 stripe_count = 1;
1197
1198         /* stripe count is based on whether OSD can handle larger EA sizes */
1199         if (lod->lod_osd_max_easize > 0)
1200                 max_stripes = lov_mds_md_stripecnt(lod->lod_osd_max_easize,
1201                                                    magic);
1202
1203         return (stripe_count < max_stripes) ? stripe_count : max_stripes;
1204 }
1205
1206 static int lod_use_defined_striping(const struct lu_env *env,
1207                                     struct lod_object *mo,
1208                                     const struct lu_buf *buf)
1209 {
1210         struct lod_device      *d = lu2lod_dev(lod2lu_obj(mo)->lo_dev);
1211         struct lov_mds_md_v1   *v1 = buf->lb_buf;
1212         struct lov_mds_md_v3   *v3 = buf->lb_buf;
1213         struct lov_ost_data_v1 *objs;
1214         __u32                   magic;
1215         int                     rc;
1216         ENTRY;
1217
1218         rc = lod_verify_striping(d, buf, 1);
1219         if (rc)
1220                 RETURN(rc);
1221
1222         magic = le32_to_cpu(v1->lmm_magic);
1223         if (magic == LOV_MAGIC_V1_DEF) {
1224                 objs = &v1->lmm_objects[0];
1225         } else if (magic == LOV_MAGIC_V3_DEF) {
1226                 objs = &v3->lmm_objects[0];
1227                 lod_object_set_pool(mo, v3->lmm_pool_name);
1228         } else {
1229                 GOTO(out, rc = -EINVAL);
1230         }
1231
1232         /*
1233          * LOD shouldn't be aware of recovery at all,
1234          * but this track recovery status (to some extent)
1235          * to be do additional checks like this one
1236          */
1237         LASSERT(d->lod_recovery_completed == 0);
1238
1239         mo->ldo_stripe_size = le32_to_cpu(v1->lmm_stripe_size);
1240         mo->ldo_stripenr = le16_to_cpu(v1->lmm_stripe_count);
1241         mo->ldo_layout_gen = le16_to_cpu(v1->lmm_layout_gen);
1242         LASSERT(buf->lb_len >= lov_mds_md_size(mo->ldo_stripenr, magic));
1243
1244         rc = lod_initialize_objects(env, mo, objs);
1245
1246 out:
1247         RETURN(rc);
1248 }
1249
1250 static int lod_qos_parse_config(const struct lu_env *env,
1251                                 struct lod_object *lo,
1252                                 const struct lu_buf *buf)
1253 {
1254         struct lod_device     *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
1255         struct lov_user_md_v1 *v1 = NULL;
1256         struct lov_user_md_v3 *v3 = NULL;
1257         struct pool_desc      *pool;
1258         __u32                  magic;
1259         int                    rc;
1260         ENTRY;
1261
1262         if (buf == NULL || buf->lb_buf == NULL || buf->lb_len == 0)
1263                 RETURN(0);
1264
1265         v1 = buf->lb_buf;
1266         magic = v1->lmm_magic;
1267
1268         if (magic == __swab32(LOV_USER_MAGIC_V1)) {
1269                 lustre_swab_lov_user_md_v1(v1);
1270                 magic = v1->lmm_magic;
1271         } else if (magic == __swab32(LOV_USER_MAGIC_V3)) {
1272                 v3 = buf->lb_buf;
1273                 lustre_swab_lov_user_md_v3(v3);
1274                 magic = v3->lmm_magic;
1275         }
1276
1277         if (unlikely(magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3)) {
1278                 /* try to use as fully defined striping */
1279                 rc = lod_use_defined_striping(env, lo, buf);
1280                 RETURN(rc);
1281         }
1282
1283         if (unlikely(buf->lb_len < sizeof(*v1))) {
1284                 CERROR("wrong size: %u\n", (unsigned) buf->lb_len);
1285                 RETURN(-EINVAL);
1286         }
1287
1288         if (v1->lmm_pattern != 0 && v1->lmm_pattern != LOV_PATTERN_RAID0) {
1289                 CERROR("invalid pattern: %x\n", v1->lmm_pattern);
1290                 RETURN(-EINVAL);
1291         }
1292
1293         if (v1->lmm_stripe_size)
1294                 lo->ldo_stripe_size = v1->lmm_stripe_size;
1295         if (lo->ldo_stripe_size & (LOV_MIN_STRIPE_SIZE - 1))
1296                 lo->ldo_stripe_size = LOV_MIN_STRIPE_SIZE;
1297
1298         if (v1->lmm_stripe_count)
1299                 lo->ldo_stripenr = v1->lmm_stripe_count;
1300
1301         if ((v1->lmm_stripe_offset >= d->lod_desc.ld_tgt_count) &&
1302             (v1->lmm_stripe_offset != (typeof(v1->lmm_stripe_offset))(-1))) {
1303                 CERROR("invalid offset: %x\n", v1->lmm_stripe_offset);
1304                 RETURN(-EINVAL);
1305         }
1306         lo->ldo_def_stripe_offset = v1->lmm_stripe_offset;
1307
1308         CDEBUG(D_OTHER, "lsm: %u size, %u stripes, %u offset\n",
1309                v1->lmm_stripe_size, v1->lmm_stripe_count,
1310                v1->lmm_stripe_offset);
1311
1312         if (v1->lmm_magic == LOV_MAGIC_V3) {
1313                 if (buf->lb_len < sizeof(*v3)) {
1314                         CERROR("wrong size: %u\n", (unsigned) buf->lb_len);
1315                         RETURN(-EINVAL);
1316                 }
1317
1318                 v3 = buf->lb_buf;
1319                 lod_object_set_pool(lo, v3->lmm_pool_name);
1320
1321                 /* In the function below, .hs_keycmp resolves to
1322                  * pool_hashkey_keycmp() */
1323                 /* coverity[overrun-buffer-val] */
1324                 pool = lod_find_pool(d, v3->lmm_pool_name);
1325                 if (pool != NULL) {
1326                         if (lo->ldo_def_stripe_offset !=
1327                             (typeof(v1->lmm_stripe_offset))(-1)) {
1328                                 rc = lo->ldo_def_stripe_offset;
1329                                 rc = lod_check_index_in_pool(rc, pool);
1330                                 if (rc < 0) {
1331                                         lod_pool_putref(pool);
1332                                         CERROR("invalid offset\n");
1333                                         RETURN(-EINVAL);
1334                                 }
1335                         }
1336
1337                         if (lo->ldo_stripenr > pool_tgt_count(pool))
1338                                 lo->ldo_stripenr= pool_tgt_count(pool);
1339
1340                         lod_pool_putref(pool);
1341                 }
1342         } else
1343                 lod_object_set_pool(lo, NULL);
1344
1345         RETURN(0);
1346 }
1347
1348 /*
1349  * buf should be NULL or contain striping settings
1350  */
1351 int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
1352                         struct lu_attr *attr, const struct lu_buf *buf,
1353                         struct thandle *th)
1354 {
1355         struct lod_device      *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
1356         struct dt_object      **stripe;
1357         int                     stripe_len;
1358         int                     flag = LOV_USES_ASSIGNED_STRIPE;
1359         int                     i, rc;
1360         ENTRY;
1361
1362         LASSERT(lo);
1363
1364         /* no OST available */
1365         /* XXX: should we be waiting a bit to prevent failures during
1366          * cluster initialization? */
1367         if (d->lod_ostnr == 0)
1368                 GOTO(out, rc = -EIO);
1369
1370         /*
1371          * by this time, the object's ldo_stripenr and ldo_stripe_size
1372          * contain default value for striping: taken from the parent
1373          * or from filesystem defaults
1374          *
1375          * in case the caller is passing lovea with new striping config,
1376          * we may need to parse lovea and apply new configuration
1377          */
1378         rc = lod_qos_parse_config(env, lo, buf);
1379         if (rc)
1380                 GOTO(out, rc);
1381
1382         if (likely(lo->ldo_stripe == NULL)) {
1383                 /*
1384                  * no striping has been created so far
1385                  */
1386                 LASSERT(lo->ldo_stripenr > 0);
1387                 lo->ldo_stripenr = lod_get_stripecnt(d, LOV_MAGIC,
1388                                 lo->ldo_stripenr);
1389
1390                 stripe_len = lo->ldo_stripenr;
1391                 OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_len);
1392                 if (stripe == NULL)
1393                         GOTO(out, rc = -ENOMEM);
1394
1395                 lod_getref(&d->lod_ost_descs);
1396                 /* XXX: support for non-0 files w/o objects */
1397                 if (lo->ldo_def_stripe_offset >= d->lod_desc.ld_tgt_count) {
1398                         lod_qos_statfs_update(env, d);
1399                         rc = lod_alloc_qos(env, lo, stripe, flag, th);
1400                         if (rc == -EAGAIN)
1401                                 rc = lod_alloc_rr(env, lo, stripe, flag, th);
1402                 } else {
1403                         rc = lod_alloc_specific(env, lo, stripe, flag, th);
1404                 }
1405                 lod_putref(d, &d->lod_ost_descs);
1406
1407                 if (rc < 0) {
1408                         for (i = 0; i < stripe_len; i++)
1409                                 if (stripe[i] != NULL)
1410                                         lu_object_put(env, &stripe[i]->do_lu);
1411
1412                         OBD_FREE(stripe, sizeof(stripe[0]) * stripe_len);
1413                 } else {
1414                         lo->ldo_stripe = stripe;
1415                         lo->ldo_stripes_allocated = stripe_len;
1416                 }
1417         } else {
1418                 /*
1419                  * lod_qos_parse_config() found supplied buf as a predefined
1420                  * striping (not a hint), so it allocated all the object
1421                  * now we need to create them
1422                  */
1423                 for (i = 0; i < lo->ldo_stripenr; i++) {
1424                         struct dt_object  *o;
1425
1426                         o = lo->ldo_stripe[i];
1427                         LASSERT(o);
1428
1429                         rc = dt_declare_create(env, o, attr, NULL, NULL, th);
1430                         if (rc) {
1431                                 CERROR("can't declare create: %d\n", rc);
1432                                 break;
1433                         }
1434                 }
1435         }
1436
1437 out:
1438         RETURN(rc);
1439 }
1440