Whamcloud - gitweb
Branch: b_new_cmd
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/cmm/cmm_split.c
5  *  Lustre splitting dir
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Alex thomas <alex@clusterfs.com>
9  *           Wang Di     <wangdi@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29
30 #ifndef EXPORT_SYMTAB
31 # define EXPORT_SYMTAB
32 #endif
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd_class.h>
37 #include <lustre_fid.h>
38 #include <lustre_mds.h>
39 #include <lustre/lustre_idl.h>
40 #include "cmm_internal.h"
41 #include "mdc_internal.h"
42
43 static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
44                                   ssize_t len)
45 {
46         struct lu_buf *buf;
47
48         buf = &cmm_env_info(env)->cmi_buf;
49         buf->lb_buf = area;
50         buf->lb_len = len;
51         return buf;
52 }
53
54 int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
55                      const char *name)
56 {
57         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
58         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
59         int rc;
60         ENTRY;
61         
62         if (cmm->cmm_tgt_count == 0)
63                 RETURN(0);
64
65         /* Try to get the LMV EA size */
66         memset(ma, 0, sizeof(*ma));
67         ma->ma_need = MA_LMV;
68         rc = mo_attr_get(env, mp, ma);
69         if (rc)
70                 RETURN(rc);
71
72         if (ma->ma_valid & MA_LMV) {
73                 int stripe;
74
75                 OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
76                 if (ma->ma_lmv == NULL)
77                         RETURN(-ENOMEM);
78
79                 /* Get LMV EA */
80                 ma->ma_need = MA_LMV;
81                 rc = mo_attr_get(env, mp, ma);
82                 
83                 /* Skip checking the slave dirs (mea_count is 0) */
84                 if (rc == 0 && ma->ma_lmv->mea_count != 0) {
85                         /* 
86                          * Get stripe by name to check the name belongs to
87                          * master dir, otherwise return the -ERESTART
88                          */
89                         stripe = mea_name2idx(ma->ma_lmv, name, strlen(name));
90                 
91                         /* Master stripe is always 0 */
92                         if (stripe != 0)
93                                 rc = -ERESTART;
94                 }
95                 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
96         }
97         RETURN(rc);
98 }
99
100 int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
101                          struct md_attr *ma, int *split)
102 {
103         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
104         struct lu_fid root_fid;
105         int rc;
106         ENTRY;
107
108         /* 
109          * Check first most light things like tgt count and root fid. For some
110          * case this style should yeild better performance.
111          */
112         if (cmm->cmm_tgt_count == 0) {
113                 *split = CMM_NO_SPLIT_EXPECTED;
114                 RETURN(0);
115         }
116
117         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
118                                               &root_fid);
119         if (rc)
120                 RETURN(rc);
121
122         if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
123                 *split = CMM_NOT_SPLITTABLE;
124                 RETURN(0);
125         }
126
127         /* MA_INODE is needed to check inode size. */
128         ma->ma_need = MA_INODE | MA_LMV;
129         rc = mo_attr_get(env, mo, ma);
130         if (rc)
131                 RETURN(rc);
132         
133         if (ma->ma_valid & MA_LMV) {
134                 *split = CMM_NOT_SPLITTABLE;
135                 RETURN(0);
136         }
137                 
138         if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
139                 *split = CMM_NO_SPLIT_EXPECTED;
140                 RETURN(0);
141         }
142         
143         *split = CMM_EXPECT_SPLIT;
144         RETURN(0);
145 }
146
147 #define cmm_md_size(stripes) \
148        (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
149
150 struct cmm_object *cmm_object_find(const struct lu_env *env,
151                                    struct cmm_device *d,
152                                    const struct lu_fid *f)
153 {
154         struct lu_object *o;
155         struct cmm_object *m;
156         ENTRY;
157
158         o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f);
159         if (IS_ERR(o))
160                 m = (struct cmm_object *)o;
161         else
162                 m = lu2cmm_obj(lu_object_locate(o->lo_header,
163                                d->cmm_md_dev.md_lu_dev.ld_type));
164         RETURN(m);
165 }
166
167 static inline void cmm_object_put(const struct lu_env *env,
168                                   struct cmm_object *o)
169 {
170         lu_object_put(env, &o->cmo_obj.mo_lu);
171 }
172
173 static int cmm_object_create(const struct lu_env *env,
174                              struct cmm_device *cmm,
175                              struct lu_fid *fid,
176                              struct md_attr *ma,
177                              struct lmv_stripe_md *lmv,
178                              int lmv_size)
179 {
180         struct md_create_spec *spec;
181         struct cmm_object *obj;
182         int rc;
183         ENTRY;
184
185         obj = cmm_object_find(env, cmm, fid);
186         if (IS_ERR(obj))
187                 RETURN(PTR_ERR(obj));
188
189         OBD_ALLOC_PTR(spec);
190
191         spec->u.sp_ea.fid = fid;
192         spec->u.sp_ea.eadata = lmv;
193         spec->u.sp_ea.eadatalen = lmv_size;
194         spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
195         rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
196                               spec, ma);
197         OBD_FREE_PTR(spec);
198
199         cmm_object_put(env, obj);
200         RETURN(rc);
201 }
202
203 static int cmm_fid_alloc(const struct lu_env *env,
204                          struct cmm_device *cmm,
205                          struct mdc_device *mc,
206                          struct lu_fid *fid)
207 {
208         int rc;
209         ENTRY;
210
211         LASSERT(cmm != NULL);
212         LASSERT(mc != NULL);
213         LASSERT(fid != NULL);
214
215         down(&mc->mc_fid_sem);
216
217         /* Alloc new fid on @mc. */
218         rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
219         if (rc > 0) {
220                 /* Setup FLD for new sequenceif needed. */
221                 rc = fld_client_create(cmm->cmm_fld, fid_seq(fid),
222                                        mc->mc_num, env);
223                 if (rc)
224                         CERROR("Can't create fld entry, rc %d\n", rc);
225         }
226         up(&mc->mc_fid_sem);
227         
228         RETURN(rc);
229 }
230
231 static int cmm_slaves_create(const struct lu_env *env,
232                              struct md_object *mo,
233                              struct md_attr *ma)
234 {
235         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
236         struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
237         struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
238         struct mdc_device *mc, *tmp;
239         int lmv_size, i = 1, rc = 0;
240         ENTRY;
241
242         lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
243
244         /* This lmv will free after finish splitting. */
245         OBD_ALLOC(lmv, lmv_size);
246         if (!lmv)
247                 RETURN(-ENOMEM);
248
249         lmv->mea_master = cmm->cmm_local_num;
250         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
251         lmv->mea_count = cmm->cmm_tgt_count + 1;
252
253         /* Store master FID to local node idx number. */
254         lmv->mea_ids[0] = *lf;
255
256         OBD_ALLOC_PTR(slave_lmv);
257         if (!slave_lmv)
258                 GOTO(cleanup, rc = -ENOMEM);
259
260         slave_lmv->mea_master = cmm->cmm_local_num;
261         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
262         slave_lmv->mea_count = 0;
263
264         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
265                 /* Alloc fid for slave object. */
266                 rc = cmm_fid_alloc(env, cmm, mc, &lmv->mea_ids[i]);
267                 if (rc) {
268                         CERROR("Can't alloc fid for slave "LPU64", rc %d\n",
269                                mc->mc_num, rc);
270                         GOTO(cleanup, rc);
271                 }
272
273                 /* Create slave on remote MDT. */
274                 rc = cmm_object_create(env, cmm, &lmv->mea_ids[i], ma,
275                                        slave_lmv, sizeof(*slave_lmv));
276                 if (rc)
277                         GOTO(cleanup, rc);
278                 i++;
279         }
280
281         ma->ma_lmv_size = lmv_size;
282         ma->ma_lmv = lmv;
283         EXIT;
284 cleanup:
285         if (slave_lmv)
286                 OBD_FREE_PTR(slave_lmv);
287         if (rc && lmv) {
288                 OBD_FREE(lmv, lmv_size);
289                 ma->ma_lmv = NULL;
290                 ma->ma_lmv_size = 0;
291         }
292         return rc;
293 }
294
295 static int cmm_send_split_pages(const struct lu_env *env,
296                                 struct md_object *mo,
297                                 struct lu_rdpg *rdpg,
298                                 struct lu_fid *fid, int len)
299 {
300         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
301         struct cmm_object *obj;
302         int rc = 0;
303         ENTRY;
304
305         obj = cmm_object_find(env, cmm, fid);
306         if (IS_ERR(obj))
307                 RETURN(PTR_ERR(obj));
308
309         rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
310                            rdpg->rp_pages[0], len);
311         cmm_object_put(env, obj);
312         RETURN(rc);
313 }
314
315 static int cmm_remove_dir_ent(const struct lu_env *env,
316                               struct md_object *mo,
317                               struct lu_dirent *ent)
318 {
319         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
320         struct cmm_object *obj;
321         char *name;
322         int is_dir, rc;
323         ENTRY;
324
325         if (!strncmp(ent->lde_name, ".", ent->lde_namelen) ||
326             !strncmp(ent->lde_name, "..", ent->lde_namelen))
327                 RETURN(0);
328
329         obj = cmm_object_find(env, cmm, &ent->lde_fid);
330         if (IS_ERR(obj))
331                 RETURN(PTR_ERR(obj));
332
333         if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
334                 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
335         else
336                 /* XXX: is this correct? */
337                 is_dir = 1;
338
339         OBD_ALLOC(name, ent->lde_namelen + 1);
340         if (!name)
341                 GOTO(cleanup, rc = -ENOMEM);
342
343         memcpy(name, ent->lde_name, ent->lde_namelen);
344         rc = mdo_name_remove(env, md_object_next(mo),
345                              name, is_dir);
346         OBD_FREE(name, ent->lde_namelen + 1);
347         if (rc)
348                 GOTO(cleanup, rc);
349
350         /*
351          * This ent will be transferred to slave MDS and insert it there, so in
352          * the slave MDS, we should know whether this object is dir or not, so
353          * use the highest bit of the hash to indicate that (because we do not
354          * use highest bit of hash).
355          */
356         if (is_dir)
357                 ent->lde_hash |= MAX_HASH_HIGHEST_BIT;
358 cleanup:
359         cmm_object_put(env, obj);
360
361         RETURN(rc);
362 }
363
364 static int cmm_remove_entries(const struct lu_env *env,
365                               struct md_object *mo, struct lu_rdpg *rdpg,
366                               __u32 hash_end, __u32 *len)
367 {
368         struct lu_dirpage *dp;
369         struct lu_dirent  *ent;
370         int rc = 0;
371         ENTRY;
372
373         kmap(rdpg->rp_pages[0]);
374         dp = page_address(rdpg->rp_pages[0]);
375         for (ent = lu_dirent_start(dp); ent != NULL;
376              ent = lu_dirent_next(ent)) {
377                 if (ent->lde_hash < hash_end) {
378                         rc = cmm_remove_dir_ent(env, mo, ent);
379                         if (rc) {
380                                 CERROR("Can not del %s rc %d\n", ent->lde_name,
381                                                                  rc);
382                                 GOTO(unmap, rc);
383                         }
384                 } else {
385                         if (ent != lu_dirent_start(dp))
386                                 *len = (int)((__u32)ent - (__u32)dp);
387                         else
388                                 *len = 0;
389                         GOTO(unmap, rc);
390                 }
391         }
392         *len = CFS_PAGE_SIZE;
393         EXIT;
394 unmap:
395         kunmap(rdpg->rp_pages[0]);
396         return rc;
397 }
398
399 static int cmm_split_entries(const struct lu_env *env,
400                              struct md_object *mo, struct lu_rdpg *rdpg,
401                              struct lu_fid *lf, __u32 end)
402 {
403         int rc, done = 0;
404         ENTRY;
405
406         LASSERTF(rdpg->rp_npages == 1, "Now Only support split 1 page each time"
407                  "npages %d\n", rdpg->rp_npages);
408
409         /* Read split page and send them to the slave master. */
410         do {
411                 struct lu_dirpage *ldp;
412                 __u32  len = 0;
413
414                 /* init page with '0' */
415                 memset(kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
416                 kunmap(rdpg->rp_pages[0]);
417
418                 rc = mo_readpage(env, md_object_next(mo), rdpg);
419                 if (rc)
420                         RETURN(rc);
421
422                 /* Remove the old entries */
423                 rc = cmm_remove_entries(env, mo, rdpg, end, &len);
424                 if (rc)
425                         RETURN(rc);
426
427                 /* Send page to slave object */
428                 if (len > 0) {
429                         rc = cmm_send_split_pages(env, mo, rdpg, lf, len);
430                         if (rc)
431                                 RETURN(rc);
432                 }
433
434                 kmap(rdpg->rp_pages[0]);
435                 ldp = page_address(rdpg->rp_pages[0]);
436                 if (ldp->ldp_hash_end >= end) {
437                         done = 1;
438                 }
439                 rdpg->rp_hash = ldp->ldp_hash_end;
440                 kunmap(rdpg->rp_pages[0]);
441         } while (!done);
442
443         RETURN(rc);
444 }
445
446 #define SPLIT_PAGE_COUNT 1
447
448 static int cmm_scan_and_split(const struct lu_env *env,
449                               struct md_object *mo,
450                               struct md_attr *ma)
451 {
452         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
453         struct lu_rdpg *rdpg = NULL;
454         __u32 hash_segement;
455         int rc = 0, i;
456
457         OBD_ALLOC_PTR(rdpg);
458         if (!rdpg)
459                 RETURN(-ENOMEM);
460
461         rdpg->rp_npages = SPLIT_PAGE_COUNT;
462         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
463
464         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
465         if (rdpg->rp_pages == NULL)
466                 GOTO(free_rdpg, rc = -ENOMEM);
467
468         for (i = 0; i < rdpg->rp_npages; i++) {
469                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
470                 if (rdpg->rp_pages[i] == NULL)
471                         GOTO(cleanup, rc = -ENOMEM);
472         }
473
474         hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
475         for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
476                 struct lu_fid *lf;
477                 __u32 hash_end;
478
479                 lf = &ma->ma_lmv->mea_ids[i];
480
481                 rdpg->rp_hash = i * hash_segement;
482                 hash_end = rdpg->rp_hash + hash_segement;
483                 rc = cmm_split_entries(env, mo, rdpg, lf, hash_end);
484                 if (rc)
485                         GOTO(cleanup, rc);
486         }
487         EXIT;
488 cleanup:
489         for (i = 0; i < rdpg->rp_npages; i++)
490                 if (rdpg->rp_pages[i] != NULL)
491                         __free_pages(rdpg->rp_pages[i], 0);
492         if (rdpg->rp_pages)
493                 OBD_FREE(rdpg->rp_pages, rdpg->rp_npages *
494                          sizeof rdpg->rp_pages[0]);
495 free_rdpg:
496         if (rdpg)
497                 OBD_FREE_PTR(rdpg);
498
499         return rc;
500 }
501
502 int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
503 {
504         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
505         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
506         struct lu_buf *buf;
507         int rc = 0, split;
508         ENTRY;
509
510         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
511         memset(ma, 0, sizeof(*ma));
512
513         /* Step1: Checking whether the dir needs to be split. */
514         rc = cmm_expect_splitting(env, mo, ma, &split);
515         if (rc)
516                 GOTO(cleanup, rc);
517         
518         if (split != CMM_EXPECT_SPLIT)
519                 GOTO(cleanup, rc = 0);
520         
521         /*
522          * Disable trans for splitting, since there will be so many trans in
523          * this one ops, confilct with current recovery design.
524          */
525         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
526         if (rc)
527                 GOTO(cleanup, rc = 0);
528
529         /* Step2: Create slave objects (on slave MDTs) */
530         rc = cmm_slaves_create(env, mo, ma);
531         if (rc)
532                 GOTO(cleanup, ma);
533
534         /* Step3: Scan and split the object. */
535         rc = cmm_scan_and_split(env, mo, ma);
536         if (rc)
537                 GOTO(cleanup, ma);
538
539         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
540         
541         /* Step4: Set mea to the master object. */
542         rc = mo_xattr_set(env, md_object_next(mo), buf,
543                           MDS_LMV_MD_NAME, 0);
544         if (rc == 0) {
545                 CWARN("Dir "DFID" has been split\n",
546                       PFID(lu_object_fid(&mo->mo_lu)));
547                 rc = -ERESTART;
548         }
549         EXIT;
550 cleanup:
551         if (ma->ma_lmv_size && ma->ma_lmv)
552                 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
553         
554         return rc;
555 }
556