1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
9 * This file is part of Lustre, http://www.lustre.org.
11 * Lustre is free software; you can redistribute it and/or
12 * modify it under the terms of version 2 of the GNU General Public
13 * License as published by the Free Software Foundation.
15 * Lustre is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
20 * You should have received a copy of the GNU General Public License
21 * along with Lustre; if not, write to the Free Software
22 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26 # define EXPORT_SYMTAB
28 #define DEBUG_SUBSYSTEM S_MDS
30 #include <linux/module.h>
31 #include <linux/lustre_mds.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_fsfilt.h>
38 #include "mds_internal.h"
43 * - magic in mea struct
44 * - error handling is totally missed
47 int mds_lmv_connect(struct obd_device *obd, char * lmv_name)
49 struct mds_obd *mds = &obd->u.mds;
50 struct lustre_handle conn = {0,};
55 if (IS_ERR(mds->mds_lmv_obd))
56 RETURN(PTR_ERR(mds->mds_lmv_obd));
61 mds->mds_lmv_obd = class_name2obd(lmv_name);
62 if (!mds->mds_lmv_obd) {
63 CERROR("MDS cannot locate LMV %s\n",
65 mds->mds_lmv_obd = ERR_PTR(-ENOTCONN);
69 rc = obd_connect(&conn, mds->mds_lmv_obd, &obd->obd_uuid);
71 CERROR("MDS cannot connect to LMV %s (%d)\n",
73 mds->mds_lmv_obd = ERR_PTR(rc);
76 mds->mds_lmv_exp = class_conn2export(&conn);
77 if (mds->mds_lmv_exp == NULL)
78 CERROR("can't get export!\n");
80 rc = obd_register_observer(mds->mds_lmv_obd, obd);
82 CERROR("MDS cannot register as observer of LMV %s (%d)\n",
87 /* retrieve size of EA */
88 rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsize"), "mdsize",
92 if (mdsize > mds->mds_max_mdsize)
93 mds->mds_max_mdsize = mdsize;
95 /* find our number in LMV cluster */
96 rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsnum"), "mdsnum",
100 mds->mds_num = mdsize;
108 /* FIXME: cleanups here! */
109 obd_disconnect(mds->mds_lmv_exp, 0);
110 mds->mds_lmv_exp = NULL;
111 mds->mds_lmv_obd = ERR_PTR(rc);
115 int mds_lmv_postsetup(struct obd_device *obd)
117 struct mds_obd *mds = &obd->u.mds;
119 if (mds->mds_lmv_exp)
120 obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize,
121 mds->mds_max_cookiesize);
125 int mds_lmv_disconnect(struct obd_device *obd, int flags)
127 struct mds_obd *mds = &obd->u.mds;
131 if (!IS_ERR(mds->mds_lmv_obd) && mds->mds_lmv_exp != NULL) {
133 obd_register_observer(mds->mds_lmv_obd, NULL);
135 rc = obd_disconnect(mds->mds_lmv_exp, flags);
136 /* if obd_disconnect fails (probably because the
137 * export was disconnected by class_disconnect_exports)
138 * then we just need to drop our ref. */
140 class_export_put(mds->mds_lmv_exp);
141 mds->mds_lmv_exp = NULL;
142 mds->mds_lmv_obd = NULL;
149 int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode,
150 struct mea **mea, int *mea_size)
152 struct mds_obd *mds = &obd->u.mds;
156 if (!mds->mds_lmv_obd)
159 /* first calculate mea size */
160 *mea_size = obd_alloc_diskmd(mds->mds_lmv_exp,
161 (struct lov_mds_md **) mea);
162 /* FIXME: error handling here */
163 LASSERT(*mea != NULL);
166 rc = fsfilt_get_md(obd, inode, *mea, *mea_size);
168 /* FIXME: error handling here */
170 OBD_FREE(*mea, *mea_size);
188 #define DIR_ROUND (DIR_PAD - 1)
189 #define DIR_REC_LEN(name_len) (((name_len) + 12 + DIR_ROUND) & ~DIR_ROUND)
191 /* this struct holds dir entries for particular MDS to be flushed */
193 struct list_head list;
198 struct brw_page brwc;
201 struct dirsplit_control {
202 struct obd_device *obd;
204 struct dentry *dentry;
206 struct dir_cache *cache;
209 static int dc_new_page_to_cache(struct dir_cache * dirc)
213 if (!list_empty(&dirc->list) && dirc->free > sizeof(__u16)) {
214 /* current page became full, mark the end */
215 struct dir_entry *de = dirc->cur;
219 page = alloc_page(GFP_KERNEL);
222 list_add_tail(&page->list, &dirc->list);
223 dirc->cur = page_address(page);
224 dirc->free = PAGE_SIZE;
228 static int retrieve_generation_numbers(struct dirsplit_control *dc, void *buf)
230 struct dir_entry *de;
231 struct dentry *dentry;
234 end = buf + PAGE_SIZE;
235 de = (struct dir_entry *) buf;
236 while ((char *) de < end && de->namelen) {
237 LASSERT(de->namelen <= 255);
238 /* lookup an inode */
239 dentry = ll_lookup_one_len(de->name, dc->dentry, de->namelen);
240 if (IS_ERR(dentry)) {
241 CERROR("can't lookup '%*s'/%u in %lu: %d\n",
242 (int) de->namelen, de->name,
243 (unsigned) de->namelen,
244 (unsigned long) dc->dentry->d_inode->i_ino,
245 (int) PTR_ERR(dentry));
247 LASSERT(!IS_ERR(dentry));
248 LASSERT(dentry->d_inode != NULL);
249 de->generation = dentry->d_inode->i_generation;
251 de = (struct dir_entry *)
252 ((char *) de + DIR_REC_LEN(de->namelen));
257 static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum)
259 struct mds_obd *mds = &dc->obd->u.mds;
260 struct dir_cache *ca;
261 struct list_head *cur, *tmp;
263 ca = dc->cache + mdsnum;
265 if (ca->free > sizeof(__u16)) {
266 /* current page became full, mark the end */
267 struct dir_entry *de = ca->cur;
271 list_for_each_safe(cur, tmp, &ca->list) {
274 page = list_entry(cur, struct page, list);
275 LASSERT(page != NULL);
277 retrieve_generation_numbers(dc, page_address(page));
281 ca->brwc.count = PAGE_SIZE;
283 ca->oa.o_mds = mdsnum;
284 obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
285 (struct lov_stripe_md *) dc->mea,
288 list_del(&page->list);
294 static int filldir(void * __buf, const char * name, int namlen, loff_t offset,
295 ino_t ino, unsigned int d_type)
297 struct dirsplit_control *dc = __buf;
298 struct mds_obd *mds = &dc->obd->u.mds;
299 struct dir_cache *ca;
300 struct dir_entry *de;
305 if (name[0] == '.' && (namlen == 1 ||
306 (namlen == 2 && name[1] == '.'))) {
307 /* skip special entries */
312 newmds = mea_name2idx(dc->mea, (char *) name, namlen);
314 if (newmds == mds->mds_num) {
315 /* this entry remains on the current MDS, skip moving */
319 OBD_ALLOC(n, namlen + 1);
320 memcpy(n, name, namlen);
321 n[namlen] = (char) 0;
323 OBD_FREE(n, namlen + 1);
325 /* check for space in buffer for new entry */
326 ca = dc->cache + newmds;
327 if (DIR_REC_LEN(namlen) > ca->free) {
328 int err = dc_new_page_to_cache(ca);
332 /* insert found entry into buffer to be flushed later */
333 /* NOTE: we'll fill generations number later, because we
334 * it's stored in inode, thus we need to lookup an entry,
335 * but directory is locked for readdir(), so we delay this */
339 de->namelen = namlen;
340 memcpy(de->name, name, namlen);
341 ca->cur += DIR_REC_LEN(namlen);
342 ca->free -= DIR_REC_LEN(namlen);
348 int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
351 struct inode *dir = dentry->d_inode;
352 struct dirsplit_control dc;
357 nlen = strlen("__iopen__/") + 10 + 1;
358 OBD_ALLOC(file_name, nlen);
361 i = sprintf(file_name, "__iopen__/%u",
362 (unsigned) dentry->d_inode->i_ino);
364 file = filp_open(file_name, O_RDONLY, 0);
366 CERROR("can't open directory %s: %d\n",
367 file_name, (int) PTR_ERR(file));
368 OBD_FREE(file_name, nlen);
369 RETURN(PTR_ERR(file));
372 memset(&dc, 0, sizeof(dc));
377 OBD_ALLOC(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
378 LASSERT(dc.cache != NULL);
379 for (i = 0; i < mea->mea_count; i++) {
380 INIT_LIST_HEAD(&dc.cache[i].list);
381 dc.cache[i].free = 0;
382 dc.cache[i].cached = 0;
385 err = vfs_readdir(file, filldir, &dc);
389 for (i = 0; i < mea->mea_count; i++) {
390 if (dc.cache[i].cached)
391 flush_buffer_onto_mds(&dc, i);
394 OBD_FREE(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
395 OBD_FREE(file_name, nlen);
400 #define MAX_DIR_SIZE (64 * 1024)
403 * must not be called on already splitted directories
405 int mds_try_to_split_dir(struct obd_device *obd,
406 struct dentry *dentry, struct mea **mea, int nstripes)
408 struct inode *dir = dentry->d_inode;
409 struct mds_obd *mds = &obd->u.mds;
410 struct mea *tmea = NULL;
411 struct obdo *oa = NULL;
412 int rc, mea_size = 0;
417 if (!mds->mds_lmv_obd)
420 /* don't split root directory */
421 if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
425 if (dir->i_size < MAX_DIR_SIZE)
429 /* check is directory marked non-splittable */
433 CDEBUG(D_OTHER, "%s: split directory %lu/%lu (mea 0x%p)\n",
434 obd->obd_name, dir->i_ino,
435 (unsigned long) dir->i_generation, mea);
439 mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL);
441 /* FIXME: Actually we may only want to allocate enough space for
442 necessary amount of stripes, but on the other hand with this approach
443 of allocating maximal possible amount of MDS slots, it would be
444 easier to split the dir over more MDSes */
445 rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *) mea);
448 (*mea)->mea_count = nstripes;
450 #warning "we have to take EX lock on a dir for splitting"
452 /* 1) create directory objects on slave MDS'es */
453 /* FIXME: should this be OBD method? */
455 /* FIXME: error handling here */
457 oa->o_id = dir->i_ino;
458 oa->o_generation = dir->i_generation;
459 obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
460 OBD_MD_FLMTIME | OBD_MD_FLCTIME |
461 OBD_MD_FLUID | OBD_MD_FLGID);
462 oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
463 oa->o_valid |= OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
464 oa->o_mode = dir->i_mode;
465 CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
466 obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
468 rc = obd_create(mds->mds_lmv_exp, oa,
469 (struct lov_stripe_md **) mea, NULL);
470 /* FIXME: error handling here */
472 CDEBUG(D_OTHER, "%d dirobjects created\n",
473 (int) (*mea)->mea_count);
475 /* 2) update dir attribute */
477 handle = fsfilt_start(obd, dir, FSFILT_OP_SETATTR, NULL);
478 LASSERT(!IS_ERR(handle));
479 rc = fsfilt_set_md(obd, dir, handle, *mea, mea_size);
481 fsfilt_commit(obd, dir, handle, 0);
486 /* 3) read through the dir and distribute it over objects */
487 scan_and_distribute(obd, dentry, *mea);
490 obd_free_diskmd(mds->mds_lmv_exp,
491 (struct lov_mds_md **) mea);
495 static int filter_start_page_write(struct inode *inode,
496 struct niobuf_local *lnb)
498 struct page *page = alloc_pages(GFP_HIGHUSER, 0);
500 CERROR("no memory for a temp page\n");
501 RETURN(lnb->rc = -ENOMEM);
503 POISON_PAGE(page, 0xf1);
504 page->index = lnb->offset >> PAGE_SHIFT;
510 struct dentry *filter_fid2dentry(struct obd_device *obd,
511 struct dentry *dir_dentry,
512 obd_gr group, obd_id id);
513 void f_dput(struct dentry *dentry);
515 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
516 int objcount, struct obd_ioobj *obj,
517 int niocount, struct niobuf_remote *nb,
518 struct niobuf_local *res,
519 struct obd_trans_info *oti)
521 struct mds_obd *mds = &exp->exp_obd->u.mds;
522 struct niobuf_remote *rnb;
523 struct niobuf_local *lnb = NULL;
524 int rc = 0, i, tot_bytes = 0;
525 unsigned long now = jiffies;
526 struct dentry *dentry;
529 LASSERT(objcount == 1);
530 LASSERT(obj->ioo_bufcnt > 0);
532 memset(res, 0, niocount * sizeof(*res));
534 fid.id = obj->ioo_id;
535 fid.generation = obj->ioo_gr;
536 dentry = mds_fid2dentry(mds, &fid, NULL);
537 LASSERT(!IS_ERR(dentry));
539 if (dentry->d_inode == NULL) {
540 CERROR("trying to BRW to non-existent file "LPU64"\n",
543 GOTO(cleanup, rc = -ENOENT);
546 if (time_after(jiffies, now + 15 * HZ))
547 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
549 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
552 for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
554 lnb->dentry = dentry;
555 lnb->offset = rnb->offset;
557 lnb->flags = rnb->flags;
559 rc = filter_start_page_write(dentry->d_inode, lnb);
561 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
562 LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
563 i, obj->ioo_bufcnt, dentry, rc);
565 __free_pages(lnb->page, 0);
569 tot_bytes += lnb->len;
572 if (time_after(jiffies, now + 15 * HZ))
573 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
575 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
583 int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
584 int objcount, struct obd_ioobj *obj, int niocount,
585 struct niobuf_local *res, struct obd_trans_info *oti,
588 struct obd_device *obd = exp->exp_obd;
589 struct niobuf_local *lnb;
590 struct inode *inode = NULL;
591 int rc = 0, i, cleanup_phase = 0, err, entries = 0;
594 LASSERT(objcount == 1);
595 LASSERT(current->journal_info == NULL);
598 inode = res->dentry->d_inode;
600 for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
602 struct dir_entry *de;
604 buf = kmap(lnb->page);
605 LASSERT(buf != NULL);
606 end = buf + lnb->len;
607 de = (struct dir_entry *) buf;
608 while ((char *) de < end && de->namelen) {
609 err = fsfilt_add_dir_entry(obd, res->dentry, de->name,
610 de->namelen, de->ino,
611 de->generation, de->mds);
612 /* FIXME: remove entries from the original dir */
613 #warning "removing entries from the original dir"
615 de = (struct dir_entry *)
616 ((char *) de + DIR_REC_LEN(de->namelen));
622 for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++)
623 __free_page(lnb->page);
629 int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len)
631 struct mds_obd *mds = &obd->u.mds;
632 struct lmv_obd *lmv = &mds->mds_lmv_exp->exp_obd->u.lmv;
635 i = raw_name2idx(lmv->count, name, len);