Whamcloud - gitweb
several fixes: expiry timer adjusted.
[fs/lustre-release.git] / lustre / mds / mds_fs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mds/mds_fs.c
5  *  Lustre Metadata Server (MDS) filesystem interface code
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef EXPORT_SYMTAB
27 # define EXPORT_SYMTAB
28 #endif
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/module.h>
32 #include <linux/kmod.h>
33 #include <linux/version.h>
34 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
35 #include <linux/mount.h>
36 #endif
37 #include <linux/lustre_mds.h>
38 #include <linux/obd_class.h>
39 #include <linux/obd_support.h>
40 #include <linux/lustre_lib.h>
41 #include <linux/lustre_fsfilt.h>
42 #include <libcfs/list.h>
43
44 #include <linux/lustre_smfs.h>
45 #include "mds_internal.h"
46
47 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
48 #define MDS_MAX_CLIENTS (PAGE_SIZE * 8)
49
50 #define LAST_RCVD "last_rcvd"
51 #define LOV_OBJID "lov_objid"
52 #define LAST_FID  "last_fid"
53 #define VIRT_FID  "virt_fid"
54 #define CAPA_KEYS  "capa_key"
55
56 struct fidmap_entry {
57         struct hlist_node fm_hash;
58         struct lustre_id  fm_id;
59 };
60
61 int mds_fidmap_init(struct obd_device *obd, int size)
62 {
63         struct mds_obd *mds = &obd->u.mds;
64         struct hlist_head *head;
65         int i = 0;
66         ENTRY;
67
68         LASSERT(size > sizeof(sizeof(struct hlist_head)));
69         mds->mds_fidmap_size = size / sizeof(struct hlist_head);
70
71         CWARN("allocating %lu fid mapping entries\n",
72               (unsigned long)mds->mds_fidmap_size);
73
74         OBD_ALLOC(mds->mds_fidmap_table, size);
75         if (!mds->mds_fidmap_table)
76                 RETURN(-ENOMEM);
77
78         i = mds->mds_fidmap_size;
79         head = mds->mds_fidmap_table;
80         do {
81                 INIT_HLIST_HEAD(head);
82                 head++;
83                 i--;
84         } while(i);
85
86         RETURN(0);
87 }
88
89 int mds_fidmap_cleanup(struct obd_device *obd)
90 {
91         struct hlist_node *node = NULL, *tmp = NULL;
92         struct mds_obd *mds = &obd->u.mds;
93         struct fidmap_entry *entry;
94         struct hlist_head *head;
95         int i = 0;
96         ENTRY;
97
98         spin_lock(&mds->mds_fidmap_lock);
99         for (i = 0, head = mds->mds_fidmap_table;
100              i < mds->mds_fidmap_size; i++, head++) {
101                 hlist_for_each_safe(node, tmp, head) {
102                         entry = hlist_entry(node, struct fidmap_entry, fm_hash);
103                         hlist_del_init(&entry->fm_hash);
104                         OBD_FREE(entry, sizeof(*entry));
105                 }
106         }
107         spin_unlock(&mds->mds_fidmap_lock);
108         OBD_FREE(mds->mds_fidmap_table, mds->mds_fidmap_size *
109                  sizeof(struct hlist_head));
110         RETURN(0);
111 }
112
113 static inline unsigned long
114 const hashfn(struct obd_device *obd, __u64 fid)
115 {
116         struct mds_obd *mds = &obd->u.mds;
117         return (unsigned long)(fid & (mds->mds_fidmap_size - 1));
118 }
119
120 static struct fidmap_entry *
121 __mds_fidmap_find(struct obd_device *obd, __u64 fid)
122 {
123         struct fidmap_entry *entry = NULL;
124         struct mds_obd *mds = &obd->u.mds;
125         struct hlist_node *node = NULL;
126         struct hlist_head *head;
127         ENTRY;
128
129         head = mds->mds_fidmap_table + hashfn(obd, fid);
130         hlist_for_each(node, head) {
131                 entry = hlist_entry(node, struct fidmap_entry, fm_hash);
132                 if (id_fid(&entry->fm_id) == fid)
133                         RETURN(entry);
134         }
135         RETURN(NULL);
136 }
137
138 struct fidmap_entry *
139 mds_fidmap_find(struct obd_device *obd, __u64 fid)
140 {
141         struct mds_obd *mds = &obd->u.mds;
142         struct fidmap_entry *entry;
143         ENTRY;
144
145         spin_lock(&mds->mds_fidmap_lock);
146         entry = __mds_fidmap_find(obd, fid);
147         spin_unlock(&mds->mds_fidmap_lock);
148         
149         RETURN(entry);
150 }
151
152 static void __mds_fidmap_insert(struct obd_device *obd,
153                                 struct fidmap_entry *entry)
154 {
155         struct mds_obd *mds = &obd->u.mds;
156         struct hlist_head *head;
157         unsigned long idx;
158         ENTRY;
159
160         idx = hashfn(obd, id_fid(&entry->fm_id));
161         head = mds->mds_fidmap_table + idx;
162         hlist_add_head(&entry->fm_hash, head);
163         
164         EXIT;
165 }
166
167 void mds_fidmap_insert(struct obd_device *obd,
168                        struct fidmap_entry *entry)
169 {
170         struct mds_obd *mds = &obd->u.mds;
171         ENTRY;
172         
173         spin_lock(&mds->mds_fidmap_lock);
174         __mds_fidmap_insert(obd, entry);
175         spin_unlock(&mds->mds_fidmap_lock);
176         
177         EXIT;
178 }
179
180 static void __mds_fidmap_remove(struct obd_device *obd,
181                                 struct fidmap_entry *entry)
182 {
183         ENTRY;
184         hlist_del_init(&entry->fm_hash);
185         EXIT;
186 }
187
188 void mds_fidmap_remove(struct obd_device *obd,
189                        struct fidmap_entry *entry)
190 {
191         struct mds_obd *mds = &obd->u.mds;
192         ENTRY;
193         
194         spin_lock(&mds->mds_fidmap_lock);
195         __mds_fidmap_remove(obd, entry);
196         spin_unlock(&mds->mds_fidmap_lock);
197
198         EXIT;
199 }
200
201 /* creates new mapping remote fid -> local inode store cookie. Both are saved in
202  * lustre_id for better usability, as all mds function use lustre_id as input
203  * params.*/
204 int mds_fidmap_add(struct obd_device *obd,
205                    struct lustre_id *id)
206 {
207         struct mds_obd *mds = &obd->u.mds;
208         struct fidmap_entry *entry;
209         ENTRY;
210
211         OBD_ALLOC(entry, sizeof(*entry));
212         if (!entry)
213                 RETURN(-ENOMEM);
214
215         entry->fm_id = *id;
216         
217         spin_lock(&mds->mds_fidmap_lock);
218         if (!__mds_fidmap_find(obd, id_fid(id))) {
219                 __mds_fidmap_insert(obd, entry);
220                 spin_unlock(&mds->mds_fidmap_lock);
221                 CDEBUG(D_INODE, "added mapping to "DLID4"\n",
222                        OLID4(id));
223                 RETURN(1);
224         }
225         spin_unlock(&mds->mds_fidmap_lock);
226         OBD_FREE(entry, sizeof(*entry));
227         
228         RETURN(0);
229 }
230
231 /* removes mapping using fid component from passed @id */
232 void mds_fidmap_del(struct obd_device *obd,
233                     struct lustre_id *id)
234 {
235         struct mds_obd *mds = &obd->u.mds;
236         struct fidmap_entry *entry;
237         ENTRY;
238
239         spin_lock(&mds->mds_fidmap_lock);
240         entry = __mds_fidmap_find(obd, id_fid(id));
241         if (entry) {
242                 __mds_fidmap_remove(obd, entry);
243                 spin_unlock(&mds->mds_fidmap_lock);
244                 OBD_FREE(entry, sizeof(*entry));
245                 CDEBUG(D_INODE, "removed mapping to "DLID4"\n",
246                        OLID4(id));
247                 goto out;
248         }
249         spin_unlock(&mds->mds_fidmap_lock);
250 out:
251         EXIT;
252 }
253
254 struct lustre_id *mds_fidmap_lookup(struct obd_device *obd,
255                                     struct lustre_id *id)
256 {
257         struct mds_obd *mds = &obd->u.mds;
258         struct fidmap_entry *entry;
259         ENTRY;
260
261         spin_lock(&mds->mds_fidmap_lock);
262         entry = __mds_fidmap_find(obd, id_fid(id));
263         spin_unlock(&mds->mds_fidmap_lock);
264
265         if (!entry)
266                 RETURN(NULL);
267         
268         RETURN(&entry->fm_id);
269 }
270
271 /* Add client data to the MDS.  We use a bitmap to locate a free space
272  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
273  * Otherwise, we have just read the data from the last_rcvd file and
274  * we know its offset.
275  */
276 int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
277                    struct mds_export_data *med, int cl_idx)
278 {
279         unsigned long *bitmap = mds->mds_client_bitmap;
280         int new_client = (cl_idx == -1);
281         ENTRY;
282
283         LASSERT(bitmap != NULL);
284
285         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
286         if (!strcmp((char *)med->med_mcd->mcd_uuid, (char *)obd->obd_uuid.uuid))
287                 RETURN(0);
288
289         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
290          * there's no need for extra complication here
291          */
292         if (new_client) {
293                 cl_idx = find_first_zero_bit(bitmap, MDS_MAX_CLIENTS);
294         repeat:
295                 if (cl_idx >= MDS_MAX_CLIENTS) {
296                         CERROR("no room for clients - fix MDS_MAX_CLIENTS\n");
297                         return -ENOMEM;
298                 }
299                 if (test_and_set_bit(cl_idx, bitmap)) {
300                         cl_idx = find_next_zero_bit(bitmap, MDS_MAX_CLIENTS,
301                                                     cl_idx);
302                         goto repeat;
303                 }
304         } else {
305                 if (test_and_set_bit(cl_idx, bitmap)) {
306                         CERROR("MDS client %d: bit already set in bitmap!!\n",
307                                cl_idx);
308                         LBUG();
309                 }
310         }
311
312         CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
313                cl_idx, med->med_mcd->mcd_uuid);
314
315         med->med_idx = cl_idx;
316         med->med_off = le32_to_cpu(mds->mds_server_data->msd_client_start) +
317                 (cl_idx * le16_to_cpu(mds->mds_server_data->msd_client_size));
318
319         if (new_client) {
320                 struct file *file = mds->mds_rcvd_filp;
321                 struct lvfs_run_ctxt saved;
322                 loff_t off = med->med_off;
323                 int rc;
324
325                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
326                 rc = fsfilt_write_record(obd, file, med->med_mcd,
327                                          sizeof(*med->med_mcd), &off, 1);
328                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
329
330                 if (rc)
331                         return rc;
332                 CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
333                        med->med_idx, med->med_off,
334                        (unsigned int)sizeof(*med->med_mcd));
335         }
336         return 0;
337 }
338
339 int mds_client_free(struct obd_export *exp, int clear_client)
340 {
341         struct mds_export_data *med = &exp->exp_mds_data;
342         struct mds_obd *mds = &exp->exp_obd->u.mds;
343         unsigned long *bitmap = mds->mds_client_bitmap;
344         struct obd_device *obd = exp->exp_obd;
345         struct mds_client_data zero_mcd;
346         struct lvfs_run_ctxt saved;
347         int rc;
348
349         if (!med->med_mcd)
350                 RETURN(0);
351
352         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
353         if (!strcmp((char *)med->med_mcd->mcd_uuid, (char *)obd->obd_uuid.uuid))
354                 GOTO(free_and_out, 0);
355
356         CDEBUG(D_INFO, "freeing client at idx %u (%lld)with UUID '%s'\n",
357                med->med_idx, med->med_off, med->med_mcd->mcd_uuid);
358
359         LASSERT(bitmap);
360
361         /* Clear the bit _after_ zeroing out the client so we don't
362            race with mds_client_add and zero out new clients.*/
363         if (!test_bit(med->med_idx, bitmap)) {
364                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
365                        med->med_idx);
366                 LBUG();
367         }
368
369         if (clear_client) {
370                 memset(&zero_mcd, 0, sizeof zero_mcd);
371                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
372                 rc = fsfilt_write_record(obd, mds->mds_rcvd_filp, &zero_mcd,
373                                          sizeof(zero_mcd), &med->med_off, 1);
374                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
375
376                 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
377                        "zeroing out client %s idx %u in %s rc %d\n",
378                        med->med_mcd->mcd_uuid, med->med_idx, LAST_RCVD, rc);
379         }
380
381         if (!test_and_clear_bit(med->med_idx, bitmap)) {
382                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
383                        med->med_idx);
384                 LBUG();
385         }
386
387
388         /* Make sure the server's last_transno is up to date. Do this
389          * after the client is freed so we know all the client's
390          * transactions have been committed. */
391         mds_update_server_data(exp->exp_obd, 1);
392
393 free_and_out:
394         OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
395         med->med_mcd = NULL;
396         return 0;
397 }
398
399 static int mds_server_free_data(struct mds_obd *mds)
400 {
401         OBD_FREE(mds->mds_client_bitmap, MDS_MAX_CLIENTS / 8);
402         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
403         mds->mds_server_data = NULL;
404
405         return 0;
406 }
407
408 static int mds_read_last_fid(struct obd_device *obd, struct file *file)
409 {
410         int rc = 0;
411         loff_t off = 0;
412         struct mds_obd *mds = &obd->u.mds;
413         unsigned long last_fid_size = file->f_dentry->d_inode->i_size;
414         ENTRY;
415
416         if (last_fid_size == 0) {
417                 CWARN("%s: initializing new %s\n", obd->obd_name,
418                       file->f_dentry->d_name.name);
419
420                 /* 
421                  * as fid is used for forming res_id for locking, it should not
422                  * be zero. This will keep us out of lots possible problems,
423                  * asserts, etc.
424                  */
425                 mds_set_last_fid(obd, 0);
426         } else {
427                 __u64 lastfid;
428                 
429                 rc = fsfilt_read_record(obd, file, &lastfid,
430                                         sizeof(lastfid), &off);
431                 if (rc) {
432                         CERROR("error reading MDS %s: rc = %d\n",
433                                file->f_dentry->d_name.name, rc);
434                         RETURN(rc);
435                 }
436
437                 /* 
438                  * make sure, that fid is up-to-date.
439                  */
440                 mds_set_last_fid(obd, lastfid);
441         }
442
443         CDEBUG(D_INODE, "%s: server last_fid: "LPU64"\n",
444                obd->obd_name, mds->mds_last_fid);
445
446         rc = mds_update_last_fid(obd, NULL, 1);
447         RETURN(rc);
448 }
449
450 static int mds_read_last_rcvd(struct obd_device *obd, struct file *file)
451 {
452         unsigned long last_rcvd_size = file->f_dentry->d_inode->i_size;
453         struct mds_obd *mds = &obd->u.mds;
454         struct mds_server_data *msd = NULL;
455         struct mds_client_data *mcd = NULL;
456         loff_t off = 0;
457         __u64 mount_count;
458         int cl_idx, rc = 0;
459         ENTRY;
460
461         /* ensure padding in the struct is the correct size */
462         LASSERT(offsetof(struct mds_server_data, msd_padding) +
463                 sizeof(msd->msd_padding) == MDS_LR_SERVER_SIZE);
464         LASSERT(offsetof(struct mds_client_data, mcd_padding) +
465                 sizeof(mcd->mcd_padding) == MDS_LR_CLIENT_SIZE);
466
467         OBD_ALLOC_WAIT(msd, sizeof(*msd));
468         if (!msd)
469                 RETURN(-ENOMEM);
470
471         OBD_ALLOC_WAIT(mds->mds_client_bitmap, MDS_MAX_CLIENTS / 8);
472         if (!mds->mds_client_bitmap) {
473                 OBD_FREE(msd, sizeof(*msd));
474                 RETURN(-ENOMEM);
475         }
476
477         mds->mds_server_data = msd;
478
479         if (last_rcvd_size == 0) {
480                 CWARN("%s: initializing new %s\n", obd->obd_name,
481                       file->f_dentry->d_name.name);
482
483                 memcpy(msd->msd_uuid, obd->obd_uuid.uuid,sizeof(msd->msd_uuid));
484                 msd->msd_last_transno = 0;
485                 mount_count = msd->msd_mount_count = 0;
486                 msd->msd_server_size = cpu_to_le32(MDS_LR_SERVER_SIZE);
487                 msd->msd_client_start = cpu_to_le32(MDS_LR_CLIENT_START);
488                 msd->msd_client_size = cpu_to_le16(MDS_LR_CLIENT_SIZE);
489                 msd->msd_feature_rocompat = cpu_to_le32(MDS_ROCOMPAT_LOVOBJID);
490         } else {
491                 rc = fsfilt_read_record(obd, file, msd, sizeof(*msd), &off);
492                 if (rc) {
493                         CERROR("error reading MDS %s: rc = %d\n",
494                                file->f_dentry->d_name.name, rc);
495                         GOTO(err_msd, rc);
496                 }
497                 if (strcmp((char *)msd->msd_uuid, (char *)obd->obd_uuid.uuid)) {
498                         CERROR("OBD UUID %s does not match last_rcvd UUID %s\n",
499                                obd->obd_uuid.uuid, msd->msd_uuid);
500                         GOTO(err_msd, rc = -EINVAL);
501                 }
502                 mount_count = le64_to_cpu(msd->msd_mount_count);
503         }
504         if (msd->msd_feature_incompat & ~cpu_to_le32(MDS_INCOMPAT_SUPP)) {
505                 CERROR("unsupported incompat feature %x\n",
506                        le32_to_cpu(msd->msd_feature_incompat) &
507                        ~MDS_INCOMPAT_SUPP);
508                 GOTO(err_msd, rc = -EINVAL);
509         }
510         /* XXX updating existing b_devel fs only, can be removed in future */
511         msd->msd_feature_rocompat = cpu_to_le32(MDS_ROCOMPAT_LOVOBJID);
512         if (msd->msd_feature_rocompat & ~cpu_to_le32(MDS_ROCOMPAT_SUPP)) {
513                 CERROR("unsupported read-only feature %x\n",
514                        le32_to_cpu(msd->msd_feature_rocompat) &
515                        ~MDS_ROCOMPAT_SUPP);
516                 /* Do something like remount filesystem read-only */
517                 GOTO(err_msd, rc = -EINVAL);
518         }
519
520         mds->mds_last_transno = le64_to_cpu(msd->msd_last_transno);
521
522         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
523                obd->obd_name, mds->mds_last_transno);
524         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
525                obd->obd_name, mount_count + 1);
526         CDEBUG(D_INODE, "%s: server data size: %u\n",
527                obd->obd_name, le32_to_cpu(msd->msd_server_size));
528         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
529                obd->obd_name, le32_to_cpu(msd->msd_client_start));
530         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
531                obd->obd_name, le32_to_cpu(msd->msd_client_size));
532         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
533                obd->obd_name, last_rcvd_size);
534         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
535                last_rcvd_size <= le32_to_cpu(msd->msd_client_start) ? 0 :
536                (last_rcvd_size - le32_to_cpu(msd->msd_client_start)) /
537                 le16_to_cpu(msd->msd_client_size));
538
539         /* When we do a clean MDS shutdown, we save the last_transno into
540          * the header.  If we find clients with higher last_transno values
541          * then those clients may need recovery done. */
542         for (cl_idx = 0, off = le32_to_cpu(msd->msd_client_start);
543              off < last_rcvd_size; cl_idx++) {
544                 __u64 last_transno;
545                 struct obd_export *exp;
546                 struct mds_export_data *med;
547
548                 if (!mcd) {
549                         OBD_ALLOC_WAIT(mcd, sizeof(*mcd));
550                         if (!mcd)
551                                 GOTO(err_client, rc = -ENOMEM);
552                 }
553
554                 /* Don't assume off is incremented properly by
555                  * fsfilt_read_record(), in case sizeof(*mcd)
556                  * isn't the same as msd->msd_client_size.  */
557                 off = le32_to_cpu(msd->msd_client_start) +
558                         cl_idx * le16_to_cpu(msd->msd_client_size);
559                 rc = fsfilt_read_record(obd, file, mcd, sizeof(*mcd), &off);
560                 if (rc) {
561                         CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
562                                file->f_dentry->d_name.name, cl_idx, off, rc);
563                         break; /* read error shouldn't cause startup to fail */
564                 }
565
566                 if (mcd->mcd_uuid[0] == '\0') {
567                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
568                                cl_idx);
569                         continue;
570                 }
571
572                 last_transno = le64_to_cpu(mcd->mcd_last_transno) >
573                                le64_to_cpu(mcd->mcd_last_close_transno) ?
574                                le64_to_cpu(mcd->mcd_last_transno) :
575                                le64_to_cpu(mcd->mcd_last_close_transno);
576
577                 /* These exports are cleaned up by mds_disconnect(), so they
578                  * need to be set up like real exports as mds_connect() does.
579                  */
580                 CDEBUG(D_HA|D_WARNING,"RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
581                        " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx,
582                        last_transno, le64_to_cpu(msd->msd_last_transno),
583                        mcd->mcd_last_xid);
584
585                 exp = class_new_export(obd);
586                 if (exp == NULL)
587                         GOTO(err_client, rc = -ENOMEM);
588
589                 memcpy(&exp->exp_client_uuid.uuid, mcd->mcd_uuid,
590                        sizeof exp->exp_client_uuid.uuid);
591                 med = &exp->exp_mds_data;
592                 med->med_mcd = mcd;
593                 mds_client_add(obd, mds, med, cl_idx);
594                 /* create helper if export init gets more complex */
595                 INIT_LIST_HEAD(&med->med_open_head);
596                 spin_lock_init(&med->med_open_lock);
597
598                 mcd = NULL;
599                 exp->exp_connected = 0;
600                 exp->exp_req_replay_needed = 1;
601                 obd->obd_recoverable_clients++;
602                 obd->obd_max_recoverable_clients++;
603
604                 /* track clients to separate req replay
605                  * from lock replay. bug 6063 */
606                 atomic_inc(&obd->obd_req_replay_clients);
607                 exp->exp_req_replay_needed = 1;
608                 atomic_inc(&obd->obd_lock_replay_clients);
609                 exp->exp_lock_replay_needed = 1;
610                 
611                 class_export_put(exp);
612
613                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
614                        cl_idx, last_transno);
615
616                 if (last_transno > mds->mds_last_transno)
617                        mds->mds_last_transno = last_transno;
618         }
619         if (mcd)
620                 OBD_FREE(mcd, sizeof(*mcd));
621         obd->obd_last_committed = mds->mds_last_transno;
622         if (obd->obd_recoverable_clients) {
623                 CWARN("RECOVERY: service %s, %d recoverable clients, "
624                       "last_transno "LPU64"\n", obd->obd_name,
625                       obd->obd_recoverable_clients, mds->mds_last_transno);
626                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
627                 target_start_recovery_thread(obd, mds_handle);
628                 obd->obd_recovery_start = LTIME_S(CURRENT_TIME);
629         }
630         
631         mds->mds_mount_count = mount_count + 1;
632         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
633
634         /* save it, so mount count and last_transno is current */
635         rc = mds_update_server_data(obd, 1);
636         if (rc)
637                 GOTO(err_client, rc);
638
639         RETURN(0);
640
641 err_client:
642         class_disconnect_exports(obd, 0);
643 err_msd:
644         mds_server_free_data(mds);
645         RETURN(rc);
646 }
647
648 /*
649  * sets up root inode lustre_id. It tries to read it first from root inode and
650  * if it is not there, new rootid is allocated and saved there.
651  */
652 int mds_fs_setup_rootid(struct obd_device *obd)
653 {
654         int rc = 0;
655         void *handle;
656         struct inode *inode;
657         struct dentry *dentry;
658         struct mds_obd *mds = &obd->u.mds;
659         ENTRY;
660
661         /* getting root directory and setup its fid. */
662         dentry = mds_id2dentry(obd, &mds->mds_rootid, NULL);
663         if (IS_ERR(dentry)) {
664                 CERROR("Can't find ROOT by "DLID4", err = %d\n",
665                        OLID4(&mds->mds_rootid), (int)PTR_ERR(dentry));
666                 RETURN(PTR_ERR(dentry));
667         }
668
669         inode = dentry->d_inode;
670         LASSERT(dentry->d_inode);
671
672         rc = mds_pack_inode2id(obd, &mds->mds_rootid, inode, 1);
673         if (rc && rc != -ENODATA)
674                 GOTO(out_dentry, rc);
675
676         if (rc) {
677                 if (rc != -ENODATA)
678                         GOTO(out_dentry, rc);
679         } else {
680                 /* rootid is filled by mds_read_inode_sid(), so we do not need
681                  * to allocate it and update. */
682                 LASSERT(id_group(&mds->mds_rootid) == mds->mds_num);
683                 mds_set_last_fid(obd, id_fid(&mds->mds_rootid));
684
685                 rc = mds_fidmap_add(obd, &mds->mds_rootid);
686                 if (rc > 0)
687                         rc = 0;
688                 
689                 GOTO(out_dentry, rc);
690         }
691
692         /* allocating new one, as it is not found in root inode. */
693         handle = fsfilt_start(obd, inode,
694                               FSFILT_OP_SETATTR, NULL);
695         
696         if (IS_ERR(handle)) {
697                 rc = PTR_ERR(handle);
698                 CERROR("fsfilt_start() failed, rc = %d\n", rc);
699                 GOTO(out_dentry, rc);
700         }
701         
702         mds_inode2id(obd, &mds->mds_rootid, inode, mds_alloc_fid(obd));
703         rc = mds_update_inode_ids(obd, inode, handle, &mds->mds_rootid, NULL);
704         if (rc) {
705                 CERROR("mds_update_inode_ids() failed, rc = %d\n", rc);
706                 GOTO(out_dentry, rc);
707         }
708
709         rc = mds_fidmap_add(obd, &mds->mds_rootid);
710         if (rc < 0)
711                 GOTO(out_dentry, rc);
712         else
713                 rc = 0;
714         
715         rc = fsfilt_commit(obd, mds->mds_sb, inode, handle, 0);
716         if (rc)
717                 CERROR("fsfilt_commit() failed, rc = %d\n", rc);
718
719         EXIT;
720 out_dentry:
721         l_dput(dentry);
722         if (rc == 0)
723                 CWARN("%s: rootid: "DLID4"\n", obd->obd_name,
724                       OLID4(&mds->mds_rootid));
725         return rc;
726 }
727
728 static int mds_update_virtid_fid(struct obd_device *obd,
729                                  void *handle, int force_sync)
730 {
731         struct mds_obd *mds = &obd->u.mds;
732         struct file *filp = mds->mds_virtid_filp;
733         struct lvfs_run_ctxt saved;
734         loff_t off = 0;
735         int rc = 0;
736         ENTRY;
737
738         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
739         rc = fsfilt_write_record(obd, filp, &mds->mds_virtid_fid,
740                                  sizeof(mds->mds_virtid_fid),
741                                  &off, force_sync);
742         if (rc) {
743                 CERROR("error writing MDS virtid_fid #"LPU64
744                        ", err = %d\n", mds->mds_virtid_fid, rc);
745         }
746                 
747         CDEBUG(D_SUPER, "wrote virtid fid #"LPU64" at idx "
748                "%llu: err = %d\n", mds->mds_virtid_fid,
749                off, rc);
750         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
751
752         RETURN(rc);
753 }
754
755 static int mds_read_virtid_fid(struct obd_device *obd, struct file *file)
756 {
757         int rc = 0;
758         loff_t off = 0;
759         struct mds_obd *mds = &obd->u.mds;
760         unsigned long virtid_fid_size = file->f_dentry->d_inode->i_size;
761         ENTRY;
762
763         if (virtid_fid_size == 0) {
764                 mds->mds_virtid_fid = mds_alloc_fid(obd);
765         } else {
766                 rc = fsfilt_read_record(obd, file, &mds->mds_virtid_fid,
767                                         sizeof(mds->mds_virtid_fid), &off);
768                 if (rc) {
769                         CERROR("error reading MDS %s: rc = %d\n",
770                                file->f_dentry->d_name.name, rc);
771                         RETURN(rc);
772                 }
773         }
774         rc = mds_update_virtid_fid(obd, NULL, 1);
775
776         RETURN(rc);
777 }
778
779 /*
780  * initializes lustre_id for virtual id directory, it is needed sometimes, as it
781  * is possible that it will be the parent for object an operations is going to
782  * be performed on.
783  */
784 int mds_fs_setup_virtid(struct obd_device *obd)
785 {
786         int rc = 0;
787         void *handle;
788         struct lustre_id sid;
789         struct mds_obd *mds = &obd->u.mds;
790         struct inode *inode = mds->mds_id_dir->d_inode;
791         ENTRY;
792
793         handle = fsfilt_start(obd, inode,
794                               FSFILT_OP_SETATTR, NULL);
795         
796         if (IS_ERR(handle)) {
797                 rc = PTR_ERR(handle);
798                 CERROR("fsfilt_start() failed, rc = %d\n", rc);
799                 RETURN(rc);
800         }
801
802         id_group(&sid) = mds->mds_num;
803         id_fid(&sid) = mds->mds_virtid_fid;
804
805         id_ino(&sid) = inode->i_ino;
806         id_gen(&sid) = inode->i_generation;
807         id_type(&sid) = (S_IFMT & inode->i_mode);
808
809         rc = mds_update_inode_ids(obd, inode, handle, &sid, NULL);
810
811         if (rc) {
812                 CERROR("mds_update_inode_ids() failed, rc = %d\n", rc);
813                 RETURN(rc);
814         }
815
816         rc = mds_fidmap_add(obd, &sid);
817         if (rc < 0)
818                 RETURN(rc);
819         else
820                 rc = 0;
821         
822         rc = fsfilt_commit(obd, mds->mds_sb, inode, handle, 0);
823         if (rc) {
824                 CERROR("fsfilt_commit() failed, rc = %d\n", rc);
825                 RETURN(rc);
826         }
827
828         RETURN(rc);
829 }
830
831 #define MDS_FIDMAP_SIZE (2*PAGE_SIZE)
832
833 int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
834 {
835         struct mds_obd *mds = &obd->u.mds;
836         struct lvfs_run_ctxt saved;
837         struct dentry *dentry;
838         struct file *file;
839         int rc;
840         ENTRY;
841
842         rc = cleanup_group_info();
843         if (rc)
844                 RETURN(rc);
845
846         mds->mds_vfsmnt = mnt;
847         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
848
849         fsfilt_setup(obd, mds->mds_sb);
850
851         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
852         obd->obd_lvfs_ctxt.pwdmnt = mnt;
853         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
854         obd->obd_lvfs_ctxt.fs = get_ds();
855         obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops;
856
857         /* setup the directory tree */
858         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
859         dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755, 0);
860         if (IS_ERR(dentry)) {
861                 rc = PTR_ERR(dentry);
862                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
863                 GOTO(err_pop, rc);
864         }
865
866         mdc_pack_id(&mds->mds_rootid, dentry->d_inode->i_ino,
867                     dentry->d_inode->i_generation, S_IFDIR, 0, 0);
868
869         dput(dentry);
870         
871         dentry = lookup_one_len("__iopen__", current->fs->pwd,
872                                 strlen("__iopen__"));
873         if (IS_ERR(dentry)) {
874                 rc = PTR_ERR(dentry);
875                 CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
876                 GOTO(err_pop, rc);
877         }
878         mds->mds_id_de = dentry;
879         if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
880                 rc = -ENOENT;
881                 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
882                 GOTO(err_id_de, rc);
883         }
884
885         dentry = simple_mkdir(current->fs->pwd, "PENDING", 0777, 1);
886         if (IS_ERR(dentry)) {
887                 rc = PTR_ERR(dentry);
888                 CERROR("cannot create PENDING directory: rc = %d\n", rc);
889                 GOTO(err_id_de, rc);
890         }
891         mds->mds_pending_dir = dentry;
892       
893         dentry = simple_mkdir(current->fs->pwd, "LOGS", 0777, 1);
894         if (IS_ERR(dentry)) {
895                 rc = PTR_ERR(dentry);
896                 CERROR("cannot create LOGS directory: rc = %d\n", rc);
897                 GOTO(err_pending, rc);
898         }
899         mds->mds_logs_dir = dentry;
900
901         dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1);
902         if (IS_ERR(dentry)) {
903                 rc = PTR_ERR(dentry);
904                 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
905                 GOTO(err_logs, rc);
906         }
907         mds->mds_objects_dir = dentry;
908
909         dentry = simple_mkdir(current->fs->pwd, "FIDS", 0777, 1);
910         if (IS_ERR(dentry)) {
911                 rc = PTR_ERR(dentry);
912                 CERROR("cannot create FIDS directory: rc = %d\n", rc);
913                 GOTO(err_objects, rc);
914         }
915         mds->mds_id_dir = dentry;
916
917         dentry = simple_mkdir(current->fs->pwd, "UNNAMED", 0777, 1);
918         if (IS_ERR(dentry)) {
919                 rc = PTR_ERR(dentry);
920                 CERROR("cannot create UNNAMED directory: rc = %d\n", rc);
921                 GOTO(err_id_dir, rc);
922         }
923         mds->mds_unnamed_dir = dentry;
924
925         /* open and test the last rcvd file */
926         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
927         if (IS_ERR(file)) {
928                 rc = PTR_ERR(file);
929                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
930                 GOTO(err_unnamed, rc = PTR_ERR(file));
931         }
932         mds->mds_rcvd_filp = file;
933         
934         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
935                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
936                        file->f_dentry->d_inode->i_mode);
937                 GOTO(err_last_rcvd, rc = -ENOENT);
938         }
939
940         rc = mds_read_last_rcvd(obd, file);
941         if (rc) {
942                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
943                 GOTO(err_last_rcvd, rc);
944         }
945
946         /* open and test last fid file */
947         file = filp_open(LAST_FID, O_RDWR | O_CREAT, 0644);
948         if (IS_ERR(file)) {
949                 rc = PTR_ERR(file);
950                 CERROR("cannot open/create %s file: rc = %d\n",
951                        LAST_FID, rc);
952                 GOTO(err_client, rc = PTR_ERR(file));
953         }
954         mds->mds_fid_filp = file;
955         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
956                 CERROR("%s is not a regular file!: mode = %o\n",
957                        LAST_FID, file->f_dentry->d_inode->i_mode);
958                 GOTO(err_last_fid, rc = -ENOENT);
959         }
960
961         rc = mds_read_last_fid(obd, file);
962         if (rc) {
963                 CERROR("cannot read %s: rc = %d\n", LAST_FID, rc);
964                 GOTO(err_last_fid, rc);
965         }
966
967         /* open and test virtid fid file */
968         file = filp_open(VIRT_FID, O_RDWR | O_CREAT, 0644);
969         if (IS_ERR(file)) {
970                 rc = PTR_ERR(file);
971                 CERROR("cannot open/create %s file: rc = %d\n",
972                        VIRT_FID, rc);
973                 GOTO(err_last_fid, rc = PTR_ERR(file));
974         }
975         mds->mds_virtid_filp = file;
976         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
977                 CERROR("%s is not a regular file!: mode = %o\n",
978                        VIRT_FID, file->f_dentry->d_inode->i_mode);
979                 GOTO(err_virtid_fid, rc = -ENOENT);
980         }
981
982         rc = mds_read_virtid_fid(obd, file);
983         if (rc) {
984                 CERROR("cannot read %s: rc = %d\n", VIRT_FID, rc);
985                 GOTO(err_virtid_fid, rc);
986         }
987         
988         /* open and test the lov objid file */
989         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
990         if (IS_ERR(file)) {
991                 rc = PTR_ERR(file);
992                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
993                 GOTO(err_virtid_fid, rc = PTR_ERR(file));
994         }
995         mds->mds_dt_objid_filp = file;
996         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
997                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
998                        file->f_dentry->d_inode->i_mode);
999                 GOTO(err_lov_objid, rc = -ENOENT);
1000         }
1001
1002         /* open and test capa keyid file */
1003         file = filp_open(CAPA_KEYS, O_RDWR | O_CREAT, 0644);
1004         if (IS_ERR(file)) {
1005                 rc = PTR_ERR(file);
1006                 CERROR("cannot open/create %s file: rc = %d\n",
1007                        CAPA_KEYS, rc);
1008                 GOTO(err_lov_objid, rc = PTR_ERR(file));
1009         }
1010         mds->mds_capa_keys_filp = file;
1011         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
1012                 CERROR("%s is not a regular file!: mode = %o\n",
1013                        CAPA_KEYS, file->f_dentry->d_inode->i_mode);
1014                 GOTO(err_capa_keyid, rc = -ENOENT);
1015         }
1016
1017         rc = mds_read_capa_key(obd, file);
1018         if (rc) {
1019                 CERROR("cannot read %s: rc = %d\n", CAPA_KEYS, rc);
1020                 GOTO(err_capa_keyid, rc);
1021         }
1022
1023         /* reint fidext thumb by last fid after root and virt are initialized */
1024         mds->mds_fidext_thumb = mds->mds_last_fid;
1025                 
1026         rc = mds_fidmap_init(obd, MDS_FIDMAP_SIZE);
1027         if (rc) {
1028                 CERROR("cannot init fid mapping tables, err %d\n", rc);
1029                 GOTO(err_capa_keyid, rc);
1030         }
1031         
1032 err_pop:
1033         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1034         return rc;
1035
1036 err_capa_keyid:
1037         if (mds->mds_capa_keys_filp && filp_close(mds->mds_capa_keys_filp, 0))
1038                 CERROR("can't close %s after error\n", CAPA_KEYS);
1039 err_lov_objid:
1040         if (mds->mds_dt_objid_filp && filp_close(mds->mds_dt_objid_filp, 0))
1041                 CERROR("can't close %s after error\n", LOV_OBJID);
1042 err_virtid_fid:
1043         if (mds->mds_virtid_filp && filp_close(mds->mds_virtid_filp, 0))
1044                 CERROR("can't close %s after error\n", VIRT_FID);
1045 err_last_fid:
1046         if (mds->mds_fid_filp && filp_close(mds->mds_fid_filp, 0))
1047                 CERROR("can't close %s after error\n", LAST_FID);
1048 err_client:
1049         class_disconnect_exports(obd, 0);
1050 err_last_rcvd:
1051         if (mds->mds_rcvd_filp && filp_close(mds->mds_rcvd_filp, 0))
1052                 CERROR("can't close %s after error\n", LAST_RCVD);
1053 err_unnamed:
1054         dput(mds->mds_unnamed_dir);
1055 err_id_dir:
1056         dput(mds->mds_id_dir);
1057 err_objects:
1058         dput(mds->mds_objects_dir);
1059 err_logs:
1060         dput(mds->mds_logs_dir);
1061 err_pending:
1062         dput(mds->mds_pending_dir);
1063 err_id_de:
1064         dput(mds->mds_id_de);
1065         goto err_pop;
1066 }
1067
1068 static int  mds_fs_post_cleanup(struct obd_device *obd)
1069 {
1070         int    rc = 0;
1071         rc = fsfilt_post_cleanup(obd);
1072         return rc; 
1073 }
1074
1075 int mds_fs_cleanup(struct obd_device *obd, int flags)
1076 {
1077         struct mds_obd *mds = &obd->u.mds;
1078         struct lvfs_run_ctxt saved;
1079         int rc = 0;
1080
1081         if (flags & OBD_OPT_FAILOVER)
1082                 CERROR("%s: shutting down for failover; client state will"
1083                        " be preserved.\n", obd->obd_name);
1084
1085         class_disconnect_exports(obd, flags); /* cleans up client info too */
1086         target_cleanup_recovery(obd);
1087         mds_server_free_data(mds);
1088         mds_fidmap_cleanup(obd);
1089         
1090         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1091         if (mds->mds_virtid_filp) {
1092                 rc = filp_close(mds->mds_virtid_filp, 0);
1093                 mds->mds_virtid_filp = NULL;
1094                 if (rc)
1095                         CERROR("%s file won't close, rc = %d\n", VIRT_FID, rc);
1096         }
1097         if (mds->mds_fid_filp) {
1098                 rc = filp_close(mds->mds_fid_filp, 0);
1099                 mds->mds_fid_filp = NULL;
1100                 if (rc)
1101                         CERROR("%s file won't close, rc = %d\n", LAST_FID, rc);
1102         }
1103         if (mds->mds_rcvd_filp) {
1104                 rc = filp_close(mds->mds_rcvd_filp, 0);
1105                 mds->mds_rcvd_filp = NULL;
1106                 if (rc)
1107                         CERROR("%s file won't close, rc = %d\n", LAST_RCVD, rc);
1108         }
1109         if (mds->mds_dt_objid_filp) {
1110                 rc = filp_close(mds->mds_dt_objid_filp, 0);
1111                 mds->mds_dt_objid_filp = NULL;
1112                 if (rc)
1113                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
1114         }
1115         if (mds->mds_capa_keys_filp) {
1116                 rc = filp_close(mds->mds_capa_keys_filp, 0);
1117                 mds->mds_capa_keys_filp = NULL;
1118                 if (rc)
1119                         CERROR("%s file won't close, rc=%d\n", CAPA_KEYS, rc);
1120         }
1121         if (mds->mds_unnamed_dir != NULL) {
1122                 l_dput(mds->mds_unnamed_dir);
1123                 mds->mds_unnamed_dir = NULL;
1124         }
1125         if (mds->mds_id_dir != NULL) {
1126                 l_dput(mds->mds_id_dir);
1127                 mds->mds_id_dir = NULL;
1128         }
1129         if (mds->mds_objects_dir != NULL) {
1130                 l_dput(mds->mds_objects_dir);
1131                 mds->mds_objects_dir = NULL;
1132         }
1133         if (mds->mds_logs_dir) {
1134                 l_dput(mds->mds_logs_dir);
1135                 mds->mds_logs_dir = NULL;
1136         }
1137         if (mds->mds_pending_dir) {
1138                 l_dput(mds->mds_pending_dir);
1139                 mds->mds_pending_dir = NULL;
1140         }
1141         rc = mds_fs_post_cleanup(obd);
1142         
1143         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1144         shrink_dcache_parent(mds->mds_id_de);
1145         dput(mds->mds_id_de);
1146
1147         return rc;
1148 }
1149
1150 /* Creates an object with the same name as its id.  Because this is not at all
1151  * performance sensitive, it is accomplished by creating a file, checking the
1152  * id, and renaming it. */
1153 int mds_obd_create(struct obd_export *exp, struct obdo *oa,
1154                    void *acl, int acl_size,
1155                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
1156 {
1157         struct mds_obd *mds = &exp->exp_obd->u.mds;
1158         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
1159         struct file *filp;
1160         struct dentry *dchild;
1161         struct lvfs_run_ctxt saved;
1162         char idname[LL_ID_NAMELEN];
1163         int rc = 0, err, idlen;
1164         void *handle;
1165         ENTRY;
1166
1167         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
1168         down(&parent_inode->i_sem);
1169         if (oa->o_id) {
1170                 idlen = ll_id2str(idname, oa->o_id, oa->o_generation);
1171                 dchild = lookup_one_len(idname, mds->mds_objects_dir, idlen);
1172                 if (IS_ERR(dchild))
1173                         GOTO(out_pop, rc = PTR_ERR(dchild));
1174
1175                 if (dchild->d_inode == NULL) {
1176                         struct dentry_params dp;
1177                         struct inode *inode;
1178
1179                         CWARN("creating log with ID "LPU64"\n", oa->o_id);
1180                         
1181                         dchild->d_fsdata = (void *) &dp;
1182                         dp.p_ptr = NULL;
1183                         dp.p_inum = oa->o_id;
1184                         rc = ll_vfs_create(parent_inode, dchild, S_IFREG, NULL);
1185                         if (dchild->d_fsdata == (void *)(unsigned long)oa->o_id)
1186                                 dchild->d_fsdata = NULL;
1187                         if (rc) {
1188                                 CDEBUG(D_INODE, "err during create: %d\n", rc);
1189                                 dput(dchild);
1190                                 GOTO(out_pop, rc);
1191                         }
1192                         inode = dchild->d_inode;
1193                         LASSERT(inode->i_ino == oa->o_id);
1194                         inode->i_generation = oa->o_generation;
1195                         CDEBUG(D_HA, "recreated ino %lu with gen %u\n",
1196                                inode->i_ino, inode->i_generation);
1197                         mark_inode_dirty(inode);
1198                 } else {
1199                         CWARN("it should be here!\n");
1200                 }
1201                 GOTO(out_pop, rc);
1202         }
1203
1204         sprintf(idname, "OBJECTS/%u.%u", ll_insecure_random_int(), current->pid);
1205         filp = filp_open(idname, O_CREAT | O_EXCL, 0644);
1206         if (IS_ERR(filp)) {
1207                 rc = PTR_ERR(filp);
1208                 if (rc == -EEXIST) {
1209                         CERROR("impossible object name collision %s\n",
1210                                idname);
1211                         LBUG();
1212                 }
1213                 CERROR("error creating tmp object %s: rc %d\n", 
1214                        idname, rc);
1215                 GOTO(out_pop, rc);
1216         }
1217
1218         LASSERT(mds->mds_objects_dir == filp->f_dentry->d_parent);
1219
1220         oa->o_id = filp->f_dentry->d_inode->i_ino;
1221         oa->o_generation = filp->f_dentry->d_inode->i_generation;
1222         idlen = ll_id2str(idname, oa->o_id, oa->o_generation);
1223         
1224         CWARN("created log anonymous "LPU64"/%u\n",
1225               oa->o_id, oa->o_generation);
1226
1227         dchild = lookup_one_len(idname, mds->mds_objects_dir, idlen);
1228         if (IS_ERR(dchild)) {
1229                 CERROR("getting neg dentry for obj rename: %d\n", rc);
1230                 GOTO(out_close, rc = PTR_ERR(dchild));
1231         }
1232         if (dchild->d_inode != NULL) {
1233                 CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
1234                        oa->o_id, oa->o_generation);
1235                 LBUG();
1236         }
1237
1238         handle = fsfilt_start(exp->exp_obd, mds->mds_objects_dir->d_inode,
1239                               FSFILT_OP_RENAME, NULL);
1240         if (IS_ERR(handle))
1241                 GOTO(out_dput, rc = PTR_ERR(handle));
1242
1243         lock_kernel();
1244         rc = vfs_rename(mds->mds_objects_dir->d_inode, filp->f_dentry,
1245                         mds->mds_objects_dir->d_inode, dchild);
1246         unlock_kernel();
1247         if (rc)
1248                 CERROR("error renaming new object "LPU64":%u: rc %d\n",
1249                        oa->o_id, oa->o_generation, rc);
1250
1251         err = fsfilt_commit(exp->exp_obd, mds->mds_sb, 
1252                             mds->mds_objects_dir->d_inode, handle, 0);
1253         if (!err) {
1254                 oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
1255                 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLGROUP;
1256         } else if (!rc)
1257                 rc = err;
1258 out_dput:
1259         dput(dchild);
1260 out_close:
1261         err = filp_close(filp, 0);
1262         if (err) {
1263                 CERROR("closing tmpfile %s: rc %d\n", idname, rc);
1264                 if (!rc)
1265                         rc = err;
1266         }
1267 out_pop:
1268         up(&parent_inode->i_sem);
1269         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
1270         RETURN(rc);
1271 }
1272
1273 int mds_obd_destroy(struct obd_export *exp, struct obdo *oa,
1274                     struct lov_stripe_md *ea, struct obd_trans_info *oti)
1275 {
1276         struct mds_obd *mds = &exp->exp_obd->u.mds;
1277         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
1278         struct obd_device *obd = exp->exp_obd;
1279         struct lvfs_run_ctxt saved;
1280         char idname[LL_ID_NAMELEN];
1281         struct dentry *de;
1282         void *handle;
1283         int err, idlen, rc = 0;
1284         ENTRY;
1285
1286         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1287
1288         idlen = ll_id2str(idname, oa->o_id, oa->o_generation);
1289
1290         down(&parent_inode->i_sem);
1291         de = lookup_one_len(idname, mds->mds_objects_dir, idlen);
1292         if (IS_ERR(de) || de->d_inode == NULL) {
1293                 rc = IS_ERR(de) ? PTR_ERR(de) : -ENOENT;
1294                 CERROR("destroying non-existent object "LPU64" %s: rc %d\n",
1295                        oa->o_id, idname, rc);
1296                 GOTO(out_dput, rc);
1297         }
1298         /* Stripe count is 1 here since this is some MDS specific stuff
1299            that is unlinked, not spanned across multiple OSTs */
1300         handle = fsfilt_start_log(obd, mds->mds_objects_dir->d_inode,
1301                                   FSFILT_OP_UNLINK, oti, 1);
1302
1303         if (IS_ERR(handle))
1304                 GOTO(out_dput, rc = PTR_ERR(handle));
1305         
1306         rc = vfs_unlink(mds->mds_objects_dir->d_inode, de);
1307         if (rc) 
1308                 CERROR("error destroying object "LPU64":%u: rc %d\n",
1309                        oa->o_id, oa->o_generation, rc);
1310         
1311         err = fsfilt_commit(obd, mds->mds_sb, mds->mds_objects_dir->d_inode, 
1312                             handle, exp->exp_sync);
1313         if (err && !rc)
1314                 rc = err;
1315 out_dput:
1316         if (de != NULL)
1317                 l_dput(de);
1318         up(&parent_inode->i_sem);
1319         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1320         RETURN(rc);
1321 }