Whamcloud - gitweb
Branch: b_new_cmd
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/cmm/cmm_split.c
5  *  Lustre splitting dir
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Alex thomas <alex@clusterfs.com>
9  *           Wang Di     <wangdi@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29
30 #ifndef EXPORT_SYMTAB
31 # define EXPORT_SYMTAB
32 #endif
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd_class.h>
37 #include <lustre_fid.h>
38 #include <lustre_mds.h>
39 #include <lustre/lustre_idl.h>
40 #include "cmm_internal.h"
41 #include "mdc_internal.h"
42
43 #define CMM_NO_SPLIT_EXPECTED   0
44 #define CMM_EXPECT_SPLIT        1
45 #define CMM_NO_SPLITTABLE       2
46
47 enum {
48         SPLIT_SIZE =  64*1024
49 };
50
51 static inline struct lu_fid* cmm2_fid(struct cmm_object *obj)
52 {
53        return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
54 }
55
56 static int cmm_expect_splitting(const struct lu_env *env,
57                                 struct md_object *mo,
58                                 struct md_attr *ma)
59 {
60         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
61         struct lu_fid *fid = NULL;
62         int rc = CMM_EXPECT_SPLIT;
63         ENTRY;
64
65         if (cmm->cmm_tgt_count == 0)
66                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
67
68         if (ma->ma_attr.la_size < SPLIT_SIZE)
69                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
70
71         if (ma->ma_lmv_size)
72                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
73         OBD_ALLOC_PTR(fid);
74         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, fid);
75         if (rc)
76                 GOTO(cleanup, rc);
77
78         rc = CMM_EXPECT_SPLIT;
79
80         if (lu_fid_eq(fid, cmm2_fid(md2cmm_obj(mo))))
81                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
82
83 cleanup:
84         if (fid)
85                 OBD_FREE_PTR(fid);
86         RETURN(rc);
87 }
88
89 #define cmm_md_size(stripes)                            \
90        (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
91
92 static int cmm_alloc_fid(const struct lu_env *env, struct cmm_device *cmm,
93                          struct lu_fid *fid, int count)
94 {
95         struct  mdc_device *mc, *tmp;
96         int rc = 0, i = 0;
97
98         LASSERT(count == cmm->cmm_tgt_count);
99         /* FIXME: this spin_lock maybe not proper,
100          * because fid_alloc may need RPC */
101         spin_lock(&cmm->cmm_tgt_guard);
102         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
103                                  mc_linkage) {
104                 LASSERT(cmm->cmm_local_num != mc->mc_num);
105
106                 rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i], NULL);
107                 if (rc > 0) {
108                         struct lu_site *ls;
109
110                         ls = cmm->cmm_md_dev.md_lu_dev.ld_site;
111                         rc = fld_client_create(ls->ls_client_fld,
112                                                fid_seq(&fid[i]),
113                                                mc->mc_num, env);
114                 }
115                 if (rc < 0) {
116                         spin_unlock(&cmm->cmm_tgt_guard);
117                         RETURN(rc);
118                 }
119                 i++;
120         }
121         spin_unlock(&cmm->cmm_tgt_guard);
122         LASSERT(i == count);
123         if (rc == 1)
124                 rc = 0;
125         RETURN(rc);
126 }
127
128 struct cmm_object *cmm_object_find(const struct lu_env *env,
129                                    struct cmm_device *d,
130                                    const struct lu_fid *f,
131                                    struct lustre_capa *capa)
132 {
133         struct lu_object *o;
134         struct cmm_object *m;
135         ENTRY;
136
137         o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f,
138                            capa);
139         if (IS_ERR(o))
140                 m = (struct cmm_object *)o;
141         else
142                 m = lu2cmm_obj(lu_object_locate(o->lo_header,
143                                d->cmm_md_dev.md_lu_dev.ld_type));
144         RETURN(m);
145 }
146
147 static inline void cmm_object_put(const struct lu_env *env,
148                                   struct cmm_object *o)
149 {
150         lu_object_put(env, &o->cmo_obj.mo_lu);
151 }
152
153 static int cmm_creat_remote_obj(const struct lu_env *env,
154                                 struct cmm_device *cmm,
155                                 struct lu_fid *fid, struct md_attr *ma,
156                                 const struct lmv_stripe_md *lmv,
157                                 int lmv_size)
158 {
159         struct cmm_object *obj;
160         struct md_create_spec *spec;
161         int rc;
162         ENTRY;
163
164         /* XXX Since capablity will not work with split. so we
165          * pass NULL capablity here */
166         obj = cmm_object_find(env, cmm, fid, NULL);
167         if (IS_ERR(obj))
168                 RETURN(PTR_ERR(obj));
169
170         OBD_ALLOC_PTR(spec);
171
172         spec->u.sp_ea.fid = fid;
173         spec->u.sp_ea.eadata = lmv;
174         spec->u.sp_ea.eadatalen = lmv_size;
175         spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
176         rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
177                               spec, ma);
178         OBD_FREE_PTR(spec);
179
180         cmm_object_put(env, obj);
181         RETURN(rc);
182 }
183
184 static int cmm_create_slave_objects(const struct lu_env *env,
185                                     struct md_object *mo, struct md_attr *ma)
186 {
187         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
188         struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
189         int lmv_size, i, rc;
190         struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo));
191         ENTRY;
192
193         lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
194
195         /* This lmv will be free after finish splitting. */
196         OBD_ALLOC(lmv, lmv_size);
197         if (!lmv)
198                 RETURN(-ENOMEM);
199
200         lmv->mea_master = cmm->cmm_local_num;
201         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
202         lmv->mea_count = cmm->cmm_tgt_count + 1;
203
204         lmv->mea_ids[0] = *lf;
205
206         rc = cmm_alloc_fid(env, cmm, &lmv->mea_ids[1],
207                            cmm->cmm_tgt_count);
208         if (rc)
209                 GOTO(cleanup, rc);
210
211         OBD_ALLOC_PTR(slave_lmv);
212         if (!slave_lmv)
213                 GOTO(cleanup, rc = -ENOMEM);
214
215         slave_lmv->mea_master = cmm->cmm_local_num;
216         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
217         slave_lmv->mea_count = 0;
218         for (i = 1; i < cmm->cmm_tgt_count + 1; i ++) {
219                 rc = cmm_creat_remote_obj(env, cmm, &lmv->mea_ids[i], ma,
220                                           slave_lmv, sizeof(slave_lmv));
221                 if (rc)
222                         GOTO(cleanup, rc);
223         }
224
225         ma->ma_lmv_size = lmv_size;
226         ma->ma_lmv = lmv;
227 cleanup:
228         if (slave_lmv)
229                 OBD_FREE_PTR(slave_lmv);
230         RETURN(rc);
231 }
232
233 static int cmm_send_split_pages(const struct lu_env *env,
234                                 struct md_object *mo, struct lu_rdpg *rdpg,
235                                 struct lu_fid *fid, int len)
236 {
237         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
238         struct cmm_object *obj;
239         int rc = 0;
240         ENTRY;
241
242         obj = cmm_object_find(env, cmm, fid, NULL);
243         if (IS_ERR(obj))
244                 RETURN(PTR_ERR(obj));
245
246         rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
247                            rdpg->rp_pages[0], len);
248         cmm_object_put(env, obj);
249         RETURN(rc);
250 }
251
252 static int cmm_remove_dir_ent(const struct lu_env *env, struct md_object *mo,
253                               struct lu_dirent *ent)
254 {
255         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
256         struct cmm_object *obj;
257         char *name;
258         int is_dir, rc;
259         ENTRY;
260
261         if (!strncmp(ent->lde_name, ".", ent->lde_namelen) || 
262             !strncmp(ent->lde_name, "..", ent->lde_namelen))
263                 RETURN(0);
264
265         obj = cmm_object_find(env, cmm, &ent->lde_fid, NULL);
266         if (IS_ERR(obj))
267                 RETURN(PTR_ERR(obj));
268
269         is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
270         OBD_ALLOC(name, ent->lde_namelen + 1);
271         if (!name)
272                 GOTO(cleanup, rc = -ENOMEM);
273                 
274         memcpy(name, ent->lde_name, ent->lde_namelen);
275         rc = mdo_name_remove(env, md_object_next(mo),
276                              name, is_dir);
277         OBD_FREE(name, ent->lde_namelen + 1);
278         if (rc) 
279                 GOTO(cleanup, rc);
280         
281         /* Because this ent will be transferred to slave MDS and 
282          * insert it there, so in the slave MDS, we should know whether
283          * this object is dir or not, so use the highest bit of the hash
284          * to indicate that (because we do not use highest bit of hash)
285          */ 
286         if (is_dir)
287                 ent->lde_hash |= MAX_HASH_HIGHEST_BIT;
288 cleanup:
289         cmm_object_put(env, obj);
290         
291         RETURN(rc);
292 }
293
294 static int cmm_remove_entries(const struct lu_env *env,
295                               struct md_object *mo, struct lu_rdpg *rdpg,
296                               __u32 hash_end, __u32 *len)
297 {
298         struct lu_dirpage *dp;
299         struct lu_dirent  *ent;
300         int rc = 0;
301         ENTRY;
302
303         kmap(rdpg->rp_pages[0]);
304         dp = page_address(rdpg->rp_pages[0]);
305         for (ent = lu_dirent_start(dp); ent != NULL;
306                           ent = lu_dirent_next(ent)) {
307                 if (ent->lde_hash < hash_end) {
308                         rc = cmm_remove_dir_ent(env, mo, ent);  
309                         if (rc) { 
310                                 CERROR("Can not del %s rc %d\n", ent->lde_name, 
311                                                                  rc);
312                                 GOTO(unmap, rc);
313                         }
314                 } else {
315                         if (ent != lu_dirent_start(dp))
316                                 *len = (int)((__u32)ent - (__u32)dp);
317                         else
318                                 *len = 0;
319                         GOTO(unmap, rc);
320                 }
321         }
322         *len = CFS_PAGE_SIZE;
323 unmap:
324         kunmap(rdpg->rp_pages[0]);
325         RETURN(rc);
326 }
327
328 static int cmm_split_entries(const struct lu_env *env,
329                              struct md_object *mo, struct lu_rdpg *rdpg,
330                              struct lu_fid *lf, __u32 end)
331 {
332         int rc, done = 0;
333         ENTRY;
334
335         LASSERTF(rdpg->rp_npages == 1, "Now Only support split 1 page each time"
336                         "npages %d \n", rdpg->rp_npages);
337         /* Read splitted page and send them to the slave master */
338         do {
339                 struct lu_dirpage *ldp;
340                 __u32  len = 0;
341
342                 /* init page with '0' */
343                 memset(kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
344                 kunmap(rdpg->rp_pages[0]);
345
346                 rc = mo_readpage(env, md_object_next(mo), rdpg);
347                 /* -E2BIG means it already reach the end of the dir */
348                 if (rc) {
349                         if (rc != -ERANGE) {
350                                 if (rc == -E2BIG)
351                                         rc = 0;
352                                 RETURN(rc);
353                         }
354                 }
355
356                 /* Remove the old entries */
357                 rc = cmm_remove_entries(env, mo, rdpg, end, &len);
358                 if (rc)
359                         RETURN(rc);
360
361                 /* Send page to slave object */
362                 if (len > 0) {
363                         rc = cmm_send_split_pages(env, mo, rdpg, lf, len);
364                         if (rc)
365                                 RETURN(rc);
366                 }
367
368                 kmap(rdpg->rp_pages[0]);
369                 ldp = page_address(rdpg->rp_pages[0]);
370                 if (ldp->ldp_hash_end >= end) {
371                         done = 1;
372                 }
373                 rdpg->rp_hash = ldp->ldp_hash_end;
374                 kunmap(rdpg->rp_pages[0]);
375         } while (!done);
376
377         RETURN(rc);
378 }
379 #define SPLIT_PAGE_COUNT 1
380 static int cmm_scan_and_split(const struct lu_env *env,
381                               struct md_object *mo, struct md_attr *ma)
382 {
383         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
384         __u32 hash_segement;
385         struct lu_rdpg   *rdpg = NULL;
386         int rc = 0, i;
387
388         OBD_ALLOC_PTR(rdpg);
389         if (!rdpg)
390                 RETURN(-ENOMEM);
391
392         rdpg->rp_npages = SPLIT_PAGE_COUNT;
393         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
394
395         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
396         if (rdpg->rp_pages == NULL)
397                 GOTO(free_rdpg, rc = -ENOMEM);
398
399         for (i = 0; i < rdpg->rp_npages; i++) {
400                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
401                 if (rdpg->rp_pages[i] == NULL)
402                         GOTO(cleanup, rc = -ENOMEM);
403         }
404
405         hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
406         for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
407                 struct lu_fid *lf = &ma->ma_lmv->mea_ids[i];
408                 __u32 hash_end;
409
410                 rdpg->rp_hash = i * hash_segement;
411                 hash_end = rdpg->rp_hash + hash_segement;
412                 rc = cmm_split_entries(env, mo, rdpg, lf, hash_end);
413                 if (rc)
414                         GOTO(cleanup, rc);
415         }
416 cleanup:
417         for (i = 0; i < rdpg->rp_npages; i++)
418                 if (rdpg->rp_pages[i] != NULL)
419                         __free_pages(rdpg->rp_pages[i], 0);
420         if (rdpg->rp_pages)
421                 OBD_FREE(rdpg->rp_pages, rdpg->rp_npages *
422                                          sizeof rdpg->rp_pages[0]);
423 free_rdpg:
424         if (rdpg)
425                 OBD_FREE_PTR(rdpg);
426
427         RETURN(rc);
428 }
429
430 static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
431                                   ssize_t len)
432 {
433         struct lu_buf *buf;
434         
435         buf = &cmm_env_info(env)->cmi_buf;
436         buf->lb_buf = area;
437         buf->lb_len = len;
438         return buf;
439 }
440
441 int cml_try_to_split(const struct lu_env *env, struct md_object *mo)
442 {
443         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
444         struct md_attr *ma;
445         struct lu_buf *buf;
446         int rc = 0;
447         ENTRY;
448
449         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
450
451         OBD_ALLOC_PTR(ma);
452         if (ma == NULL)
453                 RETURN(-ENOMEM);
454
455         ma->ma_need = MA_INODE|MA_LMV;
456         rc = mo_attr_get(env, mo, ma);
457         if (rc)
458                 GOTO(cleanup, ma);
459
460         /* step1: checking whether the dir need to be splitted */
461         rc = cmm_expect_splitting(env, mo, ma);
462         if (rc != CMM_EXPECT_SPLIT)
463                 GOTO(cleanup, rc = 0);
464
465         /* Disable trans for splitting, since there will be
466          * so many trans in this one ops, confilct with current
467          * recovery design */
468         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
469         if (rc)
470                 GOTO(cleanup, rc = 0);
471
472         /* step2: create slave objects */
473         rc = cmm_create_slave_objects(env, mo, ma);
474         if (rc)
475                 GOTO(cleanup, ma);
476
477         /* step3: scan and split the object */
478         rc = cmm_scan_and_split(env, mo, ma);
479         if (rc)
480                 GOTO(cleanup, ma);
481
482         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
483         /* step4: set mea to the master object */
484         rc = mo_xattr_set(env, md_object_next(mo), buf, MDS_LMV_MD_NAME, 0);
485         if (rc == -ERESTART)
486                 CWARN("Dir"DFID" has been split \n",
487                                 PFID(lu_object_fid(&mo->mo_lu)));
488 cleanup:
489         if (ma->ma_lmv_size && ma->ma_lmv)
490                 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
491
492         OBD_FREE_PTR(ma);
493         RETURN(rc);
494 }