Whamcloud - gitweb
- raw_name2idx declaration to avoid warnings
[fs/lustre-release.git] / lustre / mds / mds_lmv.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDS
29
30 #include <linux/module.h>
31 #include <linux/lustre_mds.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_fsfilt.h>
37
38 #include "mds_internal.h"
39
40
41 /*
42  * TODO:
43  *   - magic in mea struct
44  *   - error handling is totally missed
45  */
46
47 int mds_lmv_connect(struct obd_device *obd, char * lmv_name)
48 {
49         struct mds_obd *mds = &obd->u.mds;
50         struct lustre_handle conn = {0,};
51         int valsize, mdsize;
52         int rc;
53         ENTRY;
54
55         if (IS_ERR(mds->mds_lmv_obd))
56                 RETURN(PTR_ERR(mds->mds_lmv_obd));
57
58         if (mds->mds_lmv_obd)
59                 RETURN(0);
60
61         mds->mds_lmv_obd = class_name2obd(lmv_name);
62         if (!mds->mds_lmv_obd) {
63                 CERROR("MDS cannot locate LMV %s\n",
64                        lmv_name);
65                 mds->mds_lmv_obd = ERR_PTR(-ENOTCONN);
66                 RETURN(-ENOTCONN);
67         }
68
69         rc = obd_connect(&conn, mds->mds_lmv_obd, &obd->obd_uuid);
70         if (rc) {
71                 CERROR("MDS cannot connect to LMV %s (%d)\n",
72                        lmv_name, rc);
73                 mds->mds_lmv_obd = ERR_PTR(rc);
74                 RETURN(rc);
75         }
76         mds->mds_lmv_exp = class_conn2export(&conn);
77         if (mds->mds_lmv_exp == NULL)
78                 CERROR("can't get export!\n");
79
80         rc = obd_register_observer(mds->mds_lmv_obd, obd);
81         if (rc) {
82                 CERROR("MDS cannot register as observer of LMV %s (%d)\n",
83                        lmv_name, rc);
84                 GOTO(err_discon, rc);
85         }
86
87         /* retrieve size of EA */
88         rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsize"), "mdsize", 
89                           &valsize, &mdsize);
90         if (rc) 
91                 GOTO(err_reg, rc);
92         if (mdsize > mds->mds_max_mdsize)
93                 mds->mds_max_mdsize = mdsize;
94
95         /* find our number in LMV cluster */
96         rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsnum"), "mdsnum", 
97                           &valsize, &mdsize);
98         if (rc) 
99                 GOTO(err_reg, rc);
100         mds->mds_num = mdsize;
101
102         RETURN(0);
103
104 err_reg:
105         RETURN(rc);
106
107 err_discon:
108         /* FIXME: cleanups here! */
109         obd_disconnect(mds->mds_lmv_exp, 0);
110         mds->mds_lmv_exp = NULL;
111         mds->mds_lmv_obd = ERR_PTR(rc);
112         RETURN(rc);
113 }
114
115 int mds_lmv_postsetup(struct obd_device *obd)
116 {
117         struct mds_obd *mds = &obd->u.mds;
118         ENTRY;
119         if (mds->mds_lmv_exp)
120                 obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize,
121                                  mds->mds_max_cookiesize);
122         RETURN(0);
123 }
124
125 int mds_lmv_disconnect(struct obd_device *obd, int flags)
126 {
127         struct mds_obd *mds = &obd->u.mds;
128         int rc = 0;
129         ENTRY;
130
131         if (!IS_ERR(mds->mds_lmv_obd) && mds->mds_lmv_exp != NULL) {
132
133                 obd_register_observer(mds->mds_lmv_obd, NULL);
134
135                 rc = obd_disconnect(mds->mds_lmv_exp, flags);
136                 /* if obd_disconnect fails (probably because the
137                  * export was disconnected by class_disconnect_exports)
138                  * then we just need to drop our ref. */
139                 if (rc != 0)
140                         class_export_put(mds->mds_lmv_exp);
141                 mds->mds_lmv_exp = NULL;
142                 mds->mds_lmv_obd = NULL;
143         }
144
145         RETURN(rc);
146 }
147
148
149 int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode,
150                         struct mea **mea, int *mea_size)
151 {
152         struct mds_obd *mds = &obd->u.mds;
153         int rc;
154         ENTRY;
155
156         if (!mds->mds_lmv_obd)
157                 RETURN(0);
158
159         /* first calculate mea size */
160         *mea_size = obd_alloc_diskmd(mds->mds_lmv_exp,
161                                      (struct lov_mds_md **) mea);
162         /* FIXME: error handling here */
163         LASSERT(*mea != NULL);
164
165         down(&inode->i_sem);
166         rc = fsfilt_get_md(obd, inode, *mea, *mea_size);
167         up(&inode->i_sem);
168         /* FIXME: error handling here */
169         if (rc <= 0) {
170                 OBD_FREE(*mea, *mea_size);
171                 *mea = NULL;
172                 *mea_size = 0;
173         }
174         if (rc > 0)
175                 rc = 0;
176         RETURN(rc);
177 }
178
179 struct dir_entry {
180         __u16   namelen;
181         __u16   mds;
182         __u32   ino;
183         __u32   generation;
184         char    name[0];
185 };
186
187 #define DIR_PAD                 4
188 #define DIR_ROUND               (DIR_PAD - 1)
189 #define DIR_REC_LEN(name_len)   (((name_len) + 12 + DIR_ROUND) & ~DIR_ROUND)
190
191 /* this struct holds dir entries for particular MDS to be flushed */
192 struct dir_cache {
193         struct list_head list;
194         void *cur;
195         int free;
196         int cached;
197         struct obdo oa;
198         struct brw_page brwc;
199 };
200
201 struct dirsplit_control {
202         struct obd_device *obd;
203         struct inode *dir;
204         struct dentry *dentry;
205         struct mea *mea;
206         struct dir_cache *cache;
207 };
208
209 static int dc_new_page_to_cache(struct dir_cache * dirc)
210 {
211         struct page *page;
212
213         if (!list_empty(&dirc->list) && dirc->free > sizeof(__u16)) {
214                 /* current page became full, mark the end */
215                 struct dir_entry *de = dirc->cur;
216                 de->namelen = 0;
217         }
218
219         page = alloc_page(GFP_KERNEL);
220         if (page == NULL)
221                 return -ENOMEM;
222         list_add_tail(&page->list, &dirc->list);
223         dirc->cur = page_address(page);
224         dirc->free = PAGE_SIZE;
225         return 0;
226 }
227
228 static int retrieve_generation_numbers(struct dirsplit_control *dc, void *buf)
229 {
230         struct dir_entry *de;
231         struct dentry *dentry;
232         char * end;
233         
234         end = buf + PAGE_SIZE;
235         de = (struct dir_entry *) buf;
236         while ((char *) de < end && de->namelen) {
237                 LASSERT(de->namelen <= 255);
238                 /* lookup an inode */
239                 dentry = ll_lookup_one_len(de->name, dc->dentry, de->namelen);
240                 if (IS_ERR(dentry)) {
241                         CERROR("can't lookup '%*s'/%u in %lu: %d\n",
242                                 (int) de->namelen, de->name,
243                                 (unsigned) de->namelen,
244                                 (unsigned long) dc->dentry->d_inode->i_ino,
245                                 (int) PTR_ERR(dentry));
246                 }
247                 LASSERT(!IS_ERR(dentry));
248                 LASSERT(dentry->d_inode != NULL);
249                 de->generation = dentry->d_inode->i_generation;
250                 l_dput(dentry);
251                 de = (struct dir_entry *)
252                         ((char *) de + DIR_REC_LEN(de->namelen));
253         }
254         return 0;
255 }
256
257 static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum)
258 {
259         struct mds_obd *mds = &dc->obd->u.mds;
260         struct dir_cache *ca;
261         struct list_head *cur, *tmp;
262         ENTRY; 
263         ca = dc->cache + mdsnum;
264
265         if (ca->free > sizeof(__u16)) {
266                 /* current page became full, mark the end */
267                 struct dir_entry *de = ca->cur;
268                 de->namelen = 0;
269         }
270
271         list_for_each_safe(cur, tmp, &ca->list) {
272                 struct page *page;
273
274                 page = list_entry(cur, struct page, list);
275                 LASSERT(page != NULL);
276
277                 retrieve_generation_numbers(dc, page_address(page));
278
279                 ca->brwc.pg = page;
280                 ca->brwc.off = 0;
281                 ca->brwc.count = PAGE_SIZE;
282                 ca->brwc.flag = 0;
283                 ca->oa.o_mds = mdsnum;
284                 obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
285                                 (struct lov_stripe_md *) dc->mea,
286                                 1, &ca->brwc, NULL);
287
288                 list_del(&page->list);
289                 __free_page(page);
290         }
291         RETURN(0);
292 }
293
294 static int filldir(void * __buf, const char * name, int namlen, loff_t offset,
295                    ino_t ino, unsigned int d_type)
296 {
297         struct dirsplit_control *dc = __buf;
298         struct mds_obd *mds = &dc->obd->u.mds;
299         struct dir_cache *ca;
300         struct dir_entry *de;
301         int newmds;
302         char *n;
303         ENTRY;
304
305         if (name[0] == '.' && (namlen == 1 ||
306                                 (namlen == 2 && name[1] == '.'))) {
307                 /* skip special entries */
308                 RETURN(0);
309         }
310
311         LASSERT(dc != NULL);
312         newmds = mea_name2idx(dc->mea, (char *) name, namlen);
313
314         if (newmds == mds->mds_num) {
315                 /* this entry remains on the current MDS, skip moving */
316                 RETURN(0);
317         }
318         
319         OBD_ALLOC(n, namlen + 1);
320         memcpy(n, name, namlen);
321         n[namlen] = (char) 0;
322         
323         OBD_FREE(n, namlen + 1);
324
325         /* check for space in buffer for new entry */
326         ca = dc->cache + newmds;
327         if (DIR_REC_LEN(namlen) > ca->free) {
328                 int err = dc_new_page_to_cache(ca);
329                 LASSERT(err == 0);
330         }
331         
332         /* insert found entry into buffer to be flushed later */
333         /* NOTE: we'll fill generations number later, because we
334          * it's stored in inode, thus we need to lookup an entry,
335          * but directory is locked for readdir(), so we delay this */
336         de = ca->cur;
337         de->ino = ino;
338         de->mds = d_type;
339         de->namelen = namlen;
340         memcpy(de->name, name, namlen);
341         ca->cur += DIR_REC_LEN(namlen);
342         ca->free -= DIR_REC_LEN(namlen);
343         ca->cached++;
344
345         RETURN(0);
346 }
347
348 int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
349                                 struct mea *mea)
350 {
351         struct inode *dir = dentry->d_inode;
352         struct dirsplit_control dc;
353         struct file * file;
354         int err, i, nlen;
355         char *file_name;
356
357         nlen = strlen("__iopen__/") + 10 + 1;
358         OBD_ALLOC(file_name, nlen);
359         if (!file_name)
360                 RETURN(-ENOMEM);
361         i = sprintf(file_name, "__iopen__/%u",
362                         (unsigned) dentry->d_inode->i_ino);
363
364         file = filp_open(file_name, O_RDONLY, 0);
365         if (IS_ERR(file)) {
366                 CERROR("can't open directory %s: %d\n",
367                                 file_name, (int) PTR_ERR(file));
368                 OBD_FREE(file_name, nlen);
369                 RETURN(PTR_ERR(file));
370         }
371
372         memset(&dc, 0, sizeof(dc));
373         dc.obd = obd;
374         dc.dir = dir;
375         dc.dentry = dentry;
376         dc.mea = mea;
377         OBD_ALLOC(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
378         LASSERT(dc.cache != NULL);
379         for (i = 0; i < mea->mea_count; i++) {
380                 INIT_LIST_HEAD(&dc.cache[i].list);
381                 dc.cache[i].free = 0;
382                 dc.cache[i].cached = 0;
383         }
384
385         err = vfs_readdir(file, filldir, &dc);
386         
387         filp_close(file, 0);
388
389         for (i = 0; i < mea->mea_count; i++) {
390                 if (dc.cache[i].cached)
391                         flush_buffer_onto_mds(&dc, i);
392         }
393
394         OBD_FREE(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
395         OBD_FREE(file_name, nlen);
396
397         return 0;
398 }
399
400 #define MAX_DIR_SIZE    (64 * 1024)
401
402 /*
403  * must not be called on already splitted directories
404  */
405 int mds_try_to_split_dir(struct obd_device *obd,
406                          struct dentry *dentry, struct mea **mea, int nstripes)
407 {
408         struct inode *dir = dentry->d_inode;
409         struct mds_obd *mds = &obd->u.mds;
410         struct mea *tmea = NULL;
411         struct obdo *oa = NULL;
412         int rc, mea_size = 0;
413         void *handle;
414         ENTRY;
415
416         /* clustered MD ? */
417         if (!mds->mds_lmv_obd)
418                 RETURN(0);
419
420         /* don't split root directory */
421         if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
422                 RETURN(0);
423
424 #if 1
425         if (dir->i_size < MAX_DIR_SIZE)
426                 RETURN(0);
427 #endif
428
429         /* check is directory marked non-splittable */
430         if (mea && *mea)
431                 RETURN(0);
432
433         CDEBUG(D_OTHER, "%s: split directory %lu/%lu (mea 0x%p)\n",
434                obd->obd_name, dir->i_ino,
435                (unsigned long) dir->i_generation, mea);
436
437         if (mea == NULL)
438                 mea = &tmea;
439         mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL);
440
441         /* FIXME: Actually we may only want to allocate enough space for
442            necessary amount of stripes, but on the other hand with this approach
443            of allocating maximal possible amount of MDS slots, it would be
444            easier to split the dir over more MDSes */
445         rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *) mea);
446         if (!(*mea))
447                 RETURN(-ENOMEM);
448         (*mea)->mea_count = nstripes;
449        
450 #warning "we have to take EX lock on a dir for splitting"
451         
452         /* 1) create directory objects on slave MDS'es */
453         /* FIXME: should this be OBD method? */
454         oa = obdo_alloc();
455         /* FIXME: error handling here */
456         LASSERT(oa != NULL);
457         oa->o_id = dir->i_ino;
458         oa->o_generation = dir->i_generation;
459         obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
460                         OBD_MD_FLMTIME | OBD_MD_FLCTIME |
461                         OBD_MD_FLUID | OBD_MD_FLGID);
462         oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
463         oa->o_valid |= OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
464         oa->o_mode = dir->i_mode;
465         CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
466                         obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
467                         
468         rc = obd_create(mds->mds_lmv_exp, oa,
469                         (struct lov_stripe_md **) mea, NULL);
470         /* FIXME: error handling here */
471         LASSERT(rc == 0);
472         CDEBUG(D_OTHER, "%d dirobjects created\n",
473                (int) (*mea)->mea_count);
474
475         /* 2) update dir attribute */
476         down(&dir->i_sem);
477         handle = fsfilt_start(obd, dir, FSFILT_OP_SETATTR, NULL);
478         LASSERT(!IS_ERR(handle));
479         rc = fsfilt_set_md(obd, dir, handle, *mea, mea_size);
480         LASSERT(rc == 0);
481         fsfilt_commit(obd, dir, handle, 0);
482         LASSERT(rc == 0);
483         up(&dir->i_sem);
484         obdo_free(oa);
485
486         /* 3) read through the dir and distribute it over objects */
487         scan_and_distribute(obd, dentry, *mea);
488
489         if (mea == &tmea)
490                 obd_free_diskmd(mds->mds_lmv_exp,
491                                 (struct lov_mds_md **) mea);
492         RETURN(1);
493 }
494
495 static int filter_start_page_write(struct inode *inode,
496                                    struct niobuf_local *lnb)
497 {
498         struct page *page = alloc_pages(GFP_HIGHUSER, 0);
499         if (page == NULL) {
500                 CERROR("no memory for a temp page\n");
501                 RETURN(lnb->rc = -ENOMEM);
502         }
503         POISON_PAGE(page, 0xf1);
504         page->index = lnb->offset >> PAGE_SHIFT;
505         lnb->page = page;
506
507         return 0;
508 }
509
510 struct dentry *filter_fid2dentry(struct obd_device *obd,
511                                  struct dentry *dir_dentry,
512                                  obd_gr group, obd_id id);
513 void f_dput(struct dentry *dentry);
514
515 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
516                 int objcount, struct obd_ioobj *obj,
517                 int niocount, struct niobuf_remote *nb,
518                 struct niobuf_local *res,
519                 struct obd_trans_info *oti)
520 {
521         struct mds_obd *mds = &exp->exp_obd->u.mds;
522         struct niobuf_remote *rnb;
523         struct niobuf_local *lnb = NULL;
524         int rc = 0, i, tot_bytes = 0;
525         unsigned long now = jiffies;
526         struct dentry *dentry;
527         struct ll_fid fid;
528         ENTRY;
529         LASSERT(objcount == 1);
530         LASSERT(obj->ioo_bufcnt > 0);
531
532         memset(res, 0, niocount * sizeof(*res));
533
534         fid.id = obj->ioo_id;
535         fid.generation = obj->ioo_gr;
536         dentry = mds_fid2dentry(mds, &fid, NULL);
537         LASSERT(!IS_ERR(dentry));
538
539         if (dentry->d_inode == NULL) {
540                 CERROR("trying to BRW to non-existent file "LPU64"\n",
541                        obj->ioo_id);
542                 f_dput(dentry);
543                 GOTO(cleanup, rc = -ENOENT);
544         }
545
546         if (time_after(jiffies, now + 15 * HZ))
547                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
548         else
549                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
550                        (jiffies - now));
551
552         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
553              i++, lnb++, rnb++) {
554                 lnb->dentry = dentry;
555                 lnb->offset = rnb->offset;
556                 lnb->len    = rnb->len;
557                 lnb->flags  = rnb->flags;
558
559                 rc = filter_start_page_write(dentry->d_inode, lnb);
560                 if (rc) {
561                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
562                                LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
563                                i, obj->ioo_bufcnt, dentry, rc);
564                         while (lnb-- > res)
565                                 __free_pages(lnb->page, 0);
566                         f_dput(dentry);
567                         GOTO(cleanup, rc);
568                 }
569                 tot_bytes += lnb->len;
570         }
571
572         if (time_after(jiffies, now + 15 * HZ))
573                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
574         else
575                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
576                        (jiffies - now));
577
578         EXIT;
579 cleanup:
580         return rc;
581 }
582
583 int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
584                  int objcount, struct obd_ioobj *obj, int niocount,
585                  struct niobuf_local *res, struct obd_trans_info *oti,
586                  int retcode)
587 {
588         struct obd_device *obd = exp->exp_obd;
589         struct niobuf_local *lnb;
590         struct inode *inode = NULL;
591         int rc = 0, i, cleanup_phase = 0, err, entries = 0;
592         ENTRY;
593
594         LASSERT(objcount == 1);
595         LASSERT(current->journal_info == NULL);
596
597         cleanup_phase = 1;
598         inode = res->dentry->d_inode;
599
600         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
601                 char *end, *buf;
602                 struct dir_entry *de;
603
604                 buf = kmap(lnb->page);
605                 LASSERT(buf != NULL);
606                 end = buf + lnb->len;
607                 de = (struct dir_entry *) buf;
608                 while ((char *) de < end && de->namelen) {
609                         err = fsfilt_add_dir_entry(obd, res->dentry, de->name,
610                                                    de->namelen, de->ino,
611                                                    de->generation, de->mds);
612                         /* FIXME: remove entries from the original dir */
613 #warning "removing entries from the original dir"
614                         LASSERT(err == 0);
615                         de = (struct dir_entry *)
616                                 ((char *) de + DIR_REC_LEN(de->namelen));
617                         entries++;
618                 }
619                 kunmap(buf);
620         }
621
622         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++)
623                 __free_page(lnb->page);
624         f_dput(res->dentry);
625
626         RETURN(rc);
627 }
628
629 int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len)
630 {
631         struct mds_obd *mds = &obd->u.mds;
632         struct lmv_obd *lmv = &mds->mds_lmv_exp->exp_obd->u.lmv;
633         int i;
634
635         i = raw_name2idx(lmv->count, name, len);
636         RETURN(i);
637 }
638