Whamcloud - gitweb
smash the HEAD with the contents of b_cmd. HEAD_PRE_CMD_SMASH and
[fs/lustre-release.git] / lustre / mds / mds_lmv.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDS
29
30 #include <linux/module.h>
31 #include <linux/lustre_mds.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_fsfilt.h>
37
38 #include "mds_internal.h"
39
40
41 /*
42  * TODO:
43  *   - magic in mea struct
44  *   - error handling is totally missed
45  */
46
47 int mds_lmv_connect(struct obd_device *obd, char * lmv_name)
48 {
49         struct mds_obd *mds = &obd->u.mds;
50         struct lustre_handle conn = {0,};
51         int valsize, mdsize;
52         int rc;
53         ENTRY;
54
55         if (IS_ERR(mds->mds_lmv_obd))
56                 RETURN(PTR_ERR(mds->mds_lmv_obd));
57
58         if (mds->mds_lmv_obd)
59                 RETURN(0);
60
61         mds->mds_lmv_obd = class_name2obd(lmv_name);
62         if (!mds->mds_lmv_obd) {
63                 CERROR("MDS cannot locate LMV %s\n",
64                        lmv_name);
65                 mds->mds_lmv_obd = ERR_PTR(-ENOTCONN);
66                 RETURN(-ENOTCONN);
67         }
68
69         rc = obd_connect(&conn, mds->mds_lmv_obd, &obd->obd_uuid);
70         if (rc) {
71                 CERROR("MDS cannot connect to LMV %s (%d)\n",
72                        lmv_name, rc);
73                 mds->mds_lmv_obd = ERR_PTR(rc);
74                 RETURN(rc);
75         }
76         mds->mds_lmv_exp = class_conn2export(&conn);
77         if (mds->mds_lmv_exp == NULL)
78                 CERROR("can't get export!\n");
79
80         rc = obd_register_observer(mds->mds_lmv_obd, obd);
81         if (rc) {
82                 CERROR("MDS cannot register as observer of LMV %s (%d)\n",
83                        lmv_name, rc);
84                 GOTO(err_discon, rc);
85         }
86
87         /* retrieve size of EA */
88         rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsize"), "mdsize", 
89                           &valsize, &mdsize);
90         if (rc) 
91                 GOTO(err_reg, rc);
92         if (mdsize > mds->mds_max_mdsize)
93                 mds->mds_max_mdsize = mdsize;
94
95         /* find our number in LMV cluster */
96         rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsnum"), "mdsnum", 
97                           &valsize, &mdsize);
98         if (rc) 
99                 GOTO(err_reg, rc);
100         mds->mds_num = mdsize;
101
102         RETURN(0);
103
104 err_reg:
105         RETURN(rc);
106
107 err_discon:
108         /* FIXME: cleanups here! */
109         obd_disconnect(mds->mds_lmv_exp, 0);
110         mds->mds_lmv_exp = NULL;
111         mds->mds_lmv_obd = ERR_PTR(rc);
112         RETURN(rc);
113 }
114
115 int mds_lmv_postsetup(struct obd_device *obd)
116 {
117         struct mds_obd *mds = &obd->u.mds;
118         ENTRY;
119         if (mds->mds_lmv_exp)
120                 obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize, 0);
121         RETURN(0);
122 }
123
124 int mds_lmv_disconnect(struct obd_device *obd, int flags)
125 {
126         struct mds_obd *mds = &obd->u.mds;
127         int rc = 0;
128         ENTRY;
129
130         if (!IS_ERR(mds->mds_lmv_obd) && mds->mds_lmv_exp != NULL) {
131
132                 obd_register_observer(mds->mds_lmv_obd, NULL);
133
134                 rc = obd_disconnect(mds->mds_lmv_exp, flags);
135                 /* if obd_disconnect fails (probably because the
136                  * export was disconnected by class_disconnect_exports)
137                  * then we just need to drop our ref. */
138                 if (rc != 0)
139                         class_export_put(mds->mds_lmv_exp);
140                 mds->mds_lmv_exp = NULL;
141                 mds->mds_lmv_obd = NULL;
142         }
143
144         RETURN(rc);
145 }
146
147
148 int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode,
149                         struct mea **mea, int *mea_size)
150 {
151         struct mds_obd *mds = &obd->u.mds;
152         int rc;
153         ENTRY;
154
155         if (!mds->mds_lmv_obd)
156                 RETURN(0);
157
158         /* first calculate mea size */
159         *mea_size = obd_alloc_diskmd(mds->mds_lmv_exp,
160                                      (struct lov_mds_md **) mea);
161         /* FIXME: error handling here */
162         LASSERT(*mea != NULL);
163
164         down(&inode->i_sem);
165         rc = fsfilt_get_md(obd, inode, *mea, *mea_size);
166         up(&inode->i_sem);
167         /* FIXME: error handling here */
168         if (rc <= 0) {
169                 OBD_FREE(*mea, *mea_size);
170                 *mea = NULL;
171                 *mea_size = 0;
172         }
173         if (rc > 0)
174                 rc = 0;
175         RETURN(rc);
176 }
177
178 struct dir_entry {
179         __u16   namelen;
180         __u16   mds;
181         __u32   ino;
182         __u32   generation;
183         char    name[0];
184 };
185
186 #define DIR_PAD                 4
187 #define DIR_ROUND               (DIR_PAD - 1)
188 #define DIR_REC_LEN(name_len)   (((name_len) + 12 + DIR_ROUND) & ~DIR_ROUND)
189
190 /* this struct holds dir entries for particular MDS to be flushed */
191 struct dir_cache {
192         struct list_head list;
193         void *cur;
194         int free;
195         int cached;
196         struct obdo oa;
197         struct brw_page brwc;
198 };
199
200 struct dirsplit_control {
201         struct obd_device *obd;
202         struct inode *dir;
203         struct dentry *dentry;
204         struct mea *mea;
205         struct dir_cache *cache;
206 };
207
208 static int dc_new_page_to_cache(struct dir_cache * dirc)
209 {
210         struct page *page;
211
212         if (!list_empty(&dirc->list) && dirc->free > sizeof(__u16)) {
213                 /* current page became full, mark the end */
214                 struct dir_entry *de = dirc->cur;
215                 de->namelen = 0;
216         }
217
218         page = alloc_page(GFP_KERNEL);
219         if (page == NULL)
220                 return -ENOMEM;
221         list_add_tail(&page->list, &dirc->list);
222         dirc->cur = page_address(page);
223         dirc->free = PAGE_SIZE;
224         return 0;
225 }
226
227 static int retrieve_generation_numbers(struct dirsplit_control *dc, void *buf)
228 {
229         struct dir_entry *de;
230         struct dentry *dentry;
231         char * end;
232         
233         end = buf + PAGE_SIZE;
234         de = (struct dir_entry *) buf;
235         while ((char *) de < end && de->namelen) {
236                 LASSERT(de->namelen <= 255);
237                 /* lookup an inode */
238                 dentry = ll_lookup_one_len(de->name, dc->dentry, de->namelen);
239                 if (IS_ERR(dentry)) {
240                         CERROR("can't lookup '%*s'/%u in %lu: %d\n",
241                                 (int) de->namelen, de->name,
242                                 (unsigned) de->namelen,
243                                 (unsigned long) dc->dentry->d_inode->i_ino,
244                                 (int) PTR_ERR(dentry));
245                 }
246                 LASSERT(!IS_ERR(dentry));
247                 LASSERT(dentry->d_inode != NULL);
248                 de->generation = dentry->d_inode->i_generation;
249                 l_dput(dentry);
250                 de = (struct dir_entry *)
251                         ((char *) de + DIR_REC_LEN(de->namelen));
252         }
253         return 0;
254 }
255
256 static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum)
257 {
258         struct mds_obd *mds = &dc->obd->u.mds;
259         struct dir_cache *ca;
260         struct list_head *cur, *tmp;
261         ENTRY; 
262         ca = dc->cache + mdsnum;
263
264         if (ca->free > sizeof(__u16)) {
265                 /* current page became full, mark the end */
266                 struct dir_entry *de = ca->cur;
267                 de->namelen = 0;
268         }
269
270         list_for_each_safe(cur, tmp, &ca->list) {
271                 struct page *page;
272
273                 page = list_entry(cur, struct page, list);
274                 LASSERT(page != NULL);
275
276                 retrieve_generation_numbers(dc, page_address(page));
277
278                 ca->brwc.pg = page;
279                 ca->brwc.off = 0;
280                 ca->brwc.count = PAGE_SIZE;
281                 ca->brwc.flag = 0;
282                 ca->oa.o_mds = mdsnum;
283                 obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
284                                 (struct lov_stripe_md *) dc->mea,
285                                 1, &ca->brwc, NULL);
286
287                 list_del(&page->list);
288                 __free_page(page);
289         }
290         RETURN(0);
291 }
292
293 static int filldir(void * __buf, const char * name, int namlen, loff_t offset,
294                    ino_t ino, unsigned int d_type)
295 {
296         struct dirsplit_control *dc = __buf;
297         struct mds_obd *mds = &dc->obd->u.mds;
298         struct dir_cache *ca;
299         struct dir_entry *de;
300         int newmds;
301         char *n;
302         ENTRY;
303
304         if (name[0] == '.' && (namlen == 1 ||
305                                 (namlen == 2 && name[1] == '.'))) {
306                 /* skip special entries */
307                 RETURN(0);
308         }
309
310         LASSERT(dc != NULL);
311         newmds = mea_name2idx(dc->mea, (char *) name, namlen);
312
313         if (newmds == mds->mds_num) {
314                 /* this entry remains on the current MDS, skip moving */
315                 RETURN(0);
316         }
317         
318         OBD_ALLOC(n, namlen + 1);
319         memcpy(n, name, namlen);
320         n[namlen] = (char) 0;
321         
322         OBD_FREE(n, namlen + 1);
323
324         /* check for space in buffer for new entry */
325         ca = dc->cache + newmds;
326         if (DIR_REC_LEN(namlen) > ca->free) {
327                 int err = dc_new_page_to_cache(ca);
328                 LASSERT(err == 0);
329         }
330         
331         /* insert found entry into buffer to be flushed later */
332         /* NOTE: we'll fill generations number later, because we
333          * it's stored in inode, thus we need to lookup an entry,
334          * but directory is locked for readdir(), so we delay this */
335         de = ca->cur;
336         de->ino = ino;
337         de->mds = d_type;
338         de->namelen = namlen;
339         memcpy(de->name, name, namlen);
340         ca->cur += DIR_REC_LEN(namlen);
341         ca->free -= DIR_REC_LEN(namlen);
342         ca->cached++;
343
344         RETURN(0);
345 }
346
347 int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
348                                 struct mea *mea)
349 {
350         struct inode *dir = dentry->d_inode;
351         struct dirsplit_control dc;
352         struct file * file;
353         int err, i, nlen;
354         char *file_name;
355
356         nlen = strlen("__iopen__/") + 10 + 1;
357         OBD_ALLOC(file_name, nlen);
358         if (!file_name)
359                 RETURN(-ENOMEM);
360         i = sprintf(file_name, "__iopen__/%u",
361                         (unsigned) dentry->d_inode->i_ino);
362
363         file = filp_open(file_name, O_RDONLY, 0);
364         if (IS_ERR(file)) {
365                 CERROR("can't open directory %s: %d\n",
366                                 file_name, (int) PTR_ERR(file));
367                 OBD_FREE(file_name, nlen);
368                 RETURN(PTR_ERR(file));
369         }
370
371         memset(&dc, 0, sizeof(dc));
372         dc.obd = obd;
373         dc.dir = dir;
374         dc.dentry = dentry;
375         dc.mea = mea;
376         OBD_ALLOC(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
377         LASSERT(dc.cache != NULL);
378         for (i = 0; i < mea->mea_count; i++) {
379                 INIT_LIST_HEAD(&dc.cache[i].list);
380                 dc.cache[i].free = 0;
381                 dc.cache[i].cached = 0;
382         }
383
384         err = vfs_readdir(file, filldir, &dc);
385         
386         filp_close(file, 0);
387
388         for (i = 0; i < mea->mea_count; i++) {
389                 if (dc.cache[i].cached)
390                         flush_buffer_onto_mds(&dc, i);
391         }
392
393         OBD_FREE(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
394         OBD_FREE(file_name, nlen);
395
396         return 0;
397 }
398
399 #define MAX_DIR_SIZE    (32 * 1024)
400
401 /*
402  * must not be called on already splitted directories
403  */
404 int mds_try_to_split_dir(struct obd_device *obd,
405                          struct dentry *dentry, struct mea **mea, int nstripes)
406 {
407         ldlm_policy_data_t policy = { .l_inodebits = {MDS_INODELOCK_UPDATE}};
408         struct ldlm_res_id res_id = { .name = {0} };
409         struct inode *dir = dentry->d_inode;
410         struct mds_obd *mds = &obd->u.mds;
411         struct lustre_handle lockh;
412         struct mea *tmea = NULL;
413         struct obdo *oa = NULL;
414         int rc, flags = 0;
415         int mea_size = 0;
416         void *handle;
417         ENTRY;
418
419         /* clustered MD ? */
420         if (!mds->mds_lmv_obd)
421                 RETURN(0);
422
423         /* don't split root directory */
424         if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
425                 RETURN(0);
426
427 #if 0
428         if (dir->i_size < MAX_DIR_SIZE)
429                 RETURN(0);
430 #endif
431
432         /* check is directory marked non-splittable */
433         if (mea && *mea)
434                 RETURN(0);
435
436         CDEBUG(D_OTHER, "%s: split directory %lu/%lu (mea 0x%p)\n",
437                obd->obd_name, dir->i_ino,
438                (unsigned long) dir->i_generation, mea);
439
440         if (mea == NULL)
441                 mea = &tmea;
442         mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL);
443
444         /* FIXME: Actually we may only want to allocate enough space for
445            necessary amount of stripes, but on the other hand with this approach
446            of allocating maximal possible amount of MDS slots, it would be
447            easier to split the dir over more MDSes */
448         rc = obd_alloc_diskmd(mds->mds_lmv_exp, mea);
449         if (!(*mea))
450                 RETURN(-ENOMEM);
451         (*mea)->mea_count = nstripes;
452         
453         /* convert lock on the dir in order tox
454          * invalidate client's attributes -bzzz */
455         res_id.name[0] = dir->i_ino;
456         res_id.name[1] = dir->i_generation;
457         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
458                               LDLM_IBITS, &policy, LCK_PW, &flags,
459                               mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
460                               NULL, 0, NULL, &lockh);
461         if (rc != ELDLM_OK) {
462                 CERROR("error: rc = %d\n", rc);
463         }
464
465         /* 1) create directory objects on slave MDS'es */
466         /* FIXME: should this be OBD method? */
467         oa = obdo_alloc();
468         /* FIXME: error handling here */
469         LASSERT(oa != NULL);
470         oa->o_id = dir->i_ino;
471         oa->o_generation = dir->i_generation;
472         obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
473                         OBD_MD_FLMTIME | OBD_MD_FLCTIME |
474                         OBD_MD_FLUID | OBD_MD_FLGID);
475         oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
476         oa->o_valid |= OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
477         oa->o_mode = dir->i_mode;
478         CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
479                         obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
480                         
481         rc = obd_create(mds->mds_lmv_exp, oa,
482                         (struct lov_stripe_md **) mea, NULL);
483         /* FIXME: error handling here */
484         LASSERT(rc == 0);
485         CDEBUG(D_OTHER, "%d dirobjects created\n",
486                (int) (*mea)->mea_count);
487
488         /* 2) update dir attribute */
489         down(&dir->i_sem);
490         handle = fsfilt_start(obd, dir, FSFILT_OP_SETATTR, NULL);
491         LASSERT(!IS_ERR(handle));
492         rc = fsfilt_set_md(obd, dir, handle, *mea, mea_size);
493         LASSERT(rc == 0);
494         fsfilt_commit(obd, dir, handle, 0);
495         LASSERT(rc == 0);
496         up(&dir->i_sem);
497         obdo_free(oa);
498
499         ldlm_lock_decref(&lockh, LCK_PW);
500
501         /* 3) read through the dir and distribute it over objects */
502         scan_and_distribute(obd, dentry, *mea);
503
504         if (mea == &tmea)
505                 obd_free_diskmd(mds->mds_lmv_exp,
506                                 (struct lov_mds_md **) mea);
507         RETURN(1);
508 }
509
510 static int filter_start_page_write(struct inode *inode,
511                                    struct niobuf_local *lnb)
512 {
513         struct page *page = alloc_pages(GFP_HIGHUSER, 0);
514         if (page == NULL) {
515                 CERROR("no memory for a temp page\n");
516                 RETURN(lnb->rc = -ENOMEM);
517         }
518         POISON_PAGE(page, 0xf1);
519         page->index = lnb->offset >> PAGE_SHIFT;
520         lnb->page = page;
521
522         return 0;
523 }
524
525 struct dentry *filter_fid2dentry(struct obd_device *obd,
526                                  struct dentry *dir_dentry,
527                                  obd_gr group, obd_id id);
528 void f_dput(struct dentry *dentry);
529
530 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
531                 int objcount, struct obd_ioobj *obj,
532                 int niocount, struct niobuf_remote *nb,
533                 struct niobuf_local *res,
534                 struct obd_trans_info *oti)
535 {
536         struct mds_obd *mds = &exp->exp_obd->u.mds;
537         struct niobuf_remote *rnb;
538         struct niobuf_local *lnb = NULL;
539         int rc = 0, i, tot_bytes = 0;
540         unsigned long now = jiffies;
541         struct dentry *dentry;
542         struct ll_fid fid;
543         ENTRY;
544         LASSERT(objcount == 1);
545         LASSERT(obj->ioo_bufcnt > 0);
546
547         memset(res, 0, niocount * sizeof(*res));
548
549         fid.id = obj->ioo_id;
550         fid.generation = obj->ioo_gr;
551         dentry = mds_fid2dentry(mds, &fid, NULL);
552         LASSERT(!IS_ERR(dentry));
553
554         if (dentry->d_inode == NULL) {
555                 CERROR("trying to BRW to non-existent file "LPU64"\n",
556                        obj->ioo_id);
557                 f_dput(dentry);
558                 GOTO(cleanup, rc = -ENOENT);
559         }
560
561         if (time_after(jiffies, now + 15 * HZ))
562                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
563         else
564                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
565                        (jiffies - now));
566
567         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
568              i++, lnb++, rnb++) {
569                 lnb->dentry = dentry;
570                 lnb->offset = rnb->offset;
571                 lnb->len    = rnb->len;
572                 lnb->flags  = rnb->flags;
573
574                 rc = filter_start_page_write(dentry->d_inode, lnb);
575                 if (rc) {
576                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
577                                LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
578                                i, obj->ioo_bufcnt, dentry, rc);
579                         while (lnb-- > res)
580                                 __free_pages(lnb->page, 0);
581                         f_dput(dentry);
582                         GOTO(cleanup, rc);
583                 }
584                 tot_bytes += lnb->len;
585         }
586
587         if (time_after(jiffies, now + 15 * HZ))
588                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
589         else
590                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
591                        (jiffies - now));
592
593         EXIT;
594 cleanup:
595         return rc;
596 }
597
598 int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
599                  int objcount, struct obd_ioobj *obj, int niocount,
600                  struct niobuf_local *res, struct obd_trans_info *oti,
601                  int retcode)
602 {
603         struct obd_device *obd = exp->exp_obd;
604         struct niobuf_local *lnb;
605         struct inode *inode = NULL;
606         int rc = 0, i, cleanup_phase = 0, err, entries = 0;
607         ENTRY;
608
609         LASSERT(objcount == 1);
610         LASSERT(current->journal_info == NULL);
611
612         cleanup_phase = 1;
613         inode = res->dentry->d_inode;
614
615         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
616                 char *end, *buf;
617                 struct dir_entry *de;
618
619                 buf = kmap(lnb->page);
620                 LASSERT(buf != NULL);
621                 end = buf + lnb->len;
622                 de = (struct dir_entry *) buf;
623                 while ((char *) de < end && de->namelen) {
624                         err = fsfilt_add_dir_entry(obd, res->dentry, de->name,
625                                                    de->namelen, de->ino,
626                                                    de->generation, de->mds);
627                         /* FIXME: remove entries from the original dir */
628 #warning "removing entries from the original dir"
629                         LASSERT(err == 0);
630                         de = (struct dir_entry *)
631                                 ((char *) de + DIR_REC_LEN(de->namelen));
632                         entries++;
633                 }
634                 kunmap(buf);
635         }
636
637         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++)
638                 __free_page(lnb->page);
639         f_dput(res->dentry);
640
641         RETURN(rc);
642 }
643