1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
9 * This file is part of Lustre, http://www.lustre.org.
11 * Lustre is free software; you can redistribute it and/or
12 * modify it under the terms of version 2 of the GNU General Public
13 * License as published by the Free Software Foundation.
15 * Lustre is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
20 * You should have received a copy of the GNU General Public License
21 * along with Lustre; if not, write to the Free Software
22 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26 # define EXPORT_SYMTAB
28 #define DEBUG_SUBSYSTEM S_MDS
30 #include <linux/module.h>
31 #include <linux/lustre_mds.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_fsfilt.h>
38 #include "mds_internal.h"
43 * - magic in mea struct
44 * - error handling is totally missed
47 int mds_lmv_connect(struct obd_device *obd, char * lmv_name)
49 struct mds_obd *mds = &obd->u.mds;
50 struct lustre_handle conn = {0,};
55 if (IS_ERR(mds->mds_lmv_obd))
56 RETURN(PTR_ERR(mds->mds_lmv_obd));
61 mds->mds_lmv_obd = class_name2obd(lmv_name);
62 if (!mds->mds_lmv_obd) {
63 CERROR("MDS cannot locate LMV %s\n",
65 mds->mds_lmv_obd = ERR_PTR(-ENOTCONN);
69 rc = obd_connect(&conn, mds->mds_lmv_obd, &obd->obd_uuid);
71 CERROR("MDS cannot connect to LMV %s (%d)\n",
73 mds->mds_lmv_obd = ERR_PTR(rc);
76 mds->mds_lmv_exp = class_conn2export(&conn);
77 if (mds->mds_lmv_exp == NULL)
78 CERROR("can't get export!\n");
80 rc = obd_register_observer(mds->mds_lmv_obd, obd);
82 CERROR("MDS cannot register as observer of LMV %s (%d)\n",
87 /* retrieve size of EA */
88 rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsize"), "mdsize",
92 if (mdsize > mds->mds_max_mdsize)
93 mds->mds_max_mdsize = mdsize;
95 /* find our number in LMV cluster */
96 rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsnum"), "mdsnum",
100 mds->mds_num = mdsize;
108 /* FIXME: cleanups here! */
109 obd_disconnect(mds->mds_lmv_exp, 0);
110 mds->mds_lmv_exp = NULL;
111 mds->mds_lmv_obd = ERR_PTR(rc);
115 int mds_lmv_postsetup(struct obd_device *obd)
117 struct mds_obd *mds = &obd->u.mds;
119 if (mds->mds_lmv_exp)
120 obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize,
121 mds->mds_max_cookiesize);
125 int mds_lmv_disconnect(struct obd_device *obd, int flags)
127 struct mds_obd *mds = &obd->u.mds;
131 if (!IS_ERR(mds->mds_lmv_obd) && mds->mds_lmv_exp != NULL) {
133 obd_register_observer(mds->mds_lmv_obd, NULL);
135 rc = obd_disconnect(mds->mds_lmv_exp, flags);
136 /* if obd_disconnect fails (probably because the
137 * export was disconnected by class_disconnect_exports)
138 * then we just need to drop our ref. */
140 class_export_put(mds->mds_lmv_exp);
141 mds->mds_lmv_exp = NULL;
142 mds->mds_lmv_obd = NULL;
149 int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode,
150 struct mea **mea, int *mea_size)
152 struct mds_obd *mds = &obd->u.mds;
156 if (!mds->mds_lmv_obd)
159 /* first calculate mea size */
160 *mea_size = obd_alloc_diskmd(mds->mds_lmv_exp,
161 (struct lov_mds_md **) mea);
162 /* FIXME: error handling here */
163 LASSERT(*mea != NULL);
166 rc = fsfilt_get_md(obd, inode, *mea, *mea_size);
168 /* FIXME: error handling here */
170 OBD_FREE(*mea, *mea_size);
187 #define DIR_ROUND (DIR_PAD - 1)
188 #define DIR_REC_LEN(name_len) (((name_len) + 12 + DIR_ROUND) & ~DIR_ROUND)
190 /* this struct holds dir entries for particular MDS to be flushed */
192 struct list_head list;
197 struct brw_page brwc;
200 struct dirsplit_control {
201 struct obd_device *obd;
203 struct dentry *dentry;
205 struct dir_cache *cache;
208 static int dc_new_page_to_cache(struct dir_cache * dirc)
212 if (!list_empty(&dirc->list) && dirc->free > sizeof(__u16)) {
213 /* current page became full, mark the end */
214 struct dir_entry *de = dirc->cur;
218 page = alloc_page(GFP_KERNEL);
221 list_add_tail(&page->list, &dirc->list);
222 dirc->cur = page_address(page);
223 dirc->free = PAGE_SIZE;
227 static int retrieve_generation_numbers(struct dirsplit_control *dc, void *buf)
229 struct mds_obd *mds = &dc->obd->u.mds;
230 struct dir_entry *de;
231 struct dentry *dentry;
234 end = buf + PAGE_SIZE;
235 de = (struct dir_entry *) buf;
236 while ((char *) de < end && de->namelen) {
237 /* lookup an inode */
238 LASSERT(de->namelen <= 255);
239 dentry = ll_lookup_one_len(de->name, dc->dentry, de->namelen);
240 if (IS_ERR(dentry)) {
241 CERROR("can't lookup %*s: %d\n", de->namelen,
242 de->name, (int) PTR_ERR(dentry));
245 if (dentry->d_inode != NULL) {
246 de->mds = mds->mds_num;
247 de->ino = dentry->d_inode->i_ino;
248 de->generation = dentry->d_inode->i_generation;
249 } else if (dentry->d_flags & DCACHE_CROSS_REF) {
250 de->mds = dentry->d_mdsnum;
251 de->ino = dentry->d_inum;
252 de->generation = dentry->d_generation;
254 CERROR("can't lookup %*s\n", de->namelen, de->name);
260 de = (struct dir_entry *)
261 ((char *) de + DIR_REC_LEN(de->namelen));
266 static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum)
268 struct mds_obd *mds = &dc->obd->u.mds;
269 struct dir_cache *ca;
270 struct list_head *cur, *tmp;
272 ca = dc->cache + mdsnum;
274 if (ca->free > sizeof(__u16)) {
275 /* current page became full, mark the end */
276 struct dir_entry *de = ca->cur;
280 list_for_each_safe(cur, tmp, &ca->list) {
283 page = list_entry(cur, struct page, list);
284 LASSERT(page != NULL);
286 retrieve_generation_numbers(dc, page_address(page));
290 ca->brwc.count = PAGE_SIZE;
292 ca->oa.o_mds = mdsnum;
293 obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
294 (struct lov_stripe_md *) dc->mea,
297 list_del(&page->list);
303 static int filldir(void * __buf, const char * name, int namlen, loff_t offset,
304 ino_t ino, unsigned int d_type)
306 struct dirsplit_control *dc = __buf;
307 struct mds_obd *mds = &dc->obd->u.mds;
308 struct dir_cache *ca;
309 struct dir_entry *de;
314 if (name[0] == '.' && (namlen == 1 ||
315 (namlen == 2 && name[1] == '.'))) {
316 /* skip special entries */
321 newmds = mea_name2idx(dc->mea, (char *) name, namlen);
323 if (newmds == mds->mds_num) {
324 /* this entry remains on the current MDS, skip moving */
328 OBD_ALLOC(n, namlen + 1);
329 memcpy(n, name, namlen);
330 n[namlen] = (char) 0;
332 OBD_FREE(n, namlen + 1);
334 /* check for space in buffer for new entry */
335 ca = dc->cache + newmds;
336 if (DIR_REC_LEN(namlen) > ca->free) {
337 int err = dc_new_page_to_cache(ca);
341 /* insert found entry into buffer to be flushed later */
342 /* NOTE: we'll fill generations number later, because we
343 * it's stored in inode, thus we need to lookup an entry,
344 * but directory is locked for readdir(), so we delay this */
348 de->namelen = namlen;
349 memcpy(de->name, name, namlen);
350 ca->cur += DIR_REC_LEN(namlen);
351 ca->free -= DIR_REC_LEN(namlen);
357 int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
360 struct inode *dir = dentry->d_inode;
361 struct dirsplit_control dc;
366 nlen = strlen("__iopen__/") + 10 + 1;
367 OBD_ALLOC(file_name, nlen);
370 i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino);
372 file = filp_open(file_name, O_RDONLY, 0);
374 CERROR("can't open directory %s: %d\n",
375 file_name, (int) PTR_ERR(file));
376 OBD_FREE(file_name, nlen);
377 RETURN(PTR_ERR(file));
380 memset(&dc, 0, sizeof(dc));
385 OBD_ALLOC(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
386 LASSERT(dc.cache != NULL);
387 for (i = 0; i < mea->mea_count; i++) {
388 INIT_LIST_HEAD(&dc.cache[i].list);
389 dc.cache[i].free = 0;
390 dc.cache[i].cached = 0;
393 err = vfs_readdir(file, filldir, &dc);
397 for (i = 0; i < mea->mea_count; i++) {
398 if (dc.cache[i].cached)
399 flush_buffer_onto_mds(&dc, i);
402 OBD_FREE(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
403 OBD_FREE(file_name, nlen);
408 #define MAX_DIR_SIZE (64 * 1024)
411 * must not be called on already splitted directories
413 int mds_try_to_split_dir(struct obd_device *obd,
414 struct dentry *dentry, struct mea **mea, int nstripes)
416 struct inode *dir = dentry->d_inode;
417 struct mds_obd *mds = &obd->u.mds;
418 struct mea *tmea = NULL;
419 struct obdo *oa = NULL;
420 int rc, mea_size = 0;
425 if (!mds->mds_lmv_obd)
428 /* don't split root directory */
429 if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
432 /* we want to split only large dirs. this may be already
433 * splitted dir or a slave dir created during splitting */
434 if (dir->i_size < MAX_DIR_SIZE)
437 /* check is directory marked non-splittable */
441 CDEBUG(D_OTHER, "%s: split directory %u/%lu/%lu\n",
442 obd->obd_name, mds->mds_num, dir->i_ino,
443 (unsigned long) dir->i_generation);
447 mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL);
449 /* FIXME: Actually we may only want to allocate enough space for
450 * necessary amount of stripes, but on the other hand with this
451 * approach of allocating maximal possible amount of MDS slots,
452 * it would be easier to split the dir over more MDSes */
453 rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *) mea);
456 (*mea)->mea_count = nstripes;
458 #warning "we have to take EX lock on a dir for splitting"
460 /* 1) create directory objects on slave MDS'es */
461 /* FIXME: should this be OBD method? */
463 /* FIXME: error handling here */
465 oa->o_id = dir->i_ino;
466 oa->o_generation = dir->i_generation;
467 obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
468 OBD_MD_FLMTIME | OBD_MD_FLCTIME |
469 OBD_MD_FLUID | OBD_MD_FLGID);
470 oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
471 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
472 oa->o_mode = dir->i_mode;
473 CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
474 obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
476 rc = obd_create(mds->mds_lmv_exp, oa,
477 (struct lov_stripe_md **) mea, NULL);
478 /* FIXME: error handling here */
480 CDEBUG(D_OTHER, "%d dirobjects created\n",
481 (int) (*mea)->mea_count);
483 /* 2) update dir attribute */
485 handle = fsfilt_start(obd, dir, FSFILT_OP_SETATTR, NULL);
486 LASSERT(!IS_ERR(handle));
487 rc = fsfilt_set_md(obd, dir, handle, *mea, mea_size);
489 fsfilt_commit(obd, dir, handle, 0);
494 /* 3) read through the dir and distribute it over objects */
495 scan_and_distribute(obd, dentry, *mea);
498 obd_free_diskmd(mds->mds_lmv_exp,
499 (struct lov_mds_md **) mea);
503 static int filter_start_page_write(struct inode *inode,
504 struct niobuf_local *lnb)
506 struct page *page = alloc_pages(GFP_HIGHUSER, 0);
508 CERROR("no memory for a temp page\n");
509 RETURN(lnb->rc = -ENOMEM);
511 POISON_PAGE(page, 0xf1);
512 page->index = lnb->offset >> PAGE_SHIFT;
518 struct dentry *filter_fid2dentry(struct obd_device *obd,
519 struct dentry *dir_dentry,
520 obd_gr group, obd_id id);
521 void f_dput(struct dentry *dentry);
523 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
524 int objcount, struct obd_ioobj *obj,
525 int niocount, struct niobuf_remote *nb,
526 struct niobuf_local *res,
527 struct obd_trans_info *oti)
529 struct mds_obd *mds = &exp->exp_obd->u.mds;
530 struct niobuf_remote *rnb;
531 struct niobuf_local *lnb = NULL;
532 int rc = 0, i, tot_bytes = 0;
533 unsigned long now = jiffies;
534 struct dentry *dentry;
537 LASSERT(objcount == 1);
538 LASSERT(obj->ioo_bufcnt > 0);
540 memset(res, 0, niocount * sizeof(*res));
542 fid.id = obj->ioo_id;
543 fid.generation = obj->ioo_gr;
544 dentry = mds_fid2dentry(mds, &fid, NULL);
545 LASSERT(!IS_ERR(dentry));
547 if (dentry->d_inode == NULL) {
548 CERROR("trying to BRW to non-existent file "LPU64"\n",
551 GOTO(cleanup, rc = -ENOENT);
554 if (time_after(jiffies, now + 15 * HZ))
555 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
557 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
560 for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
562 lnb->dentry = dentry;
563 lnb->offset = rnb->offset;
565 lnb->flags = rnb->flags;
567 rc = filter_start_page_write(dentry->d_inode, lnb);
569 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
570 LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
571 i, obj->ioo_bufcnt, dentry, rc);
573 __free_pages(lnb->page, 0);
577 tot_bytes += lnb->len;
580 if (time_after(jiffies, now + 15 * HZ))
581 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
583 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
591 int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
592 int objcount, struct obd_ioobj *obj, int niocount,
593 struct niobuf_local *res, struct obd_trans_info *oti,
596 struct obd_device *obd = exp->exp_obd;
597 struct niobuf_local *lnb;
598 struct inode *inode = NULL;
599 int rc = 0, i, cleanup_phase = 0, err, entries = 0;
602 LASSERT(objcount == 1);
603 LASSERT(current->journal_info == NULL);
606 inode = res->dentry->d_inode;
608 for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
610 struct dir_entry *de;
612 buf = kmap(lnb->page);
613 LASSERT(buf != NULL);
614 end = buf + lnb->len;
615 de = (struct dir_entry *) buf;
616 while ((char *) de < end && de->namelen) {
617 err = fsfilt_add_dir_entry(obd, res->dentry, de->name,
618 de->namelen, de->ino,
619 de->generation, de->mds);
620 /* FIXME: remove entries from the original dir */
621 #warning "removing entries from the original dir"
623 de = (struct dir_entry *)
624 ((char *) de + DIR_REC_LEN(de->namelen));
630 for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++)
631 __free_page(lnb->page);
637 int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len)
639 struct mds_obd *mds = &obd->u.mds;
640 struct lmv_obd *lmv = &mds->mds_lmv_exp->exp_obd->u.lmv;
643 i = raw_name2idx(lmv->count, name, len);