Whamcloud - gitweb
- fixed ctx using in fld_client (bug found by Tom). In few words, ctx cannot be saved...
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/cmm/cmm_split.c
5  *  Lustre splitting dir 
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Alex thomas <alex@clusterfs.com>
9  *           Wang Di     <wangdi@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29
30 #ifndef EXPORT_SYMTAB
31 # define EXPORT_SYMTAB
32 #endif
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd_class.h>
37 #include <lustre_fid.h>
38 #include <lustre_mds.h>
39 #include "cmm_internal.h"
40 #include "mdc_internal.h"
41
42 #define CMM_NO_SPLIT_EXPECTED   0
43 #define CMM_EXPECT_SPLIT        1
44 #define CMM_NO_SPLITTABLE       2
45
46 #define SPLIT_SIZE 8*1024
47
48 static inline struct lu_fid* cmm2_fid(struct cmm_object *obj)
49 {
50        return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
51 }
52
53 static int cmm_expect_splitting(const struct lu_context *ctx,
54                                 struct md_object *mo, struct md_attr *ma)
55 {
56         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
57         struct lu_fid *fid = NULL;
58         int rc = CMM_EXPECT_SPLIT;
59         ENTRY;
60
61         if (cmm->cmm_tgt_count == 0)
62                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
63
64         if (ma->ma_attr.la_size < SPLIT_SIZE)
65                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
66
67         if (ma->ma_lmv_size)
68                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
69                    
70         OBD_ALLOC_PTR(fid);
71         rc = cmm_root_get(ctx, &cmm->cmm_md_dev, fid);
72         if (rc)
73                 GOTO(cleanup, rc);
74         
75         if (lu_fid_eq(fid, cmm2_fid(md2cmm_obj(mo))))
76                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); 
77
78 cleanup:
79         if (fid)
80                 OBD_FREE_PTR(fid);
81         RETURN(rc);
82 }
83
84 #define cmm_md_size(stripes)                            \
85        (sizeof(struct lmv_stripe_md) + stripes * sizeof(struct lu_fid))
86
87 static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm,
88                          struct lu_fid *fid, int count)
89 {
90         struct  mdc_device *mc, *tmp;
91         int rc = 0, i = 0;
92         
93         LASSERT(count == cmm->cmm_tgt_count);
94         /* FIXME: this spin_lock maybe not proper, 
95          * because fid_alloc may need RPC */
96         spin_lock(&cmm->cmm_tgt_guard);
97         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
98                                  mc_linkage) {
99                 LASSERT(cmm->cmm_local_num != mc->mc_num);
100                 
101                 rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i++], NULL);
102                 if (rc > 0) {
103                         struct lu_site *ls;
104
105                         ls = cmm->cmm_md_dev.md_lu_dev.ld_site;
106                         rc = fld_client_create(ls->ls_client_fld,
107                                                fid_seq(&fid[i]),
108                                                mc->mc_num, ctx);
109                 }
110                 if (rc < 0) {
111                         spin_unlock(&cmm->cmm_tgt_guard);
112                         RETURN(rc);
113                 }
114         }
115         spin_unlock(&cmm->cmm_tgt_guard);
116         LASSERT(i == count);
117         if (rc == 1)
118                 rc = 0;
119         RETURN(rc);
120 }
121
122 struct cmm_object *cmm_object_find(const struct lu_context *ctxt,
123                                    struct cmm_device *d,
124                                    const struct lu_fid *f)
125 {
126         struct lu_object *o;
127         struct cmm_object *m;
128         ENTRY;
129
130         o = lu_object_find(ctxt, d->cmm_md_dev.md_lu_dev.ld_site, f);
131         if (IS_ERR(o))
132                 m = (struct cmm_object *)o;
133         else
134                 m = lu2cmm_obj(lu_object_locate(o->lo_header,
135                                d->cmm_md_dev.md_lu_dev.ld_type));
136         RETURN(m);
137 }
138
139 static inline void cmm_object_put(const struct lu_context *ctxt,
140                                   struct cmm_object *o)
141 {
142         lu_object_put(ctxt, &o->cmo_obj.mo_lu);
143 }
144
145 static int cmm_creat_remote_obj(const struct lu_context *ctx, 
146                                 struct cmm_device *cmm,
147                                 struct lu_fid *fid, struct md_attr *ma)
148 {
149         struct cmm_object *obj;
150         struct md_create_spec *spec;
151         int rc;
152         ENTRY;
153
154         obj = cmm_object_find(ctx, cmm, fid);
155         if (IS_ERR(obj))
156                 RETURN(PTR_ERR(obj));
157
158         OBD_ALLOC_PTR(spec);
159         spec->u.sp_pfid = fid; 
160         rc = mo_object_create(ctx, md_object_next(&obj->cmo_obj), 
161                               spec, ma);
162         OBD_FREE_PTR(spec);
163
164         cmm_object_put(ctx, obj);
165         RETURN(rc);
166 }
167
168 static int cmm_create_slave_objects(const struct lu_context *ctx,
169                                     struct md_object *mo, struct md_attr *ma)
170 {
171         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
172         struct lmv_stripe_md *lmv = NULL;
173         int lmv_size, i, rc;
174         struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo));
175         ENTRY;
176
177         lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
178
179         /* This lmv will be free after finish splitting. */
180         OBD_ALLOC(lmv, lmv_size);
181         if (!lmv)
182                 RETURN(-ENOMEM);
183
184         lmv->mea_master = -1;
185         lmv->mea_magic = MEA_MAGIC_ALL_CHARS;
186         lmv->mea_count = cmm->cmm_tgt_count;
187
188         lmv->mea_ids[0] = *lf;
189
190         rc = cmm_alloc_fid(ctx, cmm, &lmv->mea_ids[1], cmm->cmm_tgt_count);
191         if (rc)
192                 GOTO(cleanup, rc);
193
194         for (i = 1; i < cmm->cmm_tgt_count; i ++) {
195                 rc = cmm_creat_remote_obj(ctx, cmm, &lmv->mea_ids[i], ma);
196                 if (rc)
197                         GOTO(cleanup, rc);
198         }
199
200         rc = mo_xattr_set(ctx, md_object_next(mo), lmv, lmv_size,
201                           MDS_LMV_MD_NAME, 0);
202
203         ma->ma_lmv_size = lmv_size;
204         ma->ma_lmv = lmv;
205 cleanup:
206         RETURN(rc);
207 }
208
209 static int cmm_send_split_pages(const struct lu_context *ctx, 
210                                 struct md_object *mo, struct lu_rdpg *rdpg, 
211                                 struct lu_fid *fid, __u32 hash_end)
212 {
213         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
214         struct cmm_object *obj;
215         int rc = 0, i;
216         ENTRY;
217
218         obj = cmm_object_find(ctx, cmm, fid);
219         if (IS_ERR(obj))
220                 RETURN(PTR_ERR(obj));
221
222         for (i = 0; i < rdpg->rp_npages; i++) {
223                 rc = mdc_send_page(cmm, ctx, md_object_next(&obj->cmo_obj),
224                                    rdpg->rp_pages[i], hash_end);
225                 if (rc)
226                         break;
227         }
228         cmm_object_put(ctx, obj);
229         RETURN(rc);
230 }
231
232 static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo,
233                              struct lu_rdpg *rdpg, struct lu_fid *lf, 
234                              __u32 end)
235 {
236         int rc, i;
237         ENTRY;
238
239         /* Read splitted page and send them to the slave master */
240         do {
241                 /* init page with '0' */
242                 for (i = 0; i < rdpg->rp_npages; i++) {
243                         memset(kmap(rdpg->rp_pages[i]), 0, CFS_PAGE_SIZE);
244                         kunmap(rdpg->rp_pages[i]);
245                 }
246
247                 rc = mo_readpage(ctx, md_object_next(mo), rdpg);
248                 
249                 /* -E2BIG means it already reach the end of the dir */
250                 if (rc == -E2BIG)                         
251                         RETURN(0);
252                 if (rc)
253                         RETURN(rc);
254
255                 rc = cmm_send_split_pages(ctx, mo, rdpg, lf, end);
256
257         } while (rc == 0);
258
259         /* it means already finish splitting this segment */
260         if (rc == -E2BIG)
261                 rc = 0;
262         RETURN(rc);
263 }
264
265 #if 0
266 static int cmm_remove_entries(const struct lu_context *ctx, 
267                               struct md_object *mo, struct lu_rdpg *rdpg)
268 {
269         struct lu_dirpage *dp;
270         struct lu_dirent  *ent;
271         int rc = 0, i;
272         ENTRY;
273
274         for (i = 0; i < rdpg->rp_npages; i++) {
275                 kmap(rdpg->rp_pages[i]);
276                 dp = page_address(rdpg->rp_pages[i]);
277                 for (ent = lu_dirent_start(dp); ent != NULL;
278                                   ent = lu_dirent_next(ent)) {
279                         rc = mdo_name_remove(ctx, md_object_next(mo),
280                                              ent->lde_name);
281                         if (rc) {
282                                 kunmap(rdpg->rp_pages[i]);
283                                 RETURN(rc);
284                         }
285                 }
286                 kunmap(rdpg->rp_pages[i]);
287         }
288         RETURN(rc);
289 }
290 #endif
291 #define MAX_HASH_SIZE 0x3fffffff
292 #define SPLIT_PAGE_COUNT 1
293 static int cmm_scan_and_split(const struct lu_context *ctx,
294                               struct md_object *mo, struct md_attr *ma)
295 {
296         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
297         __u32 hash_segement;
298         struct lu_rdpg   *rdpg = NULL;
299         int rc = 0, i;
300
301         OBD_ALLOC_PTR(rdpg);
302         if (!rdpg)
303                 RETURN(-ENOMEM);
304
305         rdpg->rp_npages = SPLIT_PAGE_COUNT;
306         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
307
308         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
309         if (rdpg->rp_pages == NULL)
310                 GOTO(free_rdpg, rc = -ENOMEM);
311
312         for (i = 0; i < rdpg->rp_npages; i++) {
313                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
314                 if (rdpg->rp_pages[i] == NULL)
315                         GOTO(cleanup, rc = -ENOMEM);
316         }
317
318         hash_segement = MAX_HASH_SIZE / cmm->cmm_tgt_count;
319         for (i = 1; i < cmm->cmm_tgt_count; i++) {
320                 struct lu_fid *lf = &ma->ma_lmv->mea_ids[i];
321                 __u32 hash_end;
322                 
323                 rdpg->rp_hash = i * hash_segement;
324                 hash_end = rdpg->rp_hash + hash_segement;
325                 
326                 rc = cmm_split_entries(ctx, mo, rdpg, lf, hash_end);
327                 if (rc)
328                         GOTO(cleanup, rc);
329         }
330 cleanup:
331         for (i = 0; i < rdpg->rp_npages; i++)
332                 if (rdpg->rp_pages[i] != NULL)
333                         __free_pages(rdpg->rp_pages[i], 0);
334         if (rdpg->rp_pages)
335                 OBD_FREE(rdpg->rp_pages, rdpg->rp_npages *
336                                          sizeof rdpg->rp_pages[0]);
337 free_rdpg:
338         if (rdpg)
339                 OBD_FREE_PTR(rdpg);
340
341         RETURN(rc);
342 }
343
344 int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo)
345 {
346         struct md_attr *ma;
347         int rc = 0;
348         ENTRY;
349
350         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
351        
352         OBD_ALLOC_PTR(ma);
353         if (ma == NULL)
354                 RETURN(-ENOMEM);
355
356         ma->ma_need = MA_INODE;
357         rc = mo_attr_get(ctx, mo, ma);
358         if (rc)
359                 GOTO(cleanup, ma);
360
361         /* step1: checking whether the dir need to be splitted */
362         rc = cmm_expect_splitting(ctx, mo, ma);
363         if (rc != CMM_EXPECT_SPLIT)
364                 GOTO(cleanup, rc = 0);
365
366         /* step2: create slave objects */
367         rc = cmm_create_slave_objects(ctx, mo, ma);
368         if (rc)
369                 GOTO(cleanup, ma);
370
371         /* step3: scan and split the object */
372         rc = cmm_scan_and_split(ctx, mo, ma);
373
374 cleanup:
375         if (ma->ma_lmv_size && ma->ma_lmv)
376                 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
377         
378         OBD_FREE_PTR(ma);
379
380         RETURN(rc);
381 }