Whamcloud - gitweb
- CERROR's for test #46 debugging
[fs/lustre-release.git] / lustre / mds / mds_lmv.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDS
29
30 #include <linux/module.h>
31 #include <linux/lustre_mds.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_fsfilt.h>
37
38 #include "mds_internal.h"
39
40
41 /*
42  * TODO:
43  *   - magic in mea struct
44  *   - error handling is totally missed
45  */
46
47 int mds_lmv_connect(struct obd_device *obd, char * lmv_name)
48 {
49         struct mds_obd *mds = &obd->u.mds;
50         struct lustre_handle conn = {0};
51         int rc, valsize, value;
52         ENTRY;
53
54         if (IS_ERR(mds->mds_lmv_obd))
55                 RETURN(PTR_ERR(mds->mds_lmv_obd));
56
57         if (mds->mds_lmv_obd)
58                 RETURN(0);
59
60         mds->mds_lmv_obd = class_name2obd(lmv_name);
61         if (!mds->mds_lmv_obd) {
62                 CERROR("MDS cannot locate LMV %s\n",
63                        lmv_name);
64                 mds->mds_lmv_obd = ERR_PTR(-ENOTCONN);
65                 RETURN(-ENOTCONN);
66         }
67
68         rc = obd_connect(&conn, mds->mds_lmv_obd, &obd->obd_uuid);
69         if (rc) {
70                 CERROR("MDS cannot connect to LMV %s (%d)\n",
71                        lmv_name, rc);
72                 mds->mds_lmv_obd = ERR_PTR(rc);
73                 RETURN(rc);
74         }
75         mds->mds_lmv_exp = class_conn2export(&conn);
76         if (mds->mds_lmv_exp == NULL)
77                 CERROR("can't get export!\n");
78
79         rc = obd_register_observer(mds->mds_lmv_obd, obd);
80         if (rc) {
81                 CERROR("MDS cannot register as observer of LMV %s, "
82                        "rc = %d\n", lmv_name, rc);
83                 GOTO(err_discon, rc);
84         }
85
86         /* retrieve size of EA */
87         rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsize"), "mdsize", 
88                           &valsize, &value);
89         if (rc) 
90                 GOTO(err_reg, rc);
91
92         if (value > mds->mds_max_mdsize)
93                 mds->mds_max_mdsize = value;
94
95         /* find our number in LMV cluster */
96         rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsnum"), "mdsnum", 
97                           &valsize, &value);
98         if (rc) 
99                 GOTO(err_reg, rc);
100         
101         mds->mds_num = value;
102
103         rc = obd_set_info(mds->mds_lmv_exp, strlen("inter_mds"),
104                           "inter_mds", 0, NULL);
105         if (rc)
106                 GOTO(err_reg, rc);
107         
108         RETURN(0);
109
110 err_reg:
111         obd_register_observer(mds->mds_lmv_obd, NULL);
112 err_discon:
113         obd_disconnect(mds->mds_lmv_exp, 0);
114         mds->mds_lmv_exp = NULL;
115         mds->mds_lmv_obd = ERR_PTR(rc);
116         RETURN(rc);
117 }
118
119 int mds_lmv_postsetup(struct obd_device *obd)
120 {
121         struct mds_obd *mds = &obd->u.mds;
122         int rc = 0;
123         ENTRY;
124
125         if (mds->mds_lmv_exp)
126                 rc = obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize,
127                                       mds->mds_max_cookiesize);
128         
129         RETURN(rc);
130 }
131
132 int mds_lmv_disconnect(struct obd_device *obd, int flags)
133 {
134         struct mds_obd *mds = &obd->u.mds;
135         int rc = 0;
136         ENTRY;
137
138         if (!IS_ERR(mds->mds_lmv_obd) && mds->mds_lmv_exp != NULL) {
139                 obd_register_observer(mds->mds_lmv_obd, NULL);
140
141                 /* if obd_disconnect fails (probably because the export was
142                  * disconnected by class_disconnect_exports) then we just need
143                  * to drop our ref. */
144                 rc = obd_disconnect(mds->mds_lmv_exp, flags);
145                 if (rc)
146                         class_export_put(mds->mds_lmv_exp);
147                 
148                 mds->mds_lmv_exp = NULL;
149                 mds->mds_lmv_obd = NULL;
150         }
151
152         RETURN(rc);
153 }
154
155 int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode,
156                      struct mea **mea, int *mea_size)
157 {
158         struct mds_obd *mds = &obd->u.mds;
159         int rc;
160         ENTRY;
161
162         if (!mds->mds_lmv_obd)
163                 RETURN(0);
164
165         /* first calculate mea size */
166         *mea_size = obd_alloc_diskmd(mds->mds_lmv_exp,
167                                      (struct lov_mds_md **)mea);
168         if (*mea_size < 0 || *mea == NULL)
169                 return *mea_size < 0 ? *mea_size : -EINVAL;
170
171         down(&inode->i_sem);
172         rc = fsfilt_get_md(obd, inode, *mea, *mea_size);
173         up(&inode->i_sem);
174
175         if (rc <= 0) {
176                 OBD_FREE(*mea, *mea_size);
177                 *mea = NULL;
178         } else
179                 rc = 0;
180                         
181         RETURN(rc);
182 }
183
184 struct dir_entry {
185         __u16   namelen;
186         __u16   mds;
187         __u32   ino;
188         __u32   generation;
189         char    name[0];
190 };
191
192 #define DIR_PAD                 4
193 #define DIR_ROUND               (DIR_PAD - 1)
194 #define DIR_REC_LEN(name_len)   (((name_len) + 12 + DIR_ROUND) & ~DIR_ROUND)
195
196 /* this struct holds dir entries for particular MDS to be flushed */
197 struct dir_cache {
198         struct list_head list;
199         void *cur;
200         int free;
201         int cached;
202         struct obdo oa;
203         struct brw_page brwc;
204 };
205
206 struct dirsplit_control {
207         struct obd_device *obd;
208         struct inode *dir;
209         struct dentry *dentry;
210         struct mea *mea;
211         struct dir_cache *cache;
212 };
213
214 static int dc_new_page_to_cache(struct dir_cache * dirc)
215 {
216         struct page *page;
217
218         if (!list_empty(&dirc->list) && dirc->free > sizeof(__u16)) {
219                 /* current page became full, mark the end */
220                 struct dir_entry *de = dirc->cur;
221                 de->namelen = 0;
222         }
223
224         page = alloc_page(GFP_KERNEL);
225         if (page == NULL)
226                 return -ENOMEM;
227         list_add_tail(&page->list, &dirc->list);
228         dirc->cur = page_address(page);
229         dirc->free = PAGE_SIZE;
230         return 0;
231 }
232
233 static int retrieve_generation_numbers(struct dirsplit_control *dc, void *buf)
234 {
235         struct mds_obd *mds = &dc->obd->u.mds;
236         struct dir_entry *de;
237         struct dentry *dentry;
238         char * end;
239         
240         end = buf + PAGE_SIZE;
241         de = (struct dir_entry *) buf;
242         while ((char *) de < end && de->namelen) {
243                 /* lookup an inode */
244                 LASSERT(de->namelen <= 255);
245                 dentry = ll_lookup_one_len(de->name, dc->dentry, de->namelen);
246                 if (IS_ERR(dentry)) {
247                         CERROR("can't lookup %*s: %d\n", de->namelen,
248                                de->name, (int) PTR_ERR(dentry));
249                         goto next;
250                 }
251                 if (dentry->d_inode != NULL) {
252                         de->mds = mds->mds_num;
253                         de->ino = dentry->d_inode->i_ino;
254                         de->generation = dentry->d_inode->i_generation;
255                 } else if (dentry->d_flags & DCACHE_CROSS_REF) {
256                         de->mds = dentry->d_mdsnum;
257                         de->ino = dentry->d_inum;
258                         de->generation = dentry->d_generation;
259                 } else {
260                         CERROR("can't lookup %*s\n", de->namelen, de->name);
261                         goto next;
262                 }
263                 l_dput(dentry);
264
265 next:
266                 de = (struct dir_entry *)
267                         ((char *) de + DIR_REC_LEN(de->namelen));
268         }
269         return 0;
270 }
271
272 static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum)
273 {
274         struct mds_obd *mds = &dc->obd->u.mds;
275         struct list_head *cur, *tmp;
276         struct dir_cache *ca;
277         int rc;
278         ENTRY; 
279         ca = dc->cache + mdsnum;
280
281         if (ca->free > sizeof(__u16)) {
282                 /* current page became full, mark the end */
283                 struct dir_entry *de = ca->cur;
284                 de->namelen = 0;
285         }
286
287         list_for_each_safe(cur, tmp, &ca->list) {
288                 struct page *page;
289
290                 page = list_entry(cur, struct page, list);
291                 LASSERT(page != NULL);
292
293                 retrieve_generation_numbers(dc, page_address(page));
294
295                 ca->brwc.pg = page;
296                 ca->brwc.off = 0;
297                 ca->brwc.count = PAGE_SIZE;
298                 ca->brwc.flag = 0;
299                 ca->oa.o_mds = mdsnum;
300                 rc = obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
301                              (struct lov_stripe_md *) dc->mea,
302                              1, &ca->brwc, NULL);
303                 if (rc)
304                         RETURN(rc);
305
306         }
307         RETURN(0);
308 }
309
310 static int remove_entries_from_orig_dir(struct dirsplit_control *dc, int mdsnum)
311 {
312         struct list_head *cur, *tmp;
313         struct dentry *dentry;
314         struct dir_cache *ca;
315         struct dir_entry *de;
316         struct page *page;
317         char *buf, *end;
318         int rc;
319         ENTRY; 
320
321         ca = dc->cache + mdsnum;
322         list_for_each_safe(cur, tmp, &ca->list) {
323                 page = list_entry(cur, struct page, list);
324                 buf = page_address(page);
325                 end = buf + PAGE_SIZE;
326
327                 de = (struct dir_entry *) buf;
328                 while ((char *) de < end && de->namelen) {
329                         /* lookup an inode */
330                         LASSERT(de->namelen <= 255);
331
332                         dentry = ll_lookup_one_len(de->name, dc->dentry,
333                                                    de->namelen);
334                         if (IS_ERR(dentry)) {
335                                 CERROR("can't lookup %*s: %d\n", de->namelen,
336                                                 de->name, (int) PTR_ERR(dentry));
337                                 goto next;
338                         }
339                         rc = fsfilt_del_dir_entry(dc->obd, dentry);
340                         l_dput(dentry);
341 next:
342                         de = (struct dir_entry *)
343                                 ((char *) de + DIR_REC_LEN(de->namelen));
344                 }
345         }
346         RETURN(0);
347 }
348
349 static int filldir(void * __buf, const char * name, int namlen, loff_t offset,
350                    ino_t ino, unsigned int d_type)
351 {
352         struct dirsplit_control *dc = __buf;
353         struct mds_obd *mds = &dc->obd->u.mds;
354         struct dir_cache *ca;
355         struct dir_entry *de;
356         int newmds;
357         char *n;
358         ENTRY;
359
360         if (name[0] == '.' && (namlen == 1 ||
361                                 (namlen == 2 && name[1] == '.'))) {
362                 /* skip special entries */
363                 RETURN(0);
364         }
365
366         LASSERT(dc != NULL);
367         newmds = mea_name2idx(dc->mea, (char *) name, namlen);
368
369         if (newmds == mds->mds_num) {
370                 /* this entry remains on the current MDS, skip moving */
371                 RETURN(0);
372         }
373         
374         OBD_ALLOC(n, namlen + 1);
375         memcpy(n, name, namlen);
376         n[namlen] = (char) 0;
377         
378         OBD_FREE(n, namlen + 1);
379
380         /* check for space in buffer for new entry */
381         ca = dc->cache + newmds;
382         if (DIR_REC_LEN(namlen) > ca->free) {
383                 int err = dc_new_page_to_cache(ca);
384                 LASSERT(err == 0);
385         }
386         
387         /* insert found entry into buffer to be flushed later */
388         /* NOTE: we'll fill generations number later, because we
389          * it's stored in inode, thus we need to lookup an entry,
390          * but directory is locked for readdir(), so we delay this */
391         de = ca->cur;
392         de->ino = ino;
393         de->mds = d_type;
394         de->namelen = namlen;
395         memcpy(de->name, name, namlen);
396         ca->cur += DIR_REC_LEN(namlen);
397         ca->free -= DIR_REC_LEN(namlen);
398         ca->cached++;
399
400         RETURN(0);
401 }
402
403 int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
404                                 struct mea *mea)
405 {
406         struct inode *dir = dentry->d_inode;
407         struct dirsplit_control dc;
408         struct file * file;
409         int err, i, nlen;
410         char *file_name;
411
412         nlen = strlen("__iopen__/") + 10 + 1;
413         OBD_ALLOC(file_name, nlen);
414         if (!file_name)
415                 RETURN(-ENOMEM);
416         i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino);
417
418         file = filp_open(file_name, O_RDONLY, 0);
419         if (IS_ERR(file)) {
420                 CERROR("can't open directory %s: %d\n",
421                                 file_name, (int) PTR_ERR(file));
422                 OBD_FREE(file_name, nlen);
423                 RETURN(PTR_ERR(file));
424         }
425
426         memset(&dc, 0, sizeof(dc));
427         dc.obd = obd;
428         dc.dir = dir;
429         dc.dentry = dentry;
430         dc.mea = mea;
431         OBD_ALLOC(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
432         LASSERT(dc.cache != NULL);
433         for (i = 0; i < mea->mea_count; i++) {
434                 INIT_LIST_HEAD(&dc.cache[i].list);
435                 dc.cache[i].free = 0;
436                 dc.cache[i].cached = 0;
437         }
438
439         err = vfs_readdir(file, filldir, &dc);
440         filp_close(file, 0);
441         if (err)
442                 GOTO(cleanup, err);
443
444         for (i = 0; i < mea->mea_count; i++) {
445                 if (!dc.cache[i].cached)
446                         continue;
447                 err = flush_buffer_onto_mds(&dc, i);
448                 if (err)
449                         GOTO(cleanup, err);
450         }
451
452         for (i = 0; i < mea->mea_count; i++) {
453                 if (!dc.cache[i].cached)
454                         continue;
455                 err = remove_entries_from_orig_dir(&dc, i);
456                 if (err)
457                         GOTO(cleanup, err);
458         }
459
460 cleanup:
461         for (i = 0; i < mea->mea_count; i++) {
462                 struct list_head *cur, *tmp;
463                 if (!dc.cache[i].cached)
464                         continue;
465                 list_for_each_safe(cur, tmp, &dc.cache[i].list) {
466                         struct page *page;
467                         page = list_entry(cur, struct page, list);
468                         list_del(&page->list);
469                         __free_page(page);
470                 }
471         }
472         OBD_FREE(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
473         OBD_FREE(file_name, nlen);
474
475         RETURN(err);
476 }
477
478 #define MAX_DIR_SIZE    (64 * 1024)
479
480 int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry)
481 {
482         struct mds_obd *mds = &obd->u.mds;
483         struct mea *mea = NULL;
484         int rc, size;
485
486         /* clustered MD ? */
487         if (!mds->mds_lmv_obd)
488                 RETURN(MDS_NO_SPLITTABLE);
489
490         /* inode exist? */
491         if (dentry->d_inode == NULL)
492                 return MDS_NO_SPLITTABLE;
493
494         /* a dir can be splitted only */
495         if (!S_ISDIR(dentry->d_inode->i_mode))
496                 return MDS_NO_SPLITTABLE;
497
498         /* don't split root directory */
499         if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
500                 return MDS_NO_SPLITTABLE;
501
502         /* large enough to be splitted? */
503         if (dentry->d_inode->i_size < MAX_DIR_SIZE)
504                 return MDS_NO_SPLIT_EXPECTED;
505
506         mds_get_lmv_attr(obd, dentry->d_inode, &mea, &size);
507         if (mea) {
508                 /* already splitted or slave object: shouldn't be splitted */
509                 rc = MDS_NO_SPLITTABLE;
510         } else {
511                 /* may be splitted */
512                 rc = MDS_EXPECT_SPLIT;
513         }
514
515         if (mea)
516                 OBD_FREE(mea, size);
517         RETURN(rc);
518 }
519
520 /*
521  * must not be called on already splitted directories.
522  */
523 int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
524                          struct mea **mea, int nstripes)
525 {
526         struct inode *dir = dentry->d_inode;
527         struct mds_obd *mds = &obd->u.mds;
528         struct mea *tmea = NULL;
529         struct obdo *oa = NULL;
530         int rc, mea_size = 0;
531         void *handle;
532         ENTRY;
533
534         /* TODO: optimization possible - we already may have mea here */
535         rc = mds_splitting_expected(obd, dentry);
536         if (rc == MDS_NO_SPLITTABLE)
537                 return 0;
538         if (rc == MDS_NO_SPLIT_EXPECTED && nstripes == 0)
539                 return 0;
540         
541         LASSERT(mea == NULL || *mea == NULL);
542
543         CDEBUG(D_OTHER, "%s: split directory %u/%lu/%lu\n",
544                obd->obd_name, mds->mds_num, dir->i_ino,
545                (unsigned long) dir->i_generation);
546         CDEBUG(D_ERROR, "%s: split directory %u/%lu/%lu\n",
547                obd->obd_name, mds->mds_num, dir->i_ino,
548                (unsigned long) dir->i_generation);
549
550         if (mea == NULL)
551                 mea = &tmea;
552         mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL);
553
554         /* FIXME: Actually we may only want to allocate enough space for
555          * necessary amount of stripes, but on the other hand with this
556          * approach of allocating maximal possible amount of MDS slots,
557          * it would be easier to split the dir over more MDSes */
558         rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *)mea);
559         if (rc < 0) {
560                 CERROR("obd_alloc_diskmd() failed, error %d.\n", rc);
561                 RETURN(rc);
562         }
563         if (*mea == NULL)
564                 RETURN(-EINVAL);
565
566         (*mea)->mea_count = nstripes;
567        
568         /* 1) create directory objects on slave MDS'es */
569         /* FIXME: should this be OBD method? */
570         oa = obdo_alloc();
571         if (!oa)
572                 RETURN(-ENOMEM);
573
574         oa->o_id = dir->i_ino;
575         oa->o_generation = dir->i_generation;
576
577         obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
578                         OBD_MD_FLMTIME | OBD_MD_FLCTIME |
579                         OBD_MD_FLUID | OBD_MD_FLGID);
580
581         oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
582         oa->o_valid |= OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
583         oa->o_mode = dir->i_mode;
584
585         CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
586                         obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
587                         
588         rc = obd_create(mds->mds_lmv_exp, oa,
589                         (struct lov_stripe_md **)mea, NULL);
590         if (rc)
591                 GOTO(err_oa, rc);
592
593         CDEBUG(D_OTHER, "%d dirobjects created\n", (int)(*mea)->mea_count);
594
595         /* 2) update dir attribute */
596         down(&dir->i_sem);
597         
598         handle = fsfilt_start(obd, dir, FSFILT_OP_SETATTR, NULL);
599         if (IS_ERR(handle)) {
600                 up(&dir->i_sem);
601                 CERROR("fsfilt_start() failed: %d\n", (int) PTR_ERR(handle));
602                 GOTO(err_oa, rc = PTR_ERR(handle));
603         }
604         
605         rc = fsfilt_set_md(obd, dir, handle, *mea, mea_size);
606         if (rc) {
607                 up(&dir->i_sem);
608                 CERROR("fsfilt_set_md() failed, error %d.\n", rc);
609                 GOTO(err_oa, rc);
610         }
611         
612         rc = fsfilt_commit(obd, mds->mds_sb, dir, handle, 0);
613         if (rc) {
614                 up(&dir->i_sem);
615                 CERROR("fsfilt_commit() failed, error %d.\n", rc);
616                 GOTO(err_oa, rc);
617         }
618         
619         up(&dir->i_sem);
620         obdo_free(oa);
621
622         /* 3) read through the dir and distribute it over objects */
623         rc = scan_and_distribute(obd, dentry, *mea);
624         if (mea == &tmea)
625                 obd_free_diskmd(mds->mds_lmv_exp, (struct lov_mds_md **)mea);
626         if (rc) {
627                 CERROR("scan_and_distribute() failed, error %d.\n", rc);
628                 RETURN(rc);
629         }
630
631         RETURN(1);
632
633 err_oa:
634         obdo_free(oa);
635         RETURN(rc);
636 }
637
638 static int filter_start_page_write(struct inode *inode,
639                                    struct niobuf_local *lnb)
640 {
641         struct page *page = alloc_pages(GFP_HIGHUSER, 0);
642         if (page == NULL) {
643                 CERROR("no memory for a temp page\n");
644                 RETURN(lnb->rc = -ENOMEM);
645         }
646         POISON_PAGE(page, 0xf1);
647         page->index = lnb->offset >> PAGE_SHIFT;
648         lnb->page = page;
649
650         return 0;
651 }
652
653 struct dentry *filter_fid2dentry(struct obd_device *obd,
654                                  struct dentry *dir_dentry,
655                                  obd_gr group, obd_id id);
656
657 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
658                 int objcount, struct obd_ioobj *obj,
659                 int niocount, struct niobuf_remote *nb,
660                 struct niobuf_local *res,
661                 struct obd_trans_info *oti)
662 {
663         struct mds_obd *mds = &exp->exp_obd->u.mds;
664         struct niobuf_remote *rnb;
665         struct niobuf_local *lnb = NULL;
666         int rc = 0, i, tot_bytes = 0;
667         unsigned long now = jiffies;
668         struct dentry *dentry;
669         struct ll_fid fid;
670         ENTRY;
671         LASSERT(objcount == 1);
672         LASSERT(obj->ioo_bufcnt > 0);
673
674         memset(res, 0, niocount * sizeof(*res));
675
676         fid.id = obj->ioo_id;
677         fid.generation = obj->ioo_gr;
678         dentry = mds_fid2dentry(mds, &fid, NULL);
679         if (IS_ERR(dentry)) {
680                 CERROR("can't get dentry for "LPU64"/%u: %d\n",
681                        fid.id, fid.generation, (int) PTR_ERR(dentry));
682                 GOTO(cleanup, rc = (int) PTR_ERR(dentry));
683         }
684
685         if (dentry->d_inode == NULL) {
686                 CERROR("trying to BRW to non-existent file "LPU64"\n",
687                        obj->ioo_id);
688                 l_dput(dentry);
689                 GOTO(cleanup, rc = -ENOENT);
690         }
691
692         if (time_after(jiffies, now + 15 * HZ))
693                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
694         else
695                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
696                        (jiffies - now));
697
698         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
699              i++, lnb++, rnb++) {
700                 lnb->dentry = dentry;
701                 lnb->offset = rnb->offset;
702                 lnb->len    = rnb->len;
703                 lnb->flags  = rnb->flags;
704
705                 rc = filter_start_page_write(dentry->d_inode, lnb);
706                 if (rc) {
707                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
708                                LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
709                                i, obj->ioo_bufcnt, dentry, rc);
710                         while (lnb-- > res)
711                                 __free_pages(lnb->page, 0);
712                         l_dput(dentry);
713                         GOTO(cleanup, rc);
714                 }
715                 tot_bytes += lnb->len;
716         }
717
718         if (time_after(jiffies, now + 15 * HZ))
719                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
720         else
721                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
722                        (jiffies - now));
723
724         EXIT;
725 cleanup:
726         return rc;
727 }
728
729 int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
730                  int objcount, struct obd_ioobj *obj, int niocount,
731                  struct niobuf_local *res, struct obd_trans_info *oti,
732                  int retcode)
733 {
734         struct obd_device *obd = exp->exp_obd;
735         struct niobuf_local *lnb;
736         struct inode *inode = NULL;
737         int rc = 0, i, cleanup_phase = 0, err, entries = 0;
738         ENTRY;
739
740         LASSERT(objcount == 1);
741         LASSERT(current->journal_info == NULL);
742
743         cleanup_phase = 1;
744         inode = res->dentry->d_inode;
745
746         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
747                 char *end, *buf;
748                 struct dir_entry *de;
749
750                 buf = kmap(lnb->page);
751                 LASSERT(buf != NULL);
752                 end = buf + lnb->len;
753                 de = (struct dir_entry *) buf;
754                 while ((char *) de < end && de->namelen) {
755                         err = fsfilt_add_dir_entry(obd, res->dentry, de->name,
756                                                    de->namelen, de->ino,
757                                                    de->generation, de->mds);
758                         if (err) {
759                                 CERROR("can't add dir entry %*s->%u/%u/%u"
760                                        " to %lu/%u: %d\n",
761                                        de->namelen, de->name,
762                                        de->mds, (unsigned) de->ino,
763                                        (unsigned) de->generation,
764                                        res->dentry->d_inode->i_ino,
765                                        res->dentry->d_inode->i_generation,
766                                        err);
767                                 rc = err;
768                                 break;
769                         }
770                         LASSERT(err == 0);
771                         de = (struct dir_entry *)
772                                 ((char *) de + DIR_REC_LEN(de->namelen));
773                         entries++;
774                 }
775                 kunmap(lnb->page);
776         }
777
778         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++)
779                 __free_page(lnb->page);
780         l_dput(res->dentry);
781
782         RETURN(rc);
783 }
784
785 int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int flags)
786 {
787         struct lmv_obd *lmv;
788         struct mds_obd *mds = &obd->u.mds;
789         int i = mds->mds_num;
790
791         if (flags & REC_REINT_CREATE) { 
792                 i = mds->mds_num;
793         } else if (mds->mds_lmv_exp) {
794                 lmv = &mds->mds_lmv_exp->exp_obd->u.lmv;
795                 i = raw_name2idx(lmv->desc.ld_tgt_count, name, len);
796         }
797         RETURN(i);
798 }
799
800 int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
801                         struct lustre_handle **rlockh)
802 {
803         struct mds_obd *mds = &obd->u.mds;
804         struct mdc_op_data op_data;
805         struct lookup_intent it;
806         struct mea *mea = NULL;
807         int mea_size, rc;
808
809         LASSERT(rlockh != NULL);
810         LASSERT(dentry != NULL);
811         LASSERT(dentry->d_inode != NULL);
812
813         /* clustered MD ? */
814         if (!mds->mds_lmv_obd)
815                 return 0;
816
817         /* a dir can be splitted only */
818         if (!S_ISDIR(dentry->d_inode->i_mode))
819                 return 0;
820
821         rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
822         if (rc)
823                 return rc;
824
825         if (mea == NULL)
826                 return 0;
827         if (mea->mea_count == 0) {
828                 /* this is slave object */
829                 GOTO(cleanup, rc = 0);
830         }
831                 
832         CDEBUG(D_OTHER, "%s: lock slaves for %lu/%lu\n", obd->obd_name,
833                (unsigned long) dentry->d_inode->i_ino,
834                (unsigned long) dentry->d_inode->i_generation);
835
836         OBD_ALLOC(*rlockh, sizeof(struct lustre_handle) * mea->mea_count);
837         if (*rlockh == NULL)
838                 GOTO(cleanup, rc = -ENOMEM);
839         memset(*rlockh, 0, sizeof(struct lustre_handle) * mea->mea_count);
840
841         memset(&op_data, 0, sizeof(op_data));
842         op_data.mea1 = mea;
843         it.it_op = IT_UNLINK;
844         rc = md_enqueue(mds->mds_lmv_exp, LDLM_IBITS, &it, LCK_EX, &op_data,
845                         *rlockh, NULL, 0, ldlm_completion_ast, mds_blocking_ast,
846                         NULL);
847 cleanup:
848         OBD_FREE(mea, mea_size);
849         RETURN(rc);
850 }
851
852 void mds_unlock_slave_objs(struct obd_device *obd, struct dentry *dentry,
853                         struct lustre_handle *lockh)
854 {
855         struct mds_obd *mds = &obd->u.mds;
856         struct mea *mea = NULL;
857         int mea_size, rc, i;
858
859         if (lockh == NULL)
860                 return;
861
862         LASSERT(mds->mds_lmv_obd != NULL);
863         LASSERT(S_ISDIR(dentry->d_inode->i_mode));
864
865         rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
866         if (rc) {
867                 CERROR("locks are leaked\n");
868                 return;
869         }
870         LASSERT(mea_size != 0);
871         LASSERT(mea != NULL);
872         LASSERT(mea->mea_count != 0);
873
874         CDEBUG(D_OTHER, "%s: unlock slaves for %lu/%lu\n", obd->obd_name,
875                (unsigned long) dentry->d_inode->i_ino,
876                (unsigned long) dentry->d_inode->i_generation);
877
878         for (i = 0; i < mea->mea_count; i++) {
879                 if (lockh[i].cookie != 0)
880                         ldlm_lock_decref(lockh + i, LCK_EX);
881         }
882
883         OBD_FREE(lockh, sizeof(struct lustre_handle) * mea->mea_count);
884         OBD_FREE(mea, mea_size);
885         return;
886 }
887
888 int mds_unlink_slave_objs(struct obd_device *obd, struct dentry *dentry)
889 {
890         struct mds_obd *mds = &obd->u.mds;
891         struct ptlrpc_request *req = NULL;
892         struct mdc_op_data op_data;
893         struct mea *mea = NULL;
894         int mea_size, rc;
895
896         /* clustered MD ? */
897         if (!mds->mds_lmv_obd)
898                 return 0;
899
900         /* a dir can be splitted only */
901         if (!S_ISDIR(dentry->d_inode->i_mode))
902                 RETURN(0);
903
904         rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
905         if (rc)
906                 RETURN(rc);
907
908         if (mea == NULL)
909                 return 0;
910         if (mea->mea_count == 0)
911                 GOTO(cleanup, rc = 0);
912
913         CDEBUG(D_OTHER, "%s: unlink slaves for %lu/%lu\n", obd->obd_name,
914                (unsigned long) dentry->d_inode->i_ino,
915                (unsigned long) dentry->d_inode->i_generation);
916
917         memset(&op_data, 0, sizeof(op_data));
918         op_data.mea1 = mea;
919         rc = md_unlink(mds->mds_lmv_exp, &op_data, &req);
920         LASSERT(req == NULL);
921 cleanup:
922         OBD_FREE(mea, mea_size);
923         RETURN(rc);
924 }
925
926 struct ide_tracking {
927         int entries;
928         int empty;
929 };
930
931 int mds_ide_filldir(void *__buf, const char *name, int namelen,
932                     loff_t offset, ino_t ino, unsigned int d_type)
933 {
934         struct ide_tracking *it = __buf;
935
936         if (ino == 0)
937                 return 0;
938
939         it->entries++;
940         if (it->entries > 2)
941                 goto noempty;
942         if (namelen > 2)
943                 goto noempty;
944         if (name[0] == '.' && namelen == 1)
945                 return 0;
946         if (name[0] == '.' && name[1] == '.' && namelen == 2)
947                 return 0;
948 noempty:
949         it->empty = 0;
950         return -ENOTEMPTY;
951 }
952
953 int mds_is_dir_empty(struct obd_device *obd, struct dentry *dentry)
954 {
955         struct ide_tracking it;
956         struct file * file;
957         char *file_name;
958         int nlen, i, rc;
959         
960         it.entries = 0;
961         it.empty = 1;
962
963         nlen = strlen("__iopen__/") + 10 + 1;
964         OBD_ALLOC(file_name, nlen);
965         if (!file_name)
966                 RETURN(-ENOMEM);
967         i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino);
968
969         file = filp_open(file_name, O_RDONLY, 0);
970         if (IS_ERR(file)) {
971                 CERROR("can't open directory %s: %d\n",
972                        file_name, (int) PTR_ERR(file));
973                 GOTO(cleanup, rc = PTR_ERR(file));
974         }
975
976         rc = vfs_readdir(file, mds_ide_filldir, &it);
977         filp_close(file, 0);
978
979         if (it.empty && rc == 0)
980                 rc = 1;
981         else
982                 rc = 0;
983
984 cleanup:
985         OBD_FREE(file_name, nlen);
986         return rc;
987 }
988
989 int mds_lock_and_check_slave(int offset, struct ptlrpc_request *req,
990                              struct lustre_handle *lockh)
991 {
992         struct obd_device *obd = req->rq_export->exp_obd;
993         struct dentry *dentry = NULL;
994         struct lvfs_run_ctxt saved;
995         int cleanup_phase = 0;
996         struct mds_body *body;
997         struct lvfs_ucred uc;
998         int rc, update_mode;
999         ENTRY;
1000
1001         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1002                                   lustre_swab_mds_body);
1003         if (body == NULL) {
1004                 CERROR("Can't swab mds_body\n");
1005                 GOTO(cleanup, rc = -EFAULT);
1006         }
1007         CDEBUG(D_OTHER, "%s: check slave %lu/%lu\n", obd->obd_name,
1008                (unsigned long) body->fid1.id,
1009                (unsigned long) body->fid1.generation);
1010         dentry = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_EX, lockh,
1011                                        &update_mode, NULL, 0,
1012                                        MDS_INODELOCK_UPDATE);
1013         if (IS_ERR(dentry)) {
1014                 CERROR("can't find inode: %d\n", (int) PTR_ERR(dentry));
1015                 GOTO(cleanup, rc = PTR_ERR(dentry));
1016         }
1017         cleanup_phase = 1;
1018
1019         LASSERT(S_ISDIR(dentry->d_inode->i_mode));
1020
1021         uc.luc_fsuid = body->fsuid;
1022         uc.luc_fsgid = body->fsgid;
1023         uc.luc_cap = body->capability;
1024         uc.luc_suppgid1 = body->suppgid;
1025         uc.luc_suppgid2 = -1;
1026         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1027
1028         rc = 0;
1029         if (!mds_is_dir_empty(obd, dentry))
1030                 rc = -ENOTEMPTY;
1031
1032 cleanup:
1033         switch(cleanup_phase) {
1034         case 1:
1035                 if (rc)
1036                         ldlm_lock_decref(lockh, LCK_EX);
1037                 l_dput(dentry);
1038                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1039         default:
1040                 break;
1041         }
1042         RETURN(rc);
1043 }
1044