Whamcloud - gitweb
b=3920
[fs/lustre-release.git] / lustre / mds / mds_lmv.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDS
29
30 #include <linux/module.h>
31 #include <linux/lustre_mds.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_fsfilt.h>
37
38 #include "mds_internal.h"
39
40
41 /*
42  * TODO:
43  *   - magic in mea struct
44  *   - error handling is totally missed
45  */
46
47 int mds_lmv_connect(struct obd_device *obd, char * lmv_name)
48 {
49         struct mds_obd *mds = &obd->u.mds;
50         struct lustre_handle conn = {0};
51         int rc, valsize, value;
52         ENTRY;
53
54         if (IS_ERR(mds->mds_lmv_obd))
55                 RETURN(PTR_ERR(mds->mds_lmv_obd));
56
57         if (mds->mds_lmv_obd)
58                 RETURN(0);
59
60         mds->mds_lmv_obd = class_name2obd(lmv_name);
61         if (!mds->mds_lmv_obd) {
62                 CERROR("MDS cannot locate LMV %s\n",
63                        lmv_name);
64                 mds->mds_lmv_obd = ERR_PTR(-ENOTCONN);
65                 RETURN(-ENOTCONN);
66         }
67
68         rc = obd_connect(&conn, mds->mds_lmv_obd, &obd->obd_uuid);
69         if (rc) {
70                 CERROR("MDS cannot connect to LMV %s (%d)\n",
71                        lmv_name, rc);
72                 mds->mds_lmv_obd = ERR_PTR(rc);
73                 RETURN(rc);
74         }
75         mds->mds_lmv_exp = class_conn2export(&conn);
76         if (mds->mds_lmv_exp == NULL)
77                 CERROR("can't get export!\n");
78
79         rc = obd_register_observer(mds->mds_lmv_obd, obd);
80         if (rc) {
81                 CERROR("MDS cannot register as observer of LMV %s, "
82                        "rc = %d\n", lmv_name, rc);
83                 GOTO(err_discon, rc);
84         }
85
86         /* retrieve size of EA */
87         rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsize"), "mdsize", 
88                           &valsize, &value);
89         if (rc) 
90                 GOTO(err_reg, rc);
91
92         if (value > mds->mds_max_mdsize)
93                 mds->mds_max_mdsize = value;
94
95         /* find our number in LMV cluster */
96         rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsnum"), "mdsnum", 
97                           &valsize, &value);
98         if (rc) 
99                 GOTO(err_reg, rc);
100         
101         mds->mds_num = value;
102
103         rc = obd_set_info(mds->mds_lmv_exp, strlen("inter_mds"),
104                           "inter_mds", 0, NULL);
105         if (rc)
106                 GOTO(err_reg, rc);
107         
108         RETURN(0);
109
110 err_reg:
111         obd_register_observer(mds->mds_lmv_obd, NULL);
112 err_discon:
113         obd_disconnect(mds->mds_lmv_exp, 0);
114         mds->mds_lmv_exp = NULL;
115         mds->mds_lmv_obd = ERR_PTR(rc);
116         RETURN(rc);
117 }
118
119 int mds_lmv_postsetup(struct obd_device *obd)
120 {
121         struct mds_obd *mds = &obd->u.mds;
122         int rc = 0;
123         ENTRY;
124
125         if (mds->mds_lmv_exp)
126                 rc = obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize,
127                                       mds->mds_max_cookiesize);
128         
129         RETURN(rc);
130 }
131
132 int mds_lmv_disconnect(struct obd_device *obd, int flags)
133 {
134         struct mds_obd *mds = &obd->u.mds;
135         int rc = 0;
136         ENTRY;
137
138         if (!IS_ERR(mds->mds_lmv_obd) && mds->mds_lmv_exp != NULL) {
139                 obd_register_observer(mds->mds_lmv_obd, NULL);
140
141                 /* if obd_disconnect fails (probably because the export was
142                  * disconnected by class_disconnect_exports) then we just need
143                  * to drop our ref. */
144                 rc = obd_disconnect(mds->mds_lmv_exp, flags);
145                 if (rc)
146                         class_export_put(mds->mds_lmv_exp);
147                 
148                 mds->mds_lmv_exp = NULL;
149                 mds->mds_lmv_obd = NULL;
150         }
151
152         RETURN(rc);
153 }
154
155 int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode,
156                      struct mea **mea, int *mea_size)
157 {
158         struct mds_obd *mds = &obd->u.mds;
159         int rc;
160         ENTRY;
161
162         if (!mds->mds_lmv_obd)
163                 RETURN(0);
164
165         /* first calculate mea size */
166         *mea_size = obd_alloc_diskmd(mds->mds_lmv_exp,
167                                      (struct lov_mds_md **)mea);
168         if (*mea_size < 0 || *mea == NULL)
169                 return *mea_size < 0 ? *mea_size : -EINVAL;
170
171         down(&inode->i_sem);
172         rc = fsfilt_get_md(obd, inode, *mea, *mea_size);
173         up(&inode->i_sem);
174
175         if (rc <= 0) {
176                 OBD_FREE(*mea, *mea_size);
177                 *mea = NULL;
178         } else
179                 rc = 0;
180                         
181         RETURN(rc);
182 }
183
184 struct dir_entry {
185         __u16   namelen;
186         __u16   mds;
187         __u32   ino;
188         __u32   generation;
189         char    name[0];
190 };
191
192 #define DIR_PAD                 4
193 #define DIR_ROUND               (DIR_PAD - 1)
194 #define DIR_REC_LEN(name_len)   (((name_len) + 12 + DIR_ROUND) & ~DIR_ROUND)
195
196 /* this struct holds dir entries for particular MDS to be flushed */
197 struct dir_cache {
198         struct list_head list;
199         void *cur;
200         int free;
201         int cached;
202         struct obdo oa;
203         struct brw_page brwc;
204 };
205
206 struct dirsplit_control {
207         struct obd_device *obd;
208         struct inode *dir;
209         struct dentry *dentry;
210         struct mea *mea;
211         struct dir_cache *cache;
212 };
213
214 static int dc_new_page_to_cache(struct dir_cache * dirc)
215 {
216         struct page *page;
217
218         if (!list_empty(&dirc->list) && dirc->free > sizeof(__u16)) {
219                 /* current page became full, mark the end */
220                 struct dir_entry *de = dirc->cur;
221                 de->namelen = 0;
222         }
223
224         page = alloc_page(GFP_KERNEL);
225         if (page == NULL)
226                 return -ENOMEM;
227         list_add_tail(&page->list, &dirc->list);
228         dirc->cur = page_address(page);
229         dirc->free = PAGE_SIZE;
230         return 0;
231 }
232
233 static int retrieve_generation_numbers(struct dirsplit_control *dc, void *buf)
234 {
235         struct mds_obd *mds = &dc->obd->u.mds;
236         struct dir_entry *de;
237         struct dentry *dentry;
238         char * end;
239         
240         end = buf + PAGE_SIZE;
241         de = (struct dir_entry *) buf;
242         while ((char *) de < end && de->namelen) {
243                 /* lookup an inode */
244                 LASSERT(de->namelen <= 255);
245                 dentry = ll_lookup_one_len(de->name, dc->dentry, de->namelen);
246                 if (IS_ERR(dentry)) {
247                         CERROR("can't lookup %*s: %d\n", de->namelen,
248                                de->name, (int) PTR_ERR(dentry));
249                         goto next;
250                 }
251                 if (dentry->d_inode != NULL) {
252                         de->mds = mds->mds_num;
253                         de->ino = dentry->d_inode->i_ino;
254                         de->generation = dentry->d_inode->i_generation;
255                 } else if (dentry->d_flags & DCACHE_CROSS_REF) {
256                         de->mds = dentry->d_mdsnum;
257                         de->ino = dentry->d_inum;
258                         de->generation = dentry->d_generation;
259                 } else {
260                         CERROR("can't lookup %*s\n", de->namelen, de->name);
261                         goto next;
262                 }
263                 l_dput(dentry);
264
265 next:
266                 de = (struct dir_entry *)
267                         ((char *) de + DIR_REC_LEN(de->namelen));
268         }
269         return 0;
270 }
271
272 static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum)
273 {
274         struct mds_obd *mds = &dc->obd->u.mds;
275         struct list_head *cur, *tmp;
276         struct dir_cache *ca;
277         int rc;
278         ENTRY; 
279         ca = dc->cache + mdsnum;
280
281         if (ca->free > sizeof(__u16)) {
282                 /* current page became full, mark the end */
283                 struct dir_entry *de = ca->cur;
284                 de->namelen = 0;
285         }
286
287         list_for_each_safe(cur, tmp, &ca->list) {
288                 struct page *page;
289
290                 page = list_entry(cur, struct page, list);
291                 LASSERT(page != NULL);
292
293                 retrieve_generation_numbers(dc, page_address(page));
294
295                 ca->brwc.pg = page;
296                 ca->brwc.off = 0;
297                 ca->brwc.count = PAGE_SIZE;
298                 ca->brwc.flag = 0;
299                 ca->oa.o_mds = mdsnum;
300                 rc = obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
301                              (struct lov_stripe_md *) dc->mea,
302                              1, &ca->brwc, NULL);
303                 if (rc)
304                         RETURN(rc);
305
306         }
307         RETURN(0);
308 }
309
310 static int remove_entries_from_orig_dir(struct dirsplit_control *dc, int mdsnum)
311 {
312         struct list_head *cur, *tmp;
313         struct dentry *dentry;
314         struct dir_cache *ca;
315         struct dir_entry *de;
316         struct page *page;
317         char *buf, *end;
318         int rc;
319         ENTRY; 
320
321         ca = dc->cache + mdsnum;
322         list_for_each_safe(cur, tmp, &ca->list) {
323                 page = list_entry(cur, struct page, list);
324                 buf = page_address(page);
325                 end = buf + PAGE_SIZE;
326
327                 de = (struct dir_entry *) buf;
328                 while ((char *) de < end && de->namelen) {
329                         /* lookup an inode */
330                         LASSERT(de->namelen <= 255);
331
332                         dentry = ll_lookup_one_len(de->name, dc->dentry,
333                                                    de->namelen);
334                         if (IS_ERR(dentry)) {
335                                 CERROR("can't lookup %*s: %d\n", de->namelen,
336                                                 de->name, (int) PTR_ERR(dentry));
337                                 goto next;
338                         }
339                         rc = fsfilt_del_dir_entry(dc->obd, dentry);
340                         l_dput(dentry);
341 next:
342                         de = (struct dir_entry *)
343                                 ((char *) de + DIR_REC_LEN(de->namelen));
344                 }
345         }
346         RETURN(0);
347 }
348
349 static int filldir(void * __buf, const char * name, int namlen, loff_t offset,
350                    ino_t ino, unsigned int d_type)
351 {
352         struct dirsplit_control *dc = __buf;
353         struct mds_obd *mds = &dc->obd->u.mds;
354         struct dir_cache *ca;
355         struct dir_entry *de;
356         int newmds;
357         char *n;
358         ENTRY;
359
360         if (name[0] == '.' && (namlen == 1 ||
361                                 (namlen == 2 && name[1] == '.'))) {
362                 /* skip special entries */
363                 RETURN(0);
364         }
365
366         LASSERT(dc != NULL);
367         newmds = mea_name2idx(dc->mea, (char *) name, namlen);
368
369         if (newmds == mds->mds_num) {
370                 /* this entry remains on the current MDS, skip moving */
371                 RETURN(0);
372         }
373         
374         OBD_ALLOC(n, namlen + 1);
375         memcpy(n, name, namlen);
376         n[namlen] = (char) 0;
377         
378         OBD_FREE(n, namlen + 1);
379
380         /* check for space in buffer for new entry */
381         ca = dc->cache + newmds;
382         if (DIR_REC_LEN(namlen) > ca->free) {
383                 int err = dc_new_page_to_cache(ca);
384                 LASSERT(err == 0);
385         }
386         
387         /* insert found entry into buffer to be flushed later */
388         /* NOTE: we'll fill generations number later, because we
389          * it's stored in inode, thus we need to lookup an entry,
390          * but directory is locked for readdir(), so we delay this */
391         de = ca->cur;
392         de->ino = ino;
393         de->mds = d_type;
394         de->namelen = namlen;
395         memcpy(de->name, name, namlen);
396         ca->cur += DIR_REC_LEN(namlen);
397         ca->free -= DIR_REC_LEN(namlen);
398         ca->cached++;
399
400         RETURN(0);
401 }
402
403 int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
404                                 struct mea *mea)
405 {
406         struct inode *dir = dentry->d_inode;
407         struct dirsplit_control dc;
408         struct file * file;
409         int err, i, nlen;
410         char *file_name;
411
412         nlen = strlen("__iopen__/") + 10 + 1;
413         OBD_ALLOC(file_name, nlen);
414         if (!file_name)
415                 RETURN(-ENOMEM);
416         i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino);
417
418         file = filp_open(file_name, O_RDONLY, 0);
419         if (IS_ERR(file)) {
420                 CERROR("can't open directory %s: %d\n",
421                                 file_name, (int) PTR_ERR(file));
422                 OBD_FREE(file_name, nlen);
423                 RETURN(PTR_ERR(file));
424         }
425
426         memset(&dc, 0, sizeof(dc));
427         dc.obd = obd;
428         dc.dir = dir;
429         dc.dentry = dentry;
430         dc.mea = mea;
431         OBD_ALLOC(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
432         LASSERT(dc.cache != NULL);
433         for (i = 0; i < mea->mea_count; i++) {
434                 INIT_LIST_HEAD(&dc.cache[i].list);
435                 dc.cache[i].free = 0;
436                 dc.cache[i].cached = 0;
437         }
438
439         err = vfs_readdir(file, filldir, &dc);
440         filp_close(file, 0);
441         if (err)
442                 GOTO(cleanup, err);
443
444         for (i = 0; i < mea->mea_count; i++) {
445                 if (!dc.cache[i].cached)
446                         continue;
447                 err = flush_buffer_onto_mds(&dc, i);
448                 if (err)
449                         GOTO(cleanup, err);
450         }
451
452         for (i = 0; i < mea->mea_count; i++) {
453                 if (!dc.cache[i].cached)
454                         continue;
455                 err = remove_entries_from_orig_dir(&dc, i);
456                 if (err)
457                         GOTO(cleanup, err);
458         }
459
460 cleanup:
461         for (i = 0; i < mea->mea_count; i++) {
462                 struct list_head *cur, *tmp;
463                 if (!dc.cache[i].cached)
464                         continue;
465                 list_for_each_safe(cur, tmp, &dc.cache[i].list) {
466                         struct page *page;
467                         page = list_entry(cur, struct page, list);
468                         list_del(&page->list);
469                         __free_page(page);
470                 }
471         }
472         OBD_FREE(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
473         OBD_FREE(file_name, nlen);
474
475         RETURN(err);
476 }
477
478 #define MAX_DIR_SIZE    (64 * 1024)
479
480 int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry)
481 {
482         struct mds_obd *mds = &obd->u.mds;
483         struct mea *mea = NULL;
484         int rc, size;
485
486         /* clustered MD ? */
487         if (!mds->mds_lmv_obd)
488                 RETURN(MDS_NO_SPLITTABLE);
489
490         /* inode exist? */
491         if (dentry->d_inode == NULL)
492                 return MDS_NO_SPLITTABLE;
493
494         /* a dir can be splitted only */
495         if (!S_ISDIR(dentry->d_inode->i_mode))
496                 return MDS_NO_SPLITTABLE;
497
498         /* don't split root directory */
499         if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
500                 return MDS_NO_SPLITTABLE;
501
502         /* large enough to be splitted? */
503         if (dentry->d_inode->i_size < MAX_DIR_SIZE)
504                 return MDS_NO_SPLIT_EXPECTED;
505
506         mds_get_lmv_attr(obd, dentry->d_inode, &mea, &size);
507         if (mea) {
508                 /* already splitted or slave object: shouldn't be splitted */
509                 rc = MDS_NO_SPLITTABLE;
510         } else {
511                 /* may be splitted */
512                 rc = MDS_EXPECT_SPLIT;
513         }
514
515         if (mea)
516                 OBD_FREE(mea, size);
517         RETURN(rc);
518 }
519
520 /*
521  * must not be called on already splitted directories.
522  */
523 int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
524                          struct mea **mea, int nstripes, int update_mode)
525 {
526         struct inode *dir = dentry->d_inode;
527         struct mds_obd *mds = &obd->u.mds;
528         struct mea *tmea = NULL;
529         struct obdo *oa = NULL;
530         int rc, mea_size = 0;
531         void *handle;
532         ENTRY;
533
534         if (update_mode != LCK_EX)
535                 return 0;
536         /* TODO: optimization possible - we already may have mea here */
537         rc = mds_splitting_expected(obd, dentry);
538         if (rc == MDS_NO_SPLITTABLE)
539                 return 0;
540         if (rc == MDS_NO_SPLIT_EXPECTED && nstripes == 0)
541                 return 0;
542         if (nstripes && nstripes == 1)
543                 return 0;
544         
545         LASSERT(mea == NULL || *mea == NULL);
546
547         CDEBUG(D_OTHER, "%s: split directory %u/%lu/%lu\n",
548                obd->obd_name, mds->mds_num, dir->i_ino,
549                (unsigned long) dir->i_generation);
550
551         if (mea == NULL)
552                 mea = &tmea;
553         mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL);
554
555         /* FIXME: Actually we may only want to allocate enough space for
556          * necessary amount of stripes, but on the other hand with this
557          * approach of allocating maximal possible amount of MDS slots,
558          * it would be easier to split the dir over more MDSes */
559         rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *)mea);
560         if (rc < 0) {
561                 CERROR("obd_alloc_diskmd() failed, error %d.\n", rc);
562                 RETURN(rc);
563         }
564         if (*mea == NULL)
565                 RETURN(-EINVAL);
566
567         (*mea)->mea_count = nstripes;
568        
569         /* 1) create directory objects on slave MDS'es */
570         /* FIXME: should this be OBD method? */
571         oa = obdo_alloc();
572         if (!oa)
573                 RETURN(-ENOMEM);
574
575         oa->o_id = dir->i_ino;
576         oa->o_generation = dir->i_generation;
577
578         obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
579                         OBD_MD_FLMTIME | OBD_MD_FLCTIME |
580                         OBD_MD_FLUID | OBD_MD_FLGID);
581
582         oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
583         oa->o_valid |= OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
584         oa->o_mode = dir->i_mode;
585
586         CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
587                         obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
588                         
589         rc = obd_create(mds->mds_lmv_exp, oa,
590                         (struct lov_stripe_md **)mea, NULL);
591         if (rc)
592                 GOTO(err_oa, rc);
593
594         CDEBUG(D_OTHER, "%d dirobjects created\n", (int)(*mea)->mea_count);
595
596         /* 2) update dir attribute */
597         down(&dir->i_sem);
598         
599         handle = fsfilt_start(obd, dir, FSFILT_OP_SETATTR, NULL);
600         if (IS_ERR(handle)) {
601                 up(&dir->i_sem);
602                 CERROR("fsfilt_start() failed: %d\n", (int) PTR_ERR(handle));
603                 GOTO(err_oa, rc = PTR_ERR(handle));
604         }
605         
606         rc = fsfilt_set_md(obd, dir, handle, *mea, mea_size);
607         if (rc) {
608                 up(&dir->i_sem);
609                 CERROR("fsfilt_set_md() failed, error %d.\n", rc);
610                 GOTO(err_oa, rc);
611         }
612         
613         rc = fsfilt_commit(obd, mds->mds_sb, dir, handle, 0);
614         if (rc) {
615                 up(&dir->i_sem);
616                 CERROR("fsfilt_commit() failed, error %d.\n", rc);
617                 GOTO(err_oa, rc);
618         }
619         
620         up(&dir->i_sem);
621         obdo_free(oa);
622
623         /* 3) read through the dir and distribute it over objects */
624         rc = scan_and_distribute(obd, dentry, *mea);
625         if (mea == &tmea)
626                 obd_free_diskmd(mds->mds_lmv_exp, (struct lov_mds_md **)mea);
627         if (rc) {
628                 CERROR("scan_and_distribute() failed, error %d.\n", rc);
629                 RETURN(rc);
630         }
631
632         RETURN(1);
633
634 err_oa:
635         obdo_free(oa);
636         RETURN(rc);
637 }
638
639 static int filter_start_page_write(struct inode *inode,
640                                    struct niobuf_local *lnb)
641 {
642         struct page *page = alloc_pages(GFP_HIGHUSER, 0);
643         if (page == NULL) {
644                 CERROR("no memory for a temp page\n");
645                 RETURN(lnb->rc = -ENOMEM);
646         }
647         POISON_PAGE(page, 0xf1);
648         page->index = lnb->offset >> PAGE_SHIFT;
649         lnb->page = page;
650
651         return 0;
652 }
653
654 struct dentry *filter_fid2dentry(struct obd_device *obd,
655                                  struct dentry *dir_dentry,
656                                  obd_gr group, obd_id id);
657
658 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
659                 int objcount, struct obd_ioobj *obj,
660                 int niocount, struct niobuf_remote *nb,
661                 struct niobuf_local *res,
662                 struct obd_trans_info *oti)
663 {
664         struct mds_obd *mds = &exp->exp_obd->u.mds;
665         struct niobuf_remote *rnb;
666         struct niobuf_local *lnb = NULL;
667         int rc = 0, i, tot_bytes = 0;
668         unsigned long now = jiffies;
669         struct dentry *dentry;
670         struct ll_fid fid;
671         ENTRY;
672         LASSERT(objcount == 1);
673         LASSERT(obj->ioo_bufcnt > 0);
674
675         memset(res, 0, niocount * sizeof(*res));
676
677         fid.id = obj->ioo_id;
678         fid.generation = obj->ioo_gr;
679         dentry = mds_fid2dentry(mds, &fid, NULL);
680         if (IS_ERR(dentry)) {
681                 CERROR("can't get dentry for "LPU64"/%u: %d\n",
682                        fid.id, fid.generation, (int) PTR_ERR(dentry));
683                 GOTO(cleanup, rc = (int) PTR_ERR(dentry));
684         }
685
686         if (dentry->d_inode == NULL) {
687                 CERROR("trying to BRW to non-existent file "LPU64"\n",
688                        obj->ioo_id);
689                 l_dput(dentry);
690                 GOTO(cleanup, rc = -ENOENT);
691         }
692
693         if (time_after(jiffies, now + 15 * HZ))
694                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
695         else
696                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
697                        (jiffies - now));
698
699         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
700              i++, lnb++, rnb++) {
701                 lnb->dentry = dentry;
702                 lnb->offset = rnb->offset;
703                 lnb->len    = rnb->len;
704                 lnb->flags  = rnb->flags;
705
706                 rc = filter_start_page_write(dentry->d_inode, lnb);
707                 if (rc) {
708                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
709                                LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
710                                i, obj->ioo_bufcnt, dentry, rc);
711                         while (lnb-- > res)
712                                 __free_pages(lnb->page, 0);
713                         l_dput(dentry);
714                         GOTO(cleanup, rc);
715                 }
716                 tot_bytes += lnb->len;
717         }
718
719         if (time_after(jiffies, now + 15 * HZ))
720                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
721         else
722                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
723                        (jiffies - now));
724
725         EXIT;
726 cleanup:
727         return rc;
728 }
729
730 int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
731                  int objcount, struct obd_ioobj *obj, int niocount,
732                  struct niobuf_local *res, struct obd_trans_info *oti,
733                  int retcode)
734 {
735         struct obd_device *obd = exp->exp_obd;
736         struct niobuf_local *lnb;
737         struct inode *inode = NULL;
738         int rc = 0, i, cleanup_phase = 0, err, entries = 0;
739         ENTRY;
740
741         LASSERT(objcount == 1);
742         LASSERT(current->journal_info == NULL);
743
744         cleanup_phase = 1;
745         inode = res->dentry->d_inode;
746
747         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
748                 char *end, *buf;
749                 struct dir_entry *de;
750
751                 buf = kmap(lnb->page);
752                 LASSERT(buf != NULL);
753                 end = buf + lnb->len;
754                 de = (struct dir_entry *) buf;
755                 while ((char *) de < end && de->namelen) {
756                         err = fsfilt_add_dir_entry(obd, res->dentry, de->name,
757                                                    de->namelen, de->ino,
758                                                    de->generation, de->mds);
759                         if (err) {
760                                 CERROR("can't add dir entry %*s->%u/%u/%u"
761                                        " to %lu/%u: %d\n",
762                                        de->namelen, de->name,
763                                        de->mds, (unsigned) de->ino,
764                                        (unsigned) de->generation,
765                                        res->dentry->d_inode->i_ino,
766                                        res->dentry->d_inode->i_generation,
767                                        err);
768                                 rc = err;
769                                 break;
770                         }
771                         LASSERT(err == 0);
772                         de = (struct dir_entry *)
773                                 ((char *) de + DIR_REC_LEN(de->namelen));
774                         entries++;
775                 }
776                 kunmap(lnb->page);
777         }
778
779         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++)
780                 __free_page(lnb->page);
781         l_dput(res->dentry);
782
783         RETURN(rc);
784 }
785
786 int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int flags)
787 {
788         struct lmv_obd *lmv;
789         struct mds_obd *mds = &obd->u.mds;
790         int i = mds->mds_num;
791
792         if (flags & REC_REINT_CREATE) { 
793                 i = mds->mds_num;
794         } else if (mds->mds_lmv_exp) {
795                 lmv = &mds->mds_lmv_exp->exp_obd->u.lmv;
796                 i = raw_name2idx(lmv->desc.ld_tgt_count, name, len);
797         }
798         RETURN(i);
799 }
800
801 int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
802                         struct lustre_handle **rlockh)
803 {
804         struct mds_obd *mds = &obd->u.mds;
805         struct mdc_op_data op_data;
806         struct lookup_intent it;
807         struct mea *mea = NULL;
808         int mea_size, rc;
809
810         LASSERT(rlockh != NULL);
811         LASSERT(dentry != NULL);
812         LASSERT(dentry->d_inode != NULL);
813
814         /* clustered MD ? */
815         if (!mds->mds_lmv_obd)
816                 return 0;
817
818         /* a dir can be splitted only */
819         if (!S_ISDIR(dentry->d_inode->i_mode))
820                 return 0;
821
822         rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
823         if (rc)
824                 return rc;
825
826         if (mea == NULL)
827                 return 0;
828         if (mea->mea_count == 0) {
829                 /* this is slave object */
830                 GOTO(cleanup, rc = 0);
831         }
832                 
833         CDEBUG(D_OTHER, "%s: lock slaves for %lu/%lu\n", obd->obd_name,
834                (unsigned long) dentry->d_inode->i_ino,
835                (unsigned long) dentry->d_inode->i_generation);
836
837         OBD_ALLOC(*rlockh, sizeof(struct lustre_handle) * mea->mea_count);
838         if (*rlockh == NULL)
839                 GOTO(cleanup, rc = -ENOMEM);
840         memset(*rlockh, 0, sizeof(struct lustre_handle) * mea->mea_count);
841
842         memset(&op_data, 0, sizeof(op_data));
843         op_data.mea1 = mea;
844         it.it_op = IT_UNLINK;
845         rc = md_enqueue(mds->mds_lmv_exp, LDLM_IBITS, &it, LCK_EX, &op_data,
846                         *rlockh, NULL, 0, ldlm_completion_ast, mds_blocking_ast,
847                         NULL);
848 cleanup:
849         OBD_FREE(mea, mea_size);
850         RETURN(rc);
851 }
852
853 void mds_unlock_slave_objs(struct obd_device *obd, struct dentry *dentry,
854                         struct lustre_handle *lockh)
855 {
856         struct mds_obd *mds = &obd->u.mds;
857         struct mea *mea = NULL;
858         int mea_size, rc, i;
859
860         if (lockh == NULL)
861                 return;
862
863         LASSERT(mds->mds_lmv_obd != NULL);
864         LASSERT(S_ISDIR(dentry->d_inode->i_mode));
865
866         rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
867         if (rc) {
868                 CERROR("locks are leaked\n");
869                 return;
870         }
871         LASSERT(mea_size != 0);
872         LASSERT(mea != NULL);
873         LASSERT(mea->mea_count != 0);
874
875         CDEBUG(D_OTHER, "%s: unlock slaves for %lu/%lu\n", obd->obd_name,
876                (unsigned long) dentry->d_inode->i_ino,
877                (unsigned long) dentry->d_inode->i_generation);
878
879         for (i = 0; i < mea->mea_count; i++) {
880                 if (lockh[i].cookie != 0)
881                         ldlm_lock_decref(lockh + i, LCK_EX);
882         }
883
884         OBD_FREE(lockh, sizeof(struct lustre_handle) * mea->mea_count);
885         OBD_FREE(mea, mea_size);
886         return;
887 }
888
889 int mds_unlink_slave_objs(struct obd_device *obd, struct dentry *dentry)
890 {
891         struct mds_obd *mds = &obd->u.mds;
892         struct ptlrpc_request *req = NULL;
893         struct mdc_op_data op_data;
894         struct mea *mea = NULL;
895         int mea_size, rc;
896
897         /* clustered MD ? */
898         if (!mds->mds_lmv_obd)
899                 return 0;
900
901         /* a dir can be splitted only */
902         if (!S_ISDIR(dentry->d_inode->i_mode))
903                 RETURN(0);
904
905         rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
906         if (rc)
907                 RETURN(rc);
908
909         if (mea == NULL)
910                 return 0;
911         if (mea->mea_count == 0)
912                 GOTO(cleanup, rc = 0);
913
914         CDEBUG(D_OTHER, "%s: unlink slaves for %lu/%lu\n", obd->obd_name,
915                (unsigned long) dentry->d_inode->i_ino,
916                (unsigned long) dentry->d_inode->i_generation);
917
918         memset(&op_data, 0, sizeof(op_data));
919         op_data.mea1 = mea;
920         rc = md_unlink(mds->mds_lmv_exp, &op_data, &req);
921         LASSERT(req == NULL);
922 cleanup:
923         OBD_FREE(mea, mea_size);
924         RETURN(rc);
925 }
926
927 struct ide_tracking {
928         int entries;
929         int empty;
930 };
931
932 int mds_ide_filldir(void *__buf, const char *name, int namelen,
933                     loff_t offset, ino_t ino, unsigned int d_type)
934 {
935         struct ide_tracking *it = __buf;
936
937         if (ino == 0)
938                 return 0;
939
940         it->entries++;
941         if (it->entries > 2)
942                 goto noempty;
943         if (namelen > 2)
944                 goto noempty;
945         if (name[0] == '.' && namelen == 1)
946                 return 0;
947         if (name[0] == '.' && name[1] == '.' && namelen == 2)
948                 return 0;
949 noempty:
950         it->empty = 0;
951         return -ENOTEMPTY;
952 }
953
954 int mds_is_dir_empty(struct obd_device *obd, struct dentry *dentry)
955 {
956         struct ide_tracking it;
957         struct file * file;
958         char *file_name;
959         int nlen, i, rc;
960         
961         it.entries = 0;
962         it.empty = 1;
963
964         nlen = strlen("__iopen__/") + 10 + 1;
965         OBD_ALLOC(file_name, nlen);
966         if (!file_name)
967                 RETURN(-ENOMEM);
968         i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino);
969
970         file = filp_open(file_name, O_RDONLY, 0);
971         if (IS_ERR(file)) {
972                 CERROR("can't open directory %s: %d\n",
973                        file_name, (int) PTR_ERR(file));
974                 GOTO(cleanup, rc = PTR_ERR(file));
975         }
976
977         rc = vfs_readdir(file, mds_ide_filldir, &it);
978         filp_close(file, 0);
979
980         if (it.empty && rc == 0)
981                 rc = 1;
982         else
983                 rc = 0;
984
985 cleanup:
986         OBD_FREE(file_name, nlen);
987         return rc;
988 }
989
990 int mds_lock_and_check_slave(int offset, struct ptlrpc_request *req,
991                              struct lustre_handle *lockh)
992 {
993         struct obd_device *obd = req->rq_export->exp_obd;
994         struct dentry *dentry = NULL;
995         struct lvfs_run_ctxt saved;
996         int cleanup_phase = 0;
997         struct mds_body *body;
998         struct lvfs_ucred uc;
999         int rc, update_mode;
1000         ENTRY;
1001
1002         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1003                                   lustre_swab_mds_body);
1004         if (body == NULL) {
1005                 CERROR("Can't swab mds_body\n");
1006                 GOTO(cleanup, rc = -EFAULT);
1007         }
1008         CDEBUG(D_OTHER, "%s: check slave %lu/%lu\n", obd->obd_name,
1009                (unsigned long) body->fid1.id,
1010                (unsigned long) body->fid1.generation);
1011         dentry = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_EX, lockh,
1012                                        &update_mode, NULL, 0,
1013                                        MDS_INODELOCK_UPDATE);
1014         if (IS_ERR(dentry)) {
1015                 CERROR("can't find inode: %d\n", (int) PTR_ERR(dentry));
1016                 GOTO(cleanup, rc = PTR_ERR(dentry));
1017         }
1018         cleanup_phase = 1;
1019
1020         LASSERT(S_ISDIR(dentry->d_inode->i_mode));
1021
1022         uc.luc_fsuid = body->fsuid;
1023         uc.luc_fsgid = body->fsgid;
1024         uc.luc_cap = body->capability;
1025         uc.luc_suppgid1 = body->suppgid;
1026         uc.luc_suppgid2 = -1;
1027         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1028
1029         rc = 0;
1030         if (!mds_is_dir_empty(obd, dentry))
1031                 rc = -ENOTEMPTY;
1032
1033 cleanup:
1034         switch(cleanup_phase) {
1035         case 1:
1036                 if (rc)
1037                         ldlm_lock_decref(lockh, LCK_EX);
1038                 l_dput(dentry);
1039                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1040         default:
1041                 break;
1042         }
1043         RETURN(rc);
1044 }
1045