Whamcloud - gitweb
- mds_preprw() handles failed mds_fid2dentry() properly
[fs/lustre-release.git] / lustre / mds / mds_lmv.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDS
29
30 #include <linux/module.h>
31 #include <linux/lustre_mds.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_fsfilt.h>
37
38 #include "mds_internal.h"
39
40
41 /*
42  * TODO:
43  *   - magic in mea struct
44  *   - error handling is totally missed
45  */
46
47 int mds_lmv_connect(struct obd_device *obd, char * lmv_name)
48 {
49         struct mds_obd *mds = &obd->u.mds;
50         struct lustre_handle conn = {0};
51         int rc, valsize, value;
52         ENTRY;
53
54         if (IS_ERR(mds->mds_lmv_obd))
55                 RETURN(PTR_ERR(mds->mds_lmv_obd));
56
57         if (mds->mds_lmv_obd)
58                 RETURN(0);
59
60         mds->mds_lmv_obd = class_name2obd(lmv_name);
61         if (!mds->mds_lmv_obd) {
62                 CERROR("MDS cannot locate LMV %s\n",
63                        lmv_name);
64                 mds->mds_lmv_obd = ERR_PTR(-ENOTCONN);
65                 RETURN(-ENOTCONN);
66         }
67
68         rc = obd_connect(&conn, mds->mds_lmv_obd, &obd->obd_uuid);
69         if (rc) {
70                 CERROR("MDS cannot connect to LMV %s (%d)\n",
71                        lmv_name, rc);
72                 mds->mds_lmv_obd = ERR_PTR(rc);
73                 RETURN(rc);
74         }
75         mds->mds_lmv_exp = class_conn2export(&conn);
76         if (mds->mds_lmv_exp == NULL)
77                 CERROR("can't get export!\n");
78
79         rc = obd_register_observer(mds->mds_lmv_obd, obd);
80         if (rc) {
81                 CERROR("MDS cannot register as observer of LMV %s, "
82                        "rc = %d\n", lmv_name, rc);
83                 GOTO(err_discon, rc);
84         }
85
86         /* retrieve size of EA */
87         rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsize"), "mdsize", 
88                           &valsize, &value);
89         if (rc) 
90                 GOTO(err_reg, rc);
91
92         if (value > mds->mds_max_mdsize)
93                 mds->mds_max_mdsize = value;
94
95         /* find our number in LMV cluster */
96         rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsnum"), "mdsnum", 
97                           &valsize, &value);
98         if (rc) 
99                 GOTO(err_reg, rc);
100         
101         mds->mds_num = value;
102
103         rc = obd_set_info(mds->mds_lmv_exp, strlen("inter_mds"),
104                           "inter_mds", 0, NULL);
105         if (rc)
106                 GOTO(err_reg, rc);
107         
108         RETURN(0);
109
110 err_reg:
111         obd_register_observer(mds->mds_lmv_obd, NULL);
112 err_discon:
113         obd_disconnect(mds->mds_lmv_exp, 0);
114         mds->mds_lmv_exp = NULL;
115         mds->mds_lmv_obd = ERR_PTR(rc);
116         RETURN(rc);
117 }
118
119 int mds_lmv_postsetup(struct obd_device *obd)
120 {
121         struct mds_obd *mds = &obd->u.mds;
122         int rc = 0;
123         ENTRY;
124
125         if (mds->mds_lmv_exp)
126                 rc = obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize,
127                                       mds->mds_max_cookiesize);
128         
129         RETURN(rc);
130 }
131
132 int mds_lmv_disconnect(struct obd_device *obd, int flags)
133 {
134         struct mds_obd *mds = &obd->u.mds;
135         int rc = 0;
136         ENTRY;
137
138         if (!IS_ERR(mds->mds_lmv_obd) && mds->mds_lmv_exp != NULL) {
139                 obd_register_observer(mds->mds_lmv_obd, NULL);
140
141                 /* if obd_disconnect fails (probably because the export was
142                  * disconnected by class_disconnect_exports) then we just need
143                  * to drop our ref. */
144                 rc = obd_disconnect(mds->mds_lmv_exp, flags);
145                 if (rc)
146                         class_export_put(mds->mds_lmv_exp);
147                 
148                 mds->mds_lmv_exp = NULL;
149                 mds->mds_lmv_obd = NULL;
150         }
151
152         RETURN(rc);
153 }
154
155 int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode,
156                      struct mea **mea, int *mea_size)
157 {
158         struct mds_obd *mds = &obd->u.mds;
159         int rc;
160         ENTRY;
161
162         if (!mds->mds_lmv_obd)
163                 RETURN(0);
164
165         /* first calculate mea size */
166         *mea_size = obd_alloc_diskmd(mds->mds_lmv_exp,
167                                      (struct lov_mds_md **)mea);
168         if (*mea_size < 0 || *mea == NULL)
169                 return *mea_size < 0 ? *mea_size : -EINVAL;
170
171         down(&inode->i_sem);
172         rc = fsfilt_get_md(obd, inode, *mea, *mea_size);
173         up(&inode->i_sem);
174
175         if (rc <= 0) {
176                 OBD_FREE(*mea, *mea_size);
177                 *mea = NULL;
178         } else
179                 rc = 0;
180                         
181         RETURN(rc);
182 }
183
184 struct dir_entry {
185         __u16   namelen;
186         __u16   mds;
187         __u32   ino;
188         __u32   generation;
189         char    name[0];
190 };
191
192 #define DIR_PAD                 4
193 #define DIR_ROUND               (DIR_PAD - 1)
194 #define DIR_REC_LEN(name_len)   (((name_len) + 12 + DIR_ROUND) & ~DIR_ROUND)
195
196 /* this struct holds dir entries for particular MDS to be flushed */
197 struct dir_cache {
198         struct list_head list;
199         void *cur;
200         int free;
201         int cached;
202         struct obdo oa;
203         struct brw_page brwc;
204 };
205
206 struct dirsplit_control {
207         struct obd_device *obd;
208         struct inode *dir;
209         struct dentry *dentry;
210         struct mea *mea;
211         struct dir_cache *cache;
212 };
213
214 static int dc_new_page_to_cache(struct dir_cache * dirc)
215 {
216         struct page *page;
217
218         if (!list_empty(&dirc->list) && dirc->free > sizeof(__u16)) {
219                 /* current page became full, mark the end */
220                 struct dir_entry *de = dirc->cur;
221                 de->namelen = 0;
222         }
223
224         page = alloc_page(GFP_KERNEL);
225         if (page == NULL)
226                 return -ENOMEM;
227         list_add_tail(&page->list, &dirc->list);
228         dirc->cur = page_address(page);
229         dirc->free = PAGE_SIZE;
230         return 0;
231 }
232
233 static int retrieve_generation_numbers(struct dirsplit_control *dc, void *buf)
234 {
235         struct mds_obd *mds = &dc->obd->u.mds;
236         struct dir_entry *de;
237         struct dentry *dentry;
238         char * end;
239         
240         end = buf + PAGE_SIZE;
241         de = (struct dir_entry *) buf;
242         while ((char *) de < end && de->namelen) {
243                 /* lookup an inode */
244                 LASSERT(de->namelen <= 255);
245                 dentry = ll_lookup_one_len(de->name, dc->dentry, de->namelen);
246                 if (IS_ERR(dentry)) {
247                         CERROR("can't lookup %*s: %d\n", de->namelen,
248                                de->name, (int) PTR_ERR(dentry));
249                         goto next;
250                 }
251                 if (dentry->d_inode != NULL) {
252                         de->mds = mds->mds_num;
253                         de->ino = dentry->d_inode->i_ino;
254                         de->generation = dentry->d_inode->i_generation;
255                 } else if (dentry->d_flags & DCACHE_CROSS_REF) {
256                         de->mds = dentry->d_mdsnum;
257                         de->ino = dentry->d_inum;
258                         de->generation = dentry->d_generation;
259                 } else {
260                         CERROR("can't lookup %*s\n", de->namelen, de->name);
261                         goto next;
262                 }
263                 l_dput(dentry);
264
265 next:
266                 de = (struct dir_entry *)
267                         ((char *) de + DIR_REC_LEN(de->namelen));
268         }
269         return 0;
270 }
271
272 static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum)
273 {
274         struct mds_obd *mds = &dc->obd->u.mds;
275         struct list_head *cur, *tmp;
276         struct dir_cache *ca;
277         int rc;
278         ENTRY; 
279         ca = dc->cache + mdsnum;
280
281         if (ca->free > sizeof(__u16)) {
282                 /* current page became full, mark the end */
283                 struct dir_entry *de = ca->cur;
284                 de->namelen = 0;
285         }
286
287         list_for_each_safe(cur, tmp, &ca->list) {
288                 struct page *page;
289
290                 page = list_entry(cur, struct page, list);
291                 LASSERT(page != NULL);
292
293                 retrieve_generation_numbers(dc, page_address(page));
294
295                 ca->brwc.pg = page;
296                 ca->brwc.off = 0;
297                 ca->brwc.count = PAGE_SIZE;
298                 ca->brwc.flag = 0;
299                 ca->oa.o_mds = mdsnum;
300                 rc = obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
301                              (struct lov_stripe_md *) dc->mea,
302                              1, &ca->brwc, NULL);
303                 if (rc)
304                         RETURN(rc);
305
306         }
307         RETURN(0);
308 }
309
310 static int remove_entries_from_orig_dir(struct dirsplit_control *dc, int mdsnum)
311 {
312         struct list_head *cur, *tmp;
313         struct dentry *dentry;
314         struct dir_cache *ca;
315         struct dir_entry *de;
316         struct page *page;
317         char *buf, *end;
318         int rc;
319         ENTRY; 
320
321         ca = dc->cache + mdsnum;
322         list_for_each_safe(cur, tmp, &ca->list) {
323                 page = list_entry(cur, struct page, list);
324                 buf = page_address(page);
325                 end = buf + PAGE_SIZE;
326
327                 de = (struct dir_entry *) buf;
328                 while ((char *) de < end && de->namelen) {
329                         /* lookup an inode */
330                         LASSERT(de->namelen <= 255);
331
332                         dentry = ll_lookup_one_len(de->name, dc->dentry,
333                                                    de->namelen);
334                         if (IS_ERR(dentry)) {
335                                 CERROR("can't lookup %*s: %d\n", de->namelen,
336                                                 de->name, (int) PTR_ERR(dentry));
337                                 goto next;
338                         }
339                         rc = fsfilt_del_dir_entry(dc->obd, dentry);
340                         l_dput(dentry);
341 next:
342                         de = (struct dir_entry *)
343                                 ((char *) de + DIR_REC_LEN(de->namelen));
344                 }
345         }
346         RETURN(0);
347 }
348
349 static int filldir(void * __buf, const char * name, int namlen, loff_t offset,
350                    ino_t ino, unsigned int d_type)
351 {
352         struct dirsplit_control *dc = __buf;
353         struct mds_obd *mds = &dc->obd->u.mds;
354         struct dir_cache *ca;
355         struct dir_entry *de;
356         int newmds;
357         char *n;
358         ENTRY;
359
360         if (name[0] == '.' && (namlen == 1 ||
361                                 (namlen == 2 && name[1] == '.'))) {
362                 /* skip special entries */
363                 RETURN(0);
364         }
365
366         LASSERT(dc != NULL);
367         newmds = mea_name2idx(dc->mea, (char *) name, namlen);
368
369         if (newmds == mds->mds_num) {
370                 /* this entry remains on the current MDS, skip moving */
371                 RETURN(0);
372         }
373         
374         OBD_ALLOC(n, namlen + 1);
375         memcpy(n, name, namlen);
376         n[namlen] = (char) 0;
377         
378         OBD_FREE(n, namlen + 1);
379
380         /* check for space in buffer for new entry */
381         ca = dc->cache + newmds;
382         if (DIR_REC_LEN(namlen) > ca->free) {
383                 int err = dc_new_page_to_cache(ca);
384                 LASSERT(err == 0);
385         }
386         
387         /* insert found entry into buffer to be flushed later */
388         /* NOTE: we'll fill generations number later, because we
389          * it's stored in inode, thus we need to lookup an entry,
390          * but directory is locked for readdir(), so we delay this */
391         de = ca->cur;
392         de->ino = ino;
393         de->mds = d_type;
394         de->namelen = namlen;
395         memcpy(de->name, name, namlen);
396         ca->cur += DIR_REC_LEN(namlen);
397         ca->free -= DIR_REC_LEN(namlen);
398         ca->cached++;
399
400         RETURN(0);
401 }
402
403 int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
404                                 struct mea *mea)
405 {
406         struct inode *dir = dentry->d_inode;
407         struct dirsplit_control dc;
408         struct file * file;
409         int err, i, nlen;
410         char *file_name;
411
412         nlen = strlen("__iopen__/") + 10 + 1;
413         OBD_ALLOC(file_name, nlen);
414         if (!file_name)
415                 RETURN(-ENOMEM);
416         i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino);
417
418         file = filp_open(file_name, O_RDONLY, 0);
419         if (IS_ERR(file)) {
420                 CERROR("can't open directory %s: %d\n",
421                                 file_name, (int) PTR_ERR(file));
422                 OBD_FREE(file_name, nlen);
423                 RETURN(PTR_ERR(file));
424         }
425
426         memset(&dc, 0, sizeof(dc));
427         dc.obd = obd;
428         dc.dir = dir;
429         dc.dentry = dentry;
430         dc.mea = mea;
431         OBD_ALLOC(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
432         LASSERT(dc.cache != NULL);
433         for (i = 0; i < mea->mea_count; i++) {
434                 INIT_LIST_HEAD(&dc.cache[i].list);
435                 dc.cache[i].free = 0;
436                 dc.cache[i].cached = 0;
437         }
438
439         err = vfs_readdir(file, filldir, &dc);
440         filp_close(file, 0);
441         if (err)
442                 GOTO(cleanup, err);
443
444         for (i = 0; i < mea->mea_count; i++) {
445                 if (!dc.cache[i].cached)
446                         continue;
447                 err = flush_buffer_onto_mds(&dc, i);
448                 if (err)
449                         GOTO(cleanup, err);
450         }
451
452         for (i = 0; i < mea->mea_count; i++) {
453                 if (!dc.cache[i].cached)
454                         continue;
455                 err = remove_entries_from_orig_dir(&dc, i);
456                 if (err)
457                         GOTO(cleanup, err);
458         }
459
460 cleanup:
461         for (i = 0; i < mea->mea_count; i++) {
462                 struct list_head *cur, *tmp;
463                 if (!dc.cache[i].cached)
464                         continue;
465                 list_for_each_safe(cur, tmp, &dc.cache[i].list) {
466                         struct page *page;
467                         page = list_entry(cur, struct page, list);
468                         list_del(&page->list);
469                         __free_page(page);
470                 }
471         }
472         OBD_FREE(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
473         OBD_FREE(file_name, nlen);
474
475         RETURN(err);
476 }
477
478 #define MAX_DIR_SIZE    (64 * 1024)
479
480 int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry)
481 {
482         struct mds_obd *mds = &obd->u.mds;
483         struct mea *mea = NULL;
484         int rc, size;
485
486         /* clustered MD ? */
487         if (!mds->mds_lmv_obd)
488                 RETURN(MDS_NO_SPLITTABLE);
489
490         /* inode exist? */
491         if (dentry->d_inode == NULL)
492                 return MDS_NO_SPLITTABLE;
493
494         /* a dir can be splitted only */
495         if (!S_ISDIR(dentry->d_inode->i_mode))
496                 return MDS_NO_SPLITTABLE;
497
498         /* don't split root directory */
499         if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
500                 return MDS_NO_SPLITTABLE;
501
502         /* large enough to be splitted? */
503         if (dentry->d_inode->i_size < MAX_DIR_SIZE)
504                 return MDS_NO_SPLIT_EXPECTED;
505
506         mds_get_lmv_attr(obd, dentry->d_inode, &mea, &size);
507         if (mea) {
508                 /* already splitted or slave object: shouldn't be splitted */
509                 rc = MDS_NO_SPLITTABLE;
510         } else {
511                 /* may be splitted */
512                 rc = MDS_EXPECT_SPLIT;
513         }
514
515         if (mea)
516                 OBD_FREE(mea, size);
517         RETURN(rc);
518 }
519
520 /*
521  * must not be called on already splitted directories.
522  */
523 int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
524                          struct mea **mea, int nstripes)
525 {
526         struct inode *dir = dentry->d_inode;
527         struct mds_obd *mds = &obd->u.mds;
528         struct mea *tmea = NULL;
529         struct obdo *oa = NULL;
530         int rc, mea_size = 0;
531         void *handle;
532         ENTRY;
533
534         /* TODO: optimization possible - we already may have mea here */
535         rc = mds_splitting_expected(obd, dentry);
536         if (rc == MDS_NO_SPLITTABLE)
537                 return 0;
538         if (rc == MDS_NO_SPLIT_EXPECTED && nstripes == 0)
539                 return 0;
540         
541         LASSERT(mea == NULL || *mea == NULL);
542
543         CDEBUG(D_OTHER, "%s: split directory %u/%lu/%lu\n",
544                obd->obd_name, mds->mds_num, dir->i_ino,
545                (unsigned long) dir->i_generation);
546
547         if (mea == NULL)
548                 mea = &tmea;
549         mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL);
550
551         /* FIXME: Actually we may only want to allocate enough space for
552          * necessary amount of stripes, but on the other hand with this
553          * approach of allocating maximal possible amount of MDS slots,
554          * it would be easier to split the dir over more MDSes */
555         rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *)mea);
556         if (rc < 0) {
557                 CERROR("obd_alloc_diskmd() failed, error %d.\n", rc);
558                 RETURN(rc);
559         }
560         if (*mea == NULL)
561                 RETURN(-EINVAL);
562
563         (*mea)->mea_count = nstripes;
564        
565         /* 1) create directory objects on slave MDS'es */
566         /* FIXME: should this be OBD method? */
567         oa = obdo_alloc();
568         if (!oa)
569                 RETURN(-ENOMEM);
570
571         oa->o_id = dir->i_ino;
572         oa->o_generation = dir->i_generation;
573
574         obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
575                         OBD_MD_FLMTIME | OBD_MD_FLCTIME |
576                         OBD_MD_FLUID | OBD_MD_FLGID);
577
578         oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
579         oa->o_valid |= OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
580         oa->o_mode = dir->i_mode;
581
582         CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
583                         obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
584                         
585         rc = obd_create(mds->mds_lmv_exp, oa,
586                         (struct lov_stripe_md **)mea, NULL);
587         if (rc)
588                 GOTO(err_oa, rc);
589
590         CDEBUG(D_OTHER, "%d dirobjects created\n", (int)(*mea)->mea_count);
591
592         /* 2) update dir attribute */
593         down(&dir->i_sem);
594         
595         handle = fsfilt_start(obd, dir, FSFILT_OP_SETATTR, NULL);
596         if (IS_ERR(handle)) {
597                 up(&dir->i_sem);
598                 CERROR("fsfilt_start() failed: %d\n", (int) PTR_ERR(handle));
599                 GOTO(err_oa, rc = PTR_ERR(handle));
600         }
601         
602         rc = fsfilt_set_md(obd, dir, handle, *mea, mea_size);
603         if (rc) {
604                 up(&dir->i_sem);
605                 CERROR("fsfilt_set_md() failed, error %d.\n", rc);
606                 GOTO(err_oa, rc);
607         }
608         
609         rc = fsfilt_commit(obd, mds->mds_sb, dir, handle, 0);
610         if (rc) {
611                 up(&dir->i_sem);
612                 CERROR("fsfilt_commit() failed, error %d.\n", rc);
613                 GOTO(err_oa, rc);
614         }
615         
616         up(&dir->i_sem);
617         obdo_free(oa);
618
619         /* 3) read through the dir and distribute it over objects */
620         rc = scan_and_distribute(obd, dentry, *mea);
621         if (mea == &tmea)
622                 obd_free_diskmd(mds->mds_lmv_exp, (struct lov_mds_md **)mea);
623         if (rc) {
624                 CERROR("scan_and_distribute() failed, error %d.\n", rc);
625                 RETURN(rc);
626         }
627
628         RETURN(1);
629
630 err_oa:
631         obdo_free(oa);
632         RETURN(rc);
633 }
634
635 static int filter_start_page_write(struct inode *inode,
636                                    struct niobuf_local *lnb)
637 {
638         struct page *page = alloc_pages(GFP_HIGHUSER, 0);
639         if (page == NULL) {
640                 CERROR("no memory for a temp page\n");
641                 RETURN(lnb->rc = -ENOMEM);
642         }
643         POISON_PAGE(page, 0xf1);
644         page->index = lnb->offset >> PAGE_SHIFT;
645         lnb->page = page;
646
647         return 0;
648 }
649
650 struct dentry *filter_fid2dentry(struct obd_device *obd,
651                                  struct dentry *dir_dentry,
652                                  obd_gr group, obd_id id);
653
654 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
655                 int objcount, struct obd_ioobj *obj,
656                 int niocount, struct niobuf_remote *nb,
657                 struct niobuf_local *res,
658                 struct obd_trans_info *oti)
659 {
660         struct mds_obd *mds = &exp->exp_obd->u.mds;
661         struct niobuf_remote *rnb;
662         struct niobuf_local *lnb = NULL;
663         int rc = 0, i, tot_bytes = 0;
664         unsigned long now = jiffies;
665         struct dentry *dentry;
666         struct ll_fid fid;
667         ENTRY;
668         LASSERT(objcount == 1);
669         LASSERT(obj->ioo_bufcnt > 0);
670
671         memset(res, 0, niocount * sizeof(*res));
672
673         fid.id = obj->ioo_id;
674         fid.generation = obj->ioo_gr;
675         dentry = mds_fid2dentry(mds, &fid, NULL);
676         if (IS_ERR(dentry)) {
677                 CERROR("can't get dentry for %lu/%lu: %d\n",
678                        (unsigned long) fid.id,
679                        (unsigned long) fid.generation, (int) PTR_ERR(dentry));
680                 GOTO(cleanup, rc = (int) PTR_ERR(dentry));
681         }
682
683         if (dentry->d_inode == NULL) {
684                 CERROR("trying to BRW to non-existent file "LPU64"\n",
685                        obj->ioo_id);
686                 l_dput(dentry);
687                 GOTO(cleanup, rc = -ENOENT);
688         }
689
690         if (time_after(jiffies, now + 15 * HZ))
691                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
692         else
693                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
694                        (jiffies - now));
695
696         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
697              i++, lnb++, rnb++) {
698                 lnb->dentry = dentry;
699                 lnb->offset = rnb->offset;
700                 lnb->len    = rnb->len;
701                 lnb->flags  = rnb->flags;
702
703                 rc = filter_start_page_write(dentry->d_inode, lnb);
704                 if (rc) {
705                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
706                                LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
707                                i, obj->ioo_bufcnt, dentry, rc);
708                         while (lnb-- > res)
709                                 __free_pages(lnb->page, 0);
710                         l_dput(dentry);
711                         GOTO(cleanup, rc);
712                 }
713                 tot_bytes += lnb->len;
714         }
715
716         if (time_after(jiffies, now + 15 * HZ))
717                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
718         else
719                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
720                        (jiffies - now));
721
722         EXIT;
723 cleanup:
724         return rc;
725 }
726
727 int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
728                  int objcount, struct obd_ioobj *obj, int niocount,
729                  struct niobuf_local *res, struct obd_trans_info *oti,
730                  int retcode)
731 {
732         struct obd_device *obd = exp->exp_obd;
733         struct niobuf_local *lnb;
734         struct inode *inode = NULL;
735         int rc = 0, i, cleanup_phase = 0, err, entries = 0;
736         ENTRY;
737
738         LASSERT(objcount == 1);
739         LASSERT(current->journal_info == NULL);
740
741         cleanup_phase = 1;
742         inode = res->dentry->d_inode;
743
744         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
745                 char *end, *buf;
746                 struct dir_entry *de;
747
748                 buf = kmap(lnb->page);
749                 LASSERT(buf != NULL);
750                 end = buf + lnb->len;
751                 de = (struct dir_entry *) buf;
752                 while ((char *) de < end && de->namelen) {
753                         err = fsfilt_add_dir_entry(obd, res->dentry, de->name,
754                                                    de->namelen, de->ino,
755                                                    de->generation, de->mds);
756                         LASSERT(err == 0);
757                         de = (struct dir_entry *)
758                                 ((char *) de + DIR_REC_LEN(de->namelen));
759                         entries++;
760                 }
761                 kunmap(lnb->page);
762         }
763
764         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++)
765                 __free_page(lnb->page);
766         l_dput(res->dentry);
767
768         RETURN(rc);
769 }
770
771 int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int flags)
772 {
773         struct lmv_obd *lmv;
774         struct mds_obd *mds = &obd->u.mds;
775         int i = mds->mds_num;
776
777         if (flags & REC_REINT_CREATE) { 
778                 i = mds->mds_num;
779         } else if (mds->mds_lmv_exp) {
780                 lmv = &mds->mds_lmv_exp->exp_obd->u.lmv;
781                 i = raw_name2idx(lmv->desc.ld_tgt_count, name, len);
782         }
783         RETURN(i);
784 }
785
786 int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
787                         struct lustre_handle **rlockh)
788 {
789         struct mds_obd *mds = &obd->u.mds;
790         struct mdc_op_data op_data;
791         struct lookup_intent it;
792         struct mea *mea = NULL;
793         int mea_size, rc;
794
795         LASSERT(rlockh != NULL);
796         LASSERT(dentry != NULL);
797         LASSERT(dentry->d_inode != NULL);
798
799         /* clustered MD ? */
800         if (!mds->mds_lmv_obd)
801                 return 0;
802
803         /* a dir can be splitted only */
804         if (!S_ISDIR(dentry->d_inode->i_mode))
805                 return 0;
806
807         rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
808         if (rc)
809                 return rc;
810
811         if (mea == NULL)
812                 return 0;
813         if (mea->mea_count == 0) {
814                 /* this is slave object */
815                 GOTO(cleanup, rc = 0);
816         }
817                 
818         CDEBUG(D_OTHER, "%s: lock slaves for %lu/%lu\n", obd->obd_name,
819                (unsigned long) dentry->d_inode->i_ino,
820                (unsigned long) dentry->d_inode->i_generation);
821
822         OBD_ALLOC(*rlockh, sizeof(struct lustre_handle) * mea->mea_count);
823         if (*rlockh == NULL)
824                 GOTO(cleanup, rc = -ENOMEM);
825         memset(*rlockh, 0, sizeof(struct lustre_handle) * mea->mea_count);
826
827         memset(&op_data, 0, sizeof(op_data));
828         op_data.mea1 = mea;
829         it.it_op = IT_UNLINK;
830         rc = md_enqueue(mds->mds_lmv_exp, LDLM_IBITS, &it, LCK_EX, &op_data,
831                         *rlockh, NULL, 0, ldlm_completion_ast, mds_blocking_ast,
832                         NULL);
833 cleanup:
834         OBD_FREE(mea, mea_size);
835         RETURN(rc);
836 }
837
838 void mds_unlock_slave_objs(struct obd_device *obd, struct dentry *dentry,
839                         struct lustre_handle *lockh)
840 {
841         struct mds_obd *mds = &obd->u.mds;
842         struct mea *mea = NULL;
843         int mea_size, rc, i;
844
845         if (lockh == NULL)
846                 return;
847
848         LASSERT(mds->mds_lmv_obd != NULL);
849         LASSERT(S_ISDIR(dentry->d_inode->i_mode));
850
851         rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
852         if (rc) {
853                 CERROR("locks are leaked\n");
854                 return;
855         }
856         LASSERT(mea_size != 0);
857         LASSERT(mea != NULL);
858         LASSERT(mea->mea_count != 0);
859
860         CDEBUG(D_OTHER, "%s: unlock slaves for %lu/%lu\n", obd->obd_name,
861                (unsigned long) dentry->d_inode->i_ino,
862                (unsigned long) dentry->d_inode->i_generation);
863
864         for (i = 0; i < mea->mea_count; i++) {
865                 if (lockh[i].cookie != 0)
866                         ldlm_lock_decref(lockh + i, LCK_EX);
867         }
868
869         OBD_FREE(lockh, sizeof(struct lustre_handle) * mea->mea_count);
870         OBD_FREE(mea, mea_size);
871         return;
872 }
873
874 int mds_unlink_slave_objs(struct obd_device *obd, struct dentry *dentry)
875 {
876         struct mds_obd *mds = &obd->u.mds;
877         struct ptlrpc_request *req = NULL;
878         struct mdc_op_data op_data;
879         struct mea *mea = NULL;
880         int mea_size, rc;
881
882         /* clustered MD ? */
883         if (!mds->mds_lmv_obd)
884                 return 0;
885
886         /* a dir can be splitted only */
887         if (!S_ISDIR(dentry->d_inode->i_mode))
888                 RETURN(0);
889
890         rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
891         if (rc)
892                 RETURN(rc);
893
894         if (mea == NULL)
895                 return 0;
896         if (mea->mea_count == 0)
897                 GOTO(cleanup, rc = 0);
898
899         CDEBUG(D_OTHER, "%s: unlink slaves for %lu/%lu\n", obd->obd_name,
900                (unsigned long) dentry->d_inode->i_ino,
901                (unsigned long) dentry->d_inode->i_generation);
902
903         memset(&op_data, 0, sizeof(op_data));
904         op_data.mea1 = mea;
905         rc = md_unlink(mds->mds_lmv_exp, &op_data, &req);
906         LASSERT(req == NULL);
907 cleanup:
908         OBD_FREE(mea, mea_size);
909         RETURN(rc);
910 }
911
912 struct ide_tracking {
913         int entries;
914         int empty;
915 };
916
917 int mds_ide_filldir(void *__buf, const char *name, int namelen,
918                     loff_t offset, ino_t ino, unsigned int d_type)
919 {
920         struct ide_tracking *it = __buf;
921
922         if (ino == 0)
923                 return 0;
924
925         it->entries++;
926         if (it->entries > 2)
927                 goto noempty;
928         if (namelen > 2)
929                 goto noempty;
930         if (name[0] == '.' && namelen == 1)
931                 return 0;
932         if (name[0] == '.' && name[1] == '.' && namelen == 2)
933                 return 0;
934 noempty:
935         it->empty = 0;
936         return -ENOTEMPTY;
937 }
938
939 int mds_is_dir_empty(struct obd_device *obd, struct dentry *dentry)
940 {
941         struct ide_tracking it;
942         struct file * file;
943         char *file_name;
944         int nlen, i, rc;
945         
946         it.entries = 0;
947         it.empty = 1;
948
949         nlen = strlen("__iopen__/") + 10 + 1;
950         OBD_ALLOC(file_name, nlen);
951         if (!file_name)
952                 RETURN(-ENOMEM);
953         i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino);
954
955         file = filp_open(file_name, O_RDONLY, 0);
956         if (IS_ERR(file)) {
957                 CERROR("can't open directory %s: %d\n",
958                        file_name, (int) PTR_ERR(file));
959                 GOTO(cleanup, rc = PTR_ERR(file));
960         }
961
962         rc = vfs_readdir(file, mds_ide_filldir, &it);
963         filp_close(file, 0);
964
965         if (it.empty && rc == 0)
966                 rc = 1;
967         else
968                 rc = 0;
969
970 cleanup:
971         OBD_FREE(file_name, nlen);
972         return rc;
973 }
974
975 int mds_lock_and_check_slave(int offset, struct ptlrpc_request *req,
976                              struct lustre_handle *lockh)
977 {
978         struct obd_device *obd = req->rq_export->exp_obd;
979         struct dentry *dentry = NULL;
980         struct lvfs_run_ctxt saved;
981         int cleanup_phase = 0;
982         struct mds_body *body;
983         struct lvfs_ucred uc;
984         int rc, update_mode;
985         ENTRY;
986
987         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
988                                   lustre_swab_mds_body);
989         if (body == NULL) {
990                 CERROR("Can't swab mds_body\n");
991                 GOTO(cleanup, rc = -EFAULT);
992         }
993         CDEBUG(D_OTHER, "%s: check slave %lu/%lu\n", obd->obd_name,
994                (unsigned long) body->fid1.id,
995                (unsigned long) body->fid1.generation);
996         dentry = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_EX, lockh,
997                                        &update_mode, NULL, 0,
998                                        MDS_INODELOCK_UPDATE);
999         if (IS_ERR(dentry)) {
1000                 CERROR("can't find inode: %d\n", (int) PTR_ERR(dentry));
1001                 GOTO(cleanup, rc = PTR_ERR(dentry));
1002         }
1003         cleanup_phase = 1;
1004
1005         LASSERT(S_ISDIR(dentry->d_inode->i_mode));
1006
1007         uc.luc_fsuid = body->fsuid;
1008         uc.luc_fsgid = body->fsgid;
1009         uc.luc_cap = body->capability;
1010         uc.luc_suppgid1 = body->suppgid;
1011         uc.luc_suppgid2 = -1;
1012         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1013
1014         rc = 0;
1015         if (!mds_is_dir_empty(obd, dentry))
1016                 rc = -ENOTEMPTY;
1017
1018 cleanup:
1019         switch(cleanup_phase) {
1020         case 1:
1021                 if (rc)
1022                         ldlm_lock_decref(lockh, LCK_EX);
1023                 l_dput(dentry);
1024                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1025         default:
1026                 break;
1027         }
1028         RETURN(rc);
1029 }
1030