Whamcloud - gitweb
b=15504
[fs/lustre-release.git] / lustre / mds / mds_fs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_fs.c
37  *
38  * Lustre Metadata Server (MDS) filesystem interface code
39  *
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <linux/kmod.h>
50 #include <linux/version.h>
51 #include <linux/sched.h>
52 #include <lustre_quota.h>
53 #include <linux/mount.h>
54 #include <lustre_mds.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lustre_lib.h>
58 #include <lustre_fsfilt.h>
59 #include <lustre_disk.h>
60 #include <libcfs/list.h>
61
62 #include "mds_internal.h"
63
64
65 int mds_export_stats_init(struct obd_device *obd,
66                                  struct obd_export *exp,
67                                  int reconnect,
68                                  void *localdata)
69 {
70         lnet_nid_t *client_nid = localdata;
71         int rc, num_stats, newnid = 0;
72
73         rc = lprocfs_exp_setup(exp, client_nid, reconnect, &newnid);
74         if (rc) {
75                 /* Mask error for already created
76                  * /proc entries */
77                 if (rc == -EALREADY)
78                         rc = 0;
79                 return rc;
80         }
81
82         if (newnid) {
83                 struct nid_stat *tmp = exp->exp_nid_stats;
84                 LASSERT(tmp != NULL);
85
86                 num_stats = (sizeof(*obd->obd_type->typ_ops) / sizeof(void *)) +
87                              LPROC_MDS_LAST - 1;
88                 tmp->nid_stats = lprocfs_alloc_stats(num_stats,
89                                             LPROCFS_STATS_FLAG_NOPERCPU);
90                 if (tmp->nid_stats == NULL)
91                         return -ENOMEM;
92
93                 lprocfs_init_ops_stats(LPROC_MDS_LAST, tmp->nid_stats);
94                 rc = lprocfs_register_stats(tmp->nid_proc, "stats",
95                                             tmp->nid_stats);
96                 if (rc)
97                         return rc;
98
99                 mds_stats_counter_init(tmp->nid_stats);
100
101                 /* Always add in ldlm_stats */
102                 tmp->nid_ldlm_stats =
103                         lprocfs_alloc_stats(LDLM_LAST_OPC - LDLM_FIRST_OPC,
104                                             LPROCFS_STATS_FLAG_NOPERCPU);
105                 if (tmp->nid_ldlm_stats == NULL)
106                         return -ENOMEM;
107
108                 lprocfs_init_ldlm_stats(tmp->nid_ldlm_stats);
109
110                 rc = lprocfs_register_stats(tmp->nid_proc, "ldlm_stats",
111                                             tmp->nid_ldlm_stats);
112                 if (rc)
113                         return rc;
114         }
115
116         return 0;
117 }
118
119 /* VBR: to determine the delayed client the lcd should be updated for each new
120  * epoch */
121 int mds_update_client_epoch(struct obd_export *exp)
122 {
123         struct mds_export_data *med = &exp->exp_mds_data;
124         struct mds_obd *mds = &exp->exp_obd->u.mds;
125         struct lvfs_run_ctxt saved;
126         loff_t off = med->med_lr_off;
127         int rc = 0;
128
129         /* VBR: set client last_epoch to current epoch */
130         if (le32_to_cpu(med->med_lcd->lcd_last_epoch) >=
131                         le32_to_cpu(mds->mds_server_data->lsd_start_epoch))
132                 return rc;
133
134         med->med_lcd->lcd_last_epoch = mds->mds_server_data->lsd_start_epoch;
135         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
136         rc = fsfilt_write_record(exp->exp_obd, mds->mds_rcvd_filp,
137                                  med->med_lcd, sizeof(*med->med_lcd), &off,
138                                  exp->exp_delayed);
139         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
140
141         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
142                med->med_lr_idx, le32_to_cpu(med->med_lcd->lcd_last_epoch),
143                le32_to_cpu(mds->mds_server_data->lsd_start_epoch));
144
145         return rc;
146 }
147
148 /* Called after recovery is done on server */
149 void mds_update_last_epoch(struct obd_device *obd)
150 {
151         struct ptlrpc_request *req;
152         struct mds_obd *mds = &obd->u.mds;
153         __u32 start_epoch;
154
155         /* Increase server epoch after recovery */
156         spin_lock(&mds->mds_transno_lock);
157         start_epoch = lr_epoch(mds->mds_last_transno) + 1;
158         mds->mds_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
159         mds->mds_server_data->lsd_start_epoch = cpu_to_le32(start_epoch);
160         spin_unlock(&mds->mds_transno_lock);
161
162         /* go through delayed reply queue to find all exports participate in
163          * recovery and set new epoch for them */
164         list_for_each_entry(req, &obd->obd_delayed_reply_queue, rq_list) {
165                 LASSERT(!req->rq_export->exp_delayed);
166                 mds_update_client_epoch(req->rq_export);
167         }
168         mds_update_server_data(obd, 1);
169 }
170
171 /* Add client data to the MDS.  We use a bitmap to locate a free space
172  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
173  * Otherwise, we have just read the data from the last_rcvd file and
174  * we know its offset.
175  *
176  * It should not be possible to fail adding an existing client - otherwise
177  * mds_init_server_data() callsite needs to be fixed.
178  */
179 int mds_client_add(struct obd_device *obd, struct obd_export *exp,
180                    int cl_idx, void *localdata)
181 {
182         struct mds_obd *mds = &obd->u.mds;
183         struct mds_export_data *med = &exp->exp_mds_data;
184         unsigned long *bitmap = mds->mds_client_bitmap;
185         int new_client = (cl_idx == -1);
186         int rc = 0;
187         ENTRY;
188
189         LASSERT(bitmap != NULL);
190         LASSERTF(cl_idx > -2, "%d\n", cl_idx);
191
192         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
193         if (!strcmp(med->med_lcd->lcd_uuid, obd->obd_uuid.uuid))
194                 RETURN(0);
195
196         /* VBR: remove expired exports before searching for free slot */
197         if (new_client)
198                 class_disconnect_expired_exports(obd);
199
200         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
201          * there's no need for extra complication here
202          */
203         if (new_client) {
204                 cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
205         repeat:
206                 if (cl_idx >= LR_MAX_CLIENTS ||
207                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
208                         CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n",
209                                cl_idx);
210                         return -EOVERFLOW;
211                 }
212                 if (test_and_set_bit(cl_idx, bitmap)) {
213                         cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
214                                                     cl_idx);
215                         goto repeat;
216                 }
217         } else {
218                 if (test_and_set_bit(cl_idx, bitmap)) {
219                         CERROR("MDS client %d: bit already set in bitmap!!\n",
220                                cl_idx);
221                         LBUG();
222                 }
223         }
224
225         CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
226                cl_idx, med->med_lcd->lcd_uuid);
227
228         med->med_lr_idx = cl_idx;
229         med->med_lr_off = le32_to_cpu(mds->mds_server_data->lsd_client_start) +
230                 (cl_idx * le16_to_cpu(mds->mds_server_data->lsd_client_size));
231         LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
232         mds_export_stats_init(obd, exp, 0, localdata);
233
234         if (new_client) {
235                 struct lvfs_run_ctxt *saved = NULL;
236                 loff_t off = med->med_lr_off;
237                 struct file *file = mds->mds_rcvd_filp;
238                 void *handle;
239
240                 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
241                 if (saved == NULL) {
242                         CERROR("cannot allocate memory for run ctxt\n");
243                         RETURN(-ENOMEM);
244                 }
245
246                 push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
247                 handle = fsfilt_start(obd, file->f_dentry->d_inode,
248                                       FSFILT_OP_SETATTR, NULL);
249                 if (IS_ERR(handle)) {
250                         rc = PTR_ERR(handle);
251                         CERROR("unable to start transaction: rc %d\n", rc);
252                 } else {
253                         /* VBR: set client last_transno as mds_last_transno to
254                          * remember last epoch for this client */
255                         med->med_lcd->lcd_last_epoch =
256                                         mds->mds_server_data->lsd_start_epoch;
257                         exp->exp_last_request_time = cfs_time_current_sec();
258                         /* remember first epoch of client for orphan handling */
259                         med->med_lcd->lcd_first_epoch =
260                                   cpu_to_le32(lr_epoch(mds->mds_last_transno));
261                         rc = fsfilt_add_journal_cb(obd, 0, handle,
262                                                    target_client_add_cb, exp);
263                         if (rc == 0) {
264                                 spin_lock(&exp->exp_lock);
265                                 exp->exp_need_sync = 1;
266                                 spin_unlock(&exp->exp_lock);
267                         }
268                         rc = fsfilt_write_record(obd, file, med->med_lcd,
269                                                  sizeof(*med->med_lcd),
270                                                  &off, rc /* sync if no cb */);
271                         fsfilt_commit(obd, file->f_dentry->d_inode, handle, 0);
272                 }
273
274                 pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
275                 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
276
277                 if (rc)
278                         return rc;
279                 CDEBUG(D_INFO, "wrote client lcd at idx %u off %llu (len %u)\n",
280                        med->med_lr_idx, med->med_lr_off,
281                        (unsigned int)sizeof(*med->med_lcd));
282         }
283         return rc;
284 }
285
286 struct lsd_client_data zero_lcd; /* globals are implicitly zeroed */
287  
288 int mds_client_free(struct obd_export *exp)
289 {
290         struct mds_export_data *med = &exp->exp_mds_data;
291         struct mds_obd *mds = &exp->exp_obd->u.mds;
292         struct obd_device *obd = exp->exp_obd;
293         struct lvfs_run_ctxt *saved = NULL;
294         int rc;
295         loff_t off;
296         ENTRY;
297
298         if (!med->med_lcd)
299                 RETURN(0);
300
301         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
302         if (!strcmp(med->med_lcd->lcd_uuid, obd->obd_uuid.uuid))
303                 GOTO(free, 0);
304
305         CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n",
306                med->med_lr_idx, med->med_lr_off, med->med_lcd->lcd_uuid);
307
308         LASSERT(mds->mds_client_bitmap != NULL);
309
310
311         off = med->med_lr_off;
312
313         /* Don't clear med_lr_idx here as it is likely also unset.  At worst
314          * we leak a client slot that will be cleaned on the next recovery. */
315         if (off <= 0) {
316                 CERROR("%s: client idx %d has offset %lld\n",
317                         obd->obd_name, med->med_lr_idx, off);
318                 GOTO(free, rc = -EINVAL);
319         }
320
321         /* Clear the bit _after_ zeroing out the client so we don't
322            race with mds_client_add and zero out new clients.*/
323         if (!test_bit(med->med_lr_idx, mds->mds_client_bitmap)) {
324                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
325                        med->med_lr_idx);
326                 LBUG();
327         }
328
329         if (!(exp->exp_flags & OBD_OPT_FAILOVER)) {
330                 /* Don't force sync on each disconnect if aborting recovery,
331                  * or it does num_clients * num_osts syncs.  b=17194 */
332                 int need_sync = (!exp->exp_libclient || exp->exp_need_sync) &&
333                                  !(exp->exp_flags & OBD_OPT_ABORT_RECOV);
334                 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
335                 if (saved == NULL) {
336                         CERROR("cannot allocate memory for run ctxt\n");
337                         GOTO(free, rc = -ENOMEM);
338                 }
339                 push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
340                 rc = fsfilt_write_record(obd, mds->mds_rcvd_filp, &zero_lcd,
341                                          sizeof(zero_lcd), &off, 0);
342
343                 /* Make sure the server's last_transno is up to date. Do this
344                  * after the client is freed so we know all the client's
345                  * transactions have been committed. */
346                 if (rc == 0)
347                         mds_update_server_data(exp->exp_obd, need_sync);
348
349                 pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
350
351                 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
352                        "zero out client %s at idx %u/%llu in %s %ssync rc %d\n",
353                        med->med_lcd->lcd_uuid, med->med_lr_idx, med->med_lr_off,
354                        LAST_RCVD, need_sync ? "" : "a", rc);
355         }
356
357         if (!test_and_clear_bit(med->med_lr_idx, mds->mds_client_bitmap)) {
358                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
359                        med->med_lr_idx);
360                 LBUG();
361         }
362
363         EXIT;
364 free:
365         if (saved)
366                 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
367
368         OBD_FREE_PTR(med->med_lcd);
369         med->med_lcd = NULL;
370
371         return 0;
372 }
373
374 static int mds_server_free_data(struct mds_obd *mds)
375 {
376         OBD_FREE(mds->mds_client_bitmap, LR_MAX_CLIENTS / 8);
377         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
378         mds->mds_server_data = NULL;
379
380         return 0;
381 }
382
383 static void mds_add_fake_export(struct obd_device *obd, int num,
384                                 struct file *file)
385 {
386         struct obd_export *exp;
387         struct lvfs_run_ctxt saved;
388         struct obd_device_target *obt = &obd->u.obt;
389         struct lu_export_data *led;
390         unsigned long *bitmap = obt->obt_client_bitmap;
391         struct lsd_client_data *lcd = NULL;
392         unsigned int idx = 0;
393         loff_t off = 0;
394         int rc = 0;
395
396         while (num > 0) {
397                 num--;
398                 if (!lcd) {
399                         OBD_ALLOC_PTR(lcd);
400                         if (!lcd)
401                                 return;
402                 }
403                 idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS, idx);
404                 if (idx >= LR_MAX_CLIENTS) {
405                         CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n", idx);
406                         OBD_FREE_PTR(lcd);
407                         break;
408                 }
409                 if (test_and_set_bit(idx, bitmap)) {
410                         CERROR("Bit %u is set already\n", idx);
411                         continue;
412                 }
413                 off = le32_to_cpu(obt->obt_lsd->lsd_client_start) +
414                       idx * le16_to_cpu(obt->obt_lsd->lsd_client_size);
415
416                 sprintf(lcd->lcd_uuid, "dead-%.16u", idx);
417                 CDEBUG(D_INFO, "Create fake export %s, index %u, offset %lu\n",
418                        lcd->lcd_uuid, idx, (unsigned long)off);
419
420                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
421                 if (IS_ERR(exp)) {
422                         if (PTR_ERR(exp) == -EALREADY) {
423                                 CERROR("Export %s already exists\n",
424                                        lcd->lcd_uuid);
425                         }
426                         CERROR("Failed to create export %lu\n", PTR_ERR(exp));
427                         OBD_FREE_PTR(lcd);
428                         break;
429                 }
430                 LASSERT(exp);
431                 led = &exp->exp_target_data;
432                 led->led_lr_idx = idx;
433                 led->led_lr_off = off;
434                 led->led_lcd = lcd;
435
436                 exp->exp_last_request_time = cfs_time_current_sec();
437                 exp->exp_replay_needed = 1;
438                 exp->exp_connecting = 0;
439                 exp->exp_in_recovery = 0;
440
441                 spin_lock_bh(&obd->obd_processing_task_lock);
442                 obd->obd_recoverable_clients++;
443                 obd->obd_max_recoverable_clients++;
444                 spin_unlock_bh(&obd->obd_processing_task_lock);
445
446                 class_set_export_delayed(exp);
447                 class_export_put(exp);
448
449                 lcd->lcd_last_epoch = cpu_to_le32(1);
450                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
451                 rc = fsfilt_write_record(obd, file, lcd, sizeof(*lcd), &off, 0);
452                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
453                 if (rc) {
454                         CERROR("Failed to create fake client record\n");
455                         OBD_FREE_PTR(lcd);
456                         break;
457                 }
458                 lcd = NULL;
459         }
460 }
461
462 static int mds_init_server_data(struct obd_device *obd, struct file *file)
463 {
464         struct mds_obd *mds = &obd->u.mds;
465         struct lr_server_data *lsd;
466         struct lsd_client_data *lcd = NULL;
467         struct lustre_mount_info *lmi;
468         loff_t off = 0;
469         unsigned long last_rcvd_size = i_size_read(file->f_dentry->d_inode);
470         __u64 mount_count;
471         __u32 start_epoch;
472         int cl_idx, rc = 0;
473         ENTRY;
474
475         /* ensure padding in the struct is the correct size */
476         LASSERT(offsetof(struct lr_server_data, lsd_padding) +
477                 sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
478         LASSERT(offsetof(struct lsd_client_data, lcd_padding) +
479                 sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
480
481         OBD_ALLOC_WAIT(lsd, sizeof(*lsd));
482         if (!lsd)
483                 RETURN(-ENOMEM);
484
485         OBD_ALLOC_WAIT(mds->mds_client_bitmap, LR_MAX_CLIENTS / 8);
486         if (!mds->mds_client_bitmap) {
487                 OBD_FREE(lsd, sizeof(*lsd));
488                 RETURN(-ENOMEM);
489         }
490
491         mds->mds_server_data = lsd;
492
493         if (last_rcvd_size == 0) {
494                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
495
496                 memcpy(lsd->lsd_uuid, obd->obd_uuid.uuid,sizeof(lsd->lsd_uuid));
497                 lsd->lsd_last_transno = 0;
498                 mount_count = lsd->lsd_mount_count = 0;
499                 lsd->lsd_server_size = cpu_to_le32(LR_SERVER_SIZE);
500                 lsd->lsd_client_start = cpu_to_le32(LR_CLIENT_START);
501                 lsd->lsd_client_size = cpu_to_le16(LR_CLIENT_SIZE);
502                 lsd->lsd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
503                 lsd->lsd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID);
504                 lsd->lsd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT);
505         } else {
506                 rc = fsfilt_read_record(obd, file, lsd, sizeof(*lsd), &off);
507                 if (rc) {
508                         CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
509                         GOTO(err_msd, rc);
510                 }
511                 if (strcmp(lsd->lsd_uuid, obd->obd_uuid.uuid) != 0) {
512                         LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s using"
513                                            " the wrong disk %s. Were the /dev/ "
514                                            "assignments rearranged?\n",
515                                            obd->obd_uuid.uuid, lsd->lsd_uuid);
516                         GOTO(err_msd, rc = -EINVAL);
517                 }
518                 lsd->lsd_feature_compat |= cpu_to_le32(OBD_COMPAT_MDT);
519                 /* COMPAT_146 */
520                 /* Assume old last_rcvd format unless I_C_LR is set */
521                 if (!(lsd->lsd_feature_incompat &
522                       cpu_to_le32(OBD_INCOMPAT_COMMON_LR)))
523                         lsd->lsd_mount_count = lsd->lsd_compat14;
524                 /* end COMPAT_146 */
525                 mount_count = le64_to_cpu(lsd->lsd_mount_count);
526         }
527
528         if (lsd->lsd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
529                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
530                        obd->obd_name, le32_to_cpu(lsd->lsd_feature_incompat) &
531                        ~MDT_INCOMPAT_SUPP);
532                 GOTO(err_msd, rc = -EINVAL);
533         }
534         if (lsd->lsd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) {
535                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
536                        obd->obd_name, le32_to_cpu(lsd->lsd_feature_rocompat) &
537                        ~MDT_ROCOMPAT_SUPP);
538                 /* Do something like remount filesystem read-only */
539                 GOTO(err_msd, rc = -EINVAL);
540         }
541         /* evict all clients as it is first boot with 2.0 last_rcvd */
542         if (lsd->lsd_feature_compat & cpu_to_le32(OBD_COMPAT_20)) {
543                 LCONSOLE_WARN("Mounting %s at first time on 2.0 FS, remove all"
544                               " clients for interop needs\n", obd->obd_name);
545                 simple_truncate(mds->mds_vfsmnt->mnt_sb->s_root,
546                                 mds->mds_vfsmnt, LAST_RCVD,
547                                 lsd->lsd_client_start);
548                 last_rcvd_size = lsd->lsd_client_start;
549                 lsd->lsd_feature_compat &= ~cpu_to_le32(OBD_COMPAT_20);
550         }
551
552         target_trans_table_init(obd);
553         mds->mds_last_transno = le64_to_cpu(lsd->lsd_last_transno);
554         start_epoch = le32_to_cpu(lsd->lsd_start_epoch);
555
556         CDEBUG(D_INODE, "%s: server start_epoch: %#x\n",
557                obd->obd_name, start_epoch);
558         CDEBUG(D_INODE, "%s: server last_transno: "LPX64"\n",
559                obd->obd_name, mds->mds_last_transno);
560         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
561                obd->obd_name, mount_count + 1);
562         CDEBUG(D_INODE, "%s: server data size: %u\n",
563                obd->obd_name, le32_to_cpu(lsd->lsd_server_size));
564         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
565                obd->obd_name, le32_to_cpu(lsd->lsd_client_start));
566         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
567                obd->obd_name, le32_to_cpu(lsd->lsd_client_size));
568         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
569                obd->obd_name, last_rcvd_size);
570         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
571                last_rcvd_size <= le32_to_cpu(lsd->lsd_client_start) ? 0 :
572                (last_rcvd_size - le32_to_cpu(lsd->lsd_client_start)) /
573                 le16_to_cpu(lsd->lsd_client_size));
574
575         if (!lsd->lsd_server_size || !lsd->lsd_client_start ||
576             !lsd->lsd_client_size) {
577                 CERROR("Bad last_rcvd contents!\n");
578                 GOTO(err_msd, rc = -EINVAL);
579         }
580
581         /* When we do a clean MDS shutdown, we save the last_transno into
582          * the header.  If we find clients with higher last_transno values
583          * then those clients may need recovery done. */
584         for (cl_idx = 0, off = le32_to_cpu(lsd->lsd_client_start);
585              off < last_rcvd_size; cl_idx++) {
586                 __u64 last_transno;
587                 __u32 last_epoch;
588                 struct obd_export *exp;
589                 struct mds_export_data *med;
590
591                 if (!lcd) {
592                         OBD_ALLOC_WAIT(lcd, sizeof(*lcd));
593                         if (!lcd)
594                                 GOTO(err_client, rc = -ENOMEM);
595                 }
596
597                 /* Don't assume off is incremented properly by
598                  * fsfilt_read_record(), in case sizeof(*lcd)
599                  * isn't the same as lsd->lsd_client_size.  */
600                 off = le32_to_cpu(lsd->lsd_client_start) +
601                         cl_idx * le16_to_cpu(lsd->lsd_client_size);
602                 rc = fsfilt_read_record(obd, file, lcd, sizeof(*lcd), &off);
603                 if (rc) {
604                         CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
605                                LAST_RCVD, cl_idx, off, rc);
606                         break; /* read error shouldn't cause startup to fail */
607                 }
608
609                 if (lcd->lcd_uuid[0] == '\0') {
610                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
611                                cl_idx);
612                         continue;
613                 }
614
615                 last_transno = lsd_last_transno(lcd);
616                 last_epoch = le32_to_cpu(lcd->lcd_last_epoch);
617
618                 /* These exports are cleaned up by mds_disconnect(), so they
619                  * need to be set up like real exports as mds_connect() does.
620                  */
621                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
622                        " srv lr: "LPU64" lx: "LPU64"\n", lcd->lcd_uuid, cl_idx,
623                        last_transno, le64_to_cpu(lsd->lsd_last_transno),
624                        le64_to_cpu(lcd->lcd_last_xid));
625
626                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
627                 if (IS_ERR(exp)) {
628                         if (PTR_ERR(exp) == -EALREADY) {
629                                 /* export already exists, zero out this one */
630                                 lcd->lcd_uuid[0] = '\0';
631                         } else {
632                                 GOTO(err_client, rc = PTR_ERR(exp));
633                         }
634                 } else {
635                         med = &exp->exp_mds_data;
636                         med->med_lcd = lcd;
637                         rc = mds_client_add(obd, exp, cl_idx, NULL);
638                         /* can't fail for existing client */
639                         LASSERTF(rc == 0, "rc = %d\n", rc);
640
641                         /* VBR: set export last committed version */
642                         exp->exp_last_committed = last_transno;
643                         /* read last time from disk */
644                         exp->exp_last_request_time = target_trans_table_last_time(exp);
645                         lcd = NULL;
646
647                         spin_lock(&exp->exp_lock);
648                         exp->exp_replay_needed = 1;
649                         exp->exp_connecting = 0;
650                         exp->exp_in_recovery = 0;
651                         spin_unlock(&exp->exp_lock);
652
653                         spin_lock_bh(&obd->obd_processing_task_lock);
654                         obd->obd_recoverable_clients++;
655                         obd->obd_max_recoverable_clients++;
656                         spin_unlock_bh(&obd->obd_processing_task_lock);
657
658                         /* VBR: if epoch too old mark export as delayed,
659                          * if epoch is zero then client is pre-vbr one */
660                         if (start_epoch > last_epoch && last_epoch != 0)
661                                 class_set_export_delayed(exp);
662                         class_export_put(exp);
663                 }
664
665                 /* Need to check last_rcvd even for duplicated exports. */
666                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPX64","
667                        "last_epoch %#x\n", cl_idx, last_transno, last_epoch);
668
669                 if (last_transno > mds->mds_last_transno)
670                         mds->mds_last_transno = last_transno;
671         }
672
673         if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_TGT_FAKE_EXP))) {
674                 mds_add_fake_export(obd, obd_fail_val, file);
675         }
676
677         if (lcd)
678                 OBD_FREE_PTR(lcd);
679
680         obd->obd_last_committed = mds->mds_last_transno;
681
682         if (obd->obd_recoverable_clients) {
683                 CWARN("RECOVERY: service %s, %d recoverable clients, "
684                       "%d delayed clients, last_transno "LPU64"\n",
685                       obd->obd_name, obd->obd_recoverable_clients,
686                       obd->obd_delayed_clients, mds->mds_last_transno);
687                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
688                 obd->obd_recovering = 1;
689                 obd->obd_recovery_start = 0;
690                 obd->obd_recovery_end = 0;
691         } else {
692                 LASSERT(!obd->obd_recovering);
693                 /* VBR: update boot epoch after recovery */
694                 mds_update_last_epoch(obd);
695         }
696
697         obd->obd_recovery_timeout = OBD_RECOVERY_TIME_SOFT;
698         obd->obd_recovery_time_hard = OBD_RECOVERY_TIME_HARD;
699
700         lmi = server_find_mount_locked(obd->obd_name);
701         if (lmi) {
702                 struct lustre_sb_info *lsi = s2lsi(lmi->lmi_sb);
703
704                 if (lsi->lsi_lmd && lsi->lsi_lmd->lmd_recovery_time_soft)
705                         obd->obd_recovery_timeout =
706                                 lsi->lsi_lmd->lmd_recovery_time_soft;
707
708                 if (lsi->lsi_lmd && lsi->lsi_lmd->lmd_recovery_time_hard)
709                         obd->obd_recovery_time_hard =
710                                 lsi->lsi_lmd->lmd_recovery_time_hard;
711         }
712
713         mds->mds_mount_count = mount_count + 1;
714         lsd->lsd_mount_count = lsd->lsd_compat14 =
715                 cpu_to_le64(mds->mds_mount_count);
716
717         /* save it, so mount count and last_transno is current */
718         rc = mds_update_server_data(obd, 1);
719         if (rc)
720                 GOTO(err_client, rc);
721
722         RETURN(0);
723
724 err_client:
725         class_disconnect_exports(obd);
726 err_msd:
727         mds_server_free_data(mds);
728         RETURN(rc);
729 }
730
731 int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
732 {
733         struct mds_obd *mds = &obd->u.mds;
734         struct lvfs_run_ctxt *saved = NULL;
735         struct dentry *dentry;
736         struct file *file;
737         int rc;
738         ENTRY;
739
740         OBD_FAIL_RETURN(OBD_FAIL_MDS_FS_SETUP, -ENOENT);
741
742         rc = cleanup_group_info();
743         if (rc)
744                 RETURN(rc);
745
746         OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
747         if (saved == NULL) {
748                 CERROR("cannot allocate memory for run ctxt\n");
749                 RETURN(-ENOMEM);
750         }
751
752         mds->mds_vfsmnt = mnt;
753         /* why not mnt->mnt_sb instead of mnt->mnt_root->d_inode->i_sb? */
754         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
755         obd->u.obt.obt_stale_export_age = STALE_EXPORT_MAXTIME_DEFAULT;
756         spin_lock_init(&obd->u.obt.obt_trans_table_lock);
757
758         rc = fsfilt_setup(obd, obd->u.obt.obt_sb);
759         if (rc)
760                 RETURN(rc);
761
762         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
763         obd->obd_lvfs_ctxt.pwdmnt = mnt;
764         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
765         obd->obd_lvfs_ctxt.fs = get_ds();
766         obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops;
767
768         /* setup the directory tree */
769         push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
770         dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, "ROOT", 0755, 0);
771         if (IS_ERR(dentry)) {
772                 rc = PTR_ERR(dentry);
773                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
774                 GOTO(err_pop, rc);
775         }
776
777         mds->mds_rootfid.id = dentry->d_inode->i_ino;
778         mds->mds_rootfid.generation = dentry->d_inode->i_generation;
779         mds->mds_rootfid.f_type = S_IFDIR;
780
781         dput(dentry);
782
783         dentry = lookup_one_len("__iopen__", cfs_fs_pwd(current->fs),
784                                 strlen("__iopen__"));
785         if (IS_ERR(dentry)) {
786                 rc = PTR_ERR(dentry);
787                 CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
788                 GOTO(err_pop, rc);
789         }
790
791         mds->mds_fid_de = dentry;
792         if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
793                 rc = -ENOENT;
794                 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
795                 GOTO(err_fid, rc);
796         }
797
798         dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, "PENDING", 0777, 1);
799         if (IS_ERR(dentry)) {
800                 rc = PTR_ERR(dentry);
801                 CERROR("cannot create PENDING directory: rc = %d\n", rc);
802                 GOTO(err_fid, rc);
803         }
804         mds->mds_pending_dir = dentry;
805
806         /* COMPAT_146 */
807         dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, MDT_LOGS_DIR, 0777, 1);
808         if (IS_ERR(dentry)) {
809                 rc = PTR_ERR(dentry);
810                 CERROR("cannot create %s directory: rc = %d\n",
811                        MDT_LOGS_DIR, rc);
812                 GOTO(err_pending, rc);
813         }
814         mds->mds_logs_dir = dentry;
815         /* end COMPAT_146 */
816
817         dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, "OBJECTS", 0777, 1);
818         if (IS_ERR(dentry)) {
819                 rc = PTR_ERR(dentry);
820                 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
821                 GOTO(err_logs, rc);
822         }
823         mds->mds_objects_dir = dentry;
824
825         /* open and test the last rcvd file */
826         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
827         if (IS_ERR(file)) {
828                 rc = PTR_ERR(file);
829                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
830                 GOTO(err_objects, rc = PTR_ERR(file));
831         }
832         mds->mds_rcvd_filp = file;
833         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
834                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
835                        file->f_dentry->d_inode->i_mode);
836                 GOTO(err_last_rcvd, rc = -ENOENT);
837         }
838
839         rc = mds_init_server_data(obd, file);
840         if (rc) {
841                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
842                 GOTO(err_last_rcvd, rc);
843         }
844
845         rc = mds_lov_init_objids(obd);
846         if (rc != 0) {
847                CERROR("cannot init lov objid rc = %d\n", rc);
848                GOTO(err_client, rc );
849         }
850
851         /* open and test the check io file junk */
852         file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644);
853         if (IS_ERR(file)) {
854                 rc = PTR_ERR(file);
855                 CERROR("cannot open/create %s file: rc = %d\n",HEALTH_CHECK,rc);
856                 GOTO(err_lov_objid, rc = PTR_ERR(file));
857         }
858         mds->mds_obt.obt_health_check_filp = file;
859         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
860                 CERROR("%s is not a regular file!: mode = %o\n", HEALTH_CHECK,
861                        file->f_dentry->d_inode->i_mode);
862                 GOTO(err_health_check, rc = -ENOENT);
863         }
864         rc = lvfs_check_io_health(obd, file);
865         if (rc)
866                 GOTO(err_health_check, rc);
867 err_pop:
868         pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
869         OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
870         return rc;
871
872 err_health_check:
873         if (mds->mds_obt.obt_health_check_filp &&
874             filp_close(mds->mds_obt.obt_health_check_filp, 0))
875                 CERROR("can't close %s after error\n", HEALTH_CHECK);
876 err_lov_objid:
877          mds_lov_destroy_objids(obd);
878 err_client:
879         class_disconnect_exports(obd);
880 err_last_rcvd:
881         if (mds->mds_rcvd_filp && filp_close(mds->mds_rcvd_filp, 0))
882                 CERROR("can't close %s after error\n", LAST_RCVD);
883 err_objects:
884         dput(mds->mds_objects_dir);
885 err_logs:
886         dput(mds->mds_logs_dir);
887 err_pending:
888         dput(mds->mds_pending_dir);
889 err_fid:
890         dput(mds->mds_fid_de);
891         goto err_pop;
892 }
893
894 int mds_fs_cleanup(struct obd_device *obd)
895 {
896         struct mds_obd *mds = &obd->u.mds;
897         struct lvfs_run_ctxt *saved = NULL;
898         int rc = 0;
899
900         OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
901         if (saved == NULL) {
902                 CERROR("cannot allocate memory for run ctxt\n");
903                 RETURN(-ENOMEM);
904         }
905
906         if (obd->obd_fail)
907                 LCONSOLE_WARN("%s: shutting down for failover; client state "
908                               "will be preserved.\n", obd->obd_name);
909
910         class_disconnect_exports(obd); /* cleans up client info too */
911         mds_server_free_data(mds);
912
913         push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
914         if (mds->mds_rcvd_filp) {
915                 rc = filp_close(mds->mds_rcvd_filp, 0);
916                 mds->mds_rcvd_filp = NULL;
917                 if (rc)
918                         CERROR("%s file won't close, rc=%d\n", LAST_RCVD, rc);
919         }
920
921         mds_lov_destroy_objids(obd);
922
923         if (mds->mds_obt.obt_health_check_filp) {
924                 rc = filp_close(mds->mds_obt.obt_health_check_filp, 0);
925                 mds->mds_obt.obt_health_check_filp = NULL;
926                 if (rc)
927                         CERROR("%s file won't close, rc=%d\n", HEALTH_CHECK,rc);
928         }
929         if (mds->mds_objects_dir != NULL) {
930                 l_dput(mds->mds_objects_dir);
931                 mds->mds_objects_dir = NULL;
932         }
933         if (mds->mds_logs_dir) {
934                 l_dput(mds->mds_logs_dir);
935                 mds->mds_logs_dir = NULL;
936         }
937         if (mds->mds_pending_dir) {
938                 l_dput(mds->mds_pending_dir);
939                 mds->mds_pending_dir = NULL;
940         }
941
942         lquota_fs_cleanup(mds_quota_interface_ref, obd);
943
944         pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
945         OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
946         shrink_dcache_parent(mds->mds_fid_de);
947         dput(mds->mds_fid_de);
948         LL_DQUOT_OFF(obd->u.obt.obt_sb, 0);
949
950         return rc;
951 }
952
953 /* Creates an object with the same name as its fid.  Because this is not at all
954  * performance sensitive, it is accomplished by creating a file, checking the
955  * fid, and renaming it. */
956 int mds_obd_create(struct obd_export *exp, struct obdo *oa,
957                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
958 {
959         struct mds_obd *mds = &exp->exp_obd->u.mds;
960         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
961         unsigned int tmpname = ll_rand();
962         struct dentry *dchild, *new_child;
963         struct lvfs_dentry_params dp = LVFS_DENTRY_PARAMS_INIT;
964         struct lvfs_run_ctxt *saved = NULL;
965         char fidname[LL_FID_NAMELEN];
966         void *handle;
967         struct lvfs_ucred ucred = { 0 };
968         int rc = 0, err, namelen;
969         ENTRY;
970
971         OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
972         if (saved == NULL) {
973                 CERROR("cannot allocate memory for run ctxt\n");
974                 RETURN(-ENOMEM);
975         }
976
977         /* the owner of object file should always be root */
978         cap_raise(ucred.luc_cap, CAP_SYS_RESOURCE);
979
980         push_ctxt(saved, &exp->exp_obd->obd_lvfs_ctxt, &ucred);
981
982         sprintf(fidname, "%u.%u", tmpname, current->pid);
983         dchild = lookup_one_len(fidname, mds->mds_objects_dir, strlen(fidname));
984         if (IS_ERR(dchild)) {
985                 CERROR("getting neg dentry for obj: %u\n", tmpname);
986                 GOTO(out_pop, rc = PTR_ERR(dchild));
987         }
988         if (dchild->d_inode != NULL) {
989                 CERROR("impossible non-negative obj dentry: %u\n", tmpname);
990                 LBUG();
991         }
992
993         dchild->d_fsdata = (void *)&dp;
994         dp.ldp_ptr   = (void *)DP_LASTGROUP_REVERSE;
995
996         LOCK_INODE_MUTEX(parent_inode);
997         rc = ll_vfs_create(parent_inode, dchild, S_IFREG | 0666, NULL);
998
999         oa->o_id = dchild->d_inode->i_ino;
1000         oa->o_generation = dchild->d_inode->i_generation;
1001         namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
1002
1003         new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
1004
1005         if (IS_ERR(new_child)) {
1006                 CERROR("getting neg dentry for obj rename: %d\n", rc);
1007                 GOTO(out_dput, rc = PTR_ERR(new_child));
1008         }
1009         if (new_child->d_inode != NULL) {
1010                 CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
1011                        oa->o_id, oa->o_generation);
1012                 LBUG();
1013         }
1014
1015         handle = fsfilt_start(exp->exp_obd, mds->mds_objects_dir->d_inode,
1016                               FSFILT_OP_RENAME, NULL);
1017         if (IS_ERR(handle))
1018                 GOTO(out_dput2, rc = PTR_ERR(handle));
1019
1020         lock_kernel();
1021         rc = ll_vfs_rename(parent_inode, dchild, mds->mds_vfsmnt,
1022                            parent_inode, new_child, mds->mds_vfsmnt);
1023         unlock_kernel();
1024         if (rc)
1025                 CERROR("error renaming new object "LPU64":%u: rc %d\n",
1026                        oa->o_id, oa->o_generation, rc);
1027
1028         err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
1029                             handle, 0);
1030         if (!err)
1031                 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
1032         else if (!rc)
1033                 rc = err;
1034 out_dput2:
1035         dput(new_child);
1036 out_dput:
1037         dput(dchild);
1038         UNLOCK_INODE_MUTEX(parent_inode);
1039 out_pop:
1040         pop_ctxt(saved, &exp->exp_obd->obd_lvfs_ctxt, &ucred);
1041         OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
1042         RETURN(rc);
1043 }
1044
1045 int mds_obd_destroy(struct obd_export *exp, struct obdo *oa,
1046                     struct lov_stripe_md *ea, struct obd_trans_info *oti,
1047                     struct obd_export *md_exp)
1048 {
1049         struct mds_obd *mds = &exp->exp_obd->u.mds;
1050         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
1051         struct obd_device *obd = exp->exp_obd;
1052         struct lvfs_run_ctxt *saved = NULL;
1053         struct lvfs_ucred ucred = { 0 };
1054         char fidname[LL_FID_NAMELEN];
1055         struct inode *inode = NULL;
1056         struct dentry *de;
1057         void *handle;
1058         int err, namelen, rc = 0;
1059         ENTRY;
1060
1061         OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
1062         if (saved == NULL) {
1063                 CERROR("cannot allocate memory for run ctxt\n");
1064                 RETURN(-ENOMEM);
1065         }
1066
1067         cap_raise(ucred.luc_cap, CAP_SYS_RESOURCE);
1068         push_ctxt(saved, &obd->obd_lvfs_ctxt, &ucred);
1069
1070         namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
1071
1072         LOCK_INODE_MUTEX(parent_inode);
1073         de = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
1074         if (IS_ERR(de)) {
1075                 rc = IS_ERR(de);
1076                 de = NULL;
1077                 CERROR("error looking up object "LPU64" %s: rc %d\n",
1078                        oa->o_id, fidname, rc);
1079                 GOTO(out_dput, rc);
1080         }
1081         if (de->d_inode == NULL) {
1082                 CERROR("destroying non-existent object "LPU64" %s: rc %d\n",
1083                        oa->o_id, fidname, rc);
1084                 GOTO(out_dput, rc = -ENOENT);
1085         }
1086
1087         /* Stripe count is 1 here since this is some MDS specific stuff
1088            that is unlinked, not spanned across multiple OSTs */
1089         handle = fsfilt_start_log(obd, mds->mds_objects_dir->d_inode,
1090                                   FSFILT_OP_UNLINK, oti, 1);
1091
1092         if (IS_ERR(handle))
1093                 GOTO(out_dput, rc = PTR_ERR(handle));
1094
1095         /* take a reference to protect inode from truncation within
1096            vfs_unlink() context. bug 10409 */
1097         inode = de->d_inode;
1098         atomic_inc(&inode->i_count);
1099         rc = ll_vfs_unlink(mds->mds_objects_dir->d_inode, de, mds->mds_vfsmnt);
1100         if (rc)
1101                 CERROR("error destroying object "LPU64":%u: rc %d\n",
1102                        oa->o_id, oa->o_generation, rc);
1103
1104         err = fsfilt_commit(obd, mds->mds_objects_dir->d_inode, handle, 0);
1105         if (err && !rc)
1106                 rc = err;
1107 out_dput:
1108         if (de != NULL)
1109                 l_dput(de);
1110         UNLOCK_INODE_MUTEX(parent_inode);
1111
1112         if (inode)
1113                 iput(inode);
1114
1115         pop_ctxt(saved, &obd->obd_lvfs_ctxt, &ucred);
1116         OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
1117         RETURN(rc);
1118 }