Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / mds / mds_fs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mds/mds_fs.c
5  *  Lustre Metadata Server (MDS) filesystem interface code
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef EXPORT_SYMTAB
27 # define EXPORT_SYMTAB
28 #endif
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/module.h>
32 #include <linux/kmod.h>
33 #include <linux/version.h>
34 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
35 #include <linux/mount.h>
36 #endif
37 #include <linux/lustre_mds.h>
38 #include <linux/obd_class.h>
39 #include <linux/obd_support.h>
40 #include <linux/lustre_lib.h>
41 #include <linux/lustre_fsfilt.h>
42 #include <portals/list.h>
43
44 #include "mds_internal.h"
45
46 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
47 #define MDS_MAX_CLIENTS (PAGE_SIZE * 8)
48 #define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
49
50 #define LAST_RCVD "last_rcvd"
51 #define LOV_OBJID "lov_objid"
52
53 /* Add client data to the MDS.  We use a bitmap to locate a free space
54  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
55  * Otherwise, we have just read the data from the last_rcvd file and
56  * we know its offset.
57  */
58 int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
59                    struct mds_export_data *med, int cl_idx)
60 {
61         unsigned long *bitmap = mds->mds_client_bitmap;
62         int new_client = (cl_idx == -1);
63         ENTRY;
64
65         LASSERT(bitmap != NULL);
66
67         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
68         if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
69                 RETURN(0);
70
71         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
72          * there's no need for extra complication here
73          */
74         if (new_client) {
75                 cl_idx = find_first_zero_bit(bitmap, MDS_MAX_CLIENTS);
76         repeat:
77                 if (cl_idx >= MDS_MAX_CLIENTS) {
78                         CERROR("no room for clients - fix MDS_MAX_CLIENTS\n");
79                         return -ENOMEM;
80                 }
81                 if (test_and_set_bit(cl_idx, bitmap)) {
82                         cl_idx = find_next_zero_bit(bitmap, MDS_MAX_CLIENTS,
83                                                     cl_idx);
84                         goto repeat;
85                 }
86         } else {
87                 if (test_and_set_bit(cl_idx, bitmap)) {
88                         CERROR("MDS client %d: bit already set in bitmap!!\n",
89                                cl_idx);
90                         LBUG();
91                 }
92         }
93
94         CDEBUG(D_INFO, "client at index %d with UUID '%s' added\n",
95                cl_idx, med->med_mcd->mcd_uuid);
96
97         med->med_idx = cl_idx;
98         med->med_off = MDS_LR_CLIENT_START + (cl_idx * MDS_LR_CLIENT_SIZE);
99
100         if (new_client) {
101                 struct obd_run_ctxt saved;
102                 loff_t off = med->med_off;
103                 struct file *file = mds->mds_rcvd_filp;
104                 void *handle;
105                 int rc, err;
106
107                 push_ctxt(&saved, &obd->obd_ctxt, NULL);
108                 /* We need to start a transaction here first, to avoid a
109                  * possible ordering deadlock on last_rcvd->i_sem and the
110                  * journal lock. In most places we start the journal handle
111                  * first (because we do compound transactions), and then
112                  * later do the write into last_rcvd, which gets i_sem.
113                  *
114                  * Without this transaction, clients connecting at the same
115                  * time other MDS operations are ongoing get last_rcvd->i_sem
116                  * first (in generic_file_write()) and start the journal
117                  * transaction afterwards, and can deadlock with other ops.
118                  *
119                  * We use FSFILT_OP_SETATTR because it is smallest, but all
120                  * ops include enough space for the last_rcvd update so we
121                  * could use any of them, or maybe an FSFILT_OP_NONE is best?
122                  */
123                 handle = fsfilt_start(obd, file->f_dentry->d_inode,
124                                       FSFILT_OP_SETATTR, NULL);
125                 if (IS_ERR(handle)) {
126                         rc = PTR_ERR(handle);
127                         CERROR("unable to start transaction: rc %d\n", rc);
128                 } else {
129                         rc = fsfilt_write_record(obd, file, med->med_mcd,
130                                                  sizeof(*med->med_mcd),
131                                                  &off, 1);
132                         err = fsfilt_commit(obd, file->f_dentry->d_inode,
133                                             handle, 1);
134                         if (rc == 0)
135                                 rc = err;
136                 }
137                 pop_ctxt(&saved, &obd->obd_ctxt, NULL);
138
139                 if (rc)
140                         return rc;
141                 CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
142                        med->med_idx, med->med_off,
143                        (unsigned int)sizeof(*med->med_mcd));
144         }
145         return 0;
146 }
147
148 int mds_client_free(struct obd_export *exp, int clear_client)
149 {
150         struct mds_export_data *med = &exp->exp_mds_data;
151         struct mds_obd *mds = &exp->exp_obd->u.mds;
152         struct obd_device *obd = exp->exp_obd;
153         struct mds_client_data zero_mcd;
154         struct obd_run_ctxt saved;
155         int rc;
156         unsigned long *bitmap = mds->mds_client_bitmap;
157
158         LASSERT(bitmap);
159         if (!med->med_mcd)
160                 RETURN(0);
161
162         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
163         if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
164                 GOTO(free_and_out, 0);
165
166         CDEBUG(D_INFO, "freeing client at index %u (%lld)with UUID '%s'\n",
167                med->med_idx, med->med_off, med->med_mcd->mcd_uuid);
168
169         if (!test_and_clear_bit(med->med_idx, bitmap)) {
170                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
171                        med->med_idx);
172                 LBUG();
173         }
174
175         if (clear_client) {
176                 memset(&zero_mcd, 0, sizeof zero_mcd);
177                 push_ctxt(&saved, &obd->obd_ctxt, NULL);
178                 rc = fsfilt_write_record(obd, mds->mds_rcvd_filp,
179                                               &zero_mcd, sizeof(zero_mcd),
180                                               &med->med_off, 1);
181                 pop_ctxt(&saved, &obd->obd_ctxt, NULL);
182
183                 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
184                        "zeroing out client %s off %u in %s rc %d\n",
185                        med->med_mcd->mcd_uuid, med->med_idx, LAST_RCVD, rc);
186         }
187
188  free_and_out:
189         OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
190
191         return 0;
192 }
193
194 static int mds_server_free_data(struct mds_obd *mds)
195 {
196         OBD_FREE(mds->mds_client_bitmap,
197                  MDS_MAX_CLIENT_WORDS * sizeof(unsigned long));
198         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
199         mds->mds_server_data = NULL;
200
201         return 0;
202 }
203
204 static int mds_read_last_rcvd(struct obd_device *obd, struct file *file)
205 {
206         struct mds_obd *mds = &obd->u.mds;
207         struct mds_server_data *msd;
208         struct mds_client_data *mcd = NULL;
209         loff_t off = 0;
210         unsigned long last_rcvd_size = file->f_dentry->d_inode->i_size;
211         __u64 last_transno = 0;
212         __u64 mount_count;
213         int cl_idx, rc = 0;
214         ENTRY;
215
216         LASSERT(sizeof(struct mds_client_data) == MDS_LR_CLIENT_SIZE);
217         LASSERT(sizeof(struct mds_server_data) <= MDS_LR_SERVER_SIZE);
218
219         OBD_ALLOC_WAIT(msd, sizeof(*msd));
220         if (!msd)
221                 RETURN(-ENOMEM);
222
223         OBD_ALLOC_WAIT(mds->mds_client_bitmap,
224                   MDS_MAX_CLIENT_WORDS * sizeof(unsigned long));
225         if (!mds->mds_client_bitmap) {
226                 OBD_FREE(msd, sizeof(*msd));
227                 RETURN(-ENOMEM);
228         }
229
230         mds->mds_server_data = msd;
231
232         if (last_rcvd_size == 0) {
233                 CWARN("%s: initializing new %s\n", obd->obd_name, LAST_RCVD);
234                 memcpy(msd->msd_uuid, obd->obd_uuid.uuid,sizeof(msd->msd_uuid));
235                 msd->msd_server_size = cpu_to_le32(MDS_LR_SERVER_SIZE);
236                 msd->msd_client_start = cpu_to_le32(MDS_LR_CLIENT_START);
237                 msd->msd_client_size = cpu_to_le16(MDS_LR_CLIENT_SIZE);
238                 msd->msd_feature_rocompat = cpu_to_le32(MDS_ROCOMPAT_LOVOBJID);
239                 rc = fsfilt_write_record(obd, file, msd, sizeof(*msd), &off, 1);
240
241                 if (rc == 0)
242                         RETURN(0);
243
244                 CERROR("%s: error writing new MSD: %d\n", obd->obd_name, rc);
245                 GOTO(err_msd, rc);
246         }
247
248         rc = fsfilt_read_record(obd, file, msd, sizeof(*msd), &off);
249         if (rc) {
250                 CERROR("error reading MDS %s: rc = %d\n", LAST_RCVD, rc);
251                 GOTO(err_msd, rc);
252         }
253         if (!msd->msd_server_size)
254                 msd->msd_server_size = cpu_to_le32(MDS_LR_SERVER_SIZE);
255         if (!msd->msd_client_start)
256                 msd->msd_client_start = cpu_to_le32(MDS_LR_CLIENT_START);
257         if (!msd->msd_client_size)
258                 msd->msd_client_size = cpu_to_le16(MDS_LR_CLIENT_SIZE);
259
260         if (msd->msd_feature_incompat & ~cpu_to_le32(MDS_INCOMPAT_SUPP)) {
261                 CERROR("unsupported incompat feature %x\n",
262                        le32_to_cpu(msd->msd_feature_incompat) &
263                        ~MDS_INCOMPAT_SUPP);
264                 GOTO(err_msd, rc = -EINVAL);
265         }
266         /* XXX updating existing b_devel fs only, can be removed in future */
267         msd->msd_feature_rocompat = cpu_to_le32(MDS_ROCOMPAT_LOVOBJID);
268         if (msd->msd_feature_rocompat & ~cpu_to_le32(MDS_ROCOMPAT_SUPP)) {
269                 CERROR("unsupported read-only feature %x\n",
270                        le32_to_cpu(msd->msd_feature_rocompat) &
271                        ~MDS_ROCOMPAT_SUPP);
272                 /* Do something like remount filesystem read-only */
273                 GOTO(err_msd, rc = -EINVAL);
274         }
275
276         last_transno = le64_to_cpu(msd->msd_last_transno);
277         mds->mds_last_transno = last_transno;
278
279         mount_count = le64_to_cpu(msd->msd_mount_count);
280         mds->mds_mount_count = mount_count;
281
282         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
283                obd->obd_name, last_transno);
284         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
285                obd->obd_name, mount_count);
286         CDEBUG(D_INODE, "%s: server data size: %u\n",
287                obd->obd_name, le32_to_cpu(msd->msd_server_size));
288         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
289                obd->obd_name, le32_to_cpu(msd->msd_client_start));
290         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
291                obd->obd_name, le32_to_cpu(msd->msd_client_size));
292         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
293                obd->obd_name, last_rcvd_size);
294         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
295                last_rcvd_size <= MDS_LR_CLIENT_START ? 0 :
296                (last_rcvd_size - MDS_LR_CLIENT_START) / MDS_LR_CLIENT_SIZE);
297
298         /* When we do a clean MDS shutdown, we save the last_transno into
299          * the header.  If we find clients with higher last_transno values
300          * then those clients may need recovery done. */
301         for (cl_idx = 0, off = le32_to_cpu(msd->msd_client_start);
302              off < last_rcvd_size; cl_idx++) {
303                 __u64 last_transno;
304                 int mount_age;
305
306                 if (!mcd) {
307                         OBD_ALLOC_WAIT(mcd, sizeof(*mcd));
308                         if (!mcd)
309                                 GOTO(err_client, rc = -ENOMEM);
310                 }
311
312                 /* Don't assume off is incremented properly by
313                  * fsfilt_read_record(), in case sizeof(*mcd)
314                  * isn't the same as msd->msd_client_size.  */
315                 off = le32_to_cpu(msd->msd_client_start) +
316                         cl_idx * le16_to_cpu(msd->msd_client_size);
317                 rc = fsfilt_read_record(obd, file, mcd, sizeof(*mcd), &off);
318                 if (rc) {
319                         CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
320                                LAST_RCVD, cl_idx, off, rc);
321                         break; /* read error shouldn't cause startup to fail */
322                 }
323
324                 if (mcd->mcd_uuid[0] == '\0') {
325                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
326                                cl_idx);
327                         continue;
328                 }
329
330                 last_transno = le64_to_cpu(mcd->mcd_last_transno);
331
332                 /* These exports are cleaned up by mds_disconnect(), so they
333                  * need to be set up like real exports as mds_connect() does.
334                  */
335                 mount_age = mount_count - le64_to_cpu(mcd->mcd_mount_count);
336                 if (mount_age < MDS_MOUNT_RECOV) {
337                         struct obd_export *exp = class_new_export(obd);
338                         struct mds_export_data *med;
339                         CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
340                                " srv lr: "LPU64" mnt: "LPU64" last mount: "
341                                LPU64"\n", mcd->mcd_uuid, cl_idx,
342                                last_transno, le64_to_cpu(msd->msd_last_transno),
343                                le64_to_cpu(mcd->mcd_mount_count), mount_count);
344                         if (exp == NULL)
345                                 GOTO(err_client, rc = -ENOMEM);
346
347                         memcpy(&exp->exp_client_uuid.uuid, mcd->mcd_uuid,
348                                sizeof exp->exp_client_uuid.uuid);
349                         med = &exp->exp_mds_data;
350                         med->med_mcd = mcd;
351                         mds_client_add(obd, mds, med, cl_idx);
352                         /* create helper if export init gets more complex */
353                         INIT_LIST_HEAD(&med->med_open_head);
354                         spin_lock_init(&med->med_open_lock);
355
356                         mcd = NULL;
357                         obd->obd_recoverable_clients++;
358                         obd->obd_max_recoverable_clients++;
359                         class_export_put(exp);
360                 } else {
361                         CDEBUG(D_INFO, "discarded client %d, UUID '%s', count "
362                                LPU64"\n", cl_idx, mcd->mcd_uuid,
363                                le64_to_cpu(mcd->mcd_mount_count));
364                 }
365
366                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
367                        cl_idx, last_transno);
368
369                 if (last_transno > mds->mds_last_transno)
370                         mds->mds_last_transno = last_transno;
371         }
372
373         obd->obd_last_committed = mds->mds_last_transno;
374         if (obd->obd_recoverable_clients) {
375                 CWARN("RECOVERY: %d recoverable clients, last_transno "
376                        LPU64"\n", obd->obd_recoverable_clients,
377                        mds->mds_last_transno);
378                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
379                 obd->obd_recovering = 1;
380         }
381
382         if (mcd)
383                 OBD_FREE(mcd, sizeof(*mcd));
384
385         return 0;
386
387 err_client:
388         class_disconnect_exports(obd, 0);
389 err_msd:
390         mds_server_free_data(mds);
391         return rc;
392 }
393
394 int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
395 {
396         struct mds_obd *mds = &obd->u.mds;
397         struct obd_run_ctxt saved;
398         struct dentry *dentry;
399         struct file *file;
400         int rc;
401         ENTRY;
402
403
404         /* Get rid of unneeded supplementary groups */
405         current->ngroups = 0;
406         memset(current->groups, 0, sizeof(current->groups));
407
408         mds->mds_vfsmnt = mnt;
409         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
410
411         fsfilt_setup(obd, mds->mds_sb);
412
413         OBD_SET_CTXT_MAGIC(&obd->obd_ctxt);
414         obd->obd_ctxt.pwdmnt = mnt;
415         obd->obd_ctxt.pwd = mnt->mnt_root;
416         obd->obd_ctxt.fs = get_ds();
417         obd->obd_ctxt.cb_ops = mds_lvfs_ops;
418
419         /* setup the directory tree */
420         push_ctxt(&saved, &obd->obd_ctxt, NULL);
421         dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755);
422         if (IS_ERR(dentry)) {
423                 rc = PTR_ERR(dentry);
424                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
425                 GOTO(err_pop, rc);
426         }
427
428         mds->mds_rootfid.id = dentry->d_inode->i_ino;
429         mds->mds_rootfid.generation = dentry->d_inode->i_generation;
430         mds->mds_rootfid.f_type = S_IFDIR;
431
432         dput(dentry);
433
434         dentry = lookup_one_len("__iopen__", current->fs->pwd,
435                                 strlen("__iopen__"));
436         if (IS_ERR(dentry) || !dentry->d_inode) {
437                 rc = (IS_ERR(dentry)) ? PTR_ERR(dentry): -ENOENT;
438                 CERROR("cannot open iopen FH directory: rc = %d\n", rc);
439                 GOTO(err_pop, rc);
440         }
441         mds->mds_fid_de = dentry;
442
443         dentry = simple_mkdir(current->fs->pwd, "PENDING", 0777);
444         if (IS_ERR(dentry)) {
445                 rc = PTR_ERR(dentry);
446                 CERROR("cannot create PENDING directory: rc = %d\n", rc);
447                 GOTO(err_fid, rc);
448         }
449         mds->mds_pending_dir = dentry;
450
451         dentry = simple_mkdir(current->fs->pwd, "LOGS", 0777);
452         if (IS_ERR(dentry)) {
453                 rc = PTR_ERR(dentry);
454                 CERROR("cannot create LOGS directory: rc = %d\n", rc);
455                 GOTO(err_pending, rc);
456         }
457         mds->mds_logs_dir = dentry;
458
459         dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777);
460         if (IS_ERR(dentry)) {
461                 rc = PTR_ERR(dentry);
462                 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
463                 GOTO(err_logs, rc);
464         }
465         mds->mds_objects_dir = dentry;
466
467         /* open and test the last rcvd file */
468         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
469         if (IS_ERR(file)) {
470                 rc = PTR_ERR(file);
471                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
472                 GOTO(err_objects, rc = PTR_ERR(file));
473         }
474         mds->mds_rcvd_filp = file;
475         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
476                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
477                        file->f_dentry->d_inode->i_mode);
478                 GOTO(err_last_rcvd, rc = -ENOENT);
479         }
480
481         rc = mds_read_last_rcvd(obd, file);
482         if (rc) {
483                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
484                 GOTO(err_last_rcvd, rc);
485         }
486
487         /* open and test the lov objd file */
488         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
489         if (IS_ERR(file)) {
490                 rc = PTR_ERR(file);
491                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
492                 GOTO(err_client, rc = PTR_ERR(file));
493         }
494         mds->mds_lov_objid_filp = file;
495         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
496                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
497                        file->f_dentry->d_inode->i_mode);
498                 GOTO(err_lov_objid, rc = -ENOENT);
499         }
500 err_pop:
501         pop_ctxt(&saved, &obd->obd_ctxt, NULL);
502
503         return rc;
504
505 err_lov_objid:
506         if (mds->mds_lov_objid_filp && filp_close(mds->mds_lov_objid_filp, 0))
507                 CERROR("can't close %s after error\n", LOV_OBJID);
508 err_client:
509         class_disconnect_exports(obd, 0);
510 err_last_rcvd:
511         if (mds->mds_rcvd_filp && filp_close(mds->mds_rcvd_filp, 0))
512                 CERROR("can't close %s after error\n", LAST_RCVD);
513 err_objects:
514         dput(mds->mds_objects_dir);
515 err_logs:
516         dput(mds->mds_logs_dir);
517 err_pending:
518         dput(mds->mds_pending_dir);
519 err_fid:
520         dput(mds->mds_fid_de);
521         goto err_pop;
522 }
523
524
525 int mds_fs_cleanup(struct obd_device *obd, int flags)
526 {
527         struct mds_obd *mds = &obd->u.mds;
528         struct obd_run_ctxt saved;
529         int rc = 0;
530
531         if (flags & OBD_OPT_FAILOVER)
532                 CERROR("%s: shutting down for failover; client state will"
533                        " be preserved.\n", obd->obd_name);
534
535         class_disconnect_exports(obd, flags); /* cleans up client info too */
536         mds_server_free_data(mds);
537
538         push_ctxt(&saved, &obd->obd_ctxt, NULL);
539         if (mds->mds_rcvd_filp) {
540                 rc = filp_close(mds->mds_rcvd_filp, 0);
541                 mds->mds_rcvd_filp = NULL;
542                 if (rc)
543                         CERROR("%s file won't close, rc=%d\n", LAST_RCVD, rc);
544         }
545         if (mds->mds_lov_objid_filp) {
546                 rc = filp_close(mds->mds_lov_objid_filp, 0);
547                 mds->mds_lov_objid_filp = NULL;
548                 if (rc)
549                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
550         }
551         if (mds->mds_objects_dir != NULL) {
552                 l_dput(mds->mds_objects_dir);
553                 mds->mds_objects_dir = NULL;
554         }
555         if (mds->mds_logs_dir) {
556                 l_dput(mds->mds_logs_dir);
557                 mds->mds_logs_dir = NULL;
558         }
559         if (mds->mds_pending_dir) {
560                 l_dput(mds->mds_pending_dir);
561                 mds->mds_pending_dir = NULL;
562         }
563         pop_ctxt(&saved, &obd->obd_ctxt, NULL);
564         shrink_dcache_parent(mds->mds_fid_de);
565         dput(mds->mds_fid_de);
566
567         return rc;
568 }
569
570 /* Creates an object with the same name as its fid.  Because this is not at all
571  * performance sensitive, it is accomplished by creating a file, checking the
572  * fid, and renaming it. */
573 int mds_obd_create(struct obd_export *exp, struct obdo *oa,
574                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
575 {
576         struct mds_obd *mds = &exp->exp_obd->u.mds;
577         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
578         unsigned int tmpname = ll_insecure_random_int();
579         struct file *filp;
580         struct dentry *new_child;
581         struct obd_run_ctxt saved;
582         char fidname[LL_FID_NAMELEN];
583         void *handle;
584         int rc = 0, err, namelen;
585         ENTRY;
586
587         push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
588         
589         sprintf(fidname, "OBJECTS/%u", tmpname);
590         filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
591         if (IS_ERR(filp)) {
592                 rc = PTR_ERR(filp);
593                 if (rc == -EEXIST) {
594                         CERROR("impossible object name collision %u\n",
595                                tmpname);
596                         LBUG();
597                 }
598                 CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
599                 GOTO(out_pop, rc);
600         }
601
602         LASSERT(mds->mds_objects_dir == filp->f_dentry->d_parent);
603
604         oa->o_id = filp->f_dentry->d_inode->i_ino;
605         oa->o_generation = filp->f_dentry->d_inode->i_generation;
606         namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
607
608         down(&parent_inode->i_sem);
609         new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
610
611         if (IS_ERR(new_child)) {
612                 CERROR("getting neg dentry for obj rename: %d\n", rc);
613                 GOTO(out_close, rc = PTR_ERR(new_child));
614         }
615         if (new_child->d_inode != NULL) {
616                 CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
617                        oa->o_id, oa->o_generation);
618                 LBUG();
619         }
620
621         handle = fsfilt_start(exp->exp_obd, mds->mds_objects_dir->d_inode,
622                               FSFILT_OP_RENAME, NULL);
623         if (IS_ERR(handle)) 
624                 GOTO(out_dput, rc = PTR_ERR(handle));
625         
626         lock_kernel();
627         rc = vfs_rename(mds->mds_objects_dir->d_inode, filp->f_dentry,
628                         mds->mds_objects_dir->d_inode, new_child);
629         unlock_kernel();
630         if (rc)
631                 CERROR("error renaming new object "LPU64":%u: rc %d\n",
632                        oa->o_id, oa->o_generation, rc);
633
634         err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
635                             handle, 0);
636         if (!err)
637                 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
638         else if (!rc)
639                 rc = err;
640 out_dput:
641         dput(new_child);
642 out_close:
643         up(&parent_inode->i_sem);
644         err = filp_close(filp, 0);
645         if (err) {
646                 CERROR("closing tmpfile %u: rc %d\n", tmpname, rc);
647                 if (!rc)
648                         rc = err;
649         }
650 out_pop:
651         pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
652         RETURN(rc);
653 }
654
655 int mds_obd_destroy(struct obd_export *exp, struct obdo *oa,
656                     struct lov_stripe_md *ea, struct obd_trans_info *oti)
657 {
658         struct mds_obd *mds = &exp->exp_obd->u.mds;
659         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
660         struct obd_device *obd = exp->exp_obd;
661         struct obd_run_ctxt saved;
662         char fidname[LL_FID_NAMELEN];
663         struct dentry *de;
664         void *handle;
665         int err, namelen, rc = 0;
666         ENTRY;
667
668         push_ctxt(&saved, &obd->obd_ctxt, NULL);
669
670         namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
671
672         down(&parent_inode->i_sem);
673         de = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
674         if (de == NULL || de->d_inode == NULL) {
675                 CERROR("destroying non-existent object "LPU64"\n", oa->o_id);
676                 GOTO(out, rc = IS_ERR(de) ? PTR_ERR(de) : -ENOENT);
677         }
678
679         handle = fsfilt_start(obd, mds->mds_objects_dir->d_inode,
680                               FSFILT_OP_UNLINK_LOG, oti);
681         if (IS_ERR(handle)) {
682                 GOTO(out_dput, rc = PTR_ERR(handle));
683         }
684         
685         rc = vfs_unlink(mds->mds_objects_dir->d_inode, de);
686         if (rc) 
687                 CERROR("error destroying object "LPU64":%u: rc %d\n",
688                        oa->o_id, oa->o_generation, rc);
689         
690         err = fsfilt_commit(obd, mds->mds_objects_dir->d_inode, handle, 0);
691         if (err && !rc)
692                 rc = err;
693 out_dput:
694         l_dput(de);
695 out:
696         up(&parent_inode->i_sem);
697         pop_ctxt(&saved, &obd->obd_ctxt, NULL);
698         RETURN(rc);
699 }