Whamcloud - gitweb
- added error handling in various places of lmv code.
[fs/lustre-release.git] / lustre / mds / mds_lmv.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDS
29
30 #include <linux/module.h>
31 #include <linux/lustre_mds.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_fsfilt.h>
37
38 #include "mds_internal.h"
39
40
41 /*
42  * TODO:
43  *   - magic in mea struct
44  *   - error handling is totally missed
45  */
46
47 int mds_lmv_connect(struct obd_device *obd, char * lmv_name)
48 {
49         struct mds_obd *mds = &obd->u.mds;
50         struct lustre_handle conn = {0};
51         int rc, valsize, value;
52         ENTRY;
53
54         if (IS_ERR(mds->mds_lmv_obd))
55                 RETURN(PTR_ERR(mds->mds_lmv_obd));
56
57         if (mds->mds_lmv_obd)
58                 RETURN(0);
59
60         mds->mds_lmv_obd = class_name2obd(lmv_name);
61         if (!mds->mds_lmv_obd) {
62                 CERROR("MDS cannot locate LMV %s\n",
63                        lmv_name);
64                 mds->mds_lmv_obd = ERR_PTR(-ENOTCONN);
65                 RETURN(-ENOTCONN);
66         }
67
68         rc = obd_connect(&conn, mds->mds_lmv_obd, &obd->obd_uuid);
69         if (rc) {
70                 CERROR("MDS cannot connect to LMV %s (%d)\n",
71                        lmv_name, rc);
72                 mds->mds_lmv_obd = ERR_PTR(rc);
73                 RETURN(rc);
74         }
75         mds->mds_lmv_exp = class_conn2export(&conn);
76         if (mds->mds_lmv_exp == NULL)
77                 CERROR("can't get export!\n");
78
79         rc = obd_register_observer(mds->mds_lmv_obd, obd);
80         if (rc) {
81                 CERROR("MDS cannot register as observer of LMV %s, "
82                        "rc = %d\n", lmv_name, rc);
83                 GOTO(err_discon, rc);
84         }
85
86         /* retrieve size of EA */
87         rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsize"), "mdsize", 
88                           &valsize, &value);
89         if (rc) 
90                 GOTO(err_reg, rc);
91
92         if (value > mds->mds_max_mdsize)
93                 mds->mds_max_mdsize = value;
94
95         /* find our number in LMV cluster */
96         rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsnum"), "mdsnum", 
97                           &valsize, &value);
98         if (rc) 
99                 GOTO(err_reg, rc);
100         
101         mds->mds_num = value;
102
103         rc = obd_set_info(mds->mds_lmv_exp, strlen("inter_mds"),
104                           "inter_mds", 0, NULL);
105         if (rc)
106                 GOTO(err_reg, rc);
107         
108         RETURN(0);
109
110 err_reg:
111         obd_register_observer(mds->mds_lmv_obd, NULL);
112 err_discon:
113         obd_disconnect(mds->mds_lmv_exp, 0);
114         mds->mds_lmv_exp = NULL;
115         mds->mds_lmv_obd = ERR_PTR(rc);
116         RETURN(rc);
117 }
118
119 int mds_lmv_postsetup(struct obd_device *obd)
120 {
121         struct mds_obd *mds = &obd->u.mds;
122         int rc = 0;
123         ENTRY;
124
125         if (mds->mds_lmv_exp)
126                 rc = obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize,
127                                       mds->mds_max_cookiesize);
128         
129         RETURN(rc);
130 }
131
132 int mds_lmv_disconnect(struct obd_device *obd, int flags)
133 {
134         struct mds_obd *mds = &obd->u.mds;
135         int rc = 0;
136         ENTRY;
137
138         if (!IS_ERR(mds->mds_lmv_obd) && mds->mds_lmv_exp != NULL) {
139                 obd_register_observer(mds->mds_lmv_obd, NULL);
140
141                 /* if obd_disconnect fails (probably because the export was
142                  * disconnected by class_disconnect_exports) then we just need
143                  * to drop our ref. */
144                 rc = obd_disconnect(mds->mds_lmv_exp, flags);
145                 if (rc)
146                         class_export_put(mds->mds_lmv_exp);
147                 
148                 mds->mds_lmv_exp = NULL;
149                 mds->mds_lmv_obd = NULL;
150         }
151
152         RETURN(rc);
153 }
154
155 int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode,
156                      struct mea **mea, int *mea_size)
157 {
158         struct mds_obd *mds = &obd->u.mds;
159         int rc;
160         ENTRY;
161
162         if (!mds->mds_lmv_obd)
163                 RETURN(0);
164
165         /* first calculate mea size */
166         *mea_size = obd_alloc_diskmd(mds->mds_lmv_exp,
167                                      (struct lov_mds_md **)mea);
168         if (*mea_size < 0 || *mea == NULL)
169                 return *mea_size < 0 ? *mea_size : -EINVAL;
170
171         down(&inode->i_sem);
172         rc = fsfilt_get_md(obd, inode, *mea, *mea_size);
173         up(&inode->i_sem);
174
175         if (rc <= 0) {
176                 OBD_FREE(*mea, *mea_size);
177                 *mea = NULL;
178         } else
179                 rc = 0;
180                         
181         RETURN(rc);
182 }
183
184 struct dir_entry {
185         __u16   namelen;
186         __u16   mds;
187         __u32   ino;
188         __u32   generation;
189         char    name[0];
190 };
191
192 #define DIR_PAD                 4
193 #define DIR_ROUND               (DIR_PAD - 1)
194 #define DIR_REC_LEN(name_len)   (((name_len) + 12 + DIR_ROUND) & ~DIR_ROUND)
195
196 /* this struct holds dir entries for particular MDS to be flushed */
197 struct dir_cache {
198         struct list_head list;
199         void *cur;
200         int free;
201         int cached;
202         struct obdo oa;
203         struct brw_page brwc;
204 };
205
206 struct dirsplit_control {
207         struct obd_device *obd;
208         struct inode *dir;
209         struct dentry *dentry;
210         struct mea *mea;
211         struct dir_cache *cache;
212 };
213
214 static int dc_new_page_to_cache(struct dir_cache * dirc)
215 {
216         struct page *page;
217
218         if (!list_empty(&dirc->list) && dirc->free > sizeof(__u16)) {
219                 /* current page became full, mark the end */
220                 struct dir_entry *de = dirc->cur;
221                 de->namelen = 0;
222         }
223
224         page = alloc_page(GFP_KERNEL);
225         if (page == NULL)
226                 return -ENOMEM;
227         list_add_tail(&page->list, &dirc->list);
228         dirc->cur = page_address(page);
229         dirc->free = PAGE_SIZE;
230         return 0;
231 }
232
233 static int retrieve_generation_numbers(struct dirsplit_control *dc, void *buf)
234 {
235         struct mds_obd *mds = &dc->obd->u.mds;
236         struct dir_entry *de;
237         struct dentry *dentry;
238         char * end;
239         
240         end = buf + PAGE_SIZE;
241         de = (struct dir_entry *) buf;
242         while ((char *) de < end && de->namelen) {
243                 /* lookup an inode */
244                 LASSERT(de->namelen <= 255);
245                 dentry = ll_lookup_one_len(de->name, dc->dentry, de->namelen);
246                 if (IS_ERR(dentry)) {
247                         CERROR("can't lookup %*s: %d\n", de->namelen,
248                                de->name, (int) PTR_ERR(dentry));
249                         goto next;
250                 }
251                 if (dentry->d_inode != NULL) {
252                         de->mds = mds->mds_num;
253                         de->ino = dentry->d_inode->i_ino;
254                         de->generation = dentry->d_inode->i_generation;
255                 } else if (dentry->d_flags & DCACHE_CROSS_REF) {
256                         de->mds = dentry->d_mdsnum;
257                         de->ino = dentry->d_inum;
258                         de->generation = dentry->d_generation;
259                 } else {
260                         CERROR("can't lookup %*s\n", de->namelen, de->name);
261                         goto next;
262                 }
263                 l_dput(dentry);
264
265 next:
266                 de = (struct dir_entry *)
267                         ((char *) de + DIR_REC_LEN(de->namelen));
268         }
269         return 0;
270 }
271
272 static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum)
273 {
274         struct mds_obd *mds = &dc->obd->u.mds;
275         struct list_head *cur, *tmp;
276         struct dir_cache *ca;
277         int rc;
278         ENTRY; 
279         ca = dc->cache + mdsnum;
280
281         if (ca->free > sizeof(__u16)) {
282                 /* current page became full, mark the end */
283                 struct dir_entry *de = ca->cur;
284                 de->namelen = 0;
285         }
286
287         list_for_each_safe(cur, tmp, &ca->list) {
288                 struct page *page;
289
290                 page = list_entry(cur, struct page, list);
291                 LASSERT(page != NULL);
292
293                 retrieve_generation_numbers(dc, page_address(page));
294
295                 ca->brwc.pg = page;
296                 ca->brwc.off = 0;
297                 ca->brwc.count = PAGE_SIZE;
298                 ca->brwc.flag = 0;
299                 ca->oa.o_mds = mdsnum;
300                 rc = obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
301                              (struct lov_stripe_md *) dc->mea,
302                              1, &ca->brwc, NULL);
303                 if (rc)
304                         RETURN(rc);
305
306         }
307         RETURN(0);
308 }
309
310 static int remove_entries_from_orig_dir(struct dirsplit_control *dc, int mdsnum)
311 {
312         struct list_head *cur, *tmp;
313         struct dentry *dentry;
314         struct dir_cache *ca;
315         struct dir_entry *de;
316         struct page *page;
317         char *buf, *end;
318         int rc;
319         ENTRY; 
320
321         ca = dc->cache + mdsnum;
322         list_for_each_safe(cur, tmp, &ca->list) {
323                 page = list_entry(cur, struct page, list);
324                 buf = page_address(page);
325                 end = buf + PAGE_SIZE;
326
327                 de = (struct dir_entry *) buf;
328                 while ((char *) de < end && de->namelen) {
329                         /* lookup an inode */
330                         LASSERT(de->namelen <= 255);
331
332                         dentry = ll_lookup_one_len(de->name, dc->dentry,
333                                                    de->namelen);
334                         if (IS_ERR(dentry)) {
335                                 CERROR("can't lookup %*s: %d\n", de->namelen,
336                                                 de->name, (int) PTR_ERR(dentry));
337                                 goto next;
338                         }
339                         LASSERT(dentry->d_inode != NULL);
340                         rc = fsfilt_del_dir_entry(dc->obd, dentry);
341                         l_dput(dentry);
342 next:
343                         de = (struct dir_entry *)
344                                 ((char *) de + DIR_REC_LEN(de->namelen));
345                 }
346         }
347         RETURN(0);
348 }
349
350 static int filldir(void * __buf, const char * name, int namlen, loff_t offset,
351                    ino_t ino, unsigned int d_type)
352 {
353         struct dirsplit_control *dc = __buf;
354         struct mds_obd *mds = &dc->obd->u.mds;
355         struct dir_cache *ca;
356         struct dir_entry *de;
357         int newmds;
358         char *n;
359         ENTRY;
360
361         if (name[0] == '.' && (namlen == 1 ||
362                                 (namlen == 2 && name[1] == '.'))) {
363                 /* skip special entries */
364                 RETURN(0);
365         }
366
367         LASSERT(dc != NULL);
368         newmds = mea_name2idx(dc->mea, (char *) name, namlen);
369
370         if (newmds == mds->mds_num) {
371                 /* this entry remains on the current MDS, skip moving */
372                 RETURN(0);
373         }
374         
375         OBD_ALLOC(n, namlen + 1);
376         memcpy(n, name, namlen);
377         n[namlen] = (char) 0;
378         
379         OBD_FREE(n, namlen + 1);
380
381         /* check for space in buffer for new entry */
382         ca = dc->cache + newmds;
383         if (DIR_REC_LEN(namlen) > ca->free) {
384                 int err = dc_new_page_to_cache(ca);
385                 LASSERT(err == 0);
386         }
387         
388         /* insert found entry into buffer to be flushed later */
389         /* NOTE: we'll fill generations number later, because we
390          * it's stored in inode, thus we need to lookup an entry,
391          * but directory is locked for readdir(), so we delay this */
392         de = ca->cur;
393         de->ino = ino;
394         de->mds = d_type;
395         de->namelen = namlen;
396         memcpy(de->name, name, namlen);
397         ca->cur += DIR_REC_LEN(namlen);
398         ca->free -= DIR_REC_LEN(namlen);
399         ca->cached++;
400
401         RETURN(0);
402 }
403
404 int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
405                                 struct mea *mea)
406 {
407         struct inode *dir = dentry->d_inode;
408         struct dirsplit_control dc;
409         struct file * file;
410         int err, i, nlen;
411         char *file_name;
412
413         nlen = strlen("__iopen__/") + 10 + 1;
414         OBD_ALLOC(file_name, nlen);
415         if (!file_name)
416                 RETURN(-ENOMEM);
417         i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino);
418
419         file = filp_open(file_name, O_RDONLY, 0);
420         if (IS_ERR(file)) {
421                 CERROR("can't open directory %s: %d\n",
422                                 file_name, (int) PTR_ERR(file));
423                 OBD_FREE(file_name, nlen);
424                 RETURN(PTR_ERR(file));
425         }
426
427         memset(&dc, 0, sizeof(dc));
428         dc.obd = obd;
429         dc.dir = dir;
430         dc.dentry = dentry;
431         dc.mea = mea;
432         OBD_ALLOC(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
433         LASSERT(dc.cache != NULL);
434         for (i = 0; i < mea->mea_count; i++) {
435                 INIT_LIST_HEAD(&dc.cache[i].list);
436                 dc.cache[i].free = 0;
437                 dc.cache[i].cached = 0;
438         }
439
440         err = vfs_readdir(file, filldir, &dc);
441         filp_close(file, 0);
442         if (err)
443                 GOTO(cleanup, err);
444
445         for (i = 0; i < mea->mea_count; i++) {
446                 if (!dc.cache[i].cached)
447                         continue;
448                 err = flush_buffer_onto_mds(&dc, i);
449                 if (err)
450                         GOTO(cleanup, err);
451         }
452
453         for (i = 0; i < mea->mea_count; i++) {
454                 if (!dc.cache[i].cached)
455                         continue;
456                 err = remove_entries_from_orig_dir(&dc, i);
457                 if (err)
458                         GOTO(cleanup, err);
459         }
460
461 cleanup:
462         for (i = 0; i < mea->mea_count; i++) {
463                 struct list_head *cur, *tmp;
464                 if (!dc.cache[i].cached)
465                         continue;
466                 list_for_each_safe(cur, tmp, &dc.cache[i].list) {
467                         struct page *page;
468                         page = list_entry(cur, struct page, list);
469                         list_del(&page->list);
470                         __free_page(page);
471                 }
472         }
473         OBD_FREE(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
474         OBD_FREE(file_name, nlen);
475
476         RETURN(err);
477 }
478
479 #define MAX_DIR_SIZE    (64 * 1024)
480
481 int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry)
482 {
483         struct mds_obd *mds = &obd->u.mds;
484         struct mea *mea = NULL;
485         int rc, size;
486
487         /* clustered MD ? */
488         if (!mds->mds_lmv_obd)
489                 RETURN(MDS_NO_SPLITTABLE);
490
491         /* inode exist? */
492         if (dentry->d_inode == NULL)
493                 return MDS_NO_SPLITTABLE;
494
495         /* a dir can be splitted only */
496         if (!S_ISDIR(dentry->d_inode->i_mode))
497                 return MDS_NO_SPLITTABLE;
498
499         /* don't split root directory */
500         if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
501                 return MDS_NO_SPLITTABLE;
502
503         /* large enough to be splitted? */
504         if (dentry->d_inode->i_size < MAX_DIR_SIZE)
505                 return MDS_NO_SPLIT_EXPECTED;
506
507         mds_get_lmv_attr(obd, dentry->d_inode, &mea, &size);
508         if (mea) {
509                 /* already splitted or slave object: shouldn't be splitted */
510                 rc = MDS_NO_SPLITTABLE;
511         } else {
512                 /* may be splitted */
513                 rc = MDS_EXPECT_SPLIT;
514         }
515
516         if (mea)
517                 OBD_FREE(mea, size);
518         RETURN(rc);
519 }
520
521 /*
522  * must not be called on already splitted directories.
523  */
524 int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
525                          struct mea **mea, int nstripes)
526 {
527         struct inode *dir = dentry->d_inode;
528         struct mds_obd *mds = &obd->u.mds;
529         struct mea *tmea = NULL;
530         struct obdo *oa = NULL;
531         int rc, mea_size = 0;
532         void *handle;
533         ENTRY;
534
535         /* TODO: optimization possible - we already may have mea here */
536         if (mds_splitting_expected(obd, dentry) != MDS_EXPECT_SPLIT)
537                 RETURN(0);
538         
539         LASSERT(mea == NULL || *mea == NULL);
540
541         CDEBUG(D_OTHER, "%s: split directory %u/%lu/%lu\n",
542                obd->obd_name, mds->mds_num, dir->i_ino,
543                (unsigned long) dir->i_generation);
544
545         if (mea == NULL)
546                 mea = &tmea;
547         mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL);
548
549         /* FIXME: Actually we may only want to allocate enough space for
550          * necessary amount of stripes, but on the other hand with this
551          * approach of allocating maximal possible amount of MDS slots,
552          * it would be easier to split the dir over more MDSes */
553         rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *)mea);
554         if (rc < 0) {
555                 CERROR("obd_alloc_diskmd() failed, error %d.\n", rc);
556                 RETURN(rc);
557         }
558         if (*mea == NULL)
559                 RETURN(-EINVAL);
560
561         (*mea)->mea_count = nstripes;
562        
563         /* 1) create directory objects on slave MDS'es */
564         /* FIXME: should this be OBD method? */
565         oa = obdo_alloc();
566         if (!oa)
567                 RETURN(-ENOMEM);
568
569         oa->o_id = dir->i_ino;
570         oa->o_generation = dir->i_generation;
571
572         obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
573                         OBD_MD_FLMTIME | OBD_MD_FLCTIME |
574                         OBD_MD_FLUID | OBD_MD_FLGID);
575
576         oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
577         oa->o_valid |= OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
578         oa->o_mode = dir->i_mode;
579
580         CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
581                         obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
582                         
583         rc = obd_create(mds->mds_lmv_exp, oa,
584                         (struct lov_stripe_md **)mea, NULL);
585         if (rc)
586                 GOTO(err_oa, rc);
587
588         CDEBUG(D_OTHER, "%d dirobjects created\n", (int)(*mea)->mea_count);
589
590         /* 2) update dir attribute */
591         down(&dir->i_sem);
592         
593         handle = fsfilt_start(obd, dir, FSFILT_OP_SETATTR, NULL);
594         if (IS_ERR(handle)) {
595                 up(&dir->i_sem);
596                 CERROR("fsfilt_start() failed, error %d.\n",
597                        PTR_ERR(handle));
598                 GOTO(err_oa, rc = PTR_ERR(handle));
599         }
600         
601         rc = fsfilt_set_md(obd, dir, handle, *mea, mea_size);
602         if (rc) {
603                 up(&dir->i_sem);
604                 CERROR("fsfilt_set_md() failed, error %d.\n", rc);
605                 GOTO(err_oa, rc);
606         }
607         
608         rc = fsfilt_commit(obd, mds->mds_sb, dir, handle, 0);
609         if (rc) {
610                 up(&dir->i_sem);
611                 CERROR("fsfilt_commit() failed, error %d.\n", rc);
612                 GOTO(err_oa, rc);
613         }
614         
615         up(&dir->i_sem);
616         obdo_free(oa);
617
618         /* 3) read through the dir and distribute it over objects */
619         rc = scan_and_distribute(obd, dentry, *mea);
620         if (rc) {
621                 CERROR("scan_and_distribute() failed, error %d.\n",
622                        rc);
623                 RETURN(rc);
624         }
625
626         if (mea == &tmea)
627                 obd_free_diskmd(mds->mds_lmv_exp,
628                                 (struct lov_mds_md **)mea);
629         RETURN(1);
630
631 err_oa:
632         obdo_free(oa);
633         RETURN(rc);
634 }
635
636 static int filter_start_page_write(struct inode *inode,
637                                    struct niobuf_local *lnb)
638 {
639         struct page *page = alloc_pages(GFP_HIGHUSER, 0);
640         if (page == NULL) {
641                 CERROR("no memory for a temp page\n");
642                 RETURN(lnb->rc = -ENOMEM);
643         }
644         POISON_PAGE(page, 0xf1);
645         page->index = lnb->offset >> PAGE_SHIFT;
646         lnb->page = page;
647
648         return 0;
649 }
650
651 struct dentry *filter_fid2dentry(struct obd_device *obd,
652                                  struct dentry *dir_dentry,
653                                  obd_gr group, obd_id id);
654
655 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
656                 int objcount, struct obd_ioobj *obj,
657                 int niocount, struct niobuf_remote *nb,
658                 struct niobuf_local *res,
659                 struct obd_trans_info *oti)
660 {
661         struct mds_obd *mds = &exp->exp_obd->u.mds;
662         struct niobuf_remote *rnb;
663         struct niobuf_local *lnb = NULL;
664         int rc = 0, i, tot_bytes = 0;
665         unsigned long now = jiffies;
666         struct dentry *dentry;
667         struct ll_fid fid;
668         ENTRY;
669         LASSERT(objcount == 1);
670         LASSERT(obj->ioo_bufcnt > 0);
671
672         memset(res, 0, niocount * sizeof(*res));
673
674         fid.id = obj->ioo_id;
675         fid.generation = obj->ioo_gr;
676         dentry = mds_fid2dentry(mds, &fid, NULL);
677         LASSERT(!IS_ERR(dentry));
678
679         if (dentry->d_inode == NULL) {
680                 CERROR("trying to BRW to non-existent file "LPU64"\n",
681                        obj->ioo_id);
682                 l_dput(dentry);
683                 GOTO(cleanup, rc = -ENOENT);
684         }
685
686         if (time_after(jiffies, now + 15 * HZ))
687                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
688         else
689                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
690                        (jiffies - now));
691
692         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
693              i++, lnb++, rnb++) {
694                 lnb->dentry = dentry;
695                 lnb->offset = rnb->offset;
696                 lnb->len    = rnb->len;
697                 lnb->flags  = rnb->flags;
698
699                 rc = filter_start_page_write(dentry->d_inode, lnb);
700                 if (rc) {
701                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
702                                LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
703                                i, obj->ioo_bufcnt, dentry, rc);
704                         while (lnb-- > res)
705                                 __free_pages(lnb->page, 0);
706                         l_dput(dentry);
707                         GOTO(cleanup, rc);
708                 }
709                 tot_bytes += lnb->len;
710         }
711
712         if (time_after(jiffies, now + 15 * HZ))
713                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
714         else
715                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
716                        (jiffies - now));
717
718         EXIT;
719 cleanup:
720         return rc;
721 }
722
723 int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
724                  int objcount, struct obd_ioobj *obj, int niocount,
725                  struct niobuf_local *res, struct obd_trans_info *oti,
726                  int retcode)
727 {
728         struct obd_device *obd = exp->exp_obd;
729         struct niobuf_local *lnb;
730         struct inode *inode = NULL;
731         int rc = 0, i, cleanup_phase = 0, err, entries = 0;
732         ENTRY;
733
734         LASSERT(objcount == 1);
735         LASSERT(current->journal_info == NULL);
736
737         cleanup_phase = 1;
738         inode = res->dentry->d_inode;
739
740         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
741                 char *end, *buf;
742                 struct dir_entry *de;
743
744                 buf = kmap(lnb->page);
745                 LASSERT(buf != NULL);
746                 end = buf + lnb->len;
747                 de = (struct dir_entry *) buf;
748                 while ((char *) de < end && de->namelen) {
749                         err = fsfilt_add_dir_entry(obd, res->dentry, de->name,
750                                                    de->namelen, de->ino,
751                                                    de->generation, de->mds);
752                         LASSERT(err == 0);
753                         de = (struct dir_entry *)
754                                 ((char *) de + DIR_REC_LEN(de->namelen));
755                         entries++;
756                 }
757                 kunmap(lnb->page);
758         }
759
760         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++)
761                 __free_page(lnb->page);
762         l_dput(res->dentry);
763
764         RETURN(rc);
765 }
766
767 int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int flags)
768 {
769         struct lmv_obd *lmv;
770         struct mds_obd *mds = &obd->u.mds;
771         int i = mds->mds_num;
772
773         if (flags & REC_REINT_CREATE) { 
774                 i = mds->mds_num;
775         } else if (mds->mds_lmv_exp) {
776                 lmv = &mds->mds_lmv_exp->exp_obd->u.lmv;
777                 i = raw_name2idx(lmv->desc.ld_tgt_count, name, len);
778         }
779         RETURN(i);
780 }
781
782 int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
783                         struct lustre_handle **rlockh)
784 {
785         struct mds_obd *mds = &obd->u.mds;
786         struct mdc_op_data op_data;
787         struct lookup_intent it;
788         struct mea *mea = NULL;
789         int mea_size, rc;
790
791         LASSERT(rlockh != NULL);
792         LASSERT(dentry != NULL);
793         LASSERT(dentry->d_inode != NULL);
794
795         /* clustered MD ? */
796         if (!mds->mds_lmv_obd)
797                 return 0;
798
799         /* a dir can be splitted only */
800         if (!S_ISDIR(dentry->d_inode->i_mode))
801                 return 0;
802
803         rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
804         if (rc)
805                 return rc;
806
807         if (mea == NULL)
808                 return 0;
809         if (mea->mea_count == 0) {
810                 /* this is slave object */
811                 GOTO(cleanup, rc = 0);
812         }
813                 
814         CDEBUG(D_OTHER, "%s: lock slaves for %lu/%lu\n", obd->obd_name,
815                (unsigned long) dentry->d_inode->i_ino,
816                (unsigned long) dentry->d_inode->i_generation);
817
818         OBD_ALLOC(*rlockh, sizeof(struct lustre_handle) * mea->mea_count);
819         if (*rlockh == NULL)
820                 GOTO(cleanup, rc = -ENOMEM);
821         memset(*rlockh, 0, sizeof(struct lustre_handle) * mea->mea_count);
822
823         memset(&op_data, 0, sizeof(op_data));
824         op_data.mea1 = mea;
825         it.it_op = IT_UNLINK;
826         rc = md_enqueue(mds->mds_lmv_exp, LDLM_IBITS, &it, LCK_EX, &op_data,
827                         *rlockh, NULL, 0, ldlm_completion_ast, mds_blocking_ast,
828                         NULL);
829 cleanup:
830         OBD_FREE(mea, mea_size);
831         RETURN(rc);
832 }
833
834 void mds_unlock_slave_objs(struct obd_device *obd, struct dentry *dentry,
835                         struct lustre_handle *lockh)
836 {
837         struct mds_obd *mds = &obd->u.mds;
838         struct mea *mea = NULL;
839         int mea_size, rc, i;
840
841         if (lockh == NULL)
842                 return;
843
844         LASSERT(mds->mds_lmv_obd != NULL);
845         LASSERT(S_ISDIR(dentry->d_inode->i_mode));
846
847         rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
848         if (rc) {
849                 CERROR("locks are leaked\n");
850                 return;
851         }
852         LASSERT(mea_size != 0);
853         LASSERT(mea != NULL);
854         LASSERT(mea->mea_count != 0);
855
856         CDEBUG(D_OTHER, "%s: unlock slaves for %lu/%lu\n", obd->obd_name,
857                (unsigned long) dentry->d_inode->i_ino,
858                (unsigned long) dentry->d_inode->i_generation);
859
860         for (i = 0; i < mea->mea_count; i++) {
861                 if (lockh[i].cookie != 0)
862                         ldlm_lock_decref(lockh + i, LCK_EX);
863         }
864
865         OBD_FREE(lockh, sizeof(struct lustre_handle) * mea->mea_count);
866         OBD_FREE(mea, mea_size);
867         return;
868 }
869
870 int mds_unlink_slave_objs(struct obd_device *obd, struct dentry *dentry)
871 {
872         struct mds_obd *mds = &obd->u.mds;
873         struct ptlrpc_request *req = NULL;
874         struct mdc_op_data op_data;
875         struct mea *mea = NULL;
876         int mea_size, rc;
877
878         /* clustered MD ? */
879         if (!mds->mds_lmv_obd)
880                 return 0;
881
882         /* a dir can be splitted only */
883         if (!S_ISDIR(dentry->d_inode->i_mode))
884                 RETURN(0);
885
886         rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
887         if (rc)
888                 RETURN(rc);
889
890         if (mea == NULL)
891                 return 0;
892         if (mea->mea_count == 0)
893                 GOTO(cleanup, rc = 0);
894
895         CDEBUG(D_OTHER, "%s: unlink slaves for %lu/%lu\n", obd->obd_name,
896                (unsigned long) dentry->d_inode->i_ino,
897                (unsigned long) dentry->d_inode->i_generation);
898
899         memset(&op_data, 0, sizeof(op_data));
900         op_data.mea1 = mea;
901         rc = md_unlink(mds->mds_lmv_exp, &op_data, &req);
902         LASSERT(req == NULL);
903 cleanup:
904         OBD_FREE(mea, mea_size);
905         RETURN(rc);
906 }
907
908 struct ide_tracking {
909         int entries;
910         int empty;
911 };
912
913 int mds_ide_filldir(void *__buf, const char *name, int namelen,
914                     loff_t offset, ino_t ino, unsigned int d_type)
915 {
916         struct ide_tracking *it = __buf;
917
918         if (ino == 0)
919                 return 0;
920
921         it->entries++;
922         if (it->entries > 2)
923                 goto noempty;
924         if (namelen > 2)
925                 goto noempty;
926         if (name[0] == '.' && namelen == 1)
927                 return 0;
928         if (name[0] == '.' && name[1] == '.' && namelen == 2)
929                 return 0;
930 noempty:
931         it->empty = 0;
932         return -ENOTEMPTY;
933 }
934
935 int mds_is_dir_empty(struct obd_device *obd, struct dentry *dentry)
936 {
937         struct ide_tracking it;
938         struct file * file;
939         char *file_name;
940         int nlen, i, rc;
941         
942         it.entries = 0;
943         it.empty = 1;
944
945         nlen = strlen("__iopen__/") + 10 + 1;
946         OBD_ALLOC(file_name, nlen);
947         if (!file_name)
948                 RETURN(-ENOMEM);
949         i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino);
950
951         file = filp_open(file_name, O_RDONLY, 0);
952         if (IS_ERR(file)) {
953                 CERROR("can't open directory %s: %d\n",
954                        file_name, (int) PTR_ERR(file));
955                 GOTO(cleanup, rc = PTR_ERR(file));
956         }
957
958         rc = vfs_readdir(file, mds_ide_filldir, &it);
959         filp_close(file, 0);
960
961         if (it.empty && rc == 0)
962                 rc = 1;
963         else
964                 rc = 0;
965
966 cleanup:
967         OBD_FREE(file_name, nlen);
968         return rc;
969 }
970
971 int mds_lock_and_check_slave(int offset, struct ptlrpc_request *req,
972                              struct lustre_handle *lockh)
973 {
974         struct obd_device *obd = req->rq_export->exp_obd;
975         struct dentry *dentry = NULL;
976         struct lvfs_run_ctxt saved;
977         int cleanup_phase = 0;
978         struct mds_body *body;
979         struct lvfs_ucred uc;
980         int rc, update_mode;
981         ENTRY;
982
983         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
984                                   lustre_swab_mds_body);
985         if (body == NULL) {
986                 CERROR("Can't swab mds_body\n");
987                 GOTO(cleanup, rc = -EFAULT);
988         }
989         CDEBUG(D_OTHER, "%s: check slave %lu/%lu\n", obd->obd_name,
990                (unsigned long) body->fid1.id,
991                (unsigned long) body->fid1.generation);
992         dentry = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_EX, lockh,
993                                        &update_mode, NULL, 0,
994                                        MDS_INODELOCK_UPDATE);
995         if (IS_ERR(dentry)) {
996                 CERROR("can't find inode: %d\n", (int) PTR_ERR(dentry));
997                 GOTO(cleanup, rc = PTR_ERR(dentry));
998         }
999         cleanup_phase = 1;
1000
1001         LASSERT(S_ISDIR(dentry->d_inode->i_mode));
1002
1003         uc.luc_fsuid = body->fsuid;
1004         uc.luc_fsgid = body->fsgid;
1005         uc.luc_cap = body->capability;
1006         uc.luc_suppgid1 = body->suppgid;
1007         uc.luc_suppgid2 = -1;
1008         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1009
1010         rc = 0;
1011         if (!mds_is_dir_empty(obd, dentry))
1012                 rc = -ENOTEMPTY;
1013
1014 cleanup:
1015         switch(cleanup_phase) {
1016         case 1:
1017                 if (rc)
1018                         ldlm_lock_decref(lockh, LCK_EX);
1019                 l_dput(dentry);
1020                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1021         default:
1022                 break;
1023         }
1024         RETURN(rc);
1025 }
1026