Whamcloud - gitweb
* Compiles after merging b1_4
[fs/lustre-release.git] / lustre / lov / lov_qos.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LOV
26
27 #ifdef __KERNEL__
28 #else
29 #include <liblustre.h>
30 #endif
31
32 #include <linux/obd_class.h>
33 #include <linux/obd_lov.h>
34
35 #include "lov_internal.h"
36
37 void qos_shrink_lsm(struct lov_request_set *set)
38 {
39         struct lov_stripe_md *lsm = set->set_md, *lsm_new;
40         /* XXX LOV STACKING call into osc for sizes */
41         unsigned oldsize, newsize;
42
43         if (set->set_oti && set->set_cookies && set->set_cookie_sent) {
44                 struct llog_cookie *cookies;
45                 oldsize = lsm->lsm_stripe_count * sizeof(*cookies);
46                 newsize = set->set_count * sizeof(*cookies);
47
48                 cookies = set->set_cookies;
49                 oti_alloc_cookies(set->set_oti, set->set_count);
50                 if (set->set_oti->oti_logcookies) {
51                         memcpy(set->set_oti->oti_logcookies, cookies, newsize);
52                         OBD_FREE(cookies, oldsize);
53                         set->set_cookies = set->set_oti->oti_logcookies;
54                 } else {
55                         CWARN("'leaking' %d bytes\n", oldsize - newsize);
56                 }
57         }
58
59         CWARN("using fewer stripes for object "LPX64": old %u new %u\n",
60               lsm->lsm_object_id, lsm->lsm_stripe_count, set->set_count);
61
62         oldsize = lov_stripe_md_size(lsm->lsm_stripe_count);
63         newsize = lov_stripe_md_size(set->set_count);
64         OBD_ALLOC(lsm_new, newsize);
65         if (lsm_new != NULL) {
66                 memcpy(lsm_new, lsm, newsize);
67                 lsm_new->lsm_stripe_count = set->set_count;
68                 OBD_FREE(lsm, oldsize);
69                 set->set_md = lsm_new;
70         } else {
71                 CWARN("'leaking' %d bytes\n", oldsize - newsize);
72         }
73 }
74
75 int qos_remedy_create(struct lov_request_set *set, struct lov_request *req)
76 {
77         struct lov_stripe_md *lsm = set->set_md;
78         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
79         unsigned ost_idx, ost_count = lov->desc.ld_tgt_count;
80         int stripe, i, rc = -EIO;
81         ENTRY;
82
83         ost_idx = (req->rq_idx + 1) % ost_count; 
84         for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
85                 if (lov->tgts[ost_idx].active == 0) {
86                         CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
87                         continue;
88                 }
89                 /* check if objects has been created on this ost */
90                 for (stripe = req->rq_stripe; stripe >= 0; stripe--) {
91                         if (ost_idx == lsm->lsm_oinfo[stripe].loi_ost_idx)
92                                 break;
93                 }
94
95                 if (stripe < 0) {
96                         req->rq_idx = ost_idx;
97                         rc = obd_create(lov->tgts[ost_idx].ltd_exp, req->rq_oa, 
98                                         &req->rq_md, set->set_oti);
99                         if (!rc)
100                                 break;
101                 }
102         }
103         RETURN(rc);
104 }
105
106 #define LOV_CREATE_RESEED_INTERVAL 1000
107 /* FIXME use real qos data to prepare the lov create request */
108 int qos_prep_create(struct lov_obd *lov, struct lov_request_set *set, int newea)
109 {
110         static int ost_start_idx, ost_start_count;
111         unsigned ost_idx, ost_count = lov->desc.ld_tgt_count;
112         struct lov_stripe_md *lsm = set->set_md;
113         struct obdo *src_oa = set->set_oa;
114         int i, rc = 0;
115         ENTRY;
116
117         LASSERT(src_oa->o_valid & OBD_MD_FLID);
118
119         lsm->lsm_object_id = src_oa->o_id;
120         if (!lsm->lsm_stripe_size)
121                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
122         if (!lsm->lsm_pattern) {
123                 lsm->lsm_pattern = lov->desc.ld_pattern ?
124                         lov->desc.ld_pattern : LOV_PATTERN_RAID0;
125         }
126
127         if (newea || lsm->lsm_oinfo[0].loi_ost_idx >= ost_count) {
128                 if (--ost_start_count <= 0) {
129                         ost_start_idx = ll_insecure_random_int();
130                         ost_start_count = LOV_CREATE_RESEED_INTERVAL;
131                 } else if (lsm->lsm_stripe_count >=
132                            lov->desc.ld_active_tgt_count) {
133                         /* If we allocate from all of the stripes, make the
134                          * next file start on the next OST. */
135                         ++ost_start_idx;
136                 }
137                 ost_idx = ost_start_idx % ost_count;
138         } else {
139                 ost_idx = lsm->lsm_oinfo[0].loi_ost_idx;
140         }
141
142         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
143                lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
144
145         for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
146                 struct lov_request *req;
147
148                 ++ost_start_idx;
149                 if (lov->tgts[ost_idx].active == 0) {
150                         CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
151                         continue;
152                 }
153
154                 OBD_ALLOC(req, sizeof(*req));
155                 if (req == NULL)
156                         GOTO(out, rc = -ENOMEM);
157
158                 req->rq_buflen = sizeof(*req->rq_md);
159                 OBD_ALLOC(req->rq_md, req->rq_buflen);
160                 if (req->rq_md == NULL)
161                         GOTO(out, rc = -ENOMEM);
162
163                 req->rq_oa = obdo_alloc();
164                 if (req->rq_oa == NULL)
165                         GOTO(out, rc = -ENOMEM);
166
167                 req->rq_idx = ost_idx;
168                 req->rq_stripe = i;
169                 /* create data objects with "parent" OA */
170                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
171
172                 /* XXX When we start creating objects on demand, we need to
173                  *     make sure that we always create the object on the
174                  *     stripe which holds the existing file size.
175                  */
176                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
177                         if (lov_stripe_offset(lsm, src_oa->o_size, i,
178                                               &req->rq_oa->o_size) < 0 &&
179                             req->rq_oa->o_size)
180                                 req->rq_oa->o_size--;
181
182                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
183                                i, req->rq_oa->o_size, src_oa->o_size);
184                 }
185
186                 lov_set_add_req(req, set);
187
188                 /* If we have allocated enough objects, we are OK */
189                 if (set->set_count == lsm->lsm_stripe_count)
190                         GOTO(out, rc = 0);
191         }
192
193         if (set->set_count == 0)
194                 GOTO(out, rc = -EIO);
195
196         /* If we were passed specific striping params, then a failure to
197          * meet those requirements is an error, since we can't reallocate
198          * that memory (it might be part of a larger array or something).
199          *
200          * We can only get here if lsm_stripe_count was originally > 1.
201          */
202         if (!newea) {
203                 CERROR("can't lstripe objid "LPX64": have %u want %u, rc %d\n",
204                        lsm->lsm_object_id, set->set_count,
205                        lsm->lsm_stripe_count, rc);
206                 rc = rc ? rc : -EFBIG;
207         } else {
208                 qos_shrink_lsm(set);
209                 rc = 0;
210         }
211 out:
212         RETURN(rc);
213 }