Whamcloud - gitweb
b=16909 Use INFO/WARN instead of WARN/ERROR for the slow messages.
[fs/lustre-release.git] / lustre / mds / mds_fs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_fs.c
37  *
38  * Lustre Metadata Server (MDS) filesystem interface code
39  *
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <linux/kmod.h>
50 #include <linux/version.h>
51 #include <linux/sched.h>
52 #include <lustre_quota.h>
53 #include <linux/mount.h>
54 #include <lustre_mds.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lustre_lib.h>
58 #include <lustre_fsfilt.h>
59 #include <lustre_disk.h>
60 #include <libcfs/list.h>
61
62 #include "mds_internal.h"
63
64
65 int mds_export_stats_init(struct obd_device *obd,
66                                  struct obd_export *exp,
67                                  int reconnect,
68                                  void *localdata)
69 {
70         lnet_nid_t *client_nid = localdata;
71         int rc, num_stats, newnid = 0;
72
73         rc = lprocfs_exp_setup(exp, client_nid, reconnect, &newnid);
74         if (rc) {
75                 /* Mask error for already created
76                  * /proc entries */
77                 if (rc == -EALREADY)
78                         rc = 0;
79                 return rc;
80         }
81
82         if (newnid) {
83                 struct nid_stat *tmp = exp->exp_nid_stats;
84                 LASSERT(tmp != NULL);
85
86                 num_stats = (sizeof(*obd->obd_type->typ_ops) / sizeof(void *)) +
87                              LPROC_MDS_LAST - 1;
88                 tmp->nid_stats = lprocfs_alloc_stats(num_stats,
89                                             LPROCFS_STATS_FLAG_NOPERCPU);
90                 if (tmp->nid_stats == NULL)
91                         return -ENOMEM;
92
93                 lprocfs_init_ops_stats(LPROC_MDS_LAST, tmp->nid_stats);
94                 mds_stats_counter_init(tmp->nid_stats);
95
96                 rc = lprocfs_nid_ldlm_stats_init(tmp);
97                 if (rc)
98                         return rc;
99         }
100
101         return 0;
102 }
103
104 /* VBR: to determine the delayed client the lcd should be updated for each new
105  * epoch */
106 int mds_update_client_epoch(struct obd_export *exp)
107 {
108         struct mds_export_data *med = &exp->exp_mds_data;
109         struct mds_obd *mds = &exp->exp_obd->u.mds;
110         struct lvfs_run_ctxt saved;
111         loff_t off = med->med_lr_off;
112         int rc = 0;
113
114         /* VBR: set client last_epoch to current epoch */
115         if (le32_to_cpu(med->med_lcd->lcd_last_epoch) >=
116                         le32_to_cpu(mds->mds_server_data->lsd_start_epoch))
117                 return rc;
118
119         med->med_lcd->lcd_last_epoch = mds->mds_server_data->lsd_start_epoch;
120         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
121         rc = fsfilt_write_record(exp->exp_obd, mds->mds_rcvd_filp,
122                                  med->med_lcd, sizeof(*med->med_lcd), &off,
123                                  exp->exp_delayed);
124         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
125
126         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
127                med->med_lr_idx, le32_to_cpu(med->med_lcd->lcd_last_epoch),
128                le32_to_cpu(mds->mds_server_data->lsd_start_epoch));
129
130         return rc;
131 }
132
133 /* Called after recovery is done on server */
134 void mds_update_last_epoch(struct obd_device *obd)
135 {
136         struct ptlrpc_request *req;
137         struct mds_obd *mds = &obd->u.mds;
138         __u32 start_epoch;
139
140         /* Increase server epoch after recovery */
141         spin_lock(&mds->mds_transno_lock);
142         start_epoch = lr_epoch(mds->mds_last_transno) + 1;
143         mds->mds_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
144         mds->mds_server_data->lsd_start_epoch = cpu_to_le32(start_epoch);
145         spin_unlock(&mds->mds_transno_lock);
146
147         /* go through delayed reply queue to find all exports participate in
148          * recovery and set new epoch for them */
149         list_for_each_entry(req, &obd->obd_delayed_reply_queue, rq_list) {
150                 LASSERT(!req->rq_export->exp_delayed);
151                 mds_update_client_epoch(req->rq_export);
152         }
153         mds_update_server_data(obd, 1);
154 }
155
156 /* Add client data to the MDS.  We use a bitmap to locate a free space
157  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
158  * Otherwise, we have just read the data from the last_rcvd file and
159  * we know its offset.
160  *
161  * It should not be possible to fail adding an existing client - otherwise
162  * mds_init_server_data() callsite needs to be fixed.
163  */
164 int mds_client_add(struct obd_device *obd, struct obd_export *exp,
165                    int cl_idx, void *localdata)
166 {
167         struct mds_obd *mds = &obd->u.mds;
168         struct mds_export_data *med = &exp->exp_mds_data;
169         unsigned long *bitmap = mds->mds_client_bitmap;
170         int new_client = (cl_idx == -1);
171         int rc = 0;
172         ENTRY;
173
174         LASSERT(bitmap != NULL);
175         LASSERTF(cl_idx > -2, "%d\n", cl_idx);
176
177         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
178         if (!strcmp(med->med_lcd->lcd_uuid, obd->obd_uuid.uuid))
179                 RETURN(0);
180
181         /* VBR: remove expired exports before searching for free slot */
182         if (new_client)
183                 class_disconnect_expired_exports(obd);
184
185         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
186          * there's no need for extra complication here
187          */
188         if (new_client) {
189                 cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
190         repeat:
191                 if (cl_idx >= LR_MAX_CLIENTS ||
192                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
193                         CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n",
194                                cl_idx);
195                         return -EOVERFLOW;
196                 }
197                 if (test_and_set_bit(cl_idx, bitmap)) {
198                         cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
199                                                     cl_idx);
200                         goto repeat;
201                 }
202         } else {
203                 if (test_and_set_bit(cl_idx, bitmap)) {
204                         CERROR("MDS client %d: bit already set in bitmap!!\n",
205                                cl_idx);
206                         LBUG();
207                 }
208         }
209
210         CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
211                cl_idx, med->med_lcd->lcd_uuid);
212
213         med->med_lr_idx = cl_idx;
214         med->med_lr_off = le32_to_cpu(mds->mds_server_data->lsd_client_start) +
215                 (cl_idx * le16_to_cpu(mds->mds_server_data->lsd_client_size));
216         LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
217         mds_export_stats_init(obd, exp, 0, localdata);
218
219         if (new_client) {
220                 struct lvfs_run_ctxt *saved = NULL;
221                 loff_t off = med->med_lr_off;
222                 struct file *file = mds->mds_rcvd_filp;
223                 void *handle;
224
225                 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
226                 if (saved == NULL) {
227                         CERROR("cannot allocate memory for run ctxt\n");
228                         RETURN(-ENOMEM);
229                 }
230
231                 push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
232                 handle = fsfilt_start(obd, file->f_dentry->d_inode,
233                                       FSFILT_OP_SETATTR, NULL);
234                 if (IS_ERR(handle)) {
235                         rc = PTR_ERR(handle);
236                         CERROR("unable to start transaction: rc %d\n", rc);
237                 } else {
238                         /* VBR: set client last_transno as mds_last_transno to
239                          * remember last epoch for this client */
240                         med->med_lcd->lcd_last_epoch =
241                                         mds->mds_server_data->lsd_start_epoch;
242                         exp->exp_last_request_time = cfs_time_current_sec();
243                         /* remember first epoch of client for orphan handling */
244                         med->med_lcd->lcd_first_epoch =
245                                   cpu_to_le32(lr_epoch(mds->mds_last_transno));
246                         rc = fsfilt_add_journal_cb(obd, 0, handle,
247                                                    target_client_add_cb, exp);
248                         if (rc == 0) {
249                                 spin_lock(&exp->exp_lock);
250                                 exp->exp_need_sync = 1;
251                                 spin_unlock(&exp->exp_lock);
252                         }
253                         rc = fsfilt_write_record(obd, file, med->med_lcd,
254                                                  sizeof(*med->med_lcd),
255                                                  &off, rc /* sync if no cb */);
256                         fsfilt_commit(obd, file->f_dentry->d_inode, handle, 0);
257                 }
258
259                 pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
260                 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
261
262                 if (rc)
263                         return rc;
264                 CDEBUG(D_INFO, "wrote client lcd at idx %u off %llu (len %u)\n",
265                        med->med_lr_idx, med->med_lr_off,
266                        (unsigned int)sizeof(*med->med_lcd));
267         }
268         return rc;
269 }
270
271 struct lsd_client_data zero_lcd; /* globals are implicitly zeroed */
272  
273 int mds_client_free(struct obd_export *exp)
274 {
275         struct mds_export_data *med = &exp->exp_mds_data;
276         struct mds_obd *mds = &exp->exp_obd->u.mds;
277         struct obd_device *obd = exp->exp_obd;
278         struct lvfs_run_ctxt *saved = NULL;
279         int rc;
280         loff_t off;
281         ENTRY;
282
283         if (!med->med_lcd)
284                 RETURN(0);
285
286         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
287         if (!strcmp(med->med_lcd->lcd_uuid, obd->obd_uuid.uuid))
288                 GOTO(free, 0);
289
290         CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n",
291                med->med_lr_idx, med->med_lr_off, med->med_lcd->lcd_uuid);
292
293         LASSERT(mds->mds_client_bitmap != NULL);
294
295
296         off = med->med_lr_off;
297
298         /* Don't clear med_lr_idx here as it is likely also unset.  At worst
299          * we leak a client slot that will be cleaned on the next recovery. */
300         if (off <= 0) {
301                 CERROR("%s: client idx %d has offset %lld\n",
302                         obd->obd_name, med->med_lr_idx, off);
303                 GOTO(free, rc = -EINVAL);
304         }
305
306         /* Clear the bit _after_ zeroing out the client so we don't
307            race with mds_client_add and zero out new clients.*/
308         if (!test_bit(med->med_lr_idx, mds->mds_client_bitmap)) {
309                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
310                        med->med_lr_idx);
311                 LBUG();
312         }
313
314         if (!(exp->exp_flags & OBD_OPT_FAILOVER)) {
315                 /* Don't force sync on each disconnect if aborting recovery,
316                  * or it does num_clients * num_osts syncs.  b=17194 */
317                 int need_sync = (!exp->exp_libclient || exp->exp_need_sync) &&
318                                  !(exp->exp_flags & OBD_OPT_ABORT_RECOV);
319                 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
320                 if (saved == NULL) {
321                         CERROR("cannot allocate memory for run ctxt\n");
322                         GOTO(free, rc = -ENOMEM);
323                 }
324                 push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
325                 rc = fsfilt_write_record(obd, mds->mds_rcvd_filp, &zero_lcd,
326                                          sizeof(zero_lcd), &off, 0);
327
328                 /* Make sure the server's last_transno is up to date. Do this
329                  * after the client is freed so we know all the client's
330                  * transactions have been committed. */
331                 if (rc == 0)
332                         mds_update_server_data(exp->exp_obd, need_sync);
333
334                 pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
335
336                 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
337                        "zero out client %s at idx %u/%llu in %s %ssync rc %d\n",
338                        med->med_lcd->lcd_uuid, med->med_lr_idx, med->med_lr_off,
339                        LAST_RCVD, need_sync ? "" : "a", rc);
340         }
341
342         if (!test_and_clear_bit(med->med_lr_idx, mds->mds_client_bitmap)) {
343                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
344                        med->med_lr_idx);
345                 LBUG();
346         }
347
348         EXIT;
349 free:
350         if (saved)
351                 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
352
353         OBD_FREE_PTR(med->med_lcd);
354         med->med_lcd = NULL;
355
356         return 0;
357 }
358
359 static int mds_server_free_data(struct mds_obd *mds)
360 {
361         OBD_FREE(mds->mds_client_bitmap, LR_MAX_CLIENTS / 8);
362         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
363         mds->mds_server_data = NULL;
364
365         return 0;
366 }
367
368 static void mds_add_fake_export(struct obd_device *obd, int num,
369                                 struct file *file)
370 {
371         struct obd_export *exp;
372         struct lvfs_run_ctxt saved;
373         struct obd_device_target *obt = &obd->u.obt;
374         struct lu_export_data *led;
375         unsigned long *bitmap = obt->obt_client_bitmap;
376         struct lsd_client_data *lcd = NULL;
377         unsigned int idx = 0;
378         loff_t off = 0;
379         int rc = 0;
380
381         while (num > 0) {
382                 num--;
383                 if (!lcd) {
384                         OBD_ALLOC_PTR(lcd);
385                         if (!lcd)
386                                 return;
387                 }
388                 idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS, idx);
389                 if (idx >= LR_MAX_CLIENTS) {
390                         CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n", idx);
391                         OBD_FREE_PTR(lcd);
392                         break;
393                 }
394                 if (test_and_set_bit(idx, bitmap)) {
395                         CERROR("Bit %u is set already\n", idx);
396                         continue;
397                 }
398                 off = le32_to_cpu(obt->obt_lsd->lsd_client_start) +
399                       idx * le16_to_cpu(obt->obt_lsd->lsd_client_size);
400
401                 sprintf(lcd->lcd_uuid, "dead-%.16u", idx);
402                 CDEBUG(D_INFO, "Create fake export %s, index %u, offset %lu\n",
403                        lcd->lcd_uuid, idx, (unsigned long)off);
404
405                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
406                 if (IS_ERR(exp)) {
407                         if (PTR_ERR(exp) == -EALREADY) {
408                                 CERROR("Export %s already exists\n",
409                                        lcd->lcd_uuid);
410                         }
411                         CERROR("Failed to create export %lu\n", PTR_ERR(exp));
412                         OBD_FREE_PTR(lcd);
413                         break;
414                 }
415                 LASSERT(exp);
416                 led = &exp->exp_target_data;
417                 led->led_lr_idx = idx;
418                 led->led_lr_off = off;
419                 led->led_lcd = lcd;
420
421                 exp->exp_last_request_time = cfs_time_current_sec();
422                 exp->exp_replay_needed = 1;
423                 exp->exp_connecting = 0;
424                 exp->exp_in_recovery = 0;
425
426                 spin_lock_bh(&obd->obd_processing_task_lock);
427                 obd->obd_recoverable_clients++;
428                 obd->obd_max_recoverable_clients++;
429                 spin_unlock_bh(&obd->obd_processing_task_lock);
430
431                 class_set_export_delayed(exp);
432                 class_export_put(exp);
433
434                 lcd->lcd_last_epoch = cpu_to_le32(1);
435                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
436                 rc = fsfilt_write_record(obd, file, lcd, sizeof(*lcd), &off, 0);
437                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
438                 if (rc) {
439                         CERROR("Failed to create fake client record\n");
440                         OBD_FREE_PTR(lcd);
441                         break;
442                 }
443                 lcd = NULL;
444         }
445 }
446
447 static int mds_init_server_data(struct obd_device *obd, struct file *file)
448 {
449         struct mds_obd *mds = &obd->u.mds;
450         struct lr_server_data *lsd;
451         struct lsd_client_data *lcd = NULL;
452         struct lustre_mount_info *lmi;
453         loff_t off = 0;
454         unsigned long last_rcvd_size = i_size_read(file->f_dentry->d_inode);
455         __u64 mount_count;
456         __u32 start_epoch;
457         int cl_idx, rc = 0;
458         ENTRY;
459
460         /* ensure padding in the struct is the correct size */
461         LASSERT(offsetof(struct lr_server_data, lsd_padding) +
462                 sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
463         LASSERT(offsetof(struct lsd_client_data, lcd_padding) +
464                 sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
465
466         OBD_ALLOC_WAIT(lsd, sizeof(*lsd));
467         if (!lsd)
468                 RETURN(-ENOMEM);
469
470         OBD_ALLOC_WAIT(mds->mds_client_bitmap, LR_MAX_CLIENTS / 8);
471         if (!mds->mds_client_bitmap) {
472                 OBD_FREE(lsd, sizeof(*lsd));
473                 RETURN(-ENOMEM);
474         }
475
476         mds->mds_server_data = lsd;
477
478         if (last_rcvd_size == 0) {
479                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
480
481                 memcpy(lsd->lsd_uuid, obd->obd_uuid.uuid,sizeof(lsd->lsd_uuid));
482                 lsd->lsd_last_transno = 0;
483                 mount_count = lsd->lsd_mount_count = 0;
484                 lsd->lsd_server_size = cpu_to_le32(LR_SERVER_SIZE);
485                 lsd->lsd_client_start = cpu_to_le32(LR_CLIENT_START);
486                 lsd->lsd_client_size = cpu_to_le16(LR_CLIENT_SIZE);
487                 lsd->lsd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
488                 lsd->lsd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID);
489                 lsd->lsd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT);
490         } else {
491                 rc = fsfilt_read_record(obd, file, lsd, sizeof(*lsd), &off);
492                 if (rc) {
493                         CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
494                         GOTO(err_msd, rc);
495                 }
496                 if (strcmp(lsd->lsd_uuid, obd->obd_uuid.uuid) != 0) {
497                         LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s using"
498                                            " the wrong disk %s. Were the /dev/ "
499                                            "assignments rearranged?\n",
500                                            obd->obd_uuid.uuid, lsd->lsd_uuid);
501                         GOTO(err_msd, rc = -EINVAL);
502                 }
503                 lsd->lsd_feature_compat |= cpu_to_le32(OBD_COMPAT_MDT);
504                 /* COMPAT_146 */
505                 /* Assume old last_rcvd format unless I_C_LR is set */
506                 if (!(lsd->lsd_feature_incompat &
507                       cpu_to_le32(OBD_INCOMPAT_COMMON_LR)))
508                         lsd->lsd_mount_count = lsd->lsd_compat14;
509                 /* end COMPAT_146 */
510                 mount_count = le64_to_cpu(lsd->lsd_mount_count);
511         }
512
513         if (lsd->lsd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
514                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
515                        obd->obd_name, le32_to_cpu(lsd->lsd_feature_incompat) &
516                        ~MDT_INCOMPAT_SUPP);
517                 GOTO(err_msd, rc = -EINVAL);
518         }
519         if (lsd->lsd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) {
520                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
521                        obd->obd_name, le32_to_cpu(lsd->lsd_feature_rocompat) &
522                        ~MDT_ROCOMPAT_SUPP);
523                 /* Do something like remount filesystem read-only */
524                 GOTO(err_msd, rc = -EINVAL);
525         }
526         /* evict all clients as it is first boot with 2.0 last_rcvd */
527         if (lsd->lsd_feature_compat & cpu_to_le32(OBD_COMPAT_20)) {
528                 LCONSOLE_WARN("Mounting %s at first time on 2.0 FS, remove all"
529                               " clients for interop needs\n", obd->obd_name);
530                 simple_truncate(mds->mds_vfsmnt->mnt_sb->s_root,
531                                 mds->mds_vfsmnt, LAST_RCVD,
532                                 lsd->lsd_client_start);
533                 last_rcvd_size = lsd->lsd_client_start;
534                 lsd->lsd_feature_compat &= ~cpu_to_le32(OBD_COMPAT_20);
535         }
536
537         target_trans_table_init(obd);
538         mds->mds_last_transno = le64_to_cpu(lsd->lsd_last_transno);
539         start_epoch = le32_to_cpu(lsd->lsd_start_epoch);
540
541         CDEBUG(D_INODE, "%s: server start_epoch: %#x\n",
542                obd->obd_name, start_epoch);
543         CDEBUG(D_INODE, "%s: server last_transno: "LPX64"\n",
544                obd->obd_name, mds->mds_last_transno);
545         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
546                obd->obd_name, mount_count + 1);
547         CDEBUG(D_INODE, "%s: server data size: %u\n",
548                obd->obd_name, le32_to_cpu(lsd->lsd_server_size));
549         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
550                obd->obd_name, le32_to_cpu(lsd->lsd_client_start));
551         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
552                obd->obd_name, le32_to_cpu(lsd->lsd_client_size));
553         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
554                obd->obd_name, last_rcvd_size);
555         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
556                last_rcvd_size <= le32_to_cpu(lsd->lsd_client_start) ? 0 :
557                (last_rcvd_size - le32_to_cpu(lsd->lsd_client_start)) /
558                 le16_to_cpu(lsd->lsd_client_size));
559
560         if (!lsd->lsd_server_size || !lsd->lsd_client_start ||
561             !lsd->lsd_client_size) {
562                 CERROR("Bad last_rcvd contents!\n");
563                 GOTO(err_msd, rc = -EINVAL);
564         }
565
566         /* When we do a clean MDS shutdown, we save the last_transno into
567          * the header.  If we find clients with higher last_transno values
568          * then those clients may need recovery done. */
569         for (cl_idx = 0, off = le32_to_cpu(lsd->lsd_client_start);
570              off < last_rcvd_size; cl_idx++) {
571                 __u64 last_transno;
572                 __u32 last_epoch;
573                 struct obd_export *exp;
574                 struct mds_export_data *med;
575
576                 if (!lcd) {
577                         OBD_ALLOC_WAIT(lcd, sizeof(*lcd));
578                         if (!lcd)
579                                 GOTO(err_client, rc = -ENOMEM);
580                 }
581
582                 /* Don't assume off is incremented properly by
583                  * fsfilt_read_record(), in case sizeof(*lcd)
584                  * isn't the same as lsd->lsd_client_size.  */
585                 off = le32_to_cpu(lsd->lsd_client_start) +
586                         cl_idx * le16_to_cpu(lsd->lsd_client_size);
587                 rc = fsfilt_read_record(obd, file, lcd, sizeof(*lcd), &off);
588                 if (rc) {
589                         CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
590                                LAST_RCVD, cl_idx, off, rc);
591                         break; /* read error shouldn't cause startup to fail */
592                 }
593
594                 if (lcd->lcd_uuid[0] == '\0') {
595                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
596                                cl_idx);
597                         continue;
598                 }
599
600                 check_lcd(obd->obd_name, cl_idx, lcd);
601
602                 last_transno = lsd_last_transno(lcd);
603                 last_epoch = le32_to_cpu(lcd->lcd_last_epoch);
604
605                 /* These exports are cleaned up by mds_disconnect(), so they
606                  * need to be set up like real exports as mds_connect() does.
607                  */
608                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
609                        " srv lr: "LPU64" lx: "LPU64"\n", lcd->lcd_uuid, cl_idx,
610                        last_transno, le64_to_cpu(lsd->lsd_last_transno),
611                        le64_to_cpu(lcd->lcd_last_xid));
612
613                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
614                 if (IS_ERR(exp)) {
615                         if (PTR_ERR(exp) == -EALREADY) {
616                                 /* export already exists, zero out this one */
617                                 lcd->lcd_uuid[0] = '\0';
618                         } else {
619                                 GOTO(err_client, rc = PTR_ERR(exp));
620                         }
621                 } else {
622                         med = &exp->exp_mds_data;
623                         med->med_lcd = lcd;
624                         rc = mds_client_add(obd, exp, cl_idx, NULL);
625                         /* can't fail for existing client */
626                         LASSERTF(rc == 0, "rc = %d\n", rc);
627
628                         /* VBR: set export last committed version */
629                         exp->exp_last_committed = last_transno;
630                         /* read last time from disk */
631                         exp->exp_last_request_time = target_trans_table_last_time(exp);
632                         lcd = NULL;
633
634                         spin_lock(&exp->exp_lock);
635                         exp->exp_replay_needed = 1;
636                         exp->exp_connecting = 0;
637                         exp->exp_in_recovery = 0;
638                         spin_unlock(&exp->exp_lock);
639
640                         spin_lock_bh(&obd->obd_processing_task_lock);
641                         obd->obd_recoverable_clients++;
642                         obd->obd_max_recoverable_clients++;
643                         spin_unlock_bh(&obd->obd_processing_task_lock);
644
645                         /* VBR: if epoch too old mark export as delayed,
646                          * if epoch is zero then client is pre-vbr one */
647                         if (start_epoch > last_epoch && last_epoch != 0)
648                                 class_set_export_delayed(exp);
649                         class_export_put(exp);
650                 }
651
652                 /* Need to check last_rcvd even for duplicated exports. */
653                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPX64","
654                        "last_epoch %#x\n", cl_idx, last_transno, last_epoch);
655
656                 if (last_transno > mds->mds_last_transno)
657                         mds->mds_last_transno = last_transno;
658         }
659
660         if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_TGT_FAKE_EXP))) {
661                 mds_add_fake_export(obd, obd_fail_val, file);
662         }
663
664         if (lcd)
665                 OBD_FREE_PTR(lcd);
666
667         obd->obd_last_committed = mds->mds_last_transno;
668
669         if (obd->obd_recoverable_clients) {
670                 CWARN("RECOVERY: service %s, %d recoverable clients, "
671                       "%d delayed clients, last_transno "LPU64"\n",
672                       obd->obd_name, obd->obd_recoverable_clients,
673                       obd->obd_delayed_clients, mds->mds_last_transno);
674                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
675                 obd->obd_recovering = 1;
676                 obd->obd_recovery_start = 0;
677                 obd->obd_recovery_end = 0;
678         } else {
679                 LASSERT(!obd->obd_recovering);
680                 /* VBR: update boot epoch after recovery */
681                 mds_update_last_epoch(obd);
682         }
683
684         obd->obd_recovery_timeout = OBD_RECOVERY_TIME_SOFT;
685         obd->obd_recovery_time_hard = OBD_RECOVERY_TIME_HARD;
686
687         lmi = server_find_mount_locked(obd->obd_name);
688         if (lmi) {
689                 struct lustre_sb_info *lsi = s2lsi(lmi->lmi_sb);
690
691                 if (lsi->lsi_lmd && lsi->lsi_lmd->lmd_recovery_time_soft)
692                         obd->obd_recovery_timeout =
693                                 lsi->lsi_lmd->lmd_recovery_time_soft;
694
695                 if (lsi->lsi_lmd && lsi->lsi_lmd->lmd_recovery_time_hard)
696                         obd->obd_recovery_time_hard =
697                                 lsi->lsi_lmd->lmd_recovery_time_hard;
698         }
699
700         mds->mds_mount_count = mount_count + 1;
701         lsd->lsd_mount_count = lsd->lsd_compat14 =
702                 cpu_to_le64(mds->mds_mount_count);
703
704         /* save it, so mount count and last_transno is current */
705         rc = mds_update_server_data(obd, 1);
706         if (rc)
707                 GOTO(err_client, rc);
708
709         RETURN(0);
710
711 err_client:
712         class_disconnect_exports(obd);
713 err_msd:
714         mds_server_free_data(mds);
715         RETURN(rc);
716 }
717
718 int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
719 {
720         struct mds_obd *mds = &obd->u.mds;
721         struct lvfs_run_ctxt *saved = NULL;
722         struct dentry *dentry;
723         struct file *file;
724         int rc;
725         ENTRY;
726
727         OBD_FAIL_RETURN(OBD_FAIL_MDS_FS_SETUP, -ENOENT);
728
729         rc = cleanup_group_info();
730         if (rc)
731                 RETURN(rc);
732
733         OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
734         if (saved == NULL) {
735                 CERROR("cannot allocate memory for run ctxt\n");
736                 RETURN(-ENOMEM);
737         }
738
739         mds->mds_vfsmnt = mnt;
740         /* why not mnt->mnt_sb instead of mnt->mnt_root->d_inode->i_sb? */
741         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
742         obd->u.obt.obt_stale_export_age = STALE_EXPORT_MAXTIME_DEFAULT;
743         spin_lock_init(&obd->u.obt.obt_trans_table_lock);
744
745         rc = fsfilt_setup(obd, obd->u.obt.obt_sb);
746         if (rc)
747                 RETURN(rc);
748
749         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
750         obd->obd_lvfs_ctxt.pwdmnt = mnt;
751         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
752         obd->obd_lvfs_ctxt.fs = get_ds();
753         obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops;
754
755         /* setup the directory tree */
756         push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
757         dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, "ROOT", 0755, 0);
758         if (IS_ERR(dentry)) {
759                 rc = PTR_ERR(dentry);
760                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
761                 GOTO(err_pop, rc);
762         }
763
764         mds->mds_rootfid.id = dentry->d_inode->i_ino;
765         mds->mds_rootfid.generation = dentry->d_inode->i_generation;
766         mds->mds_rootfid.f_type = S_IFDIR;
767
768         dput(dentry);
769
770         dentry = lookup_one_len("__iopen__", cfs_fs_pwd(current->fs),
771                                 strlen("__iopen__"));
772         if (IS_ERR(dentry)) {
773                 rc = PTR_ERR(dentry);
774                 CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
775                 GOTO(err_pop, rc);
776         }
777
778         mds->mds_fid_de = dentry;
779         if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
780                 rc = -ENOENT;
781                 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
782                 GOTO(err_fid, rc);
783         }
784
785         dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, "PENDING", 0777, 1);
786         if (IS_ERR(dentry)) {
787                 rc = PTR_ERR(dentry);
788                 CERROR("cannot create PENDING directory: rc = %d\n", rc);
789                 GOTO(err_fid, rc);
790         }
791         mds->mds_pending_dir = dentry;
792
793         /* COMPAT_146 */
794         dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, MDT_LOGS_DIR, 0777, 1);
795         if (IS_ERR(dentry)) {
796                 rc = PTR_ERR(dentry);
797                 CERROR("cannot create %s directory: rc = %d\n",
798                        MDT_LOGS_DIR, rc);
799                 GOTO(err_pending, rc);
800         }
801         mds->mds_logs_dir = dentry;
802         /* end COMPAT_146 */
803
804         dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, "OBJECTS", 0777, 1);
805         if (IS_ERR(dentry)) {
806                 rc = PTR_ERR(dentry);
807                 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
808                 GOTO(err_logs, rc);
809         }
810         mds->mds_objects_dir = dentry;
811
812         /* open and test the last rcvd file */
813         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
814         if (IS_ERR(file)) {
815                 rc = PTR_ERR(file);
816                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
817                 GOTO(err_objects, rc = PTR_ERR(file));
818         }
819         mds->mds_rcvd_filp = file;
820         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
821                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
822                        file->f_dentry->d_inode->i_mode);
823                 GOTO(err_last_rcvd, rc = -ENOENT);
824         }
825
826         rc = mds_init_server_data(obd, file);
827         if (rc) {
828                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
829                 GOTO(err_last_rcvd, rc);
830         }
831
832         rc = mds_lov_init_objids(obd);
833         if (rc != 0) {
834                CERROR("cannot init lov objid rc = %d\n", rc);
835                GOTO(err_client, rc );
836         }
837
838         /* open and test the check io file junk */
839         file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644);
840         if (IS_ERR(file)) {
841                 rc = PTR_ERR(file);
842                 CERROR("cannot open/create %s file: rc = %d\n",HEALTH_CHECK,rc);
843                 GOTO(err_lov_objid, rc = PTR_ERR(file));
844         }
845         mds->mds_obt.obt_health_check_filp = file;
846         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
847                 CERROR("%s is not a regular file!: mode = %o\n", HEALTH_CHECK,
848                        file->f_dentry->d_inode->i_mode);
849                 GOTO(err_health_check, rc = -ENOENT);
850         }
851         rc = lvfs_check_io_health(obd, file);
852         if (rc)
853                 GOTO(err_health_check, rc);
854 err_pop:
855         pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
856         OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
857         return rc;
858
859 err_health_check:
860         if (mds->mds_obt.obt_health_check_filp &&
861             filp_close(mds->mds_obt.obt_health_check_filp, 0))
862                 CERROR("can't close %s after error\n", HEALTH_CHECK);
863 err_lov_objid:
864          mds_lov_destroy_objids(obd);
865 err_client:
866         class_disconnect_exports(obd);
867 err_last_rcvd:
868         if (mds->mds_rcvd_filp && filp_close(mds->mds_rcvd_filp, 0))
869                 CERROR("can't close %s after error\n", LAST_RCVD);
870 err_objects:
871         dput(mds->mds_objects_dir);
872 err_logs:
873         dput(mds->mds_logs_dir);
874 err_pending:
875         dput(mds->mds_pending_dir);
876 err_fid:
877         dput(mds->mds_fid_de);
878         goto err_pop;
879 }
880
881 int mds_fs_cleanup(struct obd_device *obd)
882 {
883         struct mds_obd *mds = &obd->u.mds;
884         struct lvfs_run_ctxt *saved = NULL;
885         int rc = 0;
886
887         OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
888         if (saved == NULL) {
889                 CERROR("cannot allocate memory for run ctxt\n");
890                 RETURN(-ENOMEM);
891         }
892
893         if (obd->obd_fail)
894                 LCONSOLE_WARN("%s: shutting down for failover; client state "
895                               "will be preserved.\n", obd->obd_name);
896
897         class_disconnect_exports(obd); /* cleans up client info too */
898
899        /* some exports may still be in the zombie queue, so we make sure that
900         * all the exports have been processed, otherwise the last_rcvd slot
901         * may not be updated on time */
902         obd_zombie_barrier();
903
904         mds_server_free_data(mds);
905
906         push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
907         if (mds->mds_rcvd_filp) {
908                 rc = filp_close(mds->mds_rcvd_filp, 0);
909                 mds->mds_rcvd_filp = NULL;
910                 if (rc)
911                         CERROR("%s file won't close, rc=%d\n", LAST_RCVD, rc);
912         }
913
914         mds_lov_destroy_objids(obd);
915
916         if (mds->mds_obt.obt_health_check_filp) {
917                 rc = filp_close(mds->mds_obt.obt_health_check_filp, 0);
918                 mds->mds_obt.obt_health_check_filp = NULL;
919                 if (rc)
920                         CERROR("%s file won't close, rc=%d\n", HEALTH_CHECK,rc);
921         }
922         if (mds->mds_objects_dir != NULL) {
923                 l_dput(mds->mds_objects_dir);
924                 mds->mds_objects_dir = NULL;
925         }
926         if (mds->mds_logs_dir) {
927                 l_dput(mds->mds_logs_dir);
928                 mds->mds_logs_dir = NULL;
929         }
930         if (mds->mds_pending_dir) {
931                 l_dput(mds->mds_pending_dir);
932                 mds->mds_pending_dir = NULL;
933         }
934
935         lquota_fs_cleanup(mds_quota_interface_ref, obd);
936
937         pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
938         OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
939         dput(mds->mds_fid_de);
940         LL_DQUOT_OFF(obd->u.obt.obt_sb, 0);
941         shrink_dcache_sb(mds->mds_obt.obt_sb);
942
943         return rc;
944 }
945
946 /* Creates an object with the same name as its fid.  Because this is not at all
947  * performance sensitive, it is accomplished by creating a file, checking the
948  * fid, and renaming it. */
949 int mds_obd_create(struct obd_export *exp, struct obdo *oa,
950                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
951 {
952         struct mds_obd *mds = &exp->exp_obd->u.mds;
953         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
954         unsigned int tmpname = ll_rand();
955         struct dentry *dchild, *new_child;
956         struct lvfs_dentry_params dp = LVFS_DENTRY_PARAMS_INIT;
957         struct lvfs_run_ctxt *saved = NULL;
958         char fidname[LL_FID_NAMELEN];
959         void *handle;
960         struct lvfs_ucred ucred = { 0 };
961         int rc = 0, err, namelen;
962         ENTRY;
963
964         OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
965         if (saved == NULL) {
966                 CERROR("cannot allocate memory for run ctxt\n");
967                 RETURN(-ENOMEM);
968         }
969
970         /* the owner of object file should always be root */
971         cap_raise(ucred.luc_cap, CAP_SYS_RESOURCE);
972
973         push_ctxt(saved, &exp->exp_obd->obd_lvfs_ctxt, &ucred);
974
975         sprintf(fidname, "%u.%u", tmpname, current->pid);
976         dchild = lookup_one_len(fidname, mds->mds_objects_dir, strlen(fidname));
977         if (IS_ERR(dchild)) {
978                 CERROR("getting neg dentry for obj: %u\n", tmpname);
979                 GOTO(out_pop, rc = PTR_ERR(dchild));
980         }
981         if (dchild->d_inode != NULL) {
982                 CERROR("impossible non-negative obj dentry: %u\n", tmpname);
983                 LBUG();
984         }
985
986         dchild->d_fsdata = (void *)&dp;
987         dp.ldp_ptr   = (void *)DP_LASTGROUP_REVERSE;
988
989         LOCK_INODE_MUTEX(parent_inode);
990         rc = ll_vfs_create(parent_inode, dchild, S_IFREG | 0666, NULL);
991
992         oa->o_id = dchild->d_inode->i_ino;
993         oa->o_generation = dchild->d_inode->i_generation;
994         namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
995
996         new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
997
998         if (IS_ERR(new_child)) {
999                 CERROR("getting neg dentry for obj rename: %d\n", rc);
1000                 GOTO(out_dput, rc = PTR_ERR(new_child));
1001         }
1002         if (new_child->d_inode != NULL) {
1003                 CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
1004                        oa->o_id, oa->o_generation);
1005                 LBUG();
1006         }
1007
1008         handle = fsfilt_start(exp->exp_obd, mds->mds_objects_dir->d_inode,
1009                               FSFILT_OP_RENAME, NULL);
1010         if (IS_ERR(handle))
1011                 GOTO(out_dput2, rc = PTR_ERR(handle));
1012
1013         rc = ll_vfs_rename(parent_inode, dchild, mds->mds_vfsmnt,
1014                            parent_inode, new_child, mds->mds_vfsmnt);
1015         if (rc)
1016                 CERROR("error renaming new object "LPU64":%u: rc %d\n",
1017                        oa->o_id, oa->o_generation, rc);
1018
1019         err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
1020                             handle, 0);
1021         if (!err)
1022                 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
1023         else if (!rc)
1024                 rc = err;
1025 out_dput2:
1026         dput(new_child);
1027 out_dput:
1028         dput(dchild);
1029         UNLOCK_INODE_MUTEX(parent_inode);
1030 out_pop:
1031         pop_ctxt(saved, &exp->exp_obd->obd_lvfs_ctxt, &ucred);
1032         OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
1033         RETURN(rc);
1034 }
1035
1036 int mds_obd_destroy(struct obd_export *exp, struct obdo *oa,
1037                     struct lov_stripe_md *ea, struct obd_trans_info *oti,
1038                     struct obd_export *md_exp)
1039 {
1040         struct mds_obd *mds = &exp->exp_obd->u.mds;
1041         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
1042         struct obd_device *obd = exp->exp_obd;
1043         struct lvfs_run_ctxt *saved = NULL;
1044         struct lvfs_ucred ucred = { 0 };
1045         char fidname[LL_FID_NAMELEN];
1046         struct inode *inode = NULL;
1047         struct dentry *de;
1048         void *handle;
1049         int err, namelen, rc = 0;
1050         ENTRY;
1051
1052         OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
1053         if (saved == NULL) {
1054                 CERROR("cannot allocate memory for run ctxt\n");
1055                 RETURN(-ENOMEM);
1056         }
1057
1058         cap_raise(ucred.luc_cap, CAP_SYS_RESOURCE);
1059         push_ctxt(saved, &obd->obd_lvfs_ctxt, &ucred);
1060
1061         namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
1062
1063         LOCK_INODE_MUTEX(parent_inode);
1064         de = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
1065         if (IS_ERR(de)) {
1066                 rc = IS_ERR(de);
1067                 de = NULL;
1068                 CERROR("error looking up object "LPU64" %s: rc %d\n",
1069                        oa->o_id, fidname, rc);
1070                 GOTO(out_dput, rc);
1071         }
1072         if (de->d_inode == NULL) {
1073                 CERROR("destroying non-existent object "LPU64" %s: rc %d\n",
1074                        oa->o_id, fidname, rc);
1075                 GOTO(out_dput, rc = -ENOENT);
1076         }
1077
1078         /* Stripe count is 1 here since this is some MDS specific stuff
1079            that is unlinked, not spanned across multiple OSTs */
1080         handle = fsfilt_start_log(obd, mds->mds_objects_dir->d_inode,
1081                                   FSFILT_OP_UNLINK, oti, 1);
1082
1083         if (IS_ERR(handle))
1084                 GOTO(out_dput, rc = PTR_ERR(handle));
1085
1086         /* take a reference to protect inode from truncation within
1087            vfs_unlink() context. bug 10409 */
1088         inode = de->d_inode;
1089         atomic_inc(&inode->i_count);
1090         rc = ll_vfs_unlink(mds->mds_objects_dir->d_inode, de, mds->mds_vfsmnt);
1091         if (rc)
1092                 CERROR("error destroying object "LPU64":%u: rc %d\n",
1093                        oa->o_id, oa->o_generation, rc);
1094
1095         err = fsfilt_commit(obd, mds->mds_objects_dir->d_inode, handle, 0);
1096         if (err && !rc)
1097                 rc = err;
1098 out_dput:
1099         if (de != NULL)
1100                 l_dput(de);
1101         UNLOCK_INODE_MUTEX(parent_inode);
1102
1103         if (inode)
1104                 iput(inode);
1105
1106         pop_ctxt(saved, &obd->obd_lvfs_ctxt, &ucred);
1107         OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
1108         RETURN(rc);
1109 }