Whamcloud - gitweb
ed0d7d8699d83352ce717a3fa533d7aa7551fc98
[fs/lustre-release.git] / lustre / mds / mds_fs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_fs.c
37  *
38  * Lustre Metadata Server (MDS) filesystem interface code
39  *
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <linux/kmod.h>
50 #include <linux/version.h>
51 #include <linux/sched.h>
52 #include <lustre_quota.h>
53 #include <linux/mount.h>
54 #include <lustre_mds.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lustre_lib.h>
58 #include <lustre_fsfilt.h>
59 #include <lustre_disk.h>
60 #include <libcfs/list.h>
61
62 #include "mds_internal.h"
63
64
65 int mds_export_stats_init(struct obd_device *obd,
66                                  struct obd_export *exp,
67                                  int reconnect,
68                                  void *localdata)
69 {
70         lnet_nid_t *client_nid = localdata;
71         int rc, num_stats, newnid = 0;
72
73         rc = lprocfs_exp_setup(exp, client_nid, reconnect, &newnid);
74         if (rc) {
75                 /* Mask error for already created
76                  * /proc entries */
77                 if (rc == -EALREADY)
78                         rc = 0;
79                 return rc;
80         }
81
82         if (newnid) {
83                 struct nid_stat *tmp = exp->exp_nid_stats;
84                 LASSERT(tmp != NULL);
85
86                 num_stats = (sizeof(*obd->obd_type->typ_ops) / sizeof(void *)) +
87                              LPROC_MDS_LAST - 1;
88                 tmp->nid_stats = lprocfs_alloc_stats(num_stats,
89                                             LPROCFS_STATS_FLAG_NOPERCPU);
90                 if (tmp->nid_stats == NULL)
91                         return -ENOMEM;
92
93                 lprocfs_init_ops_stats(LPROC_MDS_LAST, tmp->nid_stats);
94                 mds_stats_counter_init(tmp->nid_stats);
95
96                 rc = lprocfs_nid_ldlm_stats_init(tmp);
97                 if (rc)
98                         return rc;
99         }
100
101         return 0;
102 }
103
104 /* VBR: to determine the delayed client the lcd should be updated for each new
105  * epoch */
106 int mds_update_client_epoch(struct obd_export *exp)
107 {
108         struct mds_export_data *med = &exp->exp_mds_data;
109         struct mds_obd *mds = &exp->exp_obd->u.mds;
110         struct lvfs_run_ctxt saved;
111         loff_t off = med->med_lr_off;
112         int rc = 0;
113
114         /* VBR: set client last_epoch to current epoch */
115         if (le32_to_cpu(med->med_lcd->lcd_last_epoch) >=
116                         le32_to_cpu(mds->mds_server_data->lsd_start_epoch))
117                 return rc;
118
119         med->med_lcd->lcd_last_epoch = mds->mds_server_data->lsd_start_epoch;
120         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
121         rc = fsfilt_write_record(exp->exp_obd, mds->mds_rcvd_filp,
122                                  med->med_lcd, sizeof(*med->med_lcd), &off,
123                                  exp->exp_delayed);
124         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
125
126         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
127                med->med_lr_idx, le32_to_cpu(med->med_lcd->lcd_last_epoch),
128                le32_to_cpu(mds->mds_server_data->lsd_start_epoch));
129
130         return rc;
131 }
132
133 /* Called after recovery is done on server */
134 void mds_update_last_epoch(struct obd_device *obd)
135 {
136         struct ptlrpc_request *req;
137         struct mds_obd *mds = &obd->u.mds;
138         __u32 start_epoch;
139
140         /* Increase server epoch after recovery */
141         spin_lock(&mds->mds_transno_lock);
142         start_epoch = lr_epoch(mds->mds_last_transno) + 1;
143         mds->mds_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
144         mds->mds_server_data->lsd_start_epoch = cpu_to_le32(start_epoch);
145         spin_unlock(&mds->mds_transno_lock);
146
147         /* go through delayed reply queue to find all exports participate in
148          * recovery and set new epoch for them */
149         list_for_each_entry(req, &obd->obd_delayed_reply_queue, rq_list) {
150                 LASSERT(!req->rq_export->exp_delayed);
151                 mds_update_client_epoch(req->rq_export);
152         }
153         mds_update_server_data(obd, 1);
154 }
155
156 /* Add client data to the MDS.  We use a bitmap to locate a free space
157  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
158  * Otherwise, we have just read the data from the last_rcvd file and
159  * we know its offset.
160  *
161  * It should not be possible to fail adding an existing client - otherwise
162  * mds_init_server_data() callsite needs to be fixed.
163  */
164 int mds_client_add(struct obd_device *obd, struct obd_export *exp,
165                    int cl_idx, void *localdata)
166 {
167         struct mds_obd *mds = &obd->u.mds;
168         struct mds_export_data *med = &exp->exp_mds_data;
169         unsigned long *bitmap = mds->mds_client_bitmap;
170         int new_client = (cl_idx == -1);
171         int rc = 0;
172         ENTRY;
173
174         LASSERT(bitmap != NULL);
175         LASSERTF(cl_idx > -2, "%d\n", cl_idx);
176
177         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
178         if (!strcmp(med->med_lcd->lcd_uuid, obd->obd_uuid.uuid))
179                 RETURN(0);
180
181         /* VBR: remove expired exports before searching for free slot */
182         if (new_client)
183                 class_disconnect_expired_exports(obd);
184
185         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
186          * there's no need for extra complication here
187          */
188         if (new_client) {
189                 cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
190         repeat:
191                 if (cl_idx >= LR_MAX_CLIENTS ||
192                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
193                         CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n",
194                                cl_idx);
195                         return -EOVERFLOW;
196                 }
197                 if (test_and_set_bit(cl_idx, bitmap)) {
198                         cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
199                                                     cl_idx);
200                         goto repeat;
201                 }
202         } else {
203                 if (test_and_set_bit(cl_idx, bitmap)) {
204                         CERROR("MDS client %d: bit already set in bitmap!!\n",
205                                cl_idx);
206                         LBUG();
207                 }
208         }
209
210         CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
211                cl_idx, med->med_lcd->lcd_uuid);
212
213         med->med_lr_idx = cl_idx;
214         med->med_lr_off = le32_to_cpu(mds->mds_server_data->lsd_client_start) +
215                 (cl_idx * le16_to_cpu(mds->mds_server_data->lsd_client_size));
216         LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
217         mds_export_stats_init(obd, exp, 0, localdata);
218
219         if (new_client) {
220                 struct lvfs_run_ctxt *saved = NULL;
221                 loff_t off = med->med_lr_off;
222                 struct file *file = mds->mds_rcvd_filp;
223                 void *handle;
224
225                 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
226                 if (saved == NULL) {
227                         CERROR("cannot allocate memory for run ctxt\n");
228                         RETURN(-ENOMEM);
229                 }
230
231                 push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
232                 handle = fsfilt_start(obd, file->f_dentry->d_inode,
233                                       FSFILT_OP_SETATTR, NULL);
234                 if (IS_ERR(handle)) {
235                         rc = PTR_ERR(handle);
236                         CERROR("unable to start transaction: rc %d\n", rc);
237                 } else {
238                         /* VBR: set client last_transno as mds_last_transno to
239                          * remember last epoch for this client */
240                         med->med_lcd->lcd_last_epoch =
241                                         mds->mds_server_data->lsd_start_epoch;
242                         exp->exp_last_request_time = cfs_time_current_sec();
243                         /* remember first epoch of client for orphan handling */
244                         med->med_lcd->lcd_first_epoch =
245                                   cpu_to_le32(lr_epoch(mds->mds_last_transno));
246                         rc = fsfilt_add_journal_cb(obd, 0, handle,
247                                                    target_client_add_cb, exp);
248                         if (rc == 0) {
249                                 spin_lock(&exp->exp_lock);
250                                 exp->exp_need_sync = 1;
251                                 spin_unlock(&exp->exp_lock);
252                         }
253                         rc = fsfilt_write_record(obd, file, med->med_lcd,
254                                                  sizeof(*med->med_lcd),
255                                                  &off, rc /* sync if no cb */);
256                         fsfilt_commit(obd, file->f_dentry->d_inode, handle, 0);
257                 }
258
259                 pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
260                 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
261
262                 if (rc)
263                         return rc;
264                 CDEBUG(D_INFO, "wrote client lcd at idx %u off %llu (len %u)\n",
265                        med->med_lr_idx, med->med_lr_off,
266                        (unsigned int)sizeof(*med->med_lcd));
267         }
268         return rc;
269 }
270
271 struct lsd_client_data zero_lcd; /* globals are implicitly zeroed */
272  
273 int mds_client_free(struct obd_export *exp)
274 {
275         struct mds_export_data *med = &exp->exp_mds_data;
276         struct mds_obd *mds = &exp->exp_obd->u.mds;
277         struct obd_device *obd = exp->exp_obd;
278         struct lvfs_run_ctxt *saved = NULL;
279         int rc;
280         loff_t off;
281         ENTRY;
282
283         if (!med->med_lcd)
284                 RETURN(0);
285
286         /* Do not erase record for recoverable client. */
287         if (obd->obd_fail && !exp->exp_failed)
288                 GOTO(free, 0);
289
290         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
291         if (!strcmp(med->med_lcd->lcd_uuid, obd->obd_uuid.uuid))
292                 GOTO(free, 0);
293
294         CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n",
295                med->med_lr_idx, med->med_lr_off, med->med_lcd->lcd_uuid);
296
297         LASSERT(mds->mds_client_bitmap != NULL);
298
299
300         off = med->med_lr_off;
301
302         /* Don't clear med_lr_idx here as it is likely also unset.  At worst
303          * we leak a client slot that will be cleaned on the next recovery. */
304         if (off <= 0) {
305                 CERROR("%s: client idx %d has offset %lld\n",
306                         obd->obd_name, med->med_lr_idx, off);
307                 GOTO(free, rc = -EINVAL);
308         }
309
310         /* Clear the bit _after_ zeroing out the client so we don't
311            race with mds_client_add and zero out new clients.*/
312         if (!test_bit(med->med_lr_idx, mds->mds_client_bitmap)) {
313                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
314                        med->med_lr_idx);
315                 LBUG();
316         }
317
318         if (!(exp->exp_flags & OBD_OPT_FAILOVER)) {
319                 /* Don't force sync on each disconnect if aborting recovery,
320                  * or it does num_clients * num_osts syncs.  b=17194 */
321                 int need_sync = (!exp->exp_libclient || exp->exp_need_sync) &&
322                                  !(exp->exp_flags & OBD_OPT_ABORT_RECOV);
323                 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
324                 if (saved == NULL) {
325                         CERROR("cannot allocate memory for run ctxt\n");
326                         GOTO(free, rc = -ENOMEM);
327                 }
328                 push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
329                 rc = fsfilt_write_record(obd, mds->mds_rcvd_filp, &zero_lcd,
330                                          sizeof(zero_lcd), &off, 0);
331
332                 /* Make sure the server's last_transno is up to date. Do this
333                  * after the client is freed so we know all the client's
334                  * transactions have been committed. */
335                 if (rc == 0)
336                         mds_update_server_data(exp->exp_obd, need_sync);
337
338                 pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
339
340                 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
341                        "zero out client %s at idx %u/%llu in %s %ssync rc %d\n",
342                        med->med_lcd->lcd_uuid, med->med_lr_idx, med->med_lr_off,
343                        LAST_RCVD, need_sync ? "" : "a", rc);
344         }
345
346         if (!test_and_clear_bit(med->med_lr_idx, mds->mds_client_bitmap)) {
347                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
348                        med->med_lr_idx);
349                 LBUG();
350         }
351
352         EXIT;
353 free:
354         if (saved)
355                 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
356
357         OBD_FREE_PTR(med->med_lcd);
358         med->med_lcd = NULL;
359
360         return 0;
361 }
362
363 static int mds_server_free_data(struct mds_obd *mds)
364 {
365         OBD_FREE(mds->mds_client_bitmap, LR_MAX_CLIENTS / 8);
366         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
367         mds->mds_server_data = NULL;
368
369         return 0;
370 }
371
372 static void mds_add_fake_export(struct obd_device *obd, int num,
373                                 struct file *file)
374 {
375         struct obd_export *exp;
376         struct lvfs_run_ctxt saved;
377         struct obd_device_target *obt = &obd->u.obt;
378         struct lu_export_data *led;
379         unsigned long *bitmap = obt->obt_client_bitmap;
380         struct lsd_client_data *lcd = NULL;
381         unsigned int idx = 0;
382         loff_t off = 0;
383         int rc = 0;
384
385         while (num > 0) {
386                 num--;
387                 if (!lcd) {
388                         OBD_ALLOC_PTR(lcd);
389                         if (!lcd)
390                                 return;
391                 }
392                 idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS, idx);
393                 if (idx >= LR_MAX_CLIENTS) {
394                         CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n", idx);
395                         OBD_FREE_PTR(lcd);
396                         break;
397                 }
398                 if (test_and_set_bit(idx, bitmap)) {
399                         CERROR("Bit %u is set already\n", idx);
400                         continue;
401                 }
402                 off = le32_to_cpu(obt->obt_lsd->lsd_client_start) +
403                       idx * le16_to_cpu(obt->obt_lsd->lsd_client_size);
404
405                 sprintf(lcd->lcd_uuid, "dead-%.16u", idx);
406                 CDEBUG(D_INFO, "Create fake export %s, index %u, offset %lu\n",
407                        lcd->lcd_uuid, idx, (unsigned long)off);
408
409                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
410                 if (IS_ERR(exp)) {
411                         if (PTR_ERR(exp) == -EALREADY) {
412                                 CERROR("Export %s already exists\n",
413                                        lcd->lcd_uuid);
414                         }
415                         CERROR("Failed to create export %lu\n", PTR_ERR(exp));
416                         OBD_FREE_PTR(lcd);
417                         break;
418                 }
419                 LASSERT(exp);
420                 led = &exp->exp_target_data;
421                 led->led_lr_idx = idx;
422                 led->led_lr_off = off;
423                 led->led_lcd = lcd;
424
425                 exp->exp_last_request_time = cfs_time_current_sec();
426                 exp->exp_replay_needed = 1;
427                 exp->exp_connecting = 0;
428                 exp->exp_in_recovery = 0;
429
430                 spin_lock_bh(&obd->obd_processing_task_lock);
431                 obd->obd_recoverable_clients++;
432                 obd->obd_max_recoverable_clients++;
433                 spin_unlock_bh(&obd->obd_processing_task_lock);
434
435                 class_set_export_delayed(exp);
436                 class_export_put(exp);
437
438                 lcd->lcd_last_epoch = cpu_to_le32(1);
439                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
440                 rc = fsfilt_write_record(obd, file, lcd, sizeof(*lcd), &off, 0);
441                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
442                 if (rc) {
443                         CERROR("Failed to create fake client record\n");
444                         OBD_FREE_PTR(lcd);
445                         break;
446                 }
447                 lcd = NULL;
448         }
449 }
450
451 static int mds_init_server_data(struct obd_device *obd, struct file *file)
452 {
453         struct mds_obd *mds = &obd->u.mds;
454         struct lr_server_data *lsd;
455         struct lsd_client_data *lcd = NULL;
456         struct lustre_mount_info *lmi;
457         loff_t off = 0;
458         unsigned long last_rcvd_size = i_size_read(file->f_dentry->d_inode);
459         __u64 mount_count;
460         __u32 start_epoch;
461         int cl_idx, rc = 0;
462         ENTRY;
463
464         /* ensure padding in the struct is the correct size */
465         LASSERT(offsetof(struct lr_server_data, lsd_padding) +
466                 sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
467         LASSERT(offsetof(struct lsd_client_data, lcd_padding) +
468                 sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
469
470         OBD_ALLOC_WAIT(lsd, sizeof(*lsd));
471         if (!lsd)
472                 RETURN(-ENOMEM);
473
474         OBD_ALLOC_WAIT(mds->mds_client_bitmap, LR_MAX_CLIENTS / 8);
475         if (!mds->mds_client_bitmap) {
476                 OBD_FREE(lsd, sizeof(*lsd));
477                 RETURN(-ENOMEM);
478         }
479
480         mds->mds_server_data = lsd;
481
482         if (last_rcvd_size == 0) {
483                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
484
485                 memcpy(lsd->lsd_uuid, obd->obd_uuid.uuid,sizeof(lsd->lsd_uuid));
486                 lsd->lsd_last_transno = 0;
487                 mount_count = lsd->lsd_mount_count = 0;
488                 lsd->lsd_server_size = cpu_to_le32(LR_SERVER_SIZE);
489                 lsd->lsd_client_start = cpu_to_le32(LR_CLIENT_START);
490                 lsd->lsd_client_size = cpu_to_le16(LR_CLIENT_SIZE);
491                 lsd->lsd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
492                 lsd->lsd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID);
493                 lsd->lsd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT);
494         } else {
495                 rc = fsfilt_read_record(obd, file, lsd, sizeof(*lsd), &off);
496                 if (rc) {
497                         CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
498                         GOTO(err_msd, rc);
499                 }
500                 if (strcmp(lsd->lsd_uuid, obd->obd_uuid.uuid) != 0) {
501                         LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s using"
502                                            " the wrong disk %s. Were the /dev/ "
503                                            "assignments rearranged?\n",
504                                            obd->obd_uuid.uuid, lsd->lsd_uuid);
505                         GOTO(err_msd, rc = -EINVAL);
506                 }
507                 lsd->lsd_feature_compat |= cpu_to_le32(OBD_COMPAT_MDT);
508                 /* COMPAT_146 */
509                 /* Assume old last_rcvd format unless I_C_LR is set */
510                 if (!(lsd->lsd_feature_incompat &
511                       cpu_to_le32(OBD_INCOMPAT_COMMON_LR)))
512                         lsd->lsd_mount_count = lsd->lsd_compat14;
513                 /* end COMPAT_146 */
514                 mount_count = le64_to_cpu(lsd->lsd_mount_count);
515         }
516
517         if (lsd->lsd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
518                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
519                        obd->obd_name, le32_to_cpu(lsd->lsd_feature_incompat) &
520                        ~MDT_INCOMPAT_SUPP);
521                 GOTO(err_msd, rc = -EINVAL);
522         }
523         if (lsd->lsd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) {
524                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
525                        obd->obd_name, le32_to_cpu(lsd->lsd_feature_rocompat) &
526                        ~MDT_ROCOMPAT_SUPP);
527                 /* Do something like remount filesystem read-only */
528                 GOTO(err_msd, rc = -EINVAL);
529         }
530         /* evict all clients as it is first boot with 2.0 last_rcvd */
531         if (lsd->lsd_feature_compat & cpu_to_le32(OBD_COMPAT_20)) {
532                 LCONSOLE_WARN("Mounting %s at first time on 2.0 FS, remove all"
533                               " clients for interop needs\n", obd->obd_name);
534                 simple_truncate(mds->mds_vfsmnt->mnt_sb->s_root,
535                                 mds->mds_vfsmnt, LAST_RCVD,
536                                 lsd->lsd_client_start);
537                 last_rcvd_size = lsd->lsd_client_start;
538                 lsd->lsd_feature_compat &= ~cpu_to_le32(OBD_COMPAT_20);
539         }
540
541         target_trans_table_init(obd);
542         mds->mds_last_transno = le64_to_cpu(lsd->lsd_last_transno);
543         start_epoch = le32_to_cpu(lsd->lsd_start_epoch);
544
545         CDEBUG(D_INODE, "%s: server start_epoch: %#x\n",
546                obd->obd_name, start_epoch);
547         CDEBUG(D_INODE, "%s: server last_transno: "LPX64"\n",
548                obd->obd_name, mds->mds_last_transno);
549         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
550                obd->obd_name, mount_count + 1);
551         CDEBUG(D_INODE, "%s: server data size: %u\n",
552                obd->obd_name, le32_to_cpu(lsd->lsd_server_size));
553         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
554                obd->obd_name, le32_to_cpu(lsd->lsd_client_start));
555         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
556                obd->obd_name, le32_to_cpu(lsd->lsd_client_size));
557         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
558                obd->obd_name, last_rcvd_size);
559         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
560                last_rcvd_size <= le32_to_cpu(lsd->lsd_client_start) ? 0 :
561                (last_rcvd_size - le32_to_cpu(lsd->lsd_client_start)) /
562                 le16_to_cpu(lsd->lsd_client_size));
563
564         if (!lsd->lsd_server_size || !lsd->lsd_client_start ||
565             !lsd->lsd_client_size) {
566                 CERROR("Bad last_rcvd contents!\n");
567                 GOTO(err_msd, rc = -EINVAL);
568         }
569
570         /* When we do a clean MDS shutdown, we save the last_transno into
571          * the header.  If we find clients with higher last_transno values
572          * then those clients may need recovery done. */
573         for (cl_idx = 0, off = le32_to_cpu(lsd->lsd_client_start);
574              off < last_rcvd_size; cl_idx++) {
575                 __u64 last_transno;
576                 __u32 last_epoch;
577                 struct obd_export *exp;
578                 struct mds_export_data *med;
579
580                 if (!lcd) {
581                         OBD_ALLOC_WAIT(lcd, sizeof(*lcd));
582                         if (!lcd)
583                                 GOTO(err_client, rc = -ENOMEM);
584                 }
585
586                 /* Don't assume off is incremented properly by
587                  * fsfilt_read_record(), in case sizeof(*lcd)
588                  * isn't the same as lsd->lsd_client_size.  */
589                 off = le32_to_cpu(lsd->lsd_client_start) +
590                         cl_idx * le16_to_cpu(lsd->lsd_client_size);
591                 rc = fsfilt_read_record(obd, file, lcd, sizeof(*lcd), &off);
592                 if (rc) {
593                         CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
594                                LAST_RCVD, cl_idx, off, rc);
595                         break; /* read error shouldn't cause startup to fail */
596                 }
597
598                 if (lcd->lcd_uuid[0] == '\0') {
599                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
600                                cl_idx);
601                         continue;
602                 }
603
604                 check_lcd(obd->obd_name, cl_idx, lcd);
605
606                 last_transno = lsd_last_transno(lcd);
607                 last_epoch = le32_to_cpu(lcd->lcd_last_epoch);
608
609                 /* These exports are cleaned up by mds_disconnect(), so they
610                  * need to be set up like real exports as mds_connect() does.
611                  */
612                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
613                        " srv lr: "LPU64" lx: "LPU64"\n", lcd->lcd_uuid, cl_idx,
614                        last_transno, le64_to_cpu(lsd->lsd_last_transno),
615                        le64_to_cpu(lcd->lcd_last_xid));
616
617                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
618                 if (IS_ERR(exp)) {
619                         if (PTR_ERR(exp) == -EALREADY) {
620                                 /* export already exists, zero out this one */
621                                 lcd->lcd_uuid[0] = '\0';
622                         } else {
623                                 GOTO(err_client, rc = PTR_ERR(exp));
624                         }
625                 } else {
626                         med = &exp->exp_mds_data;
627                         med->med_lcd = lcd;
628                         rc = mds_client_add(obd, exp, cl_idx, NULL);
629                         /* can't fail for existing client */
630                         LASSERTF(rc == 0, "rc = %d\n", rc);
631
632                         /* VBR: set export last committed version */
633                         exp->exp_last_committed = last_transno;
634                         /* read last time from disk */
635                         exp->exp_last_request_time = target_trans_table_last_time(exp);
636                         lcd = NULL;
637
638                         spin_lock(&exp->exp_lock);
639                         exp->exp_replay_needed = 1;
640                         exp->exp_connecting = 0;
641                         exp->exp_in_recovery = 0;
642                         spin_unlock(&exp->exp_lock);
643
644                         spin_lock_bh(&obd->obd_processing_task_lock);
645                         obd->obd_recoverable_clients++;
646                         obd->obd_max_recoverable_clients++;
647                         spin_unlock_bh(&obd->obd_processing_task_lock);
648
649                         /* VBR: if epoch too old mark export as delayed,
650                          * if epoch is zero then client is pre-vbr one */
651                         if (start_epoch > last_epoch && last_epoch != 0)
652                                 class_set_export_delayed(exp);
653                         class_export_put(exp);
654                 }
655
656                 /* Need to check last_rcvd even for duplicated exports. */
657                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPX64","
658                        "last_epoch %#x\n", cl_idx, last_transno, last_epoch);
659
660                 if (last_transno > mds->mds_last_transno)
661                         mds->mds_last_transno = last_transno;
662         }
663
664         if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_TGT_FAKE_EXP))) {
665                 mds_add_fake_export(obd, obd_fail_val, file);
666         }
667
668         if (lcd)
669                 OBD_FREE_PTR(lcd);
670
671         obd->obd_last_committed = mds->mds_last_transno;
672
673         if (obd->obd_recoverable_clients) {
674                 CWARN("RECOVERY: service %s, %d recoverable clients, "
675                       "%d delayed clients, last_transno "LPU64"\n",
676                       obd->obd_name, obd->obd_recoverable_clients,
677                       obd->obd_delayed_clients, mds->mds_last_transno);
678                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
679                 obd->obd_recovering = 1;
680                 obd->obd_recovery_start = 0;
681                 obd->obd_recovery_end = 0;
682         } else {
683                 LASSERT(!obd->obd_recovering);
684                 /* VBR: update boot epoch after recovery */
685                 mds_update_last_epoch(obd);
686         }
687
688         obd->obd_recovery_timeout = OBD_RECOVERY_TIME_SOFT;
689         obd->obd_recovery_time_hard = OBD_RECOVERY_TIME_HARD;
690
691         lmi = server_find_mount_locked(obd->obd_name);
692         if (lmi) {
693                 struct lustre_sb_info *lsi = s2lsi(lmi->lmi_sb);
694
695                 if (lsi->lsi_lmd && lsi->lsi_lmd->lmd_recovery_time_soft)
696                         obd->obd_recovery_timeout =
697                                 lsi->lsi_lmd->lmd_recovery_time_soft;
698
699                 if (lsi->lsi_lmd && lsi->lsi_lmd->lmd_recovery_time_hard)
700                         obd->obd_recovery_time_hard =
701                                 lsi->lsi_lmd->lmd_recovery_time_hard;
702         }
703
704         mds->mds_mount_count = mount_count + 1;
705         lsd->lsd_mount_count = lsd->lsd_compat14 =
706                 cpu_to_le64(mds->mds_mount_count);
707
708         /* save it, so mount count and last_transno is current */
709         rc = mds_update_server_data(obd, 1);
710         if (rc)
711                 GOTO(err_client, rc);
712
713         RETURN(0);
714
715 err_client:
716         class_disconnect_exports(obd);
717 err_msd:
718         mds_server_free_data(mds);
719         RETURN(rc);
720 }
721
722 int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
723 {
724         struct mds_obd *mds = &obd->u.mds;
725         struct lvfs_run_ctxt *saved = NULL;
726         struct dentry *dentry;
727         struct file *file;
728         int rc;
729         ENTRY;
730
731         OBD_FAIL_RETURN(OBD_FAIL_MDS_FS_SETUP, -ENOENT);
732
733         rc = cleanup_group_info();
734         if (rc)
735                 RETURN(rc);
736
737         OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
738         if (saved == NULL) {
739                 CERROR("cannot allocate memory for run ctxt\n");
740                 RETURN(-ENOMEM);
741         }
742
743         mds->mds_vfsmnt = mnt;
744         /* why not mnt->mnt_sb instead of mnt->mnt_root->d_inode->i_sb? */
745         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
746         obd->u.obt.obt_stale_export_age = STALE_EXPORT_MAXTIME_DEFAULT;
747         spin_lock_init(&obd->u.obt.obt_trans_table_lock);
748
749         rc = fsfilt_setup(obd, obd->u.obt.obt_sb);
750         if (rc)
751                 RETURN(rc);
752
753         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
754         obd->obd_lvfs_ctxt.pwdmnt = mnt;
755         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
756         obd->obd_lvfs_ctxt.fs = get_ds();
757         obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops;
758
759         /* setup the directory tree */
760         push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
761         dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, "ROOT", 0755, 0);
762         if (IS_ERR(dentry)) {
763                 rc = PTR_ERR(dentry);
764                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
765                 GOTO(err_pop, rc);
766         }
767
768         mds->mds_rootfid.id = dentry->d_inode->i_ino;
769         mds->mds_rootfid.generation = dentry->d_inode->i_generation;
770         mds->mds_rootfid.f_type = S_IFDIR;
771
772         dput(dentry);
773
774         dentry = lookup_one_len("__iopen__", cfs_fs_pwd(current->fs),
775                                 strlen("__iopen__"));
776         if (IS_ERR(dentry)) {
777                 rc = PTR_ERR(dentry);
778                 CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
779                 GOTO(err_pop, rc);
780         }
781
782         mds->mds_fid_de = dentry;
783         if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
784                 rc = -ENOENT;
785                 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
786                 GOTO(err_fid, rc);
787         }
788
789         dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, "PENDING", 0777, 1);
790         if (IS_ERR(dentry)) {
791                 rc = PTR_ERR(dentry);
792                 CERROR("cannot create PENDING directory: rc = %d\n", rc);
793                 GOTO(err_fid, rc);
794         }
795         mds->mds_pending_dir = dentry;
796
797         /* COMPAT_146 */
798         dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, MDT_LOGS_DIR, 0777, 1);
799         if (IS_ERR(dentry)) {
800                 rc = PTR_ERR(dentry);
801                 CERROR("cannot create %s directory: rc = %d\n",
802                        MDT_LOGS_DIR, rc);
803                 GOTO(err_pending, rc);
804         }
805         mds->mds_logs_dir = dentry;
806         /* end COMPAT_146 */
807
808         dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, "OBJECTS", 0777, 1);
809         if (IS_ERR(dentry)) {
810                 rc = PTR_ERR(dentry);
811                 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
812                 GOTO(err_logs, rc);
813         }
814         mds->mds_objects_dir = dentry;
815
816         /* open and test the last rcvd file */
817         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
818         if (IS_ERR(file)) {
819                 rc = PTR_ERR(file);
820                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
821                 GOTO(err_objects, rc = PTR_ERR(file));
822         }
823         mds->mds_rcvd_filp = file;
824         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
825                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
826                        file->f_dentry->d_inode->i_mode);
827                 GOTO(err_last_rcvd, rc = -ENOENT);
828         }
829
830         rc = mds_init_server_data(obd, file);
831         if (rc) {
832                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
833                 GOTO(err_last_rcvd, rc);
834         }
835
836         rc = mds_lov_init_objids(obd);
837         if (rc != 0) {
838                CERROR("cannot init lov objid rc = %d\n", rc);
839                GOTO(err_client, rc );
840         }
841
842         /* open and test the check io file junk */
843         file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644);
844         if (IS_ERR(file)) {
845                 rc = PTR_ERR(file);
846                 CERROR("cannot open/create %s file: rc = %d\n",HEALTH_CHECK,rc);
847                 GOTO(err_lov_objid, rc = PTR_ERR(file));
848         }
849         mds->mds_obt.obt_health_check_filp = file;
850         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
851                 CERROR("%s is not a regular file!: mode = %o\n", HEALTH_CHECK,
852                        file->f_dentry->d_inode->i_mode);
853                 GOTO(err_health_check, rc = -ENOENT);
854         }
855         rc = lvfs_check_io_health(obd, file);
856         if (rc)
857                 GOTO(err_health_check, rc);
858 err_pop:
859         pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
860         OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
861         return rc;
862
863 err_health_check:
864         if (mds->mds_obt.obt_health_check_filp &&
865             filp_close(mds->mds_obt.obt_health_check_filp, 0))
866                 CERROR("can't close %s after error\n", HEALTH_CHECK);
867 err_lov_objid:
868          mds_lov_destroy_objids(obd);
869 err_client:
870         class_disconnect_exports(obd);
871 err_last_rcvd:
872         if (mds->mds_rcvd_filp && filp_close(mds->mds_rcvd_filp, 0))
873                 CERROR("can't close %s after error\n", LAST_RCVD);
874 err_objects:
875         dput(mds->mds_objects_dir);
876 err_logs:
877         dput(mds->mds_logs_dir);
878 err_pending:
879         dput(mds->mds_pending_dir);
880 err_fid:
881         dput(mds->mds_fid_de);
882         goto err_pop;
883 }
884
885 int mds_fs_cleanup(struct obd_device *obd)
886 {
887         struct mds_obd *mds = &obd->u.mds;
888         struct lvfs_run_ctxt *saved = NULL;
889         int rc = 0;
890
891         OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
892         if (saved == NULL) {
893                 CERROR("cannot allocate memory for run ctxt\n");
894                 RETURN(-ENOMEM);
895         }
896
897         if (obd->obd_fail)
898                 LCONSOLE_WARN("%s: shutting down for failover; client state "
899                               "will be preserved.\n", obd->obd_name);
900
901         class_disconnect_exports(obd); /* cleans up client info too */
902
903        /* some exports may still be in the zombie queue, so we make sure that
904         * all the exports have been processed, otherwise the last_rcvd slot
905         * may not be updated on time */
906         obd_zombie_barrier();
907
908         mds_server_free_data(mds);
909
910         push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
911         if (mds->mds_rcvd_filp) {
912                 rc = filp_close(mds->mds_rcvd_filp, 0);
913                 mds->mds_rcvd_filp = NULL;
914                 if (rc)
915                         CERROR("%s file won't close, rc=%d\n", LAST_RCVD, rc);
916         }
917
918         mds_lov_destroy_objids(obd);
919
920         if (mds->mds_obt.obt_health_check_filp) {
921                 rc = filp_close(mds->mds_obt.obt_health_check_filp, 0);
922                 mds->mds_obt.obt_health_check_filp = NULL;
923                 if (rc)
924                         CERROR("%s file won't close, rc=%d\n", HEALTH_CHECK,rc);
925         }
926         if (mds->mds_objects_dir != NULL) {
927                 l_dput(mds->mds_objects_dir);
928                 mds->mds_objects_dir = NULL;
929         }
930         if (mds->mds_logs_dir) {
931                 l_dput(mds->mds_logs_dir);
932                 mds->mds_logs_dir = NULL;
933         }
934         if (mds->mds_pending_dir) {
935                 l_dput(mds->mds_pending_dir);
936                 mds->mds_pending_dir = NULL;
937         }
938
939         lquota_fs_cleanup(mds_quota_interface_ref, obd);
940
941         pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
942         OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
943         dput(mds->mds_fid_de);
944         LL_DQUOT_OFF(obd->u.obt.obt_sb, 0);
945         shrink_dcache_sb(mds->mds_obt.obt_sb);
946
947         return rc;
948 }
949
950 /* Creates an object with the same name as its fid.  Because this is not at all
951  * performance sensitive, it is accomplished by creating a file, checking the
952  * fid, and renaming it. */
953 int mds_obd_create(struct obd_export *exp, struct obdo *oa,
954                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
955 {
956         struct mds_obd *mds = &exp->exp_obd->u.mds;
957         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
958         unsigned int tmpname = ll_rand();
959         struct dentry *dchild, *new_child;
960         struct lvfs_dentry_params dp = LVFS_DENTRY_PARAMS_INIT;
961         struct lvfs_run_ctxt *saved = NULL;
962         char fidname[LL_FID_NAMELEN];
963         void *handle;
964         struct lvfs_ucred ucred = { 0 };
965         int rc = 0, err, namelen;
966         ENTRY;
967
968         OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
969         if (saved == NULL) {
970                 CERROR("cannot allocate memory for run ctxt\n");
971                 RETURN(-ENOMEM);
972         }
973
974         /* the owner of object file should always be root */
975         cap_raise(ucred.luc_cap, CAP_SYS_RESOURCE);
976
977         push_ctxt(saved, &exp->exp_obd->obd_lvfs_ctxt, &ucred);
978
979         sprintf(fidname, "%u.%u", tmpname, current->pid);
980         dchild = lookup_one_len(fidname, mds->mds_objects_dir, strlen(fidname));
981         if (IS_ERR(dchild)) {
982                 CERROR("getting neg dentry for obj: %u\n", tmpname);
983                 GOTO(out_pop, rc = PTR_ERR(dchild));
984         }
985         if (dchild->d_inode != NULL) {
986                 CERROR("impossible non-negative obj dentry: %u\n", tmpname);
987                 LBUG();
988         }
989
990         dchild->d_fsdata = (void *)&dp;
991         dp.ldp_ptr   = (void *)DP_LASTGROUP_REVERSE;
992
993         LOCK_INODE_MUTEX(parent_inode);
994         rc = ll_vfs_create(parent_inode, dchild, S_IFREG | 0666, NULL);
995
996         oa->o_id = dchild->d_inode->i_ino;
997         oa->o_generation = dchild->d_inode->i_generation;
998         namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
999
1000         new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
1001
1002         if (IS_ERR(new_child)) {
1003                 CERROR("getting neg dentry for obj rename: %d\n", rc);
1004                 GOTO(out_dput, rc = PTR_ERR(new_child));
1005         }
1006         if (new_child->d_inode != NULL) {
1007                 CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
1008                        oa->o_id, oa->o_generation);
1009                 LBUG();
1010         }
1011
1012         handle = fsfilt_start(exp->exp_obd, mds->mds_objects_dir->d_inode,
1013                               FSFILT_OP_RENAME, NULL);
1014         if (IS_ERR(handle))
1015                 GOTO(out_dput2, rc = PTR_ERR(handle));
1016
1017         rc = ll_vfs_rename(parent_inode, dchild, mds->mds_vfsmnt,
1018                            parent_inode, new_child, mds->mds_vfsmnt);
1019         if (rc)
1020                 CERROR("error renaming new object "LPU64":%u: rc %d\n",
1021                        oa->o_id, oa->o_generation, rc);
1022
1023         err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
1024                             handle, 0);
1025         if (!err)
1026                 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
1027         else if (!rc)
1028                 rc = err;
1029 out_dput2:
1030         dput(new_child);
1031 out_dput:
1032         dput(dchild);
1033         UNLOCK_INODE_MUTEX(parent_inode);
1034 out_pop:
1035         pop_ctxt(saved, &exp->exp_obd->obd_lvfs_ctxt, &ucred);
1036         OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
1037         RETURN(rc);
1038 }
1039
1040 int mds_obd_destroy(struct obd_export *exp, struct obdo *oa,
1041                     struct lov_stripe_md *ea, struct obd_trans_info *oti,
1042                     struct obd_export *md_exp)
1043 {
1044         struct mds_obd *mds = &exp->exp_obd->u.mds;
1045         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
1046         struct obd_device *obd = exp->exp_obd;
1047         struct lvfs_run_ctxt *saved = NULL;
1048         struct lvfs_ucred ucred = { 0 };
1049         char fidname[LL_FID_NAMELEN];
1050         struct inode *inode = NULL;
1051         struct dentry *de;
1052         void *handle;
1053         int err, namelen, rc = 0;
1054         ENTRY;
1055
1056         OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
1057         if (saved == NULL) {
1058                 CERROR("cannot allocate memory for run ctxt\n");
1059                 RETURN(-ENOMEM);
1060         }
1061
1062         cap_raise(ucred.luc_cap, CAP_SYS_RESOURCE);
1063         push_ctxt(saved, &obd->obd_lvfs_ctxt, &ucred);
1064
1065         namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
1066
1067         LOCK_INODE_MUTEX(parent_inode);
1068         de = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
1069         if (IS_ERR(de)) {
1070                 rc = IS_ERR(de);
1071                 de = NULL;
1072                 CERROR("error looking up object "LPU64" %s: rc %d\n",
1073                        oa->o_id, fidname, rc);
1074                 GOTO(out_dput, rc);
1075         }
1076         if (de->d_inode == NULL) {
1077                 CERROR("destroying non-existent object "LPU64" %s: rc %d\n",
1078                        oa->o_id, fidname, rc);
1079                 GOTO(out_dput, rc = -ENOENT);
1080         }
1081
1082         /* Stripe count is 1 here since this is some MDS specific stuff
1083            that is unlinked, not spanned across multiple OSTs */
1084         handle = fsfilt_start_log(obd, mds->mds_objects_dir->d_inode,
1085                                   FSFILT_OP_UNLINK, oti, 1);
1086
1087         if (IS_ERR(handle))
1088                 GOTO(out_dput, rc = PTR_ERR(handle));
1089
1090         /* take a reference to protect inode from truncation within
1091            vfs_unlink() context. bug 10409 */
1092         inode = de->d_inode;
1093         atomic_inc(&inode->i_count);
1094         rc = ll_vfs_unlink(mds->mds_objects_dir->d_inode, de, mds->mds_vfsmnt);
1095         if (rc)
1096                 CERROR("error destroying object "LPU64":%u: rc %d\n",
1097                        oa->o_id, oa->o_generation, rc);
1098
1099         err = fsfilt_commit(obd, mds->mds_objects_dir->d_inode, handle, 0);
1100         if (err && !rc)
1101                 rc = err;
1102 out_dput:
1103         if (de != NULL)
1104                 l_dput(de);
1105         UNLOCK_INODE_MUTEX(parent_inode);
1106
1107         if (inode)
1108                 iput(inode);
1109
1110         pop_ctxt(saved, &obd->obd_lvfs_ctxt, &ucred);
1111         OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
1112         RETURN(rc);
1113 }