Whamcloud - gitweb
Branch: HEAD
[fs/lustre-release.git] / lustre / lov / lov_qos.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LOV
26
27 #ifdef __KERNEL__
28 #else
29 #include <liblustre.h>
30 #endif
31
32 #include <linux/obd_class.h>
33 #include <linux/obd_lov.h>
34
35 #include "lov_internal.h"
36
37 void qos_shrink_lsm(struct lov_request_set *set)
38 {
39         struct lov_stripe_md *lsm = set->set_md;
40         struct lov_stripe_md *lsm_new;
41         /* XXX LOV STACKING call into osc for sizes */
42         unsigned oldsize, newsize;
43
44         if (set->set_oti && set->set_cookies && set->set_cookie_sent) {
45                 struct llog_cookie *cookies;
46                 oldsize = lsm->lsm_stripe_count * sizeof(*cookies);
47                 newsize = set->set_count * sizeof(*cookies);
48
49                 cookies = set->set_cookies;
50                 oti_alloc_cookies(set->set_oti, set->set_count);
51                 if (set->set_oti->oti_logcookies) {
52                         memcpy(set->set_oti->oti_logcookies, cookies, newsize);
53                         OBD_FREE(cookies, oldsize);
54                         set->set_cookies = set->set_oti->oti_logcookies;
55                 } else {
56                         CWARN("'leaking' %d bytes\n", oldsize - newsize);
57                 }
58         }
59
60         CWARN("using fewer stripes for object "LPX64": old %u new %u\n",
61               lsm->lsm_object_id, lsm->lsm_stripe_count, set->set_count);
62
63         oldsize = lov_stripe_md_size(lsm->lsm_stripe_count);
64         newsize = lov_stripe_md_size(set->set_count);
65         OBD_ALLOC(lsm_new, newsize);
66         if (lsm_new != NULL) {
67                 memcpy(lsm_new, lsm, newsize);
68                 lsm_new->lsm_stripe_count = set->set_count;
69                 OBD_FREE(lsm, oldsize);
70                 set->set_md = lsm_new;
71         } else {
72                 CWARN("'leaking' %d bytes\n", oldsize - newsize);
73         }
74 }
75
76 #define LOV_CREATE_RESEED_INTERVAL 1000
77 /* FIXME use real qos data to prepare the lov create request */
78 int qos_prep_create(struct lov_obd *lov, struct lov_request_set *set, int newea)
79 {
80         static int ost_start_idx, ost_start_count;
81         unsigned ost_idx, ost_count = lov->desc.ld_tgt_count;
82         struct lov_stripe_md *lsm = set->set_md;
83         struct obdo *src_oa = set->set_oa;
84         int i, rc = 0;
85         ENTRY;
86
87         LASSERT(src_oa->o_valid & OBD_MD_FLID);
88         
89         lsm->lsm_object_id = src_oa->o_id;
90         lsm->lsm_object_gr = src_oa->o_gr;
91         if (!lsm->lsm_stripe_size)
92                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
93         if (!lsm->lsm_pattern) {
94                 lsm->lsm_pattern = lov->desc.ld_pattern ?
95                         lov->desc.ld_pattern : LOV_PATTERN_RAID0;
96         }
97
98         if (newea || lsm->lsm_oinfo[0].loi_ost_idx >= ost_count) {
99                 if (--ost_start_count <= 0) {
100                         ost_start_idx = ll_insecure_random_int();
101                         ost_start_count = LOV_CREATE_RESEED_INTERVAL;
102                 } else if (lsm->lsm_stripe_count >=
103                            lov->desc.ld_active_tgt_count) {
104                         /* If we allocate from all of the stripes, make the
105                          * next file start on the next OST. */
106                         ++ost_start_idx;
107                 }
108                 ost_idx = ost_start_idx % ost_count;
109         } else {
110                 ost_idx = lsm->lsm_oinfo[0].loi_ost_idx;
111         }
112
113         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
114                lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
115
116         for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
117                 struct lov_tgt_desc *tgt = lov->tgts + ost_idx;
118                 struct lov_request *req;
119                 
120                 ++ost_start_idx;
121                 if (!lov_tgt_active(lov, tgt, 0)) {
122                         CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
123                         continue;
124                 }
125
126                 OBD_ALLOC(req, sizeof(*req));
127                 if (req == NULL) {
128                         lov_tgt_decref(lov, tgt);
129                         GOTO(out, rc = -ENOMEM);
130                 }
131                 
132                 req->rq_buflen = sizeof(*req->rq_md);
133                 OBD_ALLOC(req->rq_md, req->rq_buflen);
134                 if (req->rq_md == NULL) {
135                         OBD_FREE(req, sizeof(*req));
136                         lov_tgt_decref(lov, tgt);
137                         GOTO(out, rc = -ENOMEM);
138                 }
139                 
140                 req->rq_oa = obdo_alloc();
141                 if (req->rq_oa == NULL) {
142                         OBD_FREE(req->rq_md, sizeof(*req->rq_md));
143                         OBD_FREE(req, sizeof(*req));
144                         lov_tgt_decref(lov, tgt);
145                         GOTO(out, rc = -ENOMEM);
146                 }
147                 
148                 req->rq_idx = ost_idx;
149                 req->rq_gen = tgt->ltd_gen;
150                 req->rq_stripe = i;
151                 /* create data objects with "parent" OA */
152                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
153
154                 /* XXX When we start creating objects on demand, we need to
155                  *     make sure that we always create the object on the
156                  *     stripe which holds the existing file size.
157                  */
158                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
159                         if (lov_stripe_offset(lsm, src_oa->o_size, i,
160                                               &req->rq_oa->o_size) < 0 &&
161                             req->rq_oa->o_size)
162                                 req->rq_oa->o_size--;
163
164                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
165                                i, req->rq_oa->o_size, src_oa->o_size);
166                 }
167
168                 lov_set_add_req(req, set);
169
170                 /* If we have allocated enough objects, we are OK */
171                 if (set->set_count == lsm->lsm_stripe_count)
172                         GOTO(out, rc = 0);
173         }
174         
175         if (set->set_count == 0)
176                 GOTO(out, rc = -EIO);
177         
178         /* If we were passed specific striping params, then a failure to
179          * meet those requirements is an error, since we can't reallocate
180          * that memory (it might be part of a larger array or something).
181          *
182          * We can only get here if lsm_stripe_count was originally > 1.
183          */
184         if (!newea) {
185                 CERROR("can't lstripe objid "LPX64": have %u want %u, rc %d\n",
186                        lsm->lsm_object_id, set->set_count, 
187                        lsm->lsm_stripe_count, rc);
188                 rc = rc ? rc : -EFBIG;
189         } else {
190                 qos_shrink_lsm(set);
191                 rc = 0;
192         }
193 out:
194         RETURN(rc);
195 }