Whamcloud - gitweb
Branch: b_new_cmd
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/cmm/cmm_split.c
5  *  Lustre splitting dir
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Alex thomas <alex@clusterfs.com>
9  *           Wang Di     <wangdi@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29
30 #ifndef EXPORT_SYMTAB
31 # define EXPORT_SYMTAB
32 #endif
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd_class.h>
37 #include <lustre_fid.h>
38 #include <lustre_mds.h>
39 #include <lustre_idl.h>
40 #include "cmm_internal.h"
41 #include "mdc_internal.h"
42
43 #define CMM_NO_SPLIT_EXPECTED   0
44 #define CMM_EXPECT_SPLIT        1
45 #define CMM_NO_SPLITTABLE       2
46
47 enum {
48         SPLIT_SIZE =  12*1024
49 };
50
51 static inline struct lu_fid* cmm2_fid(struct cmm_object *obj)
52 {
53        return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
54 }
55
56 static int cmm_expect_splitting(const struct lu_context *ctx,
57                                 struct md_object *mo, struct md_attr *ma)
58 {
59         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
60         struct lu_fid *fid = NULL;
61         int rc = CMM_EXPECT_SPLIT;
62         ENTRY;
63
64         if (cmm->cmm_tgt_count == 0)
65                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
66
67         if (ma->ma_attr.la_size < SPLIT_SIZE)
68                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
69
70         if (ma->ma_lmv_size)
71                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
72         OBD_ALLOC_PTR(fid);
73         rc = cmm_child_ops(cmm)->mdo_root_get(ctx, cmm->cmm_child, 
74                                               fid);
75         if (rc)
76                 GOTO(cleanup, rc);
77
78         rc = CMM_EXPECT_SPLIT;
79
80         if (lu_fid_eq(fid, cmm2_fid(md2cmm_obj(mo))))
81                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
82
83 cleanup:
84         if (fid)
85                 OBD_FREE_PTR(fid);
86         RETURN(rc);
87 }
88
89 #define cmm_md_size(stripes)                            \
90        (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
91
92 static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm,
93                          struct lu_fid *fid, int count)
94 {
95         struct  mdc_device *mc, *tmp;
96         int rc = 0, i = 0;
97
98         LASSERT(count == cmm->cmm_tgt_count);
99         /* FIXME: this spin_lock maybe not proper,
100          * because fid_alloc may need RPC */
101         spin_lock(&cmm->cmm_tgt_guard);
102         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
103                                  mc_linkage) {
104                 LASSERT(cmm->cmm_local_num != mc->mc_num);
105
106                 rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i], NULL);
107                 if (rc > 0) {
108                         struct lu_site *ls;
109
110                         ls = cmm->cmm_md_dev.md_lu_dev.ld_site;
111                         rc = fld_client_create(ls->ls_client_fld,
112                                                fid_seq(&fid[i]),
113                                                mc->mc_num, ctx);
114                 }
115                 if (rc < 0) {
116                         spin_unlock(&cmm->cmm_tgt_guard);
117                         RETURN(rc);
118                 }
119                 i++;
120         }
121         spin_unlock(&cmm->cmm_tgt_guard);
122         LASSERT(i == count);
123         if (rc == 1)
124                 rc = 0;
125         RETURN(rc);
126 }
127
128 struct cmm_object *cmm_object_find(const struct lu_context *ctxt,
129                                    struct cmm_device *d,
130                                    const struct lu_fid *f)
131 {
132         struct lu_object *o;
133         struct cmm_object *m;
134         ENTRY;
135
136         o = lu_object_find(ctxt, d->cmm_md_dev.md_lu_dev.ld_site, f);
137         if (IS_ERR(o))
138                 m = (struct cmm_object *)o;
139         else
140                 m = lu2cmm_obj(lu_object_locate(o->lo_header,
141                                d->cmm_md_dev.md_lu_dev.ld_type));
142         RETURN(m);
143 }
144
145 static inline void cmm_object_put(const struct lu_context *ctxt,
146                                   struct cmm_object *o)
147 {
148         lu_object_put(ctxt, &o->cmo_obj.mo_lu);
149 }
150
151 static int cmm_creat_remote_obj(const struct lu_context *ctx,
152                                 struct cmm_device *cmm,
153                                 struct lu_fid *fid, struct md_attr *ma,
154                                 const struct lmv_stripe_md *lmv,
155                                 int lmv_size)
156 {
157         struct cmm_object *obj;
158         struct md_create_spec *spec;
159         int rc;
160         ENTRY;
161
162         obj = cmm_object_find(ctx, cmm, fid);
163         if (IS_ERR(obj))
164                 RETURN(PTR_ERR(obj));
165
166         OBD_ALLOC_PTR(spec);
167
168         spec->u.sp_ea.fid = fid;
169         spec->u.sp_ea.eadata = lmv;
170         spec->u.sp_ea.eadatalen = lmv_size;
171         spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
172         rc = mo_object_create(ctx, md_object_next(&obj->cmo_obj),
173                               spec, ma);
174         OBD_FREE_PTR(spec);
175
176         cmm_object_put(ctx, obj);
177         RETURN(rc);
178 }
179
180 static int cmm_create_slave_objects(const struct lu_context *ctx,
181                                     struct md_object *mo, struct md_attr *ma)
182 {
183         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
184         struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
185         int lmv_size, i, rc;
186         struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo));
187         ENTRY;
188
189         lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
190
191         /* This lmv will be free after finish splitting. */
192         OBD_ALLOC(lmv, lmv_size);
193         if (!lmv)
194                 RETURN(-ENOMEM);
195
196         lmv->mea_master = cmm->cmm_local_num;
197         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
198         lmv->mea_count = cmm->cmm_tgt_count + 1;
199
200         lmv->mea_ids[0] = *lf;
201
202         rc = cmm_alloc_fid(ctx, cmm, &lmv->mea_ids[1], cmm->cmm_tgt_count);
203         if (rc)
204                 GOTO(cleanup, rc);
205
206         OBD_ALLOC_PTR(slave_lmv);
207         if (!slave_lmv)
208                 GOTO(cleanup, rc = -ENOMEM);
209
210         slave_lmv->mea_master = cmm->cmm_local_num;
211         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
212         slave_lmv->mea_count = 0;
213         for (i = 1; i < cmm->cmm_tgt_count + 1; i ++) {
214                 rc = cmm_creat_remote_obj(ctx, cmm, &lmv->mea_ids[i], ma, 
215                                           slave_lmv, sizeof(slave_lmv));
216                 if (rc)
217                         GOTO(cleanup, rc);
218         }
219
220         ma->ma_lmv_size = lmv_size;
221         ma->ma_lmv = lmv;
222 cleanup:
223         if (slave_lmv)
224                 OBD_FREE_PTR(slave_lmv);
225         RETURN(rc);
226 }
227
228 static int cmm_send_split_pages(const struct lu_context *ctx,
229                                 struct md_object *mo, struct lu_rdpg *rdpg,
230                                 struct lu_fid *fid, int len)
231 {
232         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
233         struct cmm_object *obj;
234         int rc = 0;
235         ENTRY;
236
237         obj = cmm_object_find(ctx, cmm, fid);
238         if (IS_ERR(obj))
239                 RETURN(PTR_ERR(obj));
240
241         rc = mdc_send_page(cmm, ctx, md_object_next(&obj->cmo_obj),
242                            rdpg->rp_pages[0], len);
243         cmm_object_put(ctx, obj);
244         RETURN(rc);
245 }
246
247 static int cmm_remove_entries(const struct lu_context *ctx,
248                               struct md_object *mo, struct lu_rdpg *rdpg,
249                               __u32 hash_end, __u32 *len)
250 {
251         struct lu_dirpage *dp;
252         struct lu_dirent  *ent;
253         int rc = 0, i;
254         ENTRY;
255
256         kmap(rdpg->rp_pages[0]);
257         dp = page_address(rdpg->rp_pages[0]);
258         for (ent = lu_dirent_start(dp); ent != NULL;
259                           ent = lu_dirent_next(ent)) {
260                 if (ent->lde_hash < hash_end) {
261                         if (strncmp(ent->lde_name, ".", ent->lde_namelen) &&
262                             strncmp(ent->lde_name, "..", ent->lde_namelen)) {
263                                 char *name;
264                                 /* FIXME: Here we allocate name for each name,
265                                  * maybe stupid, but can not find better way.
266                                  * will find better way */
267                                 OBD_ALLOC(name, ent->lde_namelen + 1);
268                                 memcpy(name, ent->lde_name, ent->lde_namelen);
269                                 rc = mdo_name_remove(ctx, md_object_next(mo),
270                                                      name);
271                                 OBD_FREE(name, ent->lde_namelen + 1);
272                         }
273                         if (rc) {
274                                 /* FIXME: Do not know why it return -ENOENT
275                                  * in some case 
276                                  * */
277                                 if (rc != -ENOENT)
278                                         GOTO(unmap, rc);
279                         }
280                 } else {
281                         if (ent != lu_dirent_start(dp))
282                                 *len = (int)((__u32)ent - (__u32)dp);
283                         else
284                                 *len = 0;
285                         GOTO(unmap, rc);
286                 }
287         }
288         *len = CFS_PAGE_SIZE;
289 unmap:
290         kunmap(rdpg->rp_pages[i]);
291         RETURN(rc);
292 }
293
294 static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo,
295                              struct lu_rdpg *rdpg, struct lu_fid *lf,
296                              __u32 end)
297 {
298         int rc, done = 0;
299         ENTRY;
300
301         LASSERTF(rdpg->rp_npages == 1, "Now Only support split 1 page each time"
302                         "npages %d \n", rdpg->rp_npages);
303         /* Read splitted page and send them to the slave master */
304         do {
305                 struct lu_dirpage *ldp;
306                 __u32  len = 0;
307
308                 /* init page with '0' */
309                 memset(kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
310                 kunmap(rdpg->rp_pages[0]);
311
312                 rc = mo_readpage(ctx, md_object_next(mo), rdpg);
313                 /* -E2BIG means it already reach the end of the dir */
314                 if (rc) { 
315                         if (rc == -E2BIG || rc == -ERANGE)
316                                 rc = 0;
317                         RETURN(rc);
318                 }
319                 
320                 /* Remove the old entries */
321                 rc = cmm_remove_entries(ctx, mo, rdpg, end, &len);
322                 if (rc)
323                         RETURN(rc);
324
325                 /* Send page to slave object */ 
326                 if (len > 0) {
327                         rc = cmm_send_split_pages(ctx, mo, rdpg, lf, len);
328                         if (rc) 
329                                 RETURN(rc);
330                 }
331                 
332                 kmap(rdpg->rp_pages[0]);
333                 ldp = page_address(rdpg->rp_pages[0]);
334                 if (ldp->ldp_hash_end >= end) {
335                         done = 1;
336                 }
337                 rdpg->rp_hash = ldp->ldp_hash_end;
338                 kunmap(rdpg->rp_pages[0]); 
339         } while (!done);
340
341         RETURN(rc);
342 }
343 #define SPLIT_PAGE_COUNT 1
344 static int cmm_scan_and_split(const struct lu_context *ctx,
345                               struct md_object *mo, struct md_attr *ma)
346 {
347         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
348         __u32 hash_segement;
349         struct lu_rdpg   *rdpg = NULL;
350         int rc = 0, i;
351
352         OBD_ALLOC_PTR(rdpg);
353         if (!rdpg)
354                 RETURN(-ENOMEM);
355
356         rdpg->rp_npages = SPLIT_PAGE_COUNT;
357         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
358
359         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
360         if (rdpg->rp_pages == NULL)
361                 GOTO(free_rdpg, rc = -ENOMEM);
362
363         for (i = 0; i < rdpg->rp_npages; i++) {
364                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
365                 if (rdpg->rp_pages[i] == NULL)
366                         GOTO(cleanup, rc = -ENOMEM);
367         }
368
369         hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
370         for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
371                 struct lu_fid *lf = &ma->ma_lmv->mea_ids[i];
372                 __u32 hash_end;
373
374                 rdpg->rp_hash = i * hash_segement;
375                 hash_end = rdpg->rp_hash + hash_segement;
376                 rc = cmm_split_entries(ctx, mo, rdpg, lf, hash_end);
377                 if (rc)
378                         GOTO(cleanup, rc);
379         }
380 cleanup:
381         for (i = 0; i < rdpg->rp_npages; i++)
382                 if (rdpg->rp_pages[i] != NULL)
383                         __free_pages(rdpg->rp_pages[i], 0);
384         if (rdpg->rp_pages)
385                 OBD_FREE(rdpg->rp_pages, rdpg->rp_npages *
386                                          sizeof rdpg->rp_pages[0]);
387 free_rdpg:
388         if (rdpg)
389                 OBD_FREE_PTR(rdpg);
390
391         RETURN(rc);
392 }
393
394 int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo)
395 {
396         struct md_attr *ma;
397         int rc = 0;
398         ENTRY;
399
400         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
401
402         OBD_ALLOC_PTR(ma);
403         if (ma == NULL)
404                 RETURN(-ENOMEM);
405
406         ma->ma_need = MA_INODE|MA_LMV;
407         rc = mo_attr_get(ctx, mo, ma);
408         if (rc)
409                 GOTO(cleanup, ma);
410
411         /* step1: checking whether the dir need to be splitted */
412         rc = cmm_expect_splitting(ctx, mo, ma);
413         if (rc != CMM_EXPECT_SPLIT)
414                 GOTO(cleanup, rc = 0);
415
416         /* step2: create slave objects */
417         rc = cmm_create_slave_objects(ctx, mo, ma);
418         if (rc)
419                 GOTO(cleanup, ma);
420
421         /* step3: scan and split the object */
422         rc = cmm_scan_and_split(ctx, mo, ma);
423         if (rc)
424                 GOTO(cleanup, ma);
425
426         /* step4: set mea to the master object */
427         rc = mo_xattr_set(ctx, md_object_next(mo), ma->ma_lmv, ma->ma_lmv_size,
428                           MDS_LMV_MD_NAME, 0);
429
430         if (rc == -ERESTART) 
431                 CWARN("Dir"DFID" has been split \n", 
432                                 PFID(lu_object_fid(&mo->mo_lu)));
433 cleanup:
434         if (ma->ma_lmv_size && ma->ma_lmv)
435                 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
436
437         OBD_FREE_PTR(ma);
438         RETURN(rc);
439 }