Whamcloud - gitweb
Branch b1_4_mountconf
[fs/lustre-release.git] / lustre / lov / lov_qos.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29
30 #ifdef __KERNEL__
31 #else
32 #include <liblustre.h>
33 #endif
34
35 #include <linux/obd_class.h>
36 #include <linux/obd_lov.h>
37
38 #include "lov_internal.h"
39
40 void qos_shrink_lsm(struct lov_request_set *set)
41 {
42         struct lov_stripe_md *lsm = set->set_md, *lsm_new;
43         /* XXX LOV STACKING call into osc for sizes */
44         unsigned oldsize, newsize;
45
46         if (set->set_oti && set->set_cookies && set->set_cookie_sent) {
47                 struct llog_cookie *cookies;
48                 oldsize = lsm->lsm_stripe_count * sizeof(*cookies);
49                 newsize = set->set_count * sizeof(*cookies);
50
51                 cookies = set->set_cookies;
52                 oti_alloc_cookies(set->set_oti, set->set_count);
53                 if (set->set_oti->oti_logcookies) {
54                         memcpy(set->set_oti->oti_logcookies, cookies, newsize);
55                         OBD_FREE(cookies, oldsize);
56                         set->set_cookies = set->set_oti->oti_logcookies;
57                 } else {
58                         CWARN("'leaking' %d bytes\n", oldsize - newsize);
59                 }
60         }
61
62         CWARN("using fewer stripes for object "LPX64": old %u new %u\n",
63               lsm->lsm_object_id, lsm->lsm_stripe_count, set->set_count);
64
65         oldsize = lov_stripe_md_size(lsm->lsm_stripe_count);
66         newsize = lov_stripe_md_size(set->set_count);
67         OBD_ALLOC(lsm_new, newsize);
68         if (lsm_new != NULL) {
69                 memcpy(lsm_new, lsm, newsize);
70                 lsm_new->lsm_stripe_count = set->set_count;
71                 OBD_FREE(lsm, oldsize);
72                 set->set_md = lsm_new;
73         } else {
74                 CWARN("'leaking' %d bytes\n", oldsize - newsize);
75         }
76 }
77
78 int qos_remedy_create(struct lov_request_set *set, struct lov_request *req)
79 {
80         struct lov_stripe_md *lsm = set->set_md;
81         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
82         unsigned ost_idx, ost_count = lov->desc.ld_tgt_count;
83         int stripe, i, rc = -EIO;
84         ENTRY;
85
86         ost_idx = (req->rq_idx + 1) % ost_count; 
87         for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
88                 if (lov->tgts[ost_idx].active == 0) {
89                         CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
90                         continue;
91                 }
92                 /* check if objects has been created on this ost */
93                 for (stripe = req->rq_stripe; stripe >= 0; stripe--) {
94                         if (ost_idx == lsm->lsm_oinfo[stripe].loi_ost_idx)
95                                 break;
96                 }
97
98                 if (stripe < 0) {
99                         req->rq_idx = ost_idx;
100                         rc = obd_create(lov->tgts[ost_idx].ltd_exp, req->rq_oa, 
101                                         &req->rq_md, set->set_oti);
102                         if (!rc)
103                                 break;
104                 }
105         }
106         RETURN(rc);
107 }
108
109 #define LOV_CREATE_RESEED_MULT 4
110 #define LOV_CREATE_RESEED_MIN  1000
111 /* FIXME use real qos data to prepare the lov create request */
112 int qos_prep_create(struct lov_obd *lov, struct lov_request_set *set, int newea)
113 {
114         static int ost_start_idx, ost_start_count;
115         unsigned ost_idx, ost_count = lov->desc.ld_tgt_count;
116         unsigned ost_active_count = lov->desc.ld_active_tgt_count;
117         struct lov_stripe_md *lsm = set->set_md;
118         struct obdo *src_oa = set->set_oa;
119         int i, rc = 0;
120         ENTRY;
121
122         LASSERT(src_oa->o_valid & OBD_MD_FLID);
123
124         lsm->lsm_object_id = src_oa->o_id;
125         if (!lsm->lsm_stripe_size)
126                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
127         if (!lsm->lsm_pattern) {
128                 lsm->lsm_pattern = lov->desc.ld_pattern ?
129                         lov->desc.ld_pattern : LOV_PATTERN_RAID0;
130         }
131
132         if (newea || lsm->lsm_oinfo[0].loi_ost_idx >= ost_count) {
133                 if (--ost_start_count <= 0) {
134                         ost_start_idx = ll_insecure_random_int();
135                         ost_start_count =
136                           (LOV_CREATE_RESEED_MIN / max(ost_active_count, 1U) +
137                            LOV_CREATE_RESEED_MULT) * max(ost_active_count, 1U);
138                 } else if (lsm->lsm_stripe_count >= ost_active_count) {
139                         /* If we allocate from all of the stripes, make the
140                          * next file start on the next OST. */
141                         ++ost_start_idx;
142                 }
143                 ost_idx = ost_start_idx % ost_count;
144         } else {
145                 ost_idx = lsm->lsm_oinfo[0].loi_ost_idx;
146         }
147
148         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
149                lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
150
151         for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
152                 struct lov_request *req;
153
154                 ++ost_start_idx;
155                 if (lov->tgts[ost_idx].active == 0) {
156                         CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
157                         continue;
158                 }
159
160                 OBD_ALLOC(req, sizeof(*req));
161                 if (req == NULL)
162                         GOTO(out, rc = -ENOMEM);
163
164                 req->rq_buflen = sizeof(*req->rq_md);
165                 OBD_ALLOC(req->rq_md, req->rq_buflen);
166                 if (req->rq_md == NULL)
167                         GOTO(out, rc = -ENOMEM);
168
169                 req->rq_oa = obdo_alloc();
170                 if (req->rq_oa == NULL)
171                         GOTO(out, rc = -ENOMEM);
172
173                 req->rq_idx = ost_idx;
174                 req->rq_stripe = i;
175                 /* create data objects with "parent" OA */
176                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
177
178                 /* XXX When we start creating objects on demand, we need to
179                  *     make sure that we always create the object on the
180                  *     stripe which holds the existing file size.
181                  */
182                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
183                         if (lov_stripe_offset(lsm, src_oa->o_size, i,
184                                               &req->rq_oa->o_size) < 0 &&
185                             req->rq_oa->o_size)
186                                 req->rq_oa->o_size--;
187
188                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
189                                i, req->rq_oa->o_size, src_oa->o_size);
190                 }
191
192                 lov_set_add_req(req, set);
193
194                 /* If we have allocated enough objects, we are OK */
195                 if (set->set_count == lsm->lsm_stripe_count)
196                         GOTO(out, rc = 0);
197         }
198
199         if (set->set_count == 0)
200                 GOTO(out, rc = -EIO);
201
202         /* If we were passed specific striping params, then a failure to
203          * meet those requirements is an error, since we can't reallocate
204          * that memory (it might be part of a larger array or something).
205          *
206          * We can only get here if lsm_stripe_count was originally > 1.
207          */
208         if (!newea) {
209                 CERROR("can't lstripe objid "LPX64": have %u want %u, rc %d\n",
210                        lsm->lsm_object_id, set->set_count,
211                        lsm->lsm_stripe_count, rc);
212                 rc = rc ? rc : -EFBIG;
213         } else {
214                 qos_shrink_lsm(set);
215                 rc = 0;
216         }
217 out:
218         RETURN(rc);
219 }