Whamcloud - gitweb
- moved dir entries are deleted from the original dir (master object)
[fs/lustre-release.git] / lustre / mds / mds_lmv.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDS
29
30 #include <linux/module.h>
31 #include <linux/lustre_mds.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_fsfilt.h>
37
38 #include "mds_internal.h"
39
40
41 /*
42  * TODO:
43  *   - magic in mea struct
44  *   - error handling is totally missed
45  */
46
47 int mds_lmv_connect(struct obd_device *obd, char * lmv_name)
48 {
49         struct mds_obd *mds = &obd->u.mds;
50         struct lustre_handle conn = {0,};
51         int valsize, mdsize;
52         int rc;
53         ENTRY;
54
55         if (IS_ERR(mds->mds_lmv_obd))
56                 RETURN(PTR_ERR(mds->mds_lmv_obd));
57
58         if (mds->mds_lmv_obd)
59                 RETURN(0);
60
61         mds->mds_lmv_obd = class_name2obd(lmv_name);
62         if (!mds->mds_lmv_obd) {
63                 CERROR("MDS cannot locate LMV %s\n",
64                        lmv_name);
65                 mds->mds_lmv_obd = ERR_PTR(-ENOTCONN);
66                 RETURN(-ENOTCONN);
67         }
68
69         rc = obd_connect(&conn, mds->mds_lmv_obd, &obd->obd_uuid);
70         if (rc) {
71                 CERROR("MDS cannot connect to LMV %s (%d)\n",
72                        lmv_name, rc);
73                 mds->mds_lmv_obd = ERR_PTR(rc);
74                 RETURN(rc);
75         }
76         mds->mds_lmv_exp = class_conn2export(&conn);
77         if (mds->mds_lmv_exp == NULL)
78                 CERROR("can't get export!\n");
79
80         rc = obd_register_observer(mds->mds_lmv_obd, obd);
81         if (rc) {
82                 CERROR("MDS cannot register as observer of LMV %s (%d)\n",
83                        lmv_name, rc);
84                 GOTO(err_discon, rc);
85         }
86
87         /* retrieve size of EA */
88         rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsize"), "mdsize", 
89                           &valsize, &mdsize);
90         if (rc) 
91                 GOTO(err_reg, rc);
92         if (mdsize > mds->mds_max_mdsize)
93                 mds->mds_max_mdsize = mdsize;
94
95         /* find our number in LMV cluster */
96         rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsnum"), "mdsnum", 
97                           &valsize, &mdsize);
98         if (rc) 
99                 GOTO(err_reg, rc);
100         mds->mds_num = mdsize;
101
102         rc = obd_set_info(mds->mds_lmv_exp, strlen("inter_mds"),
103                                 "inter_mds", 0, NULL);
104         if (rc)
105                 GOTO(err_reg, rc);
106         RETURN(0);
107
108 err_reg:
109         RETURN(rc);
110
111 err_discon:
112         /* FIXME: cleanups here! */
113         obd_disconnect(mds->mds_lmv_exp, 0);
114         mds->mds_lmv_exp = NULL;
115         mds->mds_lmv_obd = ERR_PTR(rc);
116         RETURN(rc);
117 }
118
119 int mds_lmv_postsetup(struct obd_device *obd)
120 {
121         struct mds_obd *mds = &obd->u.mds;
122         ENTRY;
123         if (mds->mds_lmv_exp)
124                 obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize,
125                                  mds->mds_max_cookiesize);
126         RETURN(0);
127 }
128
129 int mds_lmv_disconnect(struct obd_device *obd, int flags)
130 {
131         struct mds_obd *mds = &obd->u.mds;
132         int rc = 0;
133         ENTRY;
134
135         if (!IS_ERR(mds->mds_lmv_obd) && mds->mds_lmv_exp != NULL) {
136
137                 obd_register_observer(mds->mds_lmv_obd, NULL);
138
139                 rc = obd_disconnect(mds->mds_lmv_exp, flags);
140                 /* if obd_disconnect fails (probably because the
141                  * export was disconnected by class_disconnect_exports)
142                  * then we just need to drop our ref. */
143                 if (rc != 0)
144                         class_export_put(mds->mds_lmv_exp);
145                 mds->mds_lmv_exp = NULL;
146                 mds->mds_lmv_obd = NULL;
147         }
148
149         RETURN(rc);
150 }
151
152
153 int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode,
154                         struct mea **mea, int *mea_size)
155 {
156         struct mds_obd *mds = &obd->u.mds;
157         int rc;
158         ENTRY;
159
160         if (!mds->mds_lmv_obd)
161                 RETURN(0);
162
163         /* first calculate mea size */
164         *mea_size = obd_alloc_diskmd(mds->mds_lmv_exp,
165                                      (struct lov_mds_md **)mea);
166         /* FIXME: error handling here */
167         LASSERT(*mea != NULL);
168
169         down(&inode->i_sem);
170         rc = fsfilt_get_md(obd, inode, *mea, *mea_size);
171         up(&inode->i_sem);
172         /* FIXME: error handling here */
173         if (rc <= 0) {
174                 OBD_FREE(*mea, *mea_size);
175                 *mea = NULL;
176         }
177         if (rc > 0)
178                 rc = 0;
179                         
180         RETURN(rc);
181 }
182
183 struct dir_entry {
184         __u16   namelen;
185         __u16   mds;
186         __u32   ino;
187         __u32   generation;
188         char    name[0];
189 };
190
191 #define DIR_PAD                 4
192 #define DIR_ROUND               (DIR_PAD - 1)
193 #define DIR_REC_LEN(name_len)   (((name_len) + 12 + DIR_ROUND) & ~DIR_ROUND)
194
195 /* this struct holds dir entries for particular MDS to be flushed */
196 struct dir_cache {
197         struct list_head list;
198         void *cur;
199         int free;
200         int cached;
201         struct obdo oa;
202         struct brw_page brwc;
203 };
204
205 struct dirsplit_control {
206         struct obd_device *obd;
207         struct inode *dir;
208         struct dentry *dentry;
209         struct mea *mea;
210         struct dir_cache *cache;
211 };
212
213 static int dc_new_page_to_cache(struct dir_cache * dirc)
214 {
215         struct page *page;
216
217         if (!list_empty(&dirc->list) && dirc->free > sizeof(__u16)) {
218                 /* current page became full, mark the end */
219                 struct dir_entry *de = dirc->cur;
220                 de->namelen = 0;
221         }
222
223         page = alloc_page(GFP_KERNEL);
224         if (page == NULL)
225                 return -ENOMEM;
226         list_add_tail(&page->list, &dirc->list);
227         dirc->cur = page_address(page);
228         dirc->free = PAGE_SIZE;
229         return 0;
230 }
231
232 static int retrieve_generation_numbers(struct dirsplit_control *dc, void *buf)
233 {
234         struct mds_obd *mds = &dc->obd->u.mds;
235         struct dir_entry *de;
236         struct dentry *dentry;
237         char * end;
238         
239         end = buf + PAGE_SIZE;
240         de = (struct dir_entry *) buf;
241         while ((char *) de < end && de->namelen) {
242                 /* lookup an inode */
243                 LASSERT(de->namelen <= 255);
244                 dentry = ll_lookup_one_len(de->name, dc->dentry, de->namelen);
245                 if (IS_ERR(dentry)) {
246                         CERROR("can't lookup %*s: %d\n", de->namelen,
247                                de->name, (int) PTR_ERR(dentry));
248                         goto next;
249                 }
250                 if (dentry->d_inode != NULL) {
251                         de->mds = mds->mds_num;
252                         de->ino = dentry->d_inode->i_ino;
253                         de->generation = dentry->d_inode->i_generation;
254                 } else if (dentry->d_flags & DCACHE_CROSS_REF) {
255                         de->mds = dentry->d_mdsnum;
256                         de->ino = dentry->d_inum;
257                         de->generation = dentry->d_generation;
258                 } else {
259                         CERROR("can't lookup %*s\n", de->namelen, de->name);
260                         goto next;
261                 }
262                 l_dput(dentry);
263
264 next:
265                 de = (struct dir_entry *)
266                         ((char *) de + DIR_REC_LEN(de->namelen));
267         }
268         return 0;
269 }
270
271 static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum)
272 {
273         struct mds_obd *mds = &dc->obd->u.mds;
274         struct list_head *cur, *tmp;
275         struct dir_cache *ca;
276         int rc;
277         ENTRY; 
278         ca = dc->cache + mdsnum;
279
280         if (ca->free > sizeof(__u16)) {
281                 /* current page became full, mark the end */
282                 struct dir_entry *de = ca->cur;
283                 de->namelen = 0;
284         }
285
286         list_for_each_safe(cur, tmp, &ca->list) {
287                 struct page *page;
288
289                 page = list_entry(cur, struct page, list);
290                 LASSERT(page != NULL);
291
292                 retrieve_generation_numbers(dc, page_address(page));
293
294                 ca->brwc.pg = page;
295                 ca->brwc.off = 0;
296                 ca->brwc.count = PAGE_SIZE;
297                 ca->brwc.flag = 0;
298                 ca->oa.o_mds = mdsnum;
299                 rc = obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
300                              (struct lov_stripe_md *) dc->mea,
301                              1, &ca->brwc, NULL);
302                 if (rc)
303                         RETURN(rc);
304
305         }
306         RETURN(0);
307 }
308
309 static int remove_entries_from_orig_dir(struct dirsplit_control *dc, int mdsnum)
310 {
311         struct list_head *cur, *tmp;
312         struct dentry *dentry;
313         struct dir_cache *ca;
314         struct dir_entry *de;
315         struct page *page;
316         char *buf, *end;
317         int rc;
318         ENTRY; 
319
320         ca = dc->cache + mdsnum;
321         list_for_each_safe(cur, tmp, &ca->list) {
322                 page = list_entry(cur, struct page, list);
323                 buf = page_address(page);
324                 end = buf + PAGE_SIZE;
325
326                 de = (struct dir_entry *) buf;
327                 while ((char *) de < end && de->namelen) {
328                         /* lookup an inode */
329                         LASSERT(de->namelen <= 255);
330
331                         dentry = ll_lookup_one_len(de->name, dc->dentry,
332                                                    de->namelen);
333                         if (IS_ERR(dentry)) {
334                                 CERROR("can't lookup %*s: %d\n", de->namelen,
335                                                 de->name, (int) PTR_ERR(dentry));
336                                 goto next;
337                         }
338                         LASSERT(dentry->d_inode != NULL);
339                         rc = fsfilt_del_dir_entry(dc->obd, dentry);
340                         l_dput(dentry);
341 next:
342                         de = (struct dir_entry *)
343                                 ((char *) de + DIR_REC_LEN(de->namelen));
344                 }
345         }
346         RETURN(0);
347 }
348
349 static int filldir(void * __buf, const char * name, int namlen, loff_t offset,
350                    ino_t ino, unsigned int d_type)
351 {
352         struct dirsplit_control *dc = __buf;
353         struct mds_obd *mds = &dc->obd->u.mds;
354         struct dir_cache *ca;
355         struct dir_entry *de;
356         int newmds;
357         char *n;
358         ENTRY;
359
360         if (name[0] == '.' && (namlen == 1 ||
361                                 (namlen == 2 && name[1] == '.'))) {
362                 /* skip special entries */
363                 RETURN(0);
364         }
365
366         LASSERT(dc != NULL);
367         newmds = mea_name2idx(dc->mea, (char *) name, namlen);
368
369         if (newmds == mds->mds_num) {
370                 /* this entry remains on the current MDS, skip moving */
371                 RETURN(0);
372         }
373         
374         OBD_ALLOC(n, namlen + 1);
375         memcpy(n, name, namlen);
376         n[namlen] = (char) 0;
377         
378         OBD_FREE(n, namlen + 1);
379
380         /* check for space in buffer for new entry */
381         ca = dc->cache + newmds;
382         if (DIR_REC_LEN(namlen) > ca->free) {
383                 int err = dc_new_page_to_cache(ca);
384                 LASSERT(err == 0);
385         }
386         
387         /* insert found entry into buffer to be flushed later */
388         /* NOTE: we'll fill generations number later, because we
389          * it's stored in inode, thus we need to lookup an entry,
390          * but directory is locked for readdir(), so we delay this */
391         de = ca->cur;
392         de->ino = ino;
393         de->mds = d_type;
394         de->namelen = namlen;
395         memcpy(de->name, name, namlen);
396         ca->cur += DIR_REC_LEN(namlen);
397         ca->free -= DIR_REC_LEN(namlen);
398         ca->cached++;
399
400         RETURN(0);
401 }
402
403 int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
404                                 struct mea *mea)
405 {
406         struct inode *dir = dentry->d_inode;
407         struct dirsplit_control dc;
408         struct file * file;
409         int err, i, nlen;
410         char *file_name;
411
412         nlen = strlen("__iopen__/") + 10 + 1;
413         OBD_ALLOC(file_name, nlen);
414         if (!file_name)
415                 RETURN(-ENOMEM);
416         i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino);
417
418         file = filp_open(file_name, O_RDONLY, 0);
419         if (IS_ERR(file)) {
420                 CERROR("can't open directory %s: %d\n",
421                                 file_name, (int) PTR_ERR(file));
422                 OBD_FREE(file_name, nlen);
423                 RETURN(PTR_ERR(file));
424         }
425
426         memset(&dc, 0, sizeof(dc));
427         dc.obd = obd;
428         dc.dir = dir;
429         dc.dentry = dentry;
430         dc.mea = mea;
431         OBD_ALLOC(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
432         LASSERT(dc.cache != NULL);
433         for (i = 0; i < mea->mea_count; i++) {
434                 INIT_LIST_HEAD(&dc.cache[i].list);
435                 dc.cache[i].free = 0;
436                 dc.cache[i].cached = 0;
437         }
438
439         err = vfs_readdir(file, filldir, &dc);
440         filp_close(file, 0);
441         if (err)
442                 GOTO(cleanup, err);
443
444         for (i = 0; i < mea->mea_count; i++) {
445                 if (!dc.cache[i].cached)
446                         continue;
447                 err = flush_buffer_onto_mds(&dc, i);
448                 if (err)
449                         GOTO(cleanup, err);
450         }
451
452         for (i = 0; i < mea->mea_count; i++) {
453                 if (!dc.cache[i].cached)
454                         continue;
455                 err = remove_entries_from_orig_dir(&dc, i);
456                 if (err)
457                         GOTO(cleanup, err);
458         }
459
460 cleanup:
461         for (i = 0; i < mea->mea_count; i++) {
462                 struct list_head *cur, *tmp;
463                 if (!dc.cache[i].cached)
464                         continue;
465                 list_for_each_safe(cur, tmp, &dc.cache[i].list) {
466                         struct page *page;
467                         page = list_entry(cur, struct page, list);
468                         list_del(&page->list);
469                         __free_page(page);
470                 }
471         }
472         OBD_FREE(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
473         OBD_FREE(file_name, nlen);
474
475         RETURN(err);
476 }
477
478 #define MAX_DIR_SIZE    (64 * 1024)
479
480 int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry)
481 {
482         struct mds_obd *mds = &obd->u.mds;
483         struct mea *mea = NULL;
484         int rc, size;
485
486         /* clustered MD ? */
487         if (!mds->mds_lmv_obd)
488                 RETURN(0);
489
490         /* inode exist? */
491         if (dentry->d_inode == NULL)
492                 return 0;
493
494         /* a dir can be splitted only */
495         if (!S_ISDIR(dentry->d_inode->i_mode))
496                 return 0;
497
498         /* large enough to be splitted? */
499         if (dentry->d_inode->i_size < MAX_DIR_SIZE)
500                 return 0;
501
502         /* don't split root directory */
503         if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
504                 return 0;
505
506         mds_get_lmv_attr(obd, dentry->d_inode, &mea, &size);
507         if (mea) {
508                 /* already splitted or slave object: shouldn't be splitted */
509                 rc = 0;
510         } else {
511                 /* may be splitted */
512                 rc = 1;
513         }
514
515         if (mea)
516                 OBD_FREE(mea, size);
517         RETURN(rc);
518 }
519
520 /*
521  * must not be called on already splitted directories
522  */
523 int mds_try_to_split_dir(struct obd_device *obd,
524                          struct dentry *dentry, struct mea **mea, int nstripes)
525 {
526         struct inode *dir = dentry->d_inode;
527         struct mds_obd *mds = &obd->u.mds;
528         struct mea *tmea = NULL;
529         struct obdo *oa = NULL;
530         int rc, mea_size = 0;
531         void *handle;
532         ENTRY;
533
534         /* TODO: optimization possible - we already may have mea here */
535         if (!mds_splitting_expected(obd, dentry))
536                 RETURN(0);
537         LASSERT(mea == NULL || *mea == NULL);
538
539         CDEBUG(D_OTHER, "%s: split directory %u/%lu/%lu\n",
540                obd->obd_name, mds->mds_num, dir->i_ino,
541                (unsigned long) dir->i_generation);
542
543         if (mea == NULL)
544                 mea = &tmea;
545         mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL);
546
547         /* FIXME: Actually we may only want to allocate enough space for
548          * necessary amount of stripes, but on the other hand with this
549          * approach of allocating maximal possible amount of MDS slots,
550          * it would be easier to split the dir over more MDSes */
551         rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *) mea);
552         if (!(*mea))
553                 RETURN(-ENOMEM);
554         (*mea)->mea_count = nstripes;
555        
556         /* 1) create directory objects on slave MDS'es */
557         /* FIXME: should this be OBD method? */
558         oa = obdo_alloc();
559         /* FIXME: error handling here */
560         LASSERT(oa != NULL);
561         oa->o_id = dir->i_ino;
562         oa->o_generation = dir->i_generation;
563         obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
564                         OBD_MD_FLMTIME | OBD_MD_FLCTIME |
565                         OBD_MD_FLUID | OBD_MD_FLGID);
566         oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
567         oa->o_valid |= OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
568         oa->o_mode = dir->i_mode;
569         CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
570                         obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
571                         
572         rc = obd_create(mds->mds_lmv_exp, oa,
573                         (struct lov_stripe_md **) mea, NULL);
574         /* FIXME: error handling here */
575         LASSERT(rc == 0);
576         CDEBUG(D_OTHER, "%d dirobjects created\n",
577                (int) (*mea)->mea_count);
578
579         /* 2) update dir attribute */
580         down(&dir->i_sem);
581         handle = fsfilt_start(obd, dir, FSFILT_OP_SETATTR, NULL);
582         LASSERT(!IS_ERR(handle));
583         rc = fsfilt_set_md(obd, dir, handle, *mea, mea_size);
584         LASSERT(rc == 0);
585         fsfilt_commit(obd, mds->mds_sb, dir, handle, 0);
586         LASSERT(rc == 0);
587         up(&dir->i_sem);
588         obdo_free(oa);
589
590         /* 3) read through the dir and distribute it over objects */
591         scan_and_distribute(obd, dentry, *mea);
592
593         if (mea == &tmea)
594                 obd_free_diskmd(mds->mds_lmv_exp,
595                                 (struct lov_mds_md **) mea);
596         RETURN(1);
597 }
598
599 static int filter_start_page_write(struct inode *inode,
600                                    struct niobuf_local *lnb)
601 {
602         struct page *page = alloc_pages(GFP_HIGHUSER, 0);
603         if (page == NULL) {
604                 CERROR("no memory for a temp page\n");
605                 RETURN(lnb->rc = -ENOMEM);
606         }
607         POISON_PAGE(page, 0xf1);
608         page->index = lnb->offset >> PAGE_SHIFT;
609         lnb->page = page;
610
611         return 0;
612 }
613
614 struct dentry *filter_fid2dentry(struct obd_device *obd,
615                                  struct dentry *dir_dentry,
616                                  obd_gr group, obd_id id);
617
618 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
619                 int objcount, struct obd_ioobj *obj,
620                 int niocount, struct niobuf_remote *nb,
621                 struct niobuf_local *res,
622                 struct obd_trans_info *oti)
623 {
624         struct mds_obd *mds = &exp->exp_obd->u.mds;
625         struct niobuf_remote *rnb;
626         struct niobuf_local *lnb = NULL;
627         int rc = 0, i, tot_bytes = 0;
628         unsigned long now = jiffies;
629         struct dentry *dentry;
630         struct ll_fid fid;
631         ENTRY;
632         LASSERT(objcount == 1);
633         LASSERT(obj->ioo_bufcnt > 0);
634
635         memset(res, 0, niocount * sizeof(*res));
636
637         fid.id = obj->ioo_id;
638         fid.generation = obj->ioo_gr;
639         dentry = mds_fid2dentry(mds, &fid, NULL);
640         LASSERT(!IS_ERR(dentry));
641
642         if (dentry->d_inode == NULL) {
643                 CERROR("trying to BRW to non-existent file "LPU64"\n",
644                        obj->ioo_id);
645                 l_dput(dentry);
646                 GOTO(cleanup, rc = -ENOENT);
647         }
648
649         if (time_after(jiffies, now + 15 * HZ))
650                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
651         else
652                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
653                        (jiffies - now));
654
655         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
656              i++, lnb++, rnb++) {
657                 lnb->dentry = dentry;
658                 lnb->offset = rnb->offset;
659                 lnb->len    = rnb->len;
660                 lnb->flags  = rnb->flags;
661
662                 rc = filter_start_page_write(dentry->d_inode, lnb);
663                 if (rc) {
664                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
665                                LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
666                                i, obj->ioo_bufcnt, dentry, rc);
667                         while (lnb-- > res)
668                                 __free_pages(lnb->page, 0);
669                         l_dput(dentry);
670                         GOTO(cleanup, rc);
671                 }
672                 tot_bytes += lnb->len;
673         }
674
675         if (time_after(jiffies, now + 15 * HZ))
676                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
677         else
678                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
679                        (jiffies - now));
680
681         EXIT;
682 cleanup:
683         return rc;
684 }
685
686 int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
687                  int objcount, struct obd_ioobj *obj, int niocount,
688                  struct niobuf_local *res, struct obd_trans_info *oti,
689                  int retcode)
690 {
691         struct obd_device *obd = exp->exp_obd;
692         struct niobuf_local *lnb;
693         struct inode *inode = NULL;
694         int rc = 0, i, cleanup_phase = 0, err, entries = 0;
695         ENTRY;
696
697         LASSERT(objcount == 1);
698         LASSERT(current->journal_info == NULL);
699
700         cleanup_phase = 1;
701         inode = res->dentry->d_inode;
702
703         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
704                 char *end, *buf;
705                 struct dir_entry *de;
706
707                 buf = kmap(lnb->page);
708                 LASSERT(buf != NULL);
709                 end = buf + lnb->len;
710                 de = (struct dir_entry *) buf;
711                 while ((char *) de < end && de->namelen) {
712                         err = fsfilt_add_dir_entry(obd, res->dentry, de->name,
713                                                    de->namelen, de->ino,
714                                                    de->generation, de->mds);
715                         LASSERT(err == 0);
716                         de = (struct dir_entry *)
717                                 ((char *) de + DIR_REC_LEN(de->namelen));
718                         entries++;
719                 }
720                 kunmap(buf);
721         }
722
723         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++)
724                 __free_page(lnb->page);
725         l_dput(res->dentry);
726
727         RETURN(rc);
728 }
729
730 int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int flags)
731 {
732         struct lmv_obd *lmv;
733         struct mds_obd *mds = &obd->u.mds;
734         int i = mds->mds_num;
735
736         if (flags & REC_REINT_CREATE) { 
737                 i = mds->mds_num;
738         } else if (mds->mds_lmv_exp) {
739                 lmv = &mds->mds_lmv_exp->exp_obd->u.lmv;
740                 i = raw_name2idx(lmv->desc.ld_tgt_count, name, len);
741         }
742         RETURN(i);
743 }
744
745 int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
746                         struct lustre_handle **rlockh)
747 {
748         struct mds_obd *mds = &obd->u.mds;
749         struct mdc_op_data op_data;
750         struct lookup_intent it;
751         struct mea *mea = NULL;
752         int mea_size, rc;
753
754         LASSERT(rlockh != NULL);
755         LASSERT(dentry != NULL);
756         LASSERT(dentry->d_inode != NULL);
757
758         /* clustered MD ? */
759         if (!mds->mds_lmv_obd)
760                 return 0;
761
762         /* a dir can be splitted only */
763         if (!S_ISDIR(dentry->d_inode->i_mode))
764                 return 0;
765
766         rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
767         if (rc)
768                 return rc;
769
770         if (mea == NULL)
771                 return 0;
772         if (mea->mea_count == 0) {
773                 /* this is slave object */
774                 GOTO(cleanup, rc = 0);
775         }
776                 
777         CDEBUG(D_OTHER, "%s: lock slaves for %lu/%lu\n", obd->obd_name,
778                (unsigned long) dentry->d_inode->i_ino,
779                (unsigned long) dentry->d_inode->i_generation);
780
781         OBD_ALLOC(*rlockh, sizeof(struct lustre_handle) * mea->mea_count);
782         if (*rlockh == NULL)
783                 GOTO(cleanup, rc = -ENOMEM);
784         memset(*rlockh, 0, sizeof(struct lustre_handle) * mea->mea_count);
785
786         memset(&op_data, 0, sizeof(op_data));
787         op_data.mea1 = mea;
788         it.it_op = IT_UNLINK;
789         rc = md_enqueue(mds->mds_lmv_exp, LDLM_IBITS, &it, LCK_EX, &op_data,
790                         *rlockh, NULL, 0, ldlm_completion_ast, mds_blocking_ast,
791                         NULL);
792 cleanup:
793         OBD_FREE(mea, mea_size);
794         RETURN(rc);
795 }
796
797 void mds_unlock_slave_objs(struct obd_device *obd, struct dentry *dentry,
798                         struct lustre_handle *lockh)
799 {
800         struct mds_obd *mds = &obd->u.mds;
801         struct mea *mea = NULL;
802         int mea_size, rc, i;
803
804         if (lockh == NULL)
805                 return;
806
807         LASSERT(mds->mds_lmv_obd != NULL);
808         LASSERT(S_ISDIR(dentry->d_inode->i_mode));
809
810         rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
811         if (rc) {
812                 CERROR("locks are leaked\n");
813                 return;
814         }
815         LASSERT(mea_size != 0);
816         LASSERT(mea != NULL);
817         LASSERT(mea->mea_count != 0);
818
819         CDEBUG(D_OTHER, "%s: unlock slaves for %lu/%lu\n", obd->obd_name,
820                (unsigned long) dentry->d_inode->i_ino,
821                (unsigned long) dentry->d_inode->i_generation);
822
823         for (i = 0; i < mea->mea_count; i++) {
824                 if (lockh[i].cookie != 0)
825                         ldlm_lock_decref(lockh + i, LCK_EX);
826         }
827
828         OBD_FREE(lockh, sizeof(struct lustre_handle) * mea->mea_count);
829         OBD_FREE(mea, mea_size);
830         return;
831 }
832
833 int mds_unlink_slave_objs(struct obd_device *obd, struct dentry *dentry)
834 {
835         struct mds_obd *mds = &obd->u.mds;
836         struct ptlrpc_request *req = NULL;
837         struct mdc_op_data op_data;
838         struct mea *mea = NULL;
839         int mea_size, rc;
840
841         /* clustered MD ? */
842         if (!mds->mds_lmv_obd)
843                 return 0;
844
845         /* a dir can be splitted only */
846         if (!S_ISDIR(dentry->d_inode->i_mode))
847                 RETURN(0);
848
849         rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
850         if (rc)
851                 RETURN(rc);
852
853         if (mea == NULL)
854                 return 0;
855         if (mea->mea_count == 0)
856                 GOTO(cleanup, rc = 0);
857
858         CDEBUG(D_OTHER, "%s: unlink slaves for %lu/%lu\n", obd->obd_name,
859                (unsigned long) dentry->d_inode->i_ino,
860                (unsigned long) dentry->d_inode->i_generation);
861
862         memset(&op_data, 0, sizeof(op_data));
863         op_data.mea1 = mea;
864         rc = md_unlink(mds->mds_lmv_exp, &op_data, &req);
865         LASSERT(req == NULL);
866 cleanup:
867         OBD_FREE(mea, mea_size);
868         RETURN(rc);
869 }
870
871 struct ide_tracking {
872         int entries;
873         int empty;
874 };
875
876 int mds_ide_filldir(void *__buf, const char *name, int namelen,
877                     loff_t offset, ino_t ino, unsigned int d_type)
878 {
879         struct ide_tracking *it = __buf;
880
881         if (ino == 0)
882                 return 0;
883
884         it->entries++;
885         if (it->entries > 2)
886                 goto noempty;
887         if (namelen > 2)
888                 goto noempty;
889         if (name[0] == '.' && namelen == 1)
890                 return 0;
891         if (name[0] == '.' && name[1] == '.' && namelen == 2)
892                 return 0;
893 noempty:
894         it->empty = 0;
895         return -ENOTEMPTY;
896 }
897
898 int mds_is_dir_empty(struct obd_device *obd, struct dentry *dentry)
899 {
900         struct ide_tracking it;
901         struct file * file;
902         char *file_name;
903         int nlen, i, rc;
904         
905         it.entries = 0;
906         it.empty = 1;
907
908         nlen = strlen("__iopen__/") + 10 + 1;
909         OBD_ALLOC(file_name, nlen);
910         if (!file_name)
911                 RETURN(-ENOMEM);
912         i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino);
913
914         file = filp_open(file_name, O_RDONLY, 0);
915         if (IS_ERR(file)) {
916                 CERROR("can't open directory %s: %d\n",
917                        file_name, (int) PTR_ERR(file));
918                 GOTO(cleanup, rc = PTR_ERR(file));
919         }
920
921         rc = vfs_readdir(file, mds_ide_filldir, &it);
922         filp_close(file, 0);
923
924         if (it.empty && rc == 0)
925                 rc = 1;
926         else
927                 rc = 0;
928
929 cleanup:
930         OBD_FREE(file_name, nlen);
931         return rc;
932 }
933
934 int mds_lock_and_check_slave(int offset, struct ptlrpc_request *req,
935                              struct lustre_handle *lockh)
936 {
937         struct obd_device *obd = req->rq_export->exp_obd;
938         struct dentry *dentry = NULL;
939         struct lvfs_run_ctxt saved;
940         int cleanup_phase = 0;
941         struct mds_body *body;
942         struct lvfs_ucred uc;
943         int rc, update_mode;
944         ENTRY;
945
946         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
947                                   lustre_swab_mds_body);
948         if (body == NULL) {
949                 CERROR("Can't swab mds_body\n");
950                 GOTO(cleanup, rc = -EFAULT);
951         }
952         CDEBUG(D_OTHER, "%s: check slave %lu/%lu\n", obd->obd_name,
953                (unsigned long) body->fid1.id,
954                (unsigned long) body->fid1.generation);
955         dentry = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_EX, lockh,
956                                        &update_mode, NULL, 0,
957                                        MDS_INODELOCK_UPDATE);
958         if (IS_ERR(dentry)) {
959                 CERROR("can't find inode: %d\n", (int) PTR_ERR(dentry));
960                 GOTO(cleanup, rc = PTR_ERR(dentry));
961         }
962         cleanup_phase = 1;
963
964         LASSERT(S_ISDIR(dentry->d_inode->i_mode));
965
966         uc.luc_fsuid = body->fsuid;
967         uc.luc_fsgid = body->fsgid;
968         uc.luc_cap = body->capability;
969         uc.luc_suppgid1 = body->suppgid;
970         uc.luc_suppgid2 = -1;
971         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
972
973         rc = 0;
974         if (!mds_is_dir_empty(obd, dentry))
975                 rc = -ENOTEMPTY;
976
977 cleanup:
978         switch(cleanup_phase) {
979         case 1:
980                 if (rc)
981                         ldlm_lock_decref(lockh, LCK_EX);
982                 l_dput(dentry);
983                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
984         default:
985                 break;
986         }
987         RETURN(rc);
988 }
989