Whamcloud - gitweb
Branch: b_new_cmd
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/cmm/cmm_split.c
5  *  Lustre splitting dir 
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Alex thomas <alex@clusterfs.com>
9  *           Wang Di     <wangdi@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29
30 #ifndef EXPORT_SYMTAB
31 # define EXPORT_SYMTAB
32 #endif
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd_class.h>
37 #include <lustre_fid.h>
38 #include <lustre_mds.h>
39 #include "cmm_internal.h"
40 #include "mdc_internal.h"
41
42 struct cmm_thread_info {
43         struct md_attr   cti_ma;
44 };
45
46 struct lu_context_key cmm_thread_key;
47 struct cmm_thread_info *cmm_ctx_info(const struct lu_context *ctx)
48 {
49         struct cmm_thread_info *info;
50
51         info = lu_context_key_get(ctx, &cmm_thread_key);
52         LASSERT(info != NULL);
53         return info;
54 }
55
56 #define CMM_NO_SPLIT_EXPECTED   0
57 #define CMM_EXPECT_SPLIT        1
58 #define CMM_NO_SPLITTABLE       2
59
60 #define SPLIT_SIZE 64*1024
61
62 static int cmm_expect_splitting(const struct lu_context *ctx,
63                                 struct md_object *mo, struct md_attr *ma)
64 {
65         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
66         ENTRY;
67
68         if (cmm->cmm_tgt_count == 1)
69                 RETURN(CMM_NO_SPLIT_EXPECTED);
70
71         if (ma->ma_attr.la_size < SPLIT_SIZE)
72                 RETURN(CMM_NO_SPLIT_EXPECTED);
73
74         if (ma->ma_lmv_size)
75                 RETURN(CMM_NO_SPLIT_EXPECTED);
76                        
77         RETURN(CMM_EXPECT_SPLIT);
78 }
79
80 static inline struct lu_fid* cmm2_fid(struct cmm_object *obj)
81 {
82        return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
83 }
84
85 #define cmm_md_size(stripes)                            \
86        (sizeof(struct lmv_stripe_md) + stripes * sizeof(struct lu_fid))
87
88 static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm,
89                          struct lu_fid *fid, int count)
90 {
91         struct  mdc_device *mc, *tmp;
92         int rc = 0, i = 0;
93         
94         LASSERT(count == cmm->cmm_tgt_count);
95         /* FIXME: this spin_lock maybe not proper, 
96          * because fid_alloc may need RPC */
97         spin_lock(&cmm->cmm_tgt_guard);
98         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
99                                  mc_linkage) {
100                 if (cmm->cmm_local_num == mc->mc_num)
101                         continue;
102
103                 rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i++], NULL);
104                 if (rc < 0) {
105                         spin_unlock(&cmm->cmm_tgt_guard);
106                         RETURN(rc);
107                 }
108         }
109         spin_unlock(&cmm->cmm_tgt_guard);
110         LASSERT(i + 1 == count);
111         if (rc == 1)
112                 rc = 0;
113         RETURN(rc);
114 }
115
116 struct cmm_object *cmm_object_find(const struct lu_context *ctxt,
117                                    struct cmm_device *d,
118                                    const struct lu_fid *f)
119 {
120         struct lu_object *o;
121         struct cmm_object *m;
122         ENTRY;
123
124         o = lu_object_find(ctxt, d->cmm_md_dev.md_lu_dev.ld_site, f);
125         if (IS_ERR(o))
126                 m = (struct cmm_object *)o;
127         else
128                 m = lu2cmm_obj(lu_object_locate(o->lo_header,
129                                d->cmm_md_dev.md_lu_dev.ld_type));
130         RETURN(m);
131 }
132
133 static inline void cmm_object_put(const struct lu_context *ctxt,
134                                   struct cmm_object *o)
135 {
136         lu_object_put(ctxt, &o->cmo_obj.mo_lu);
137 }
138
139 static int cmm_creat_remote_obj(const struct lu_context *ctx, 
140                                 struct cmm_device *cmm,
141                                 struct lu_fid *fid, struct md_attr *ma)
142 {
143         struct cmm_object *obj;
144         struct md_create_spec *spec;
145         int rc;
146         ENTRY;
147
148         obj = cmm_object_find(ctx, cmm, fid);
149         if (IS_ERR(obj))
150                 RETURN(PTR_ERR(obj));
151
152         OBD_ALLOC_PTR(spec);
153         spec->u.sp_pfid = fid; 
154         rc = mo_object_create(ctx, md_object_next(&obj->cmo_obj), 
155                               spec, ma);
156         OBD_FREE_PTR(spec);
157
158         cmm_object_put(ctx, obj);
159         RETURN(rc);
160 }
161
162 static int cmm_create_slave_objects(const struct lu_context *ctx,
163                                     struct md_object *mo, struct md_attr *ma)
164 {
165         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
166         struct lmv_stripe_md *lmv = NULL;
167         int lmv_size, i, rc;
168         struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo));
169         ENTRY;
170
171         lmv_size = cmm_md_size(cmm->cmm_tgt_count);
172
173         /* This lmv will be free after finish splitting. */
174         OBD_ALLOC(lmv, lmv_size);
175         if (!lmv)
176                 RETURN(-ENOMEM);
177
178         lmv->mea_master = -1;
179         lmv->mea_magic = MEA_MAGIC_ALL_CHARS;
180         lmv->mea_count = cmm->cmm_tgt_count;
181
182         lmv->mea_ids[0] = *lf;
183
184         rc = cmm_alloc_fid(ctx, cmm, &lmv->mea_ids[1], cmm->cmm_tgt_count);
185         if (rc)
186                 GOTO(cleanup, rc);
187
188         for (i = 1; i < cmm->cmm_tgt_count; i ++) {
189                 rc = cmm_creat_remote_obj(ctx, cmm, &lmv->mea_ids[i], ma);
190                 if (rc)
191                         GOTO(cleanup, rc);
192         }
193
194         rc = mo_xattr_set(ctx, md_object_next(mo), lmv, lmv_size,
195                           MDS_LMV_MD_NAME, 0);
196
197         ma->ma_lmv_size = lmv_size;
198         ma->ma_lmv = lmv;
199 cleanup:
200         RETURN(rc);
201 }
202
203 static int cmm_send_split_pages(const struct lu_context *ctx, 
204                                 struct md_object *mo, struct lu_rdpg *rdpg, 
205                                 struct lu_fid *fid)
206 {
207         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
208         struct cmm_object *obj;
209         int rc = 0, i;
210         ENTRY;
211
212         obj = cmm_object_find(ctx, cmm, fid);
213         if (IS_ERR(obj))
214                 RETURN(PTR_ERR(obj));
215
216         for (i = 0; i < rdpg->rp_npages; i++) {
217                 rc = mdc_send_page(ctx, md_object_next(&obj->cmo_obj),
218                                    rdpg->rp_pages[i]);
219                 if (rc)
220                         GOTO(cleanup, rc);
221         }
222 cleanup:
223         cmm_object_put(ctx, obj);
224         RETURN(rc);
225 }
226
227 static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo,
228                              struct lu_rdpg *rdpg, struct lu_fid *lf)
229 {
230         struct lu_dirpage *dp;
231         __u32 hash_end;
232         int rc, i;
233         ENTRY;
234
235         /* init page with '0' */
236         for (i = 0; i < rdpg->rp_npages; i++) {
237                 memset(kmap(rdpg->rp_pages[i]), 0, CFS_PAGE_SIZE);
238                 kunmap(rdpg->rp_pages[i]);
239         }
240
241         /* Read splitted page and send them to the slave master */
242         do {
243                 rc = mo_readpage(ctx, md_object_next(mo), rdpg);
244                 if (rc)
245                         RETURN(rc);
246
247                 rc = cmm_send_split_pages(ctx, mo, rdpg, lf);
248                 if (rc)
249                         RETURN(rc);
250
251                 dp = kmap(rdpg->rp_pages[0]);
252                 hash_end = dp->ldp_hash_end;
253                 kunmap(rdpg->rp_pages[0]);
254                 if (hash_end == ~0ul)
255                         break;
256         } while (hash_end < rdpg->rp_hash_end);
257         
258         RETURN(rc);
259 }
260
261 static int cmm_remove_entries(const struct lu_context *ctx, 
262                               struct md_object *mo, struct lu_rdpg *rdpg)
263 {
264         struct lu_dirpage *dp;
265         struct lu_dirent  *ent;
266         int rc = 0, i;
267         ENTRY;
268
269         for (i = 0; i < rdpg->rp_npages; i++) {
270                 kmap(rdpg->rp_pages[i]);
271                 dp = page_address(rdpg->rp_pages[i]);
272                 for (ent = lu_dirent_start(dp); ent != NULL;
273                                   ent = lu_dirent_next(ent)) {
274                         rc = mdo_name_remove(ctx, md_object_next(mo),
275                                              ent->lde_name);
276                         if (rc) {
277                                 kunmap(rdpg->rp_pages[i]);
278                                 RETURN(rc);
279                         }
280                 }
281                 kunmap(rdpg->rp_pages[i]);
282         }
283         RETURN(rc);
284 }
285
286 #define MAX_HASH_SIZE 0x3fffffff
287 #define SPLIT_PAGE_COUNT 1
288 static int cmm_scan_and_split(const struct lu_context *ctx,
289                               struct md_object *mo, struct md_attr *ma)
290 {
291         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
292         __u32 hash_segement;
293         struct lu_rdpg   *rdpg = NULL;
294         int rc = 0, i;
295
296         OBD_ALLOC_PTR(rdpg);
297         if (!rdpg)
298                 RETURN(-ENOMEM);
299
300         rdpg->rp_npages = SPLIT_PAGE_COUNT;
301         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
302
303         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
304         if (rdpg->rp_pages == NULL)
305                 GOTO(free_rdpg, rc = -ENOMEM);
306
307         for (i = 0; i < rdpg->rp_npages; i++) {
308                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
309                 if (rdpg->rp_pages[i] == NULL)
310                         GOTO(cleanup, rc = -ENOMEM);
311         }
312
313         hash_segement = MAX_HASH_SIZE / cmm->cmm_tgt_count;
314         for (i = 1; i < cmm->cmm_tgt_count; i++) {
315                 struct lu_fid *lf = &ma->ma_lmv->mea_ids[i];
316                 
317                 rdpg->rp_hash = i * hash_segement;
318                 rdpg->rp_hash_end = rdpg->rp_hash + hash_segement;
319                 rc = cmm_remove_entries(ctx, mo, rdpg);
320                 if (rc)
321                         GOTO(cleanup, rc);
322
323                 rc = cmm_split_entries(ctx, mo, rdpg, lf);
324                 if (rc)
325                         GOTO(cleanup, rc);
326         }
327 cleanup:
328         for (i = 0; i < rdpg->rp_npages; i++)
329                 if (rdpg->rp_pages[i] != NULL)
330                         __free_pages(rdpg->rp_pages[i], 0);
331         if (rdpg->rp_pages)
332                 OBD_FREE(rdpg->rp_pages, rdpg->rp_npages *
333                                          sizeof rdpg->rp_pages[0]);
334 free_rdpg:
335         if (rdpg)
336                 OBD_FREE_PTR(rdpg);
337
338         RETURN(rc);
339 }
340
341 int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo)
342 {
343         struct md_attr *ma;
344         int rc = 0;
345         ENTRY;
346
347         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
348        
349         OBD_ALLOC_PTR(ma);
350         if (ma == NULL)
351                 RETURN(-ENOMEM);
352
353         ma->ma_need = MA_INODE;
354         rc = mo_attr_get(ctx, mo, ma);
355         if (rc)
356                 GOTO(cleanup, ma);
357
358         /* step1: checking whether the dir need to be splitted */
359         rc = cmm_expect_splitting(ctx, mo, ma);
360         if (rc != CMM_EXPECT_SPLIT)
361                 GOTO(cleanup, rc = 0);
362
363         /* step2: create slave objects */
364         rc = cmm_create_slave_objects(ctx, mo, ma);
365         if (rc)
366                 GOTO(cleanup, ma);
367
368         /* step3: scan and split the object */
369         rc = cmm_scan_and_split(ctx, mo, ma);
370
371 cleanup:
372         if (ma->ma_lmv_size && ma->ma_lmv)
373                 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
374         
375         OBD_FREE_PTR(ma);
376
377         RETURN(rc);
378 }