1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
9 * This file is part of Lustre, http://www.lustre.org.
11 * Lustre is free software; you can redistribute it and/or
12 * modify it under the terms of version 2 of the GNU General Public
13 * License as published by the Free Software Foundation.
15 * Lustre is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
20 * You should have received a copy of the GNU General Public License
21 * along with Lustre; if not, write to the Free Software
22 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26 # define EXPORT_SYMTAB
28 #define DEBUG_SUBSYSTEM S_MDS
30 #include <linux/module.h>
31 #include <linux/lustre_mds.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_fsfilt.h>
38 #include "mds_internal.h"
43 * - magic in mea struct
44 * - error handling is totally missed
47 int mds_lmv_connect(struct obd_device *obd, char * lmv_name)
49 struct mds_obd *mds = &obd->u.mds;
50 struct lustre_handle conn = {0,};
55 if (IS_ERR(mds->mds_lmv_obd))
56 RETURN(PTR_ERR(mds->mds_lmv_obd));
61 mds->mds_lmv_obd = class_name2obd(lmv_name);
62 if (!mds->mds_lmv_obd) {
63 CERROR("MDS cannot locate LMV %s\n",
65 mds->mds_lmv_obd = ERR_PTR(-ENOTCONN);
69 rc = obd_connect(&conn, mds->mds_lmv_obd, &obd->obd_uuid);
71 CERROR("MDS cannot connect to LMV %s (%d)\n",
73 mds->mds_lmv_obd = ERR_PTR(rc);
76 mds->mds_lmv_exp = class_conn2export(&conn);
77 if (mds->mds_lmv_exp == NULL)
78 CERROR("can't get export!\n");
80 rc = obd_register_observer(mds->mds_lmv_obd, obd);
82 CERROR("MDS cannot register as observer of LMV %s (%d)\n",
87 /* retrieve size of EA */
88 rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsize"), "mdsize",
92 if (mdsize > mds->mds_max_mdsize)
93 mds->mds_max_mdsize = mdsize;
95 /* find our number in LMV cluster */
96 rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsnum"), "mdsnum",
100 mds->mds_num = mdsize;
108 /* FIXME: cleanups here! */
109 obd_disconnect(mds->mds_lmv_exp, 0);
110 mds->mds_lmv_exp = NULL;
111 mds->mds_lmv_obd = ERR_PTR(rc);
115 int mds_lmv_postsetup(struct obd_device *obd)
117 struct mds_obd *mds = &obd->u.mds;
119 if (mds->mds_lmv_exp)
120 obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize, 0);
124 int mds_lmv_disconnect(struct obd_device *obd, int flags)
126 struct mds_obd *mds = &obd->u.mds;
130 if (!IS_ERR(mds->mds_lmv_obd) && mds->mds_lmv_exp != NULL) {
132 obd_register_observer(mds->mds_lmv_obd, NULL);
134 rc = obd_disconnect(mds->mds_lmv_exp, flags);
135 /* if obd_disconnect fails (probably because the
136 * export was disconnected by class_disconnect_exports)
137 * then we just need to drop our ref. */
139 class_export_put(mds->mds_lmv_exp);
140 mds->mds_lmv_exp = NULL;
141 mds->mds_lmv_obd = NULL;
148 int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode,
149 struct mea **mea, int *mea_size)
151 struct mds_obd *mds = &obd->u.mds;
155 if (!mds->mds_lmv_obd)
158 /* first calculate mea size */
159 *mea_size = obd_alloc_diskmd(mds->mds_lmv_exp,
160 (struct lov_mds_md **) mea);
161 /* FIXME: error handling here */
162 LASSERT(*mea != NULL);
165 rc = fsfilt_get_md(obd, inode, *mea, *mea_size);
167 /* FIXME: error handling here */
169 OBD_FREE(*mea, *mea_size);
187 #define DIR_ROUND (DIR_PAD - 1)
188 #define DIR_REC_LEN(name_len) (((name_len) + 12 + DIR_ROUND) & ~DIR_ROUND)
190 /* this struct holds dir entries for particular MDS to be flushed */
192 struct list_head list;
197 struct brw_page brwc;
200 struct dirsplit_control {
201 struct obd_device *obd;
203 struct dentry *dentry;
205 struct dir_cache *cache;
208 static int dc_new_page_to_cache(struct dir_cache * dirc)
212 if (!list_empty(&dirc->list) && dirc->free > sizeof(__u16)) {
213 /* current page became full, mark the end */
214 struct dir_entry *de = dirc->cur;
218 page = alloc_page(GFP_KERNEL);
221 list_add_tail(&page->list, &dirc->list);
222 dirc->cur = page_address(page);
223 dirc->free = PAGE_SIZE;
227 static int retrieve_generation_numbers(struct dirsplit_control *dc, void *buf)
229 struct dir_entry *de;
230 struct dentry *dentry;
233 end = buf + PAGE_SIZE;
234 de = (struct dir_entry *) buf;
235 while ((char *) de < end && de->namelen) {
236 LASSERT(de->namelen <= 255);
237 /* lookup an inode */
238 dentry = ll_lookup_one_len(de->name, dc->dentry, de->namelen);
239 if (IS_ERR(dentry)) {
240 CERROR("can't lookup '%*s'/%u in %lu: %d\n",
241 (int) de->namelen, de->name,
242 (unsigned) de->namelen,
243 (unsigned long) dc->dentry->d_inode->i_ino,
244 (int) PTR_ERR(dentry));
246 LASSERT(!IS_ERR(dentry));
247 LASSERT(dentry->d_inode != NULL);
248 de->generation = dentry->d_inode->i_generation;
250 de = (struct dir_entry *)
251 ((char *) de + DIR_REC_LEN(de->namelen));
256 static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum)
258 struct mds_obd *mds = &dc->obd->u.mds;
259 struct dir_cache *ca;
260 struct list_head *cur, *tmp;
262 ca = dc->cache + mdsnum;
264 if (ca->free > sizeof(__u16)) {
265 /* current page became full, mark the end */
266 struct dir_entry *de = ca->cur;
270 list_for_each_safe(cur, tmp, &ca->list) {
273 page = list_entry(cur, struct page, list);
274 LASSERT(page != NULL);
276 retrieve_generation_numbers(dc, page_address(page));
280 ca->brwc.count = PAGE_SIZE;
282 ca->oa.o_mds = mdsnum;
283 obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
284 (struct lov_stripe_md *) dc->mea,
287 list_del(&page->list);
293 static int filldir(void * __buf, const char * name, int namlen, loff_t offset,
294 ino_t ino, unsigned int d_type)
296 struct dirsplit_control *dc = __buf;
297 struct mds_obd *mds = &dc->obd->u.mds;
298 struct dir_cache *ca;
299 struct dir_entry *de;
304 if (name[0] == '.' && (namlen == 1 ||
305 (namlen == 2 && name[1] == '.'))) {
306 /* skip special entries */
311 newmds = mea_name2idx(dc->mea, (char *) name, namlen);
313 if (newmds == mds->mds_num) {
314 /* this entry remains on the current MDS, skip moving */
318 OBD_ALLOC(n, namlen + 1);
319 memcpy(n, name, namlen);
320 n[namlen] = (char) 0;
322 OBD_FREE(n, namlen + 1);
324 /* check for space in buffer for new entry */
325 ca = dc->cache + newmds;
326 if (DIR_REC_LEN(namlen) > ca->free) {
327 int err = dc_new_page_to_cache(ca);
331 /* insert found entry into buffer to be flushed later */
332 /* NOTE: we'll fill generations number later, because we
333 * it's stored in inode, thus we need to lookup an entry,
334 * but directory is locked for readdir(), so we delay this */
338 de->namelen = namlen;
339 memcpy(de->name, name, namlen);
340 ca->cur += DIR_REC_LEN(namlen);
341 ca->free -= DIR_REC_LEN(namlen);
347 int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
350 struct inode *dir = dentry->d_inode;
351 struct dirsplit_control dc;
356 nlen = strlen("__iopen__/") + 10 + 1;
357 OBD_ALLOC(file_name, nlen);
360 i = sprintf(file_name, "__iopen__/%u",
361 (unsigned) dentry->d_inode->i_ino);
363 file = filp_open(file_name, O_RDONLY, 0);
365 CERROR("can't open directory %s: %d\n",
366 file_name, (int) PTR_ERR(file));
367 OBD_FREE(file_name, nlen);
368 RETURN(PTR_ERR(file));
371 memset(&dc, 0, sizeof(dc));
376 OBD_ALLOC(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
377 LASSERT(dc.cache != NULL);
378 for (i = 0; i < mea->mea_count; i++) {
379 INIT_LIST_HEAD(&dc.cache[i].list);
380 dc.cache[i].free = 0;
381 dc.cache[i].cached = 0;
384 err = vfs_readdir(file, filldir, &dc);
388 for (i = 0; i < mea->mea_count; i++) {
389 if (dc.cache[i].cached)
390 flush_buffer_onto_mds(&dc, i);
393 OBD_FREE(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
394 OBD_FREE(file_name, nlen);
399 #define MAX_DIR_SIZE (32 * 1024)
402 * must not be called on already splitted directories
404 int mds_try_to_split_dir(struct obd_device *obd,
405 struct dentry *dentry, struct mea **mea, int nstripes)
407 ldlm_policy_data_t policy = { .l_inodebits = {MDS_INODELOCK_UPDATE}};
408 struct ldlm_res_id res_id = { .name = {0} };
409 struct inode *dir = dentry->d_inode;
410 struct mds_obd *mds = &obd->u.mds;
411 struct lustre_handle lockh;
412 struct mea *tmea = NULL;
413 struct obdo *oa = NULL;
420 if (!mds->mds_lmv_obd)
423 /* don't split root directory */
424 if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
428 if (dir->i_size < MAX_DIR_SIZE)
432 /* check is directory marked non-splittable */
436 CDEBUG(D_OTHER, "%s: split directory %lu/%lu (mea 0x%p)\n",
437 obd->obd_name, dir->i_ino,
438 (unsigned long) dir->i_generation, mea);
442 mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL);
444 /* FIXME: Actually we may only want to allocate enough space for
445 necessary amount of stripes, but on the other hand with this approach
446 of allocating maximal possible amount of MDS slots, it would be
447 easier to split the dir over more MDSes */
448 rc = obd_alloc_diskmd(mds->mds_lmv_exp, mea);
451 (*mea)->mea_count = nstripes;
453 /* convert lock on the dir in order tox
454 * invalidate client's attributes -bzzz */
455 res_id.name[0] = dir->i_ino;
456 res_id.name[1] = dir->i_generation;
457 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
458 LDLM_IBITS, &policy, LCK_PW, &flags,
459 mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
460 NULL, 0, NULL, &lockh);
461 if (rc != ELDLM_OK) {
462 CERROR("error: rc = %d\n", rc);
465 /* 1) create directory objects on slave MDS'es */
466 /* FIXME: should this be OBD method? */
468 /* FIXME: error handling here */
470 oa->o_id = dir->i_ino;
471 oa->o_generation = dir->i_generation;
472 obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
473 OBD_MD_FLMTIME | OBD_MD_FLCTIME |
474 OBD_MD_FLUID | OBD_MD_FLGID);
475 oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
476 oa->o_valid |= OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
477 oa->o_mode = dir->i_mode;
478 CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
479 obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
481 rc = obd_create(mds->mds_lmv_exp, oa,
482 (struct lov_stripe_md **) mea, NULL);
483 /* FIXME: error handling here */
485 CDEBUG(D_OTHER, "%d dirobjects created\n",
486 (int) (*mea)->mea_count);
488 /* 2) update dir attribute */
490 handle = fsfilt_start(obd, dir, FSFILT_OP_SETATTR, NULL);
491 LASSERT(!IS_ERR(handle));
492 rc = fsfilt_set_md(obd, dir, handle, *mea, mea_size);
494 fsfilt_commit(obd, dir, handle, 0);
499 ldlm_lock_decref(&lockh, LCK_PW);
501 /* 3) read through the dir and distribute it over objects */
502 scan_and_distribute(obd, dentry, *mea);
505 obd_free_diskmd(mds->mds_lmv_exp,
506 (struct lov_mds_md **) mea);
510 static int filter_start_page_write(struct inode *inode,
511 struct niobuf_local *lnb)
513 struct page *page = alloc_pages(GFP_HIGHUSER, 0);
515 CERROR("no memory for a temp page\n");
516 RETURN(lnb->rc = -ENOMEM);
518 POISON_PAGE(page, 0xf1);
519 page->index = lnb->offset >> PAGE_SHIFT;
525 struct dentry *filter_fid2dentry(struct obd_device *obd,
526 struct dentry *dir_dentry,
527 obd_gr group, obd_id id);
528 void f_dput(struct dentry *dentry);
530 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
531 int objcount, struct obd_ioobj *obj,
532 int niocount, struct niobuf_remote *nb,
533 struct niobuf_local *res,
534 struct obd_trans_info *oti)
536 struct mds_obd *mds = &exp->exp_obd->u.mds;
537 struct niobuf_remote *rnb;
538 struct niobuf_local *lnb = NULL;
539 int rc = 0, i, tot_bytes = 0;
540 unsigned long now = jiffies;
541 struct dentry *dentry;
544 LASSERT(objcount == 1);
545 LASSERT(obj->ioo_bufcnt > 0);
547 memset(res, 0, niocount * sizeof(*res));
549 fid.id = obj->ioo_id;
550 fid.generation = obj->ioo_gr;
551 dentry = mds_fid2dentry(mds, &fid, NULL);
552 LASSERT(!IS_ERR(dentry));
554 if (dentry->d_inode == NULL) {
555 CERROR("trying to BRW to non-existent file "LPU64"\n",
558 GOTO(cleanup, rc = -ENOENT);
561 if (time_after(jiffies, now + 15 * HZ))
562 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
564 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
567 for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
569 lnb->dentry = dentry;
570 lnb->offset = rnb->offset;
572 lnb->flags = rnb->flags;
574 rc = filter_start_page_write(dentry->d_inode, lnb);
576 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
577 LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
578 i, obj->ioo_bufcnt, dentry, rc);
580 __free_pages(lnb->page, 0);
584 tot_bytes += lnb->len;
587 if (time_after(jiffies, now + 15 * HZ))
588 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
590 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
598 int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
599 int objcount, struct obd_ioobj *obj, int niocount,
600 struct niobuf_local *res, struct obd_trans_info *oti,
603 struct obd_device *obd = exp->exp_obd;
604 struct niobuf_local *lnb;
605 struct inode *inode = NULL;
606 int rc = 0, i, cleanup_phase = 0, err, entries = 0;
609 LASSERT(objcount == 1);
610 LASSERT(current->journal_info == NULL);
613 inode = res->dentry->d_inode;
615 for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
617 struct dir_entry *de;
619 buf = kmap(lnb->page);
620 LASSERT(buf != NULL);
621 end = buf + lnb->len;
622 de = (struct dir_entry *) buf;
623 while ((char *) de < end && de->namelen) {
624 err = fsfilt_add_dir_entry(obd, res->dentry, de->name,
625 de->namelen, de->ino,
626 de->generation, de->mds);
627 /* FIXME: remove entries from the original dir */
628 #warning "removing entries from the original dir"
630 de = (struct dir_entry *)
631 ((char *) de + DIR_REC_LEN(de->namelen));
637 for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++)
638 __free_page(lnb->page);