1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 * Lustre Metadata Server (MDS) filesystem interface code
40 * Author: Andreas Dilger <adilger@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #include <linux/kmod.h>
50 #include <linux/version.h>
51 #include <linux/sched.h>
52 #include <lustre_quota.h>
53 #include <linux/mount.h>
54 #include <lustre_mds.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lustre_lib.h>
58 #include <lustre_fsfilt.h>
59 #include <lustre_disk.h>
60 #include <libcfs/list.h>
62 #include "mds_internal.h"
65 int mds_export_stats_init(struct obd_device *obd,
66 struct obd_export *exp,
69 lnet_nid_t *client_nid = localdata;
70 int rc, num_stats, newnid = 0;
72 rc = lprocfs_exp_setup(exp, client_nid, &newnid);
74 /* Mask error for already created
82 struct nid_stat *tmp = exp->exp_nid_stats;
85 num_stats = (sizeof(*obd->obd_type->typ_ops) / sizeof(void *)) +
87 tmp->nid_stats = lprocfs_alloc_stats(num_stats,
88 LPROCFS_STATS_FLAG_NOPERCPU);
89 if (tmp->nid_stats == NULL)
92 lprocfs_init_ops_stats(LPROC_MDS_LAST, tmp->nid_stats);
93 rc = lprocfs_register_stats(tmp->nid_proc, "stats",
98 mds_stats_counter_init(tmp->nid_stats);
100 /* Always add in ldlm_stats */
101 tmp->nid_ldlm_stats = lprocfs_alloc_stats(LDLM_LAST_OPC -
104 if (tmp->nid_ldlm_stats == NULL)
107 lprocfs_init_ldlm_stats(tmp->nid_ldlm_stats);
109 rc = lprocfs_register_stats(tmp->nid_proc, "ldlm_stats",
110 tmp->nid_ldlm_stats);
118 /* VBR: to determine the delayed client the lcd should be updated for each new
120 int mds_update_client_epoch(struct obd_export *exp)
122 struct mds_export_data *med = &exp->exp_mds_data;
123 struct mds_obd *mds = &exp->exp_obd->u.mds;
124 struct lvfs_run_ctxt saved;
125 loff_t off = med->med_lr_off;
128 /* VBR: set client last_epoch to current epoch */
129 if (le32_to_cpu(med->med_lcd->lcd_last_epoch) >=
130 le32_to_cpu(mds->mds_server_data->lsd_start_epoch))
133 med->med_lcd->lcd_last_epoch = mds->mds_server_data->lsd_start_epoch;
134 push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
135 rc = fsfilt_write_record(exp->exp_obd, mds->mds_rcvd_filp,
136 med->med_lcd, sizeof(*med->med_lcd), &off,
138 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
140 CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
141 med->med_lr_idx, le32_to_cpu(med->med_lcd->lcd_last_epoch),
142 le32_to_cpu(mds->mds_server_data->lsd_start_epoch));
147 /* Called after recovery is done on server */
148 void mds_update_last_epoch(struct obd_device *obd)
150 struct ptlrpc_request *req;
151 struct mds_obd *mds = &obd->u.mds;
154 /* Increase server epoch after recovery */
155 spin_lock(&mds->mds_transno_lock);
156 start_epoch = lr_epoch(mds->mds_last_transno) + 1;
157 mds->mds_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
158 mds->mds_server_data->lsd_start_epoch = cpu_to_le32(start_epoch);
159 spin_unlock(&mds->mds_transno_lock);
161 /* go through delayed reply queue to find all exports participate in
162 * recovery and set new epoch for them */
163 list_for_each_entry(req, &obd->obd_delayed_reply_queue, rq_list) {
164 LASSERT(!req->rq_export->exp_delayed);
165 mds_update_client_epoch(req->rq_export);
167 mds_update_server_data(obd, 1);
170 /* Add client data to the MDS. We use a bitmap to locate a free space
171 * in the last_rcvd file if cl_off is -1 (i.e. a new client).
172 * Otherwise, we have just read the data from the last_rcvd file and
173 * we know its offset.
175 * It should not be possible to fail adding an existing client - otherwise
176 * mds_init_server_data() callsite needs to be fixed.
178 int mds_client_add(struct obd_device *obd, struct obd_export *exp,
179 int cl_idx, void *localdata)
181 struct mds_obd *mds = &obd->u.mds;
182 struct mds_export_data *med = &exp->exp_mds_data;
183 unsigned long *bitmap = mds->mds_client_bitmap;
184 int new_client = (cl_idx == -1);
188 LASSERT(bitmap != NULL);
189 LASSERTF(cl_idx > -2, "%d\n", cl_idx);
191 /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
192 if (!strcmp(med->med_lcd->lcd_uuid, obd->obd_uuid.uuid))
195 /* VBR: remove expired exports before searching for free slot */
197 class_disconnect_expired_exports(obd);
199 /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
200 * there's no need for extra complication here
203 cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
205 if (cl_idx >= LR_MAX_CLIENTS ||
206 OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
207 CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n",
211 if (test_and_set_bit(cl_idx, bitmap)) {
212 cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
217 if (test_and_set_bit(cl_idx, bitmap)) {
218 CERROR("MDS client %d: bit already set in bitmap!!\n",
224 CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
225 cl_idx, med->med_lcd->lcd_uuid);
227 med->med_lr_idx = cl_idx;
228 med->med_lr_off = le32_to_cpu(mds->mds_server_data->lsd_client_start) +
229 (cl_idx * le16_to_cpu(mds->mds_server_data->lsd_client_size));
230 LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
231 mds_export_stats_init(obd, exp, localdata);
234 struct lvfs_run_ctxt *saved = NULL;
235 loff_t off = med->med_lr_off;
236 struct file *file = mds->mds_rcvd_filp;
239 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
241 CERROR("cannot allocate memory for run ctxt\n");
245 push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
246 handle = fsfilt_start(obd, file->f_dentry->d_inode,
247 FSFILT_OP_SETATTR, NULL);
248 if (IS_ERR(handle)) {
249 rc = PTR_ERR(handle);
250 CERROR("unable to start transaction: rc %d\n", rc);
252 /* VBR: set client last_transno as mds_last_transno to
253 * remember last epoch for this client */
254 med->med_lcd->lcd_last_epoch =
255 mds->mds_server_data->lsd_start_epoch;
256 exp->exp_last_request_time = cfs_time_current_sec();
257 /* remember first epoch of client for orphan handling */
258 med->med_lcd->lcd_first_epoch =
259 cpu_to_le32(lr_epoch(mds->mds_last_transno));
260 rc = fsfilt_add_journal_cb(obd, 0, handle,
261 target_client_add_cb, exp);
263 spin_lock(&exp->exp_lock);
264 exp->exp_need_sync = 1;
265 spin_unlock(&exp->exp_lock);
267 rc = fsfilt_write_record(obd, file, med->med_lcd,
268 sizeof(*med->med_lcd),
269 &off, rc /* sync if no cb */);
270 fsfilt_commit(obd, file->f_dentry->d_inode, handle, 0);
273 pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
274 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
278 CDEBUG(D_INFO, "wrote client lcd at idx %u off %llu (len %u)\n",
279 med->med_lr_idx, med->med_lr_off,
280 (unsigned int)sizeof(*med->med_lcd));
285 struct lsd_client_data zero_lcd; /* globals are implicitly zeroed */
287 int mds_client_free(struct obd_export *exp)
289 struct mds_export_data *med = &exp->exp_mds_data;
290 struct mds_obd *mds = &exp->exp_obd->u.mds;
291 struct obd_device *obd = exp->exp_obd;
292 struct lvfs_run_ctxt *saved = NULL;
300 /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
301 if (!strcmp(med->med_lcd->lcd_uuid, obd->obd_uuid.uuid))
304 CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n",
305 med->med_lr_idx, med->med_lr_off, med->med_lcd->lcd_uuid);
307 LASSERT(mds->mds_client_bitmap != NULL);
310 off = med->med_lr_off;
312 /* Don't clear med_lr_idx here as it is likely also unset. At worst
313 * we leak a client slot that will be cleaned on the next recovery. */
315 CERROR("%s: client idx %d has offset %lld\n",
316 obd->obd_name, med->med_lr_idx, off);
317 GOTO(free, rc = -EINVAL);
320 /* Clear the bit _after_ zeroing out the client so we don't
321 race with mds_client_add and zero out new clients.*/
322 if (!test_bit(med->med_lr_idx, mds->mds_client_bitmap)) {
323 CERROR("MDS client %u: bit already clear in bitmap!!\n",
328 if (!(exp->exp_flags & OBD_OPT_FAILOVER)) {
329 /* Don't force sync on each disconnect if aborting recovery,
330 * or it does num_clients * num_osts syncs. b=17194 */
331 int need_sync = (!exp->exp_libclient || exp->exp_need_sync) &&
332 !(exp->exp_flags & OBD_OPT_ABORT_RECOV);
333 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
335 CERROR("cannot allocate memory for run ctxt\n");
336 GOTO(free, rc = -ENOMEM);
338 push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
339 rc = fsfilt_write_record(obd, mds->mds_rcvd_filp, &zero_lcd,
340 sizeof(zero_lcd), &off, 0);
342 /* Make sure the server's last_transno is up to date. Do this
343 * after the client is freed so we know all the client's
344 * transactions have been committed. */
346 mds_update_server_data(exp->exp_obd, need_sync);
348 pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
350 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
351 "zero out client %s at idx %u/%llu in %s %ssync rc %d\n",
352 med->med_lcd->lcd_uuid, med->med_lr_idx, med->med_lr_off,
353 LAST_RCVD, need_sync ? "" : "a", rc);
356 if (!test_and_clear_bit(med->med_lr_idx, mds->mds_client_bitmap)) {
357 CERROR("MDS client %u: bit already clear in bitmap!!\n",
365 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
367 OBD_FREE_PTR(med->med_lcd);
373 static int mds_server_free_data(struct mds_obd *mds)
375 OBD_FREE(mds->mds_client_bitmap, LR_MAX_CLIENTS / 8);
376 OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
377 mds->mds_server_data = NULL;
382 static void mds_add_fake_export(struct obd_device *obd, int num,
385 struct obd_export *exp;
386 struct lvfs_run_ctxt saved;
387 struct obd_device_target *obt = &obd->u.obt;
388 struct lu_export_data *led;
389 unsigned long *bitmap = obt->obt_client_bitmap;
390 struct lsd_client_data *lcd = NULL;
391 unsigned int idx = 0;
402 idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS, idx);
403 if (idx >= LR_MAX_CLIENTS) {
404 CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n", idx);
408 if (test_and_set_bit(idx, bitmap)) {
409 CERROR("Bit %u is set already\n", idx);
412 off = le32_to_cpu(obt->obt_lsd->lsd_client_start) +
413 idx * le16_to_cpu(obt->obt_lsd->lsd_client_size);
415 sprintf(lcd->lcd_uuid, "dead-%.16u", idx);
416 CDEBUG(D_INFO, "Create fake export %s, index %u, offset %lu\n",
417 lcd->lcd_uuid, idx, (unsigned long)off);
419 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
421 if (PTR_ERR(exp) == -EALREADY) {
422 CERROR("Export %s already exists\n",
425 CERROR("Failed to create export %lu\n", PTR_ERR(exp));
430 led = &exp->exp_target_data;
431 led->led_lr_idx = idx;
432 led->led_lr_off = off;
435 exp->exp_last_request_time = cfs_time_current_sec();
436 exp->exp_replay_needed = 1;
437 exp->exp_connecting = 0;
438 exp->exp_in_recovery = 0;
440 spin_lock_bh(&obd->obd_processing_task_lock);
441 obd->obd_recoverable_clients++;
442 obd->obd_max_recoverable_clients++;
443 spin_unlock_bh(&obd->obd_processing_task_lock);
445 class_set_export_delayed(exp);
446 class_export_put(exp);
448 lcd->lcd_last_epoch = cpu_to_le32(1);
449 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
450 rc = fsfilt_write_record(obd, file, lcd, sizeof(*lcd), &off, 0);
451 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
453 CERROR("Failed to create fake client record\n");
461 static int mds_init_server_data(struct obd_device *obd, struct file *file)
463 struct mds_obd *mds = &obd->u.mds;
464 struct lr_server_data *lsd;
465 struct lsd_client_data *lcd = NULL;
467 unsigned long last_rcvd_size = i_size_read(file->f_dentry->d_inode);
473 /* ensure padding in the struct is the correct size */
474 LASSERT(offsetof(struct lr_server_data, lsd_padding) +
475 sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
476 LASSERT(offsetof(struct lsd_client_data, lcd_padding) +
477 sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
479 OBD_ALLOC_WAIT(lsd, sizeof(*lsd));
483 OBD_ALLOC_WAIT(mds->mds_client_bitmap, LR_MAX_CLIENTS / 8);
484 if (!mds->mds_client_bitmap) {
485 OBD_FREE(lsd, sizeof(*lsd));
489 mds->mds_server_data = lsd;
491 if (last_rcvd_size == 0) {
492 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
494 memcpy(lsd->lsd_uuid, obd->obd_uuid.uuid,sizeof(lsd->lsd_uuid));
495 lsd->lsd_last_transno = 0;
496 mount_count = lsd->lsd_mount_count = 0;
497 lsd->lsd_server_size = cpu_to_le32(LR_SERVER_SIZE);
498 lsd->lsd_client_start = cpu_to_le32(LR_CLIENT_START);
499 lsd->lsd_client_size = cpu_to_le16(LR_CLIENT_SIZE);
500 lsd->lsd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID);
501 lsd->lsd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT);
503 rc = fsfilt_read_record(obd, file, lsd, sizeof(*lsd), &off);
505 CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
508 if (strcmp(lsd->lsd_uuid, obd->obd_uuid.uuid) != 0) {
509 LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s using"
510 " the wrong disk %s. Were the /dev/ "
511 "assignments rearranged?\n",
512 obd->obd_uuid.uuid, lsd->lsd_uuid);
513 GOTO(err_msd, rc = -EINVAL);
516 /* Assume old last_rcvd format unless I_C_LR is set */
517 if (!(lsd->lsd_feature_incompat &
518 cpu_to_le32(OBD_INCOMPAT_COMMON_LR)))
519 lsd->lsd_mount_count = lsd->lsd_compat14;
521 mount_count = le64_to_cpu(lsd->lsd_mount_count);
524 if (lsd->lsd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
525 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
526 obd->obd_name, le32_to_cpu(lsd->lsd_feature_incompat) &
528 GOTO(err_msd, rc = -EINVAL);
530 if (lsd->lsd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) {
531 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
532 obd->obd_name, le32_to_cpu(lsd->lsd_feature_rocompat) &
534 /* Do something like remount filesystem read-only */
535 GOTO(err_msd, rc = -EINVAL);
538 lsd->lsd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
540 target_trans_table_init(obd);
541 mds->mds_last_transno = le64_to_cpu(lsd->lsd_last_transno);
542 start_epoch = le32_to_cpu(lsd->lsd_start_epoch);
544 CDEBUG(D_INODE, "%s: server start_epoch: %#x\n",
545 obd->obd_name, start_epoch);
546 CDEBUG(D_INODE, "%s: server last_transno: "LPX64"\n",
547 obd->obd_name, mds->mds_last_transno);
548 CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
549 obd->obd_name, mount_count + 1);
550 CDEBUG(D_INODE, "%s: server data size: %u\n",
551 obd->obd_name, le32_to_cpu(lsd->lsd_server_size));
552 CDEBUG(D_INODE, "%s: per-client data start: %u\n",
553 obd->obd_name, le32_to_cpu(lsd->lsd_client_start));
554 CDEBUG(D_INODE, "%s: per-client data size: %u\n",
555 obd->obd_name, le32_to_cpu(lsd->lsd_client_size));
556 CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
557 obd->obd_name, last_rcvd_size);
558 CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
559 last_rcvd_size <= le32_to_cpu(lsd->lsd_client_start) ? 0 :
560 (last_rcvd_size - le32_to_cpu(lsd->lsd_client_start)) /
561 le16_to_cpu(lsd->lsd_client_size));
563 if (!lsd->lsd_server_size || !lsd->lsd_client_start ||
564 !lsd->lsd_client_size) {
565 CERROR("Bad last_rcvd contents!\n");
566 GOTO(err_msd, rc = -EINVAL);
569 /* When we do a clean MDS shutdown, we save the last_transno into
570 * the header. If we find clients with higher last_transno values
571 * then those clients may need recovery done. */
572 for (cl_idx = 0, off = le32_to_cpu(lsd->lsd_client_start);
573 off < last_rcvd_size; cl_idx++) {
576 struct obd_export *exp;
577 struct mds_export_data *med;
580 OBD_ALLOC_WAIT(lcd, sizeof(*lcd));
582 GOTO(err_client, rc = -ENOMEM);
585 /* Don't assume off is incremented properly by
586 * fsfilt_read_record(), in case sizeof(*lcd)
587 * isn't the same as lsd->lsd_client_size. */
588 off = le32_to_cpu(lsd->lsd_client_start) +
589 cl_idx * le16_to_cpu(lsd->lsd_client_size);
590 rc = fsfilt_read_record(obd, file, lcd, sizeof(*lcd), &off);
592 CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
593 LAST_RCVD, cl_idx, off, rc);
594 break; /* read error shouldn't cause startup to fail */
597 if (lcd->lcd_uuid[0] == '\0') {
598 CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
603 last_transno = lsd_last_transno(lcd);
604 last_epoch = le32_to_cpu(lcd->lcd_last_epoch);
606 /* These exports are cleaned up by mds_disconnect(), so they
607 * need to be set up like real exports as mds_connect() does.
609 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
610 " srv lr: "LPU64" lx: "LPU64"\n", lcd->lcd_uuid, cl_idx,
611 last_transno, le64_to_cpu(lsd->lsd_last_transno),
612 le64_to_cpu(lcd->lcd_last_xid));
614 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
616 if (PTR_ERR(exp) == -EALREADY) {
617 /* export already exists, zero out this one */
618 lcd->lcd_uuid[0] = '\0';
620 GOTO(err_client, rc = PTR_ERR(exp));
623 med = &exp->exp_mds_data;
625 rc = mds_client_add(obd, exp, cl_idx, NULL);
626 /* can't fail for existing client */
627 LASSERTF(rc == 0, "rc = %d\n", rc);
629 /* VBR: set export last committed version */
630 exp->exp_last_committed = last_transno;
631 /* read last time from disk */
632 exp->exp_last_request_time = target_trans_table_last_time(exp);
635 spin_lock(&exp->exp_lock);
636 exp->exp_replay_needed = 1;
637 exp->exp_connecting = 0;
638 exp->exp_in_recovery = 0;
639 spin_unlock(&exp->exp_lock);
641 spin_lock_bh(&obd->obd_processing_task_lock);
642 obd->obd_recoverable_clients++;
643 obd->obd_max_recoverable_clients++;
644 spin_unlock_bh(&obd->obd_processing_task_lock);
646 /* VBR: if epoch too old mark export as delayed,
647 * if epoch is zero then client is pre-vbr one */
648 if (start_epoch > last_epoch && last_epoch != 0)
649 class_set_export_delayed(exp);
650 class_export_put(exp);
653 /* Need to check last_rcvd even for duplicated exports. */
654 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPX64","
655 "last_epoch %#x\n", cl_idx, last_transno, last_epoch);
657 if (last_transno > mds->mds_last_transno)
658 mds->mds_last_transno = last_transno;
661 if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_TGT_FAKE_EXP))) {
662 mds_add_fake_export(obd, obd_fail_val, file);
668 obd->obd_last_committed = mds->mds_last_transno;
670 if (obd->obd_recoverable_clients) {
671 CWARN("RECOVERY: service %s, %d recoverable clients, "
672 "%d delayed clients, last_transno "LPU64"\n",
673 obd->obd_name, obd->obd_recoverable_clients,
674 obd->obd_delayed_clients, mds->mds_last_transno);
675 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
676 obd->obd_recovering = 1;
677 obd->obd_recovery_start = 0;
678 obd->obd_recovery_end = 0;
679 obd->obd_recovery_timeout = OBD_RECOVERY_FACTOR * obd_timeout;
681 /* bz13079: this won't be changed for mds */
682 obd->obd_recovery_max_time = OBD_RECOVERY_MAX_TIME;
685 LASSERT(!obd->obd_recovering);
686 /* VBR: update boot epoch after recovery */
687 mds_update_last_epoch(obd);
689 mds->mds_mount_count = mount_count + 1;
690 lsd->lsd_mount_count = lsd->lsd_compat14 =
691 cpu_to_le64(mds->mds_mount_count);
693 /* save it, so mount count and last_transno is current */
694 rc = mds_update_server_data(obd, 1);
696 GOTO(err_client, rc);
701 class_disconnect_exports(obd);
703 mds_server_free_data(mds);
707 int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
709 struct mds_obd *mds = &obd->u.mds;
710 struct lvfs_run_ctxt *saved = NULL;
711 struct dentry *dentry;
716 OBD_FAIL_RETURN(OBD_FAIL_MDS_FS_SETUP, -ENOENT);
718 rc = cleanup_group_info();
722 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
724 CERROR("cannot allocate memory for run ctxt\n");
728 mds->mds_vfsmnt = mnt;
729 /* why not mnt->mnt_sb instead of mnt->mnt_root->d_inode->i_sb? */
730 obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
731 obd->u.obt.obt_stale_export_age = STALE_EXPORT_MAXTIME_DEFAULT;
732 spin_lock_init(&obd->u.obt.obt_trans_table_lock);
734 rc = fsfilt_setup(obd, obd->u.obt.obt_sb);
738 OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
739 obd->obd_lvfs_ctxt.pwdmnt = mnt;
740 obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
741 obd->obd_lvfs_ctxt.fs = get_ds();
742 obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops;
744 /* setup the directory tree */
745 push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
746 dentry = simple_mkdir(current->fs->pwd, mnt, "ROOT", 0755, 0);
747 if (IS_ERR(dentry)) {
748 rc = PTR_ERR(dentry);
749 CERROR("cannot create ROOT directory: rc = %d\n", rc);
753 mds->mds_rootfid.id = dentry->d_inode->i_ino;
754 mds->mds_rootfid.generation = dentry->d_inode->i_generation;
755 mds->mds_rootfid.f_type = S_IFDIR;
759 dentry = lookup_one_len("__iopen__", current->fs->pwd,
760 strlen("__iopen__"));
761 if (IS_ERR(dentry)) {
762 rc = PTR_ERR(dentry);
763 CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
767 mds->mds_fid_de = dentry;
768 if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
770 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
774 dentry = simple_mkdir(current->fs->pwd, mnt, "PENDING", 0777, 1);
775 if (IS_ERR(dentry)) {
776 rc = PTR_ERR(dentry);
777 CERROR("cannot create PENDING directory: rc = %d\n", rc);
780 mds->mds_pending_dir = dentry;
783 dentry = simple_mkdir(current->fs->pwd, mnt, MDT_LOGS_DIR, 0777, 1);
784 if (IS_ERR(dentry)) {
785 rc = PTR_ERR(dentry);
786 CERROR("cannot create %s directory: rc = %d\n",
788 GOTO(err_pending, rc);
790 mds->mds_logs_dir = dentry;
793 dentry = simple_mkdir(current->fs->pwd, mnt, "OBJECTS", 0777, 1);
794 if (IS_ERR(dentry)) {
795 rc = PTR_ERR(dentry);
796 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
799 mds->mds_objects_dir = dentry;
801 /* open and test the last rcvd file */
802 file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
805 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
806 GOTO(err_objects, rc = PTR_ERR(file));
808 mds->mds_rcvd_filp = file;
809 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
810 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
811 file->f_dentry->d_inode->i_mode);
812 GOTO(err_last_rcvd, rc = -ENOENT);
815 rc = mds_init_server_data(obd, file);
817 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
818 GOTO(err_last_rcvd, rc);
821 rc = mds_lov_init_objids(obd);
823 CERROR("cannot init lov objid rc = %d\n", rc);
824 GOTO(err_client, rc );
827 /* open and test the check io file junk */
828 file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644);
831 CERROR("cannot open/create %s file: rc = %d\n",HEALTH_CHECK,rc);
832 GOTO(err_lov_objid, rc = PTR_ERR(file));
834 mds->mds_health_check_filp = file;
835 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
836 CERROR("%s is not a regular file!: mode = %o\n", HEALTH_CHECK,
837 file->f_dentry->d_inode->i_mode);
838 GOTO(err_health_check, rc = -ENOENT);
840 rc = lvfs_check_io_health(obd, file);
842 GOTO(err_health_check, rc);
844 pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
845 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
849 if (mds->mds_health_check_filp &&
850 filp_close(mds->mds_health_check_filp, 0))
851 CERROR("can't close %s after error\n", HEALTH_CHECK);
853 mds_lov_destroy_objids(obd);
855 class_disconnect_exports(obd);
857 if (mds->mds_rcvd_filp && filp_close(mds->mds_rcvd_filp, 0))
858 CERROR("can't close %s after error\n", LAST_RCVD);
860 dput(mds->mds_objects_dir);
862 dput(mds->mds_logs_dir);
864 dput(mds->mds_pending_dir);
866 dput(mds->mds_fid_de);
870 int mds_fs_cleanup(struct obd_device *obd)
872 struct mds_obd *mds = &obd->u.mds;
873 struct lvfs_run_ctxt *saved = NULL;
876 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
878 CERROR("cannot allocate memory for run ctxt\n");
883 LCONSOLE_WARN("%s: shutting down for failover; client state "
884 "will be preserved.\n", obd->obd_name);
886 class_disconnect_exports(obd); /* cleans up client info too */
887 mds_server_free_data(mds);
889 push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
890 if (mds->mds_rcvd_filp) {
891 rc = filp_close(mds->mds_rcvd_filp, 0);
892 mds->mds_rcvd_filp = NULL;
894 CERROR("%s file won't close, rc=%d\n", LAST_RCVD, rc);
897 mds_lov_destroy_objids(obd);
899 if (mds->mds_health_check_filp) {
900 rc = filp_close(mds->mds_health_check_filp, 0);
901 mds->mds_health_check_filp = NULL;
903 CERROR("%s file won't close, rc=%d\n", HEALTH_CHECK,rc);
905 if (mds->mds_objects_dir != NULL) {
906 l_dput(mds->mds_objects_dir);
907 mds->mds_objects_dir = NULL;
909 if (mds->mds_logs_dir) {
910 l_dput(mds->mds_logs_dir);
911 mds->mds_logs_dir = NULL;
913 if (mds->mds_pending_dir) {
914 l_dput(mds->mds_pending_dir);
915 mds->mds_pending_dir = NULL;
918 lquota_fs_cleanup(mds_quota_interface_ref, obd);
920 pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
921 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
922 shrink_dcache_parent(mds->mds_fid_de);
923 dput(mds->mds_fid_de);
924 LL_DQUOT_OFF(obd->u.obt.obt_sb);
929 /* Creates an object with the same name as its fid. Because this is not at all
930 * performance sensitive, it is accomplished by creating a file, checking the
931 * fid, and renaming it. */
932 int mds_obd_create(struct obd_export *exp, struct obdo *oa,
933 struct lov_stripe_md **ea, struct obd_trans_info *oti)
935 struct mds_obd *mds = &exp->exp_obd->u.mds;
936 struct inode *parent_inode = mds->mds_objects_dir->d_inode;
937 unsigned int tmpname = ll_rand();
939 struct dentry *new_child;
940 struct lvfs_run_ctxt *saved = NULL;
941 char fidname[LL_FID_NAMELEN];
943 struct lvfs_ucred ucred = { 0 };
944 int rc = 0, err, namelen;
947 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
949 CERROR("cannot allocate memory for run ctxt\n");
953 /* the owner of object file should always be root */
954 cap_raise(ucred.luc_cap, CAP_SYS_RESOURCE);
956 push_ctxt(saved, &exp->exp_obd->obd_lvfs_ctxt, &ucred);
958 sprintf(fidname, "OBJECTS/%u.%u", tmpname, current->pid);
959 filp = filp_open(fidname, O_CREAT | O_EXCL, 0666);
963 CERROR("impossible object name collision %u\n",
967 CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
971 LASSERT(mds->mds_objects_dir == filp->f_dentry->d_parent);
973 oa->o_id = filp->f_dentry->d_inode->i_ino;
974 oa->o_generation = filp->f_dentry->d_inode->i_generation;
975 namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
977 LOCK_INODE_MUTEX(parent_inode);
978 new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
980 if (IS_ERR(new_child)) {
981 CERROR("getting neg dentry for obj rename: %d\n", rc);
982 GOTO(out_close, rc = PTR_ERR(new_child));
984 if (new_child->d_inode != NULL) {
985 CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
986 oa->o_id, oa->o_generation);
990 handle = fsfilt_start(exp->exp_obd, mds->mds_objects_dir->d_inode,
991 FSFILT_OP_RENAME, NULL);
993 GOTO(out_dput, rc = PTR_ERR(handle));
996 rc = ll_vfs_rename(mds->mds_objects_dir->d_inode, filp->f_dentry,
997 filp->f_vfsmnt, mds->mds_objects_dir->d_inode,
998 new_child, filp->f_vfsmnt);
1001 CERROR("error renaming new object "LPU64":%u: rc %d\n",
1002 oa->o_id, oa->o_generation, rc);
1004 err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
1007 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
1013 UNLOCK_INODE_MUTEX(parent_inode);
1014 err = filp_close(filp, 0);
1016 CERROR("closing tmpfile %u: rc %d\n", tmpname, rc);
1021 pop_ctxt(saved, &exp->exp_obd->obd_lvfs_ctxt, &ucred);
1022 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
1026 int mds_obd_destroy(struct obd_export *exp, struct obdo *oa,
1027 struct lov_stripe_md *ea, struct obd_trans_info *oti,
1028 struct obd_export *md_exp)
1030 struct mds_obd *mds = &exp->exp_obd->u.mds;
1031 struct inode *parent_inode = mds->mds_objects_dir->d_inode;
1032 struct obd_device *obd = exp->exp_obd;
1033 struct lvfs_run_ctxt *saved = NULL;
1034 struct lvfs_ucred ucred = { 0 };
1035 char fidname[LL_FID_NAMELEN];
1036 struct inode *inode = NULL;
1039 int err, namelen, rc = 0;
1042 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
1043 if (saved == NULL) {
1044 CERROR("cannot allocate memory for run ctxt\n");
1048 cap_raise(ucred.luc_cap, CAP_SYS_RESOURCE);
1049 push_ctxt(saved, &obd->obd_lvfs_ctxt, &ucred);
1051 namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
1053 LOCK_INODE_MUTEX(parent_inode);
1054 de = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
1058 CERROR("error looking up object "LPU64" %s: rc %d\n",
1059 oa->o_id, fidname, rc);
1062 if (de->d_inode == NULL) {
1063 CERROR("destroying non-existent object "LPU64" %s: rc %d\n",
1064 oa->o_id, fidname, rc);
1065 GOTO(out_dput, rc = -ENOENT);
1068 /* Stripe count is 1 here since this is some MDS specific stuff
1069 that is unlinked, not spanned across multiple OSTs */
1070 handle = fsfilt_start_log(obd, mds->mds_objects_dir->d_inode,
1071 FSFILT_OP_UNLINK, oti, 1);
1074 GOTO(out_dput, rc = PTR_ERR(handle));
1076 /* take a reference to protect inode from truncation within
1077 vfs_unlink() context. bug 10409 */
1078 inode = de->d_inode;
1079 atomic_inc(&inode->i_count);
1080 rc = ll_vfs_unlink(mds->mds_objects_dir->d_inode, de, mds->mds_vfsmnt);
1082 CERROR("error destroying object "LPU64":%u: rc %d\n",
1083 oa->o_id, oa->o_generation, rc);
1085 err = fsfilt_commit(obd, mds->mds_objects_dir->d_inode, handle, 0);
1091 UNLOCK_INODE_MUTEX(parent_inode);
1096 pop_ctxt(saved, &obd->obd_lvfs_ctxt, &ucred);
1097 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);