Whamcloud - gitweb
- landed b_hd_mdref (mostly WB cache fixes)
[fs/lustre-release.git] / lustre / mds / mds_fs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mds/mds_fs.c
5  *  Lustre Metadata Server (MDS) filesystem interface code
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef EXPORT_SYMTAB
27 # define EXPORT_SYMTAB
28 #endif
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/module.h>
32 #include <linux/kmod.h>
33 #include <linux/version.h>
34 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
35 #include <linux/mount.h>
36 #endif
37 #include <linux/lustre_mds.h>
38 #include <linux/obd_class.h>
39 #include <linux/obd_support.h>
40 #include <linux/lustre_lib.h>
41 #include <linux/lustre_fsfilt.h>
42 #include <libcfs/list.h>
43
44 #include <linux/lustre_smfs.h>
45 #include "mds_internal.h"
46
47 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
48 #define MDS_MAX_CLIENTS (PAGE_SIZE * 8)
49
50 #define LAST_RCVD "last_rcvd"
51 #define LOV_OBJID "lov_objid"
52 #define LAST_FID  "last_fid"
53 #define VIRT_FID  "virt_fid"
54
55 struct fidmap_entry {
56         struct hlist_node fm_hash;
57         struct lustre_id  fm_id;
58 };
59
60 int mds_fidmap_init(struct obd_device *obd, int size)
61 {
62         struct mds_obd *mds = &obd->u.mds;
63         struct hlist_head *head;
64         int i = 0;
65         ENTRY;
66
67         LASSERT(size > sizeof(sizeof(struct hlist_head)));
68         mds->mds_fidmap_size = size / sizeof(struct hlist_head);
69
70         CWARN("allocating %lu fid mapping entries\n",
71               (unsigned long)mds->mds_fidmap_size);
72
73         OBD_ALLOC(mds->mds_fidmap_table, size);
74         if (!mds->mds_fidmap_table)
75                 RETURN(-ENOMEM);
76
77         i = mds->mds_fidmap_size;
78         head = mds->mds_fidmap_table;
79         do {
80                 INIT_HLIST_HEAD(head);
81                 head++;
82                 i--;
83         } while(i);
84
85         RETURN(0);
86 }
87
88 int mds_fidmap_cleanup(struct obd_device *obd)
89 {
90         struct hlist_node *node = NULL, *tmp = NULL;
91         struct mds_obd *mds = &obd->u.mds;
92         struct fidmap_entry *entry;
93         struct hlist_head *head;
94         int i = 0;
95         ENTRY;
96
97         spin_lock(&mds->mds_fidmap_lock);
98         for (i = 0, head = mds->mds_fidmap_table;
99              i < mds->mds_fidmap_size; i++, head++) {
100                 hlist_for_each_safe(node, tmp, head) {
101                         entry = hlist_entry(node, struct fidmap_entry, fm_hash);
102                         hlist_del_init(&entry->fm_hash);
103                         OBD_FREE(entry, sizeof(*entry));
104                 }
105         }
106         spin_unlock(&mds->mds_fidmap_lock);
107         OBD_FREE(mds->mds_fidmap_table, mds->mds_fidmap_size *
108                  sizeof(struct hlist_head));
109         RETURN(0);
110 }
111
112 static inline unsigned long
113 const hashfn(struct obd_device *obd, __u64 fid)
114 {
115         struct mds_obd *mds = &obd->u.mds;
116         return (unsigned long)(fid & (mds->mds_fidmap_size - 1));
117 }
118
119 static struct fidmap_entry *
120 __mds_fidmap_find(struct obd_device *obd, __u64 fid)
121 {
122         struct fidmap_entry *entry = NULL;
123         struct mds_obd *mds = &obd->u.mds;
124         struct hlist_node *node = NULL;
125         struct hlist_head *head;
126         ENTRY;
127
128         head = mds->mds_fidmap_table + hashfn(obd, fid);
129         hlist_for_each(node, head) {
130                 entry = hlist_entry(node, struct fidmap_entry, fm_hash);
131                 if (id_fid(&entry->fm_id) == fid)
132                         RETURN(entry);
133         }
134         RETURN(NULL);
135 }
136
137 struct fidmap_entry *
138 mds_fidmap_find(struct obd_device *obd, __u64 fid)
139 {
140         struct mds_obd *mds = &obd->u.mds;
141         struct fidmap_entry *entry;
142         ENTRY;
143
144         spin_lock(&mds->mds_fidmap_lock);
145         entry = __mds_fidmap_find(obd, fid);
146         spin_unlock(&mds->mds_fidmap_lock);
147         
148         RETURN(entry);
149 }
150
151 static void __mds_fidmap_insert(struct obd_device *obd,
152                                 struct fidmap_entry *entry)
153 {
154         struct mds_obd *mds = &obd->u.mds;
155         struct hlist_head *head;
156         unsigned long idx;
157         ENTRY;
158
159         idx = hashfn(obd, id_fid(&entry->fm_id));
160         head = mds->mds_fidmap_table + idx;
161         hlist_add_head(&entry->fm_hash, head);
162         
163         EXIT;
164 }
165
166 void mds_fidmap_insert(struct obd_device *obd,
167                        struct fidmap_entry *entry)
168 {
169         struct mds_obd *mds = &obd->u.mds;
170         ENTRY;
171         
172         spin_lock(&mds->mds_fidmap_lock);
173         __mds_fidmap_insert(obd, entry);
174         spin_unlock(&mds->mds_fidmap_lock);
175         
176         EXIT;
177 }
178
179 static void __mds_fidmap_remove(struct obd_device *obd,
180                                 struct fidmap_entry *entry)
181 {
182         ENTRY;
183         hlist_del_init(&entry->fm_hash);
184         EXIT;
185 }
186
187 void mds_fidmap_remove(struct obd_device *obd,
188                        struct fidmap_entry *entry)
189 {
190         struct mds_obd *mds = &obd->u.mds;
191         ENTRY;
192         
193         spin_lock(&mds->mds_fidmap_lock);
194         __mds_fidmap_remove(obd, entry);
195         spin_unlock(&mds->mds_fidmap_lock);
196
197         EXIT;
198 }
199
200 /* creates new mapping remote fid -> local inode store cookie. Both are saved in
201  * lustre_id for better usability, as all mds function use lustre_id as input
202  * params.*/
203 int mds_fidmap_add(struct obd_device *obd,
204                    struct lustre_id *id)
205 {
206         struct mds_obd *mds = &obd->u.mds;
207         struct fidmap_entry *entry;
208         ENTRY;
209
210         OBD_ALLOC(entry, sizeof(*entry));
211         if (!entry)
212                 RETURN(-ENOMEM);
213
214         entry->fm_id = *id;
215         
216         spin_lock(&mds->mds_fidmap_lock);
217         if (!__mds_fidmap_find(obd, id_fid(id))) {
218                 __mds_fidmap_insert(obd, entry);
219                 spin_unlock(&mds->mds_fidmap_lock);
220                 CDEBUG(D_INODE, "added mapping to "DLID4"\n",
221                        OLID4(id));
222                 RETURN(1);
223         }
224         spin_unlock(&mds->mds_fidmap_lock);
225         OBD_FREE(entry, sizeof(*entry));
226         
227         RETURN(0);
228 }
229
230 /* removes mapping using fid component from passed @id */
231 void mds_fidmap_del(struct obd_device *obd,
232                     struct lustre_id *id)
233 {
234         struct mds_obd *mds = &obd->u.mds;
235         struct fidmap_entry *entry;
236         ENTRY;
237
238         spin_lock(&mds->mds_fidmap_lock);
239         entry = __mds_fidmap_find(obd, id_fid(id));
240         if (entry) {
241                 __mds_fidmap_remove(obd, entry);
242                 spin_unlock(&mds->mds_fidmap_lock);
243                 OBD_FREE(entry, sizeof(*entry));
244                 CDEBUG(D_INODE, "removed mapping to "DLID4"\n",
245                        OLID4(id));
246                 goto out;
247         }
248         spin_unlock(&mds->mds_fidmap_lock);
249 out:
250         EXIT;
251 }
252
253 struct lustre_id *mds_fidmap_lookup(struct obd_device *obd,
254                                     struct lustre_id *id)
255 {
256         struct mds_obd *mds = &obd->u.mds;
257         struct fidmap_entry *entry;
258         ENTRY;
259
260         spin_lock(&mds->mds_fidmap_lock);
261         entry = __mds_fidmap_find(obd, id_fid(id));
262         spin_unlock(&mds->mds_fidmap_lock);
263
264         if (!entry)
265                 RETURN(NULL);
266         
267         RETURN(&entry->fm_id);
268 }
269
270 /* Add client data to the MDS.  We use a bitmap to locate a free space
271  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
272  * Otherwise, we have just read the data from the last_rcvd file and
273  * we know its offset.
274  */
275 int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
276                    struct mds_export_data *med, int cl_idx)
277 {
278         unsigned long *bitmap = mds->mds_client_bitmap;
279         int new_client = (cl_idx == -1);
280         ENTRY;
281
282         LASSERT(bitmap != NULL);
283
284         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
285         if (!strcmp((char *)med->med_mcd->mcd_uuid, (char *)obd->obd_uuid.uuid))
286                 RETURN(0);
287
288         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
289          * there's no need for extra complication here
290          */
291         if (new_client) {
292                 cl_idx = find_first_zero_bit(bitmap, MDS_MAX_CLIENTS);
293         repeat:
294                 if (cl_idx >= MDS_MAX_CLIENTS) {
295                         CERROR("no room for clients - fix MDS_MAX_CLIENTS\n");
296                         return -ENOMEM;
297                 }
298                 if (test_and_set_bit(cl_idx, bitmap)) {
299                         cl_idx = find_next_zero_bit(bitmap, MDS_MAX_CLIENTS,
300                                                     cl_idx);
301                         goto repeat;
302                 }
303         } else {
304                 if (test_and_set_bit(cl_idx, bitmap)) {
305                         CERROR("MDS client %d: bit already set in bitmap!!\n",
306                                cl_idx);
307                         LBUG();
308                 }
309         }
310
311         CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
312                cl_idx, med->med_mcd->mcd_uuid);
313
314         med->med_idx = cl_idx;
315         med->med_off = le32_to_cpu(mds->mds_server_data->msd_client_start) +
316                 (cl_idx * le16_to_cpu(mds->mds_server_data->msd_client_size));
317
318         if (new_client) {
319                 struct file *file = mds->mds_rcvd_filp;
320                 struct lvfs_run_ctxt saved;
321                 loff_t off = med->med_off;
322                 int rc;
323
324                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
325                 rc = fsfilt_write_record(obd, file, med->med_mcd,
326                                          sizeof(*med->med_mcd), &off, 1);
327                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
328
329                 if (rc)
330                         return rc;
331                 CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
332                        med->med_idx, med->med_off,
333                        (unsigned int)sizeof(*med->med_mcd));
334         }
335         return 0;
336 }
337
338 int mds_client_free(struct obd_export *exp, int clear_client)
339 {
340         struct mds_export_data *med = &exp->exp_mds_data;
341         struct mds_obd *mds = &exp->exp_obd->u.mds;
342         unsigned long *bitmap = mds->mds_client_bitmap;
343         struct obd_device *obd = exp->exp_obd;
344         struct mds_client_data zero_mcd;
345         struct lvfs_run_ctxt saved;
346         int rc;
347
348         if (!med->med_mcd)
349                 RETURN(0);
350
351         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
352         if (!strcmp((char *)med->med_mcd->mcd_uuid, (char *)obd->obd_uuid.uuid))
353                 GOTO(free_and_out, 0);
354
355         CDEBUG(D_INFO, "freeing client at idx %u (%lld)with UUID '%s'\n",
356                med->med_idx, med->med_off, med->med_mcd->mcd_uuid);
357
358         LASSERT(bitmap);
359
360         /* Clear the bit _after_ zeroing out the client so we don't
361            race with mds_client_add and zero out new clients.*/
362         if (!test_bit(med->med_idx, bitmap)) {
363                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
364                        med->med_idx);
365                 LBUG();
366         }
367
368         if (clear_client) {
369                 memset(&zero_mcd, 0, sizeof zero_mcd);
370                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
371                 rc = fsfilt_write_record(obd, mds->mds_rcvd_filp, &zero_mcd,
372                                          sizeof(zero_mcd), &med->med_off, 1);
373                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
374
375                 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
376                        "zeroing out client %s idx %u in %s rc %d\n",
377                        med->med_mcd->mcd_uuid, med->med_idx, LAST_RCVD, rc);
378         }
379
380         if (!test_and_clear_bit(med->med_idx, bitmap)) {
381                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
382                        med->med_idx);
383                 LBUG();
384         }
385
386
387         /* Make sure the server's last_transno is up to date. Do this
388          * after the client is freed so we know all the client's
389          * transactions have been committed. */
390         mds_update_server_data(exp->exp_obd, 1);
391
392 free_and_out:
393         OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
394         med->med_mcd = NULL;
395         return 0;
396 }
397
398 static int mds_server_free_data(struct mds_obd *mds)
399 {
400         OBD_FREE(mds->mds_client_bitmap, MDS_MAX_CLIENTS / 8);
401         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
402         mds->mds_server_data = NULL;
403
404         return 0;
405 }
406
407 static int mds_read_last_fid(struct obd_device *obd, struct file *file)
408 {
409         int rc = 0;
410         loff_t off = 0;
411         struct mds_obd *mds = &obd->u.mds;
412         unsigned long last_fid_size = file->f_dentry->d_inode->i_size;
413         ENTRY;
414
415         if (last_fid_size == 0) {
416                 CWARN("%s: initializing new %s\n", obd->obd_name,
417                       file->f_dentry->d_name.name);
418
419                 /* 
420                  * as fid is used for forming res_id for locking, it should not
421                  * be zero. This will keep us out of lots possible problems,
422                  * asserts, etc.
423                  */
424                 mds_set_last_fid(obd, 0);
425         } else {
426                 __u64 lastfid;
427                 
428                 rc = fsfilt_read_record(obd, file, &lastfid,
429                                         sizeof(lastfid), &off);
430                 if (rc) {
431                         CERROR("error reading MDS %s: rc = %d\n",
432                                file->f_dentry->d_name.name, rc);
433                         RETURN(rc);
434                 }
435
436                 /* 
437                  * make sure, that fid is up-to-date.
438                  */
439                 mds_set_last_fid(obd, lastfid);
440         }
441
442         CDEBUG(D_INODE, "%s: server last_fid: "LPU64"\n",
443                obd->obd_name, mds->mds_last_fid);
444
445         rc = mds_update_last_fid(obd, NULL, 1);
446         RETURN(rc);
447 }
448
449 static int mds_read_last_rcvd(struct obd_device *obd, struct file *file)
450 {
451         unsigned long last_rcvd_size = file->f_dentry->d_inode->i_size;
452         struct mds_obd *mds = &obd->u.mds;
453         struct mds_server_data *msd = NULL;
454         struct mds_client_data *mcd = NULL;
455         loff_t off = 0;
456         __u64 mount_count;
457         int cl_idx, rc = 0;
458         ENTRY;
459
460         /* ensure padding in the struct is the correct size */
461         LASSERT(offsetof(struct mds_server_data, msd_padding) +
462                 sizeof(msd->msd_padding) == MDS_LR_SERVER_SIZE);
463         LASSERT(offsetof(struct mds_client_data, mcd_padding) +
464                 sizeof(mcd->mcd_padding) == MDS_LR_CLIENT_SIZE);
465
466         OBD_ALLOC_WAIT(msd, sizeof(*msd));
467         if (!msd)
468                 RETURN(-ENOMEM);
469
470         OBD_ALLOC_WAIT(mds->mds_client_bitmap, MDS_MAX_CLIENTS / 8);
471         if (!mds->mds_client_bitmap) {
472                 OBD_FREE(msd, sizeof(*msd));
473                 RETURN(-ENOMEM);
474         }
475
476         mds->mds_server_data = msd;
477
478         if (last_rcvd_size == 0) {
479                 CWARN("%s: initializing new %s\n", obd->obd_name,
480                       file->f_dentry->d_name.name);
481
482                 memcpy(msd->msd_uuid, obd->obd_uuid.uuid,sizeof(msd->msd_uuid));
483                 msd->msd_last_transno = 0;
484                 mount_count = msd->msd_mount_count = 0;
485                 msd->msd_server_size = cpu_to_le32(MDS_LR_SERVER_SIZE);
486                 msd->msd_client_start = cpu_to_le32(MDS_LR_CLIENT_START);
487                 msd->msd_client_size = cpu_to_le16(MDS_LR_CLIENT_SIZE);
488                 msd->msd_feature_rocompat = cpu_to_le32(MDS_ROCOMPAT_LOVOBJID);
489         } else {
490                 rc = fsfilt_read_record(obd, file, msd, sizeof(*msd), &off);
491                 if (rc) {
492                         CERROR("error reading MDS %s: rc = %d\n",
493                                file->f_dentry->d_name.name, rc);
494                         GOTO(err_msd, rc);
495                 }
496                 if (strcmp((char *)msd->msd_uuid, (char *)obd->obd_uuid.uuid)) {
497                         CERROR("OBD UUID %s does not match last_rcvd UUID %s\n",
498                                obd->obd_uuid.uuid, msd->msd_uuid);
499                         GOTO(err_msd, rc = -EINVAL);
500                 }
501                 mount_count = le64_to_cpu(msd->msd_mount_count);
502         }
503         if (msd->msd_feature_incompat & ~cpu_to_le32(MDS_INCOMPAT_SUPP)) {
504                 CERROR("unsupported incompat feature %x\n",
505                        le32_to_cpu(msd->msd_feature_incompat) &
506                        ~MDS_INCOMPAT_SUPP);
507                 GOTO(err_msd, rc = -EINVAL);
508         }
509         /* XXX updating existing b_devel fs only, can be removed in future */
510         msd->msd_feature_rocompat = cpu_to_le32(MDS_ROCOMPAT_LOVOBJID);
511         if (msd->msd_feature_rocompat & ~cpu_to_le32(MDS_ROCOMPAT_SUPP)) {
512                 CERROR("unsupported read-only feature %x\n",
513                        le32_to_cpu(msd->msd_feature_rocompat) &
514                        ~MDS_ROCOMPAT_SUPP);
515                 /* Do something like remount filesystem read-only */
516                 GOTO(err_msd, rc = -EINVAL);
517         }
518
519         mds->mds_last_transno = le64_to_cpu(msd->msd_last_transno);
520
521         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
522                obd->obd_name, mds->mds_last_transno);
523         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
524                obd->obd_name, mount_count + 1);
525         CDEBUG(D_INODE, "%s: server data size: %u\n",
526                obd->obd_name, le32_to_cpu(msd->msd_server_size));
527         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
528                obd->obd_name, le32_to_cpu(msd->msd_client_start));
529         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
530                obd->obd_name, le32_to_cpu(msd->msd_client_size));
531         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
532                obd->obd_name, last_rcvd_size);
533         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
534                last_rcvd_size <= le32_to_cpu(msd->msd_client_start) ? 0 :
535                (last_rcvd_size - le32_to_cpu(msd->msd_client_start)) /
536                 le16_to_cpu(msd->msd_client_size));
537
538         /* When we do a clean MDS shutdown, we save the last_transno into
539          * the header.  If we find clients with higher last_transno values
540          * then those clients may need recovery done. */
541         for (cl_idx = 0, off = le32_to_cpu(msd->msd_client_start);
542              off < last_rcvd_size; cl_idx++) {
543                 __u64 last_transno;
544                 struct obd_export *exp;
545                 struct mds_export_data *med;
546
547                 if (!mcd) {
548                         OBD_ALLOC_WAIT(mcd, sizeof(*mcd));
549                         if (!mcd)
550                                 GOTO(err_client, rc = -ENOMEM);
551                 }
552
553                 /* Don't assume off is incremented properly by
554                  * fsfilt_read_record(), in case sizeof(*mcd)
555                  * isn't the same as msd->msd_client_size.  */
556                 off = le32_to_cpu(msd->msd_client_start) +
557                         cl_idx * le16_to_cpu(msd->msd_client_size);
558                 rc = fsfilt_read_record(obd, file, mcd, sizeof(*mcd), &off);
559                 if (rc) {
560                         CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
561                                file->f_dentry->d_name.name, cl_idx, off, rc);
562                         break; /* read error shouldn't cause startup to fail */
563                 }
564
565                 if (mcd->mcd_uuid[0] == '\0') {
566                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
567                                cl_idx);
568                         continue;
569                 }
570
571                 last_transno = le64_to_cpu(mcd->mcd_last_transno) >
572                                le64_to_cpu(mcd->mcd_last_close_transno) ?
573                                le64_to_cpu(mcd->mcd_last_transno) :
574                                le64_to_cpu(mcd->mcd_last_close_transno);
575
576                 /* These exports are cleaned up by mds_disconnect(), so they
577                  * need to be set up like real exports as mds_connect() does.
578                  */
579                 CDEBUG(D_HA|D_WARNING,"RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
580                        " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx,
581                        last_transno, le64_to_cpu(msd->msd_last_transno),
582                        mcd->mcd_last_xid);
583
584                 exp = class_new_export(obd);
585                 if (exp == NULL)
586                         GOTO(err_client, rc = -ENOMEM);
587
588                 memcpy(&exp->exp_client_uuid.uuid, mcd->mcd_uuid,
589                        sizeof exp->exp_client_uuid.uuid);
590                 med = &exp->exp_mds_data;
591                 med->med_mcd = mcd;
592                 mds_client_add(obd, mds, med, cl_idx);
593                 /* create helper if export init gets more complex */
594                 INIT_LIST_HEAD(&med->med_open_head);
595                 spin_lock_init(&med->med_open_lock);
596
597                 mcd = NULL;
598                 exp->exp_connected = 0;
599                 exp->exp_req_replay_needed = 1;
600                 obd->obd_recoverable_clients++;
601                 obd->obd_max_recoverable_clients++;
602
603                 /* track clients to separate req replay
604                  * from lock replay. bug 6063 */
605                 atomic_inc(&obd->obd_req_replay_clients);
606                 exp->exp_req_replay_needed = 1;
607                 atomic_inc(&obd->obd_lock_replay_clients);
608                 exp->exp_lock_replay_needed = 1;
609                 
610                 class_export_put(exp);
611
612                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
613                        cl_idx, last_transno);
614
615                 if (last_transno > mds->mds_last_transno)
616                        mds->mds_last_transno = last_transno;
617         }
618         if (mcd)
619                 OBD_FREE(mcd, sizeof(*mcd));
620         obd->obd_last_committed = mds->mds_last_transno;
621         if (obd->obd_recoverable_clients) {
622                 CWARN("RECOVERY: service %s, %d recoverable clients, "
623                       "last_transno "LPU64"\n", obd->obd_name,
624                       obd->obd_recoverable_clients, mds->mds_last_transno);
625                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
626                 target_start_recovery_thread(obd, mds_handle);
627                 obd->obd_recovery_start = LTIME_S(CURRENT_TIME);
628         }
629         
630         mds->mds_mount_count = mount_count + 1;
631         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
632
633         /* save it, so mount count and last_transno is current */
634         rc = mds_update_server_data(obd, 1);
635         if (rc)
636                 GOTO(err_client, rc);
637
638         RETURN(0);
639
640 err_client:
641         class_disconnect_exports(obd, 0);
642 err_msd:
643         mds_server_free_data(mds);
644         RETURN(rc);
645 }
646
647 /*
648  * sets up root inode lustre_id. It tries to read it first from root inode and
649  * if it is not there, new rootid is allocated and saved there.
650  */
651 int mds_fs_setup_rootid(struct obd_device *obd)
652 {
653         int rc = 0;
654         void *handle;
655         struct inode *inode;
656         struct dentry *dentry;
657         struct mds_obd *mds = &obd->u.mds;
658         ENTRY;
659
660         /* getting root directory and setup its fid. */
661         dentry = mds_id2dentry(obd, &mds->mds_rootid, NULL);
662         if (IS_ERR(dentry)) {
663                 CERROR("Can't find ROOT by "DLID4", err = %d\n",
664                        OLID4(&mds->mds_rootid), (int)PTR_ERR(dentry));
665                 RETURN(PTR_ERR(dentry));
666         }
667
668         inode = dentry->d_inode;
669         LASSERT(dentry->d_inode);
670
671         rc = mds_pack_inode2id(obd, &mds->mds_rootid, inode, 1);
672         if (rc && rc != -ENODATA)
673                 GOTO(out_dentry, rc);
674
675         if (rc) {
676                 if (rc != -ENODATA)
677                         GOTO(out_dentry, rc);
678         } else {
679                 /* rootid is filled by mds_read_inode_sid(), so we do not need
680                  * to allocate it and update. */
681                 LASSERT(id_group(&mds->mds_rootid) == mds->mds_num);
682                 mds_set_last_fid(obd, id_fid(&mds->mds_rootid));
683
684                 rc = mds_fidmap_add(obd, &mds->mds_rootid);
685                 if (rc > 0)
686                         rc = 0;
687                 
688                 GOTO(out_dentry, rc);
689         }
690
691         /* allocating new one, as it is not found in root inode. */
692         handle = fsfilt_start(obd, inode,
693                               FSFILT_OP_SETATTR, NULL);
694         
695         if (IS_ERR(handle)) {
696                 rc = PTR_ERR(handle);
697                 CERROR("fsfilt_start() failed, rc = %d\n", rc);
698                 GOTO(out_dentry, rc);
699         }
700         
701         mds_inode2id(obd, &mds->mds_rootid, inode, mds_alloc_fid(obd));
702         rc = mds_update_inode_ids(obd, inode, handle, &mds->mds_rootid, NULL);
703         if (rc) {
704                 CERROR("mds_update_inode_ids() failed, rc = %d\n", rc);
705                 GOTO(out_dentry, rc);
706         }
707
708         rc = mds_fidmap_add(obd, &mds->mds_rootid);
709         if (rc < 0)
710                 GOTO(out_dentry, rc);
711         else
712                 rc = 0;
713         
714         rc = fsfilt_commit(obd, mds->mds_sb, inode, handle, 0);
715         if (rc)
716                 CERROR("fsfilt_commit() failed, rc = %d\n", rc);
717
718         EXIT;
719 out_dentry:
720         l_dput(dentry);
721         if (rc == 0)
722                 CWARN("%s: rootid: "DLID4"\n", obd->obd_name,
723                       OLID4(&mds->mds_rootid));
724         return rc;
725 }
726
727 static int mds_update_virtid_fid(struct obd_device *obd,
728                                  void *handle, int force_sync)
729 {
730         struct mds_obd *mds = &obd->u.mds;
731         struct file *filp = mds->mds_virtid_filp;
732         struct lvfs_run_ctxt saved;
733         loff_t off = 0;
734         int rc = 0;
735         ENTRY;
736
737         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
738         rc = fsfilt_write_record(obd, filp, &mds->mds_virtid_fid,
739                                  sizeof(mds->mds_virtid_fid),
740                                  &off, force_sync);
741         if (rc) {
742                 CERROR("error writing MDS virtid_fid #"LPU64
743                        ", err = %d\n", mds->mds_virtid_fid, rc);
744         }
745                 
746         CDEBUG(D_SUPER, "wrote virtid fid #"LPU64" at idx "
747                "%llu: err = %d\n", mds->mds_virtid_fid,
748                off, rc);
749         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
750
751         RETURN(rc);
752 }
753
754 static int mds_read_virtid_fid(struct obd_device *obd,
755                                struct file *file)
756 {
757         int rc = 0;
758         loff_t off = 0;
759         struct mds_obd *mds = &obd->u.mds;
760         unsigned long virtid_fid_size = file->f_dentry->d_inode->i_size;
761         ENTRY;
762
763         if (virtid_fid_size == 0) {
764                 mds->mds_virtid_fid = mds_alloc_fid(obd);
765         } else {
766                 rc = fsfilt_read_record(obd, file, &mds->mds_virtid_fid,
767                                         sizeof(mds->mds_virtid_fid), &off);
768                 if (rc) {
769                         CERROR("error reading MDS %s: rc = %d\n",
770                                file->f_dentry->d_name.name, rc);
771                         RETURN(rc);
772                 }
773         }
774         rc = mds_update_virtid_fid(obd, NULL, 1);
775
776         RETURN(rc);
777 }
778
779 /*
780  * initializes lustre_id for virtual id directory, it is needed sometimes, as it
781  * is possible that it will be the parent for object an operations is going to
782  * be performed on.
783  */
784 int mds_fs_setup_virtid(struct obd_device *obd)
785 {
786         int rc = 0;
787         void *handle;
788         struct lustre_id sid;
789         struct mds_obd *mds = &obd->u.mds;
790         struct inode *inode = mds->mds_id_dir->d_inode;
791         ENTRY;
792
793         handle = fsfilt_start(obd, inode,
794                               FSFILT_OP_SETATTR, NULL);
795         
796         if (IS_ERR(handle)) {
797                 rc = PTR_ERR(handle);
798                 CERROR("fsfilt_start() failed, rc = %d\n", rc);
799                 RETURN(rc);
800         }
801
802         id_group(&sid) = mds->mds_num;
803         id_fid(&sid) = mds->mds_virtid_fid;
804
805         id_ino(&sid) = inode->i_ino;
806         id_gen(&sid) = inode->i_generation;
807         id_type(&sid) = (S_IFMT & inode->i_mode);
808
809         rc = mds_update_inode_ids(obd, inode, handle, &sid, NULL);
810
811         if (rc) {
812                 CERROR("mds_update_inode_ids() failed, rc = %d\n", rc);
813                 RETURN(rc);
814         }
815
816         rc = mds_fidmap_add(obd, &sid);
817         if (rc < 0)
818                 RETURN(rc);
819         else
820                 rc = 0;
821         
822         rc = fsfilt_commit(obd, mds->mds_sb, inode, handle, 0);
823         if (rc) {
824                 CERROR("fsfilt_commit() failed, rc = %d\n", rc);
825                 RETURN(rc);
826         }
827
828         RETURN(rc);
829 }
830
831 #define MDS_FIDMAP_SIZE (2*PAGE_SIZE)
832
833 int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
834 {
835         struct mds_obd *mds = &obd->u.mds;
836         struct lvfs_run_ctxt saved;
837         struct dentry *dentry;
838         struct file *file;
839         int rc;
840         ENTRY;
841
842         rc = cleanup_group_info();
843         if (rc)
844                 RETURN(rc);
845
846         mds->mds_vfsmnt = mnt;
847         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
848
849         fsfilt_setup(obd, mds->mds_sb);
850
851         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
852         obd->obd_lvfs_ctxt.pwdmnt = mnt;
853         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
854         obd->obd_lvfs_ctxt.fs = get_ds();
855         obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops;
856
857         /* setup the directory tree */
858         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
859         dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755, 0);
860         if (IS_ERR(dentry)) {
861                 rc = PTR_ERR(dentry);
862                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
863                 GOTO(err_pop, rc);
864         }
865
866         mdc_pack_id(&mds->mds_rootid, dentry->d_inode->i_ino,
867                     dentry->d_inode->i_generation, S_IFDIR, 0, 0);
868
869         dput(dentry);
870         
871         dentry = lookup_one_len("__iopen__", current->fs->pwd,
872                                 strlen("__iopen__"));
873         if (IS_ERR(dentry)) {
874                 rc = PTR_ERR(dentry);
875                 CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
876                 GOTO(err_pop, rc);
877         }
878         mds->mds_id_de = dentry;
879         if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
880                 rc = -ENOENT;
881                 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
882                 GOTO(err_id_de, rc);
883         }
884
885         dentry = simple_mkdir(current->fs->pwd, "PENDING", 0777, 1);
886         if (IS_ERR(dentry)) {
887                 rc = PTR_ERR(dentry);
888                 CERROR("cannot create PENDING directory: rc = %d\n", rc);
889                 GOTO(err_id_de, rc);
890         }
891         mds->mds_pending_dir = dentry;
892       
893         dentry = simple_mkdir(current->fs->pwd, "LOGS", 0777, 1);
894         if (IS_ERR(dentry)) {
895                 rc = PTR_ERR(dentry);
896                 CERROR("cannot create LOGS directory: rc = %d\n", rc);
897                 GOTO(err_pending, rc);
898         }
899         mds->mds_logs_dir = dentry;
900
901         dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1);
902         if (IS_ERR(dentry)) {
903                 rc = PTR_ERR(dentry);
904                 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
905                 GOTO(err_logs, rc);
906         }
907         mds->mds_objects_dir = dentry;
908
909         dentry = simple_mkdir(current->fs->pwd, "FIDS", 0777, 1);
910         if (IS_ERR(dentry)) {
911                 rc = PTR_ERR(dentry);
912                 CERROR("cannot create FIDS directory: rc = %d\n", rc);
913                 GOTO(err_objects, rc);
914         }
915         mds->mds_id_dir = dentry;
916
917         dentry = simple_mkdir(current->fs->pwd, "UNNAMED", 0777, 1);
918         if (IS_ERR(dentry)) {
919                 rc = PTR_ERR(dentry);
920                 CERROR("cannot create UNNAMED directory: rc = %d\n", rc);
921                 GOTO(err_id_dir, rc);
922         }
923         mds->mds_unnamed_dir = dentry;
924
925         /* open and test the last rcvd file */
926         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
927         if (IS_ERR(file)) {
928                 rc = PTR_ERR(file);
929                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
930                 GOTO(err_unnamed, rc = PTR_ERR(file));
931         }
932         mds->mds_rcvd_filp = file;
933         
934         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
935                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
936                        file->f_dentry->d_inode->i_mode);
937                 GOTO(err_last_rcvd, rc = -ENOENT);
938         }
939
940         rc = mds_read_last_rcvd(obd, file);
941         if (rc) {
942                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
943                 GOTO(err_last_rcvd, rc);
944         }
945
946         /* open and test last fid file */
947         file = filp_open(LAST_FID, O_RDWR | O_CREAT, 0644);
948         if (IS_ERR(file)) {
949                 rc = PTR_ERR(file);
950                 CERROR("cannot open/create %s file: rc = %d\n",
951                        LAST_FID, rc);
952                 GOTO(err_client, rc = PTR_ERR(file));
953         }
954         mds->mds_fid_filp = file;
955         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
956                 CERROR("%s is not a regular file!: mode = %o\n",
957                        LAST_FID, file->f_dentry->d_inode->i_mode);
958                 GOTO(err_last_fid, rc = -ENOENT);
959         }
960
961         rc = mds_read_last_fid(obd, file);
962         if (rc) {
963                 CERROR("cannot read %s: rc = %d\n", LAST_FID, rc);
964                 GOTO(err_last_fid, rc);
965         }
966
967         /* open and test virtid fid file */
968         file = filp_open(VIRT_FID, O_RDWR | O_CREAT, 0644);
969         if (IS_ERR(file)) {
970                 rc = PTR_ERR(file);
971                 CERROR("cannot open/create %s file: rc = %d\n",
972                        VIRT_FID, rc);
973                 GOTO(err_last_fid, rc = PTR_ERR(file));
974         }
975         mds->mds_virtid_filp = file;
976         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
977                 CERROR("%s is not a regular file!: mode = %o\n",
978                        VIRT_FID, file->f_dentry->d_inode->i_mode);
979                 GOTO(err_virtid_fid, rc = -ENOENT);
980         }
981
982         rc = mds_read_virtid_fid(obd, file);
983         if (rc) {
984                 CERROR("cannot read %s: rc = %d\n", VIRT_FID, rc);
985                 GOTO(err_virtid_fid, rc);
986         }
987         
988         /* open and test the lov objid file */
989         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
990         if (IS_ERR(file)) {
991                 rc = PTR_ERR(file);
992                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
993                 GOTO(err_last_fid, rc = PTR_ERR(file));
994         }
995         mds->mds_dt_objid_filp = file;
996         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
997                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
998                        file->f_dentry->d_inode->i_mode);
999                 GOTO(err_lov_objid, rc = -ENOENT);
1000         }
1001
1002         /* reint fidext thumb by last fid after root and virt are initialized */
1003         mds->mds_fidext_thumb = mds->mds_last_fid;
1004                 
1005         rc = mds_fidmap_init(obd, MDS_FIDMAP_SIZE);
1006         if (rc) {
1007                 CERROR("cannot init fid mapping tables, err %d\n", rc);
1008                 GOTO(err_lov_objid, rc);
1009         }
1010         
1011 err_pop:
1012         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1013         return rc;
1014
1015 err_lov_objid:
1016         if (mds->mds_dt_objid_filp && filp_close(mds->mds_dt_objid_filp, 0))
1017                 CERROR("can't close %s after error\n", LOV_OBJID);
1018 err_virtid_fid:
1019         if (mds->mds_virtid_filp && filp_close(mds->mds_virtid_filp, 0))
1020                 CERROR("can't close %s after error\n", VIRT_FID);
1021 err_last_fid:
1022         if (mds->mds_fid_filp && filp_close(mds->mds_fid_filp, 0))
1023                 CERROR("can't close %s after error\n", LAST_FID);
1024 err_client:
1025         class_disconnect_exports(obd, 0);
1026 err_last_rcvd:
1027         if (mds->mds_rcvd_filp && filp_close(mds->mds_rcvd_filp, 0))
1028                 CERROR("can't close %s after error\n", LAST_RCVD);
1029 err_unnamed:
1030         dput(mds->mds_unnamed_dir);
1031 err_id_dir:
1032         dput(mds->mds_id_dir);
1033 err_objects:
1034         dput(mds->mds_objects_dir);
1035 err_logs:
1036         dput(mds->mds_logs_dir);
1037 err_pending:
1038         dput(mds->mds_pending_dir);
1039 err_id_de:
1040         dput(mds->mds_id_de);
1041         goto err_pop;
1042 }
1043
1044 static int  mds_fs_post_cleanup(struct obd_device *obd)
1045 {
1046         int    rc = 0;
1047         rc = fsfilt_post_cleanup(obd);
1048         return rc; 
1049 }
1050
1051 int mds_fs_cleanup(struct obd_device *obd, int flags)
1052 {
1053         struct mds_obd *mds = &obd->u.mds;
1054         struct lvfs_run_ctxt saved;
1055         int rc = 0;
1056
1057         if (flags & OBD_OPT_FAILOVER)
1058                 CERROR("%s: shutting down for failover; client state will"
1059                        " be preserved.\n", obd->obd_name);
1060
1061         class_disconnect_exports(obd, flags); /* cleans up client info too */
1062         target_cleanup_recovery(obd);
1063         mds_server_free_data(mds);
1064         mds_fidmap_cleanup(obd);
1065         
1066         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1067         if (mds->mds_virtid_filp) {
1068                 rc = filp_close(mds->mds_virtid_filp, 0);
1069                 mds->mds_virtid_filp = NULL;
1070                 if (rc)
1071                         CERROR("%s file won't close, rc = %d\n", VIRT_FID, rc);
1072         }
1073         if (mds->mds_fid_filp) {
1074                 rc = filp_close(mds->mds_fid_filp, 0);
1075                 mds->mds_fid_filp = NULL;
1076                 if (rc)
1077                         CERROR("%s file won't close, rc = %d\n", LAST_FID, rc);
1078         }
1079         if (mds->mds_rcvd_filp) {
1080                 rc = filp_close(mds->mds_rcvd_filp, 0);
1081                 mds->mds_rcvd_filp = NULL;
1082                 if (rc)
1083                         CERROR("%s file won't close, rc = %d\n", LAST_RCVD, rc);
1084         }
1085         if (mds->mds_dt_objid_filp) {
1086                 rc = filp_close(mds->mds_dt_objid_filp, 0);
1087                 mds->mds_dt_objid_filp = NULL;
1088                 if (rc)
1089                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
1090         }
1091         if (mds->mds_unnamed_dir != NULL) {
1092                 l_dput(mds->mds_unnamed_dir);
1093                 mds->mds_unnamed_dir = NULL;
1094         }
1095         if (mds->mds_id_dir != NULL) {
1096                 l_dput(mds->mds_id_dir);
1097                 mds->mds_id_dir = NULL;
1098         }
1099         if (mds->mds_objects_dir != NULL) {
1100                 l_dput(mds->mds_objects_dir);
1101                 mds->mds_objects_dir = NULL;
1102         }
1103         if (mds->mds_logs_dir) {
1104                 l_dput(mds->mds_logs_dir);
1105                 mds->mds_logs_dir = NULL;
1106         }
1107         if (mds->mds_pending_dir) {
1108                 l_dput(mds->mds_pending_dir);
1109                 mds->mds_pending_dir = NULL;
1110         }
1111         rc = mds_fs_post_cleanup(obd);
1112         
1113         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1114         shrink_dcache_parent(mds->mds_id_de);
1115         dput(mds->mds_id_de);
1116
1117         return rc;
1118 }
1119
1120 /* Creates an object with the same name as its id.  Because this is not at all
1121  * performance sensitive, it is accomplished by creating a file, checking the
1122  * id, and renaming it. */
1123 int mds_obd_create(struct obd_export *exp, struct obdo *oa,
1124                    void *acl, int acl_size,
1125                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
1126 {
1127         struct mds_obd *mds = &exp->exp_obd->u.mds;
1128         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
1129         struct file *filp;
1130         struct dentry *dchild;
1131         struct lvfs_run_ctxt saved;
1132         char idname[LL_ID_NAMELEN];
1133         int rc = 0, err, idlen;
1134         void *handle;
1135         ENTRY;
1136
1137         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
1138         down(&parent_inode->i_sem);
1139         if (oa->o_id) {
1140                 idlen = ll_id2str(idname, oa->o_id, oa->o_generation);
1141                 dchild = lookup_one_len(idname, mds->mds_objects_dir, idlen);
1142                 if (IS_ERR(dchild))
1143                         GOTO(out_pop, rc = PTR_ERR(dchild));
1144
1145                 if (dchild->d_inode == NULL) {
1146                         struct dentry_params dp;
1147                         struct inode *inode;
1148
1149                         CWARN("creating log with ID "LPU64"\n", oa->o_id);
1150                         
1151                         dchild->d_fsdata = (void *) &dp;
1152                         dp.p_ptr = NULL;
1153                         dp.p_inum = oa->o_id;
1154                         rc = ll_vfs_create(parent_inode, dchild, S_IFREG, NULL);
1155                         if (dchild->d_fsdata == (void *)(unsigned long)oa->o_id)
1156                                 dchild->d_fsdata = NULL;
1157                         if (rc) {
1158                                 CDEBUG(D_INODE, "err during create: %d\n", rc);
1159                                 dput(dchild);
1160                                 GOTO(out_pop, rc);
1161                         }
1162                         inode = dchild->d_inode;
1163                         LASSERT(inode->i_ino == oa->o_id);
1164                         inode->i_generation = oa->o_generation;
1165                         CDEBUG(D_HA, "recreated ino %lu with gen %u\n",
1166                                inode->i_ino, inode->i_generation);
1167                         mark_inode_dirty(inode);
1168                 } else {
1169                         CWARN("it should be here!\n");
1170                 }
1171                 GOTO(out_pop, rc);
1172         }
1173
1174         sprintf(idname, "OBJECTS/%u.%u", ll_insecure_random_int(), current->pid);
1175         filp = filp_open(idname, O_CREAT | O_EXCL, 0644);
1176         if (IS_ERR(filp)) {
1177                 rc = PTR_ERR(filp);
1178                 if (rc == -EEXIST) {
1179                         CERROR("impossible object name collision %s\n",
1180                                idname);
1181                         LBUG();
1182                 }
1183                 CERROR("error creating tmp object %s: rc %d\n", 
1184                        idname, rc);
1185                 GOTO(out_pop, rc);
1186         }
1187
1188         LASSERT(mds->mds_objects_dir == filp->f_dentry->d_parent);
1189
1190         oa->o_id = filp->f_dentry->d_inode->i_ino;
1191         oa->o_generation = filp->f_dentry->d_inode->i_generation;
1192         idlen = ll_id2str(idname, oa->o_id, oa->o_generation);
1193         
1194         CWARN("created log anonymous "LPU64"/%u\n",
1195               oa->o_id, oa->o_generation);
1196
1197         dchild = lookup_one_len(idname, mds->mds_objects_dir, idlen);
1198         if (IS_ERR(dchild)) {
1199                 CERROR("getting neg dentry for obj rename: %d\n", rc);
1200                 GOTO(out_close, rc = PTR_ERR(dchild));
1201         }
1202         if (dchild->d_inode != NULL) {
1203                 CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
1204                        oa->o_id, oa->o_generation);
1205                 LBUG();
1206         }
1207
1208         handle = fsfilt_start(exp->exp_obd, mds->mds_objects_dir->d_inode,
1209                               FSFILT_OP_RENAME, NULL);
1210         if (IS_ERR(handle))
1211                 GOTO(out_dput, rc = PTR_ERR(handle));
1212
1213         lock_kernel();
1214         rc = vfs_rename(mds->mds_objects_dir->d_inode, filp->f_dentry,
1215                         mds->mds_objects_dir->d_inode, dchild);
1216         unlock_kernel();
1217         if (rc)
1218                 CERROR("error renaming new object "LPU64":%u: rc %d\n",
1219                        oa->o_id, oa->o_generation, rc);
1220
1221         err = fsfilt_commit(exp->exp_obd, mds->mds_sb, 
1222                             mds->mds_objects_dir->d_inode, handle, 0);
1223         if (!err) {
1224                 oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
1225                 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLGROUP;
1226         } else if (!rc)
1227                 rc = err;
1228 out_dput:
1229         dput(dchild);
1230 out_close:
1231         err = filp_close(filp, 0);
1232         if (err) {
1233                 CERROR("closing tmpfile %s: rc %d\n", idname, rc);
1234                 if (!rc)
1235                         rc = err;
1236         }
1237 out_pop:
1238         up(&parent_inode->i_sem);
1239         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
1240         RETURN(rc);
1241 }
1242
1243 int mds_obd_destroy(struct obd_export *exp, struct obdo *oa,
1244                     struct lov_stripe_md *ea, struct obd_trans_info *oti)
1245 {
1246         struct mds_obd *mds = &exp->exp_obd->u.mds;
1247         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
1248         struct obd_device *obd = exp->exp_obd;
1249         struct lvfs_run_ctxt saved;
1250         char idname[LL_ID_NAMELEN];
1251         struct dentry *de;
1252         void *handle;
1253         int err, idlen, rc = 0;
1254         ENTRY;
1255
1256         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1257
1258         idlen = ll_id2str(idname, oa->o_id, oa->o_generation);
1259
1260         down(&parent_inode->i_sem);
1261         de = lookup_one_len(idname, mds->mds_objects_dir, idlen);
1262         if (IS_ERR(de) || de->d_inode == NULL) {
1263                 rc = IS_ERR(de) ? PTR_ERR(de) : -ENOENT;
1264                 CERROR("destroying non-existent object "LPU64" %s: rc %d\n",
1265                        oa->o_id, idname, rc);
1266                 GOTO(out_dput, rc);
1267         }
1268         /* Stripe count is 1 here since this is some MDS specific stuff
1269            that is unlinked, not spanned across multiple OSTs */
1270         handle = fsfilt_start_log(obd, mds->mds_objects_dir->d_inode,
1271                                   FSFILT_OP_UNLINK, oti, 1);
1272
1273         if (IS_ERR(handle))
1274                 GOTO(out_dput, rc = PTR_ERR(handle));
1275         
1276         rc = vfs_unlink(mds->mds_objects_dir->d_inode, de);
1277         if (rc) 
1278                 CERROR("error destroying object "LPU64":%u: rc %d\n",
1279                        oa->o_id, oa->o_generation, rc);
1280         
1281         err = fsfilt_commit(obd, mds->mds_sb, mds->mds_objects_dir->d_inode, 
1282                             handle, exp->exp_sync);
1283         if (err && !rc)
1284                 rc = err;
1285 out_dput:
1286         if (de != NULL)
1287                 l_dput(de);
1288         up(&parent_inode->i_sem);
1289         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1290         RETURN(rc);
1291 }