1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 * Lustre Metadata Server (MDS) filesystem interface code
40 * Author: Andreas Dilger <adilger@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #include <linux/kmod.h>
50 #include <linux/version.h>
51 #include <linux/sched.h>
52 #include <lustre_quota.h>
53 #include <linux/mount.h>
54 #include <lustre_mds.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lustre_lib.h>
58 #include <lustre_fsfilt.h>
59 #include <lustre_disk.h>
60 #include <libcfs/list.h>
62 #include "mds_internal.h"
65 int mds_export_stats_init(struct obd_device *obd,
66 struct obd_export *exp,
70 lnet_nid_t *client_nid = localdata;
71 int rc, num_stats, newnid = 0;
73 rc = lprocfs_exp_setup(exp, client_nid, reconnect, &newnid);
75 /* Mask error for already created
83 struct nid_stat *tmp = exp->exp_nid_stats;
86 num_stats = (sizeof(*obd->obd_type->typ_ops) / sizeof(void *)) +
88 tmp->nid_stats = lprocfs_alloc_stats(num_stats,
89 LPROCFS_STATS_FLAG_NOPERCPU);
90 if (tmp->nid_stats == NULL)
93 lprocfs_init_ops_stats(LPROC_MDS_LAST, tmp->nid_stats);
94 mds_stats_counter_init(tmp->nid_stats);
96 rc = lprocfs_nid_ldlm_stats_init(tmp);
104 /* VBR: to determine the delayed client the lcd should be updated for each new
106 int mds_update_client_epoch(struct obd_export *exp)
108 struct mds_export_data *med = &exp->exp_mds_data;
109 struct mds_obd *mds = &exp->exp_obd->u.mds;
110 struct lvfs_run_ctxt saved;
111 loff_t off = med->med_lr_off;
114 /* VBR: set client last_epoch to current epoch */
115 if (le32_to_cpu(med->med_lcd->lcd_last_epoch) >=
116 le32_to_cpu(mds->mds_server_data->lsd_start_epoch))
119 med->med_lcd->lcd_last_epoch = mds->mds_server_data->lsd_start_epoch;
120 push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
121 rc = fsfilt_write_record(exp->exp_obd, mds->mds_rcvd_filp,
122 med->med_lcd, sizeof(*med->med_lcd), &off,
124 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
126 CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
127 med->med_lr_idx, le32_to_cpu(med->med_lcd->lcd_last_epoch),
128 le32_to_cpu(mds->mds_server_data->lsd_start_epoch));
133 /* Called after recovery is done on server */
134 void mds_update_last_epoch(struct obd_device *obd)
136 struct ptlrpc_request *req;
137 struct mds_obd *mds = &obd->u.mds;
140 /* Increase server epoch after recovery */
141 spin_lock(&mds->mds_transno_lock);
142 start_epoch = lr_epoch(mds->mds_last_transno) + 1;
143 mds->mds_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
144 mds->mds_server_data->lsd_start_epoch = cpu_to_le32(start_epoch);
145 spin_unlock(&mds->mds_transno_lock);
147 /* go through delayed reply queue to find all exports participate in
148 * recovery and set new epoch for them */
149 list_for_each_entry(req, &obd->obd_delayed_reply_queue, rq_list) {
150 LASSERT(!req->rq_export->exp_delayed);
151 mds_update_client_epoch(req->rq_export);
153 mds_update_server_data(obd, 1);
156 /* Add client data to the MDS. We use a bitmap to locate a free space
157 * in the last_rcvd file if cl_off is -1 (i.e. a new client).
158 * Otherwise, we have just read the data from the last_rcvd file and
159 * we know its offset.
161 * It should not be possible to fail adding an existing client - otherwise
162 * mds_init_server_data() callsite needs to be fixed.
164 int mds_client_add(struct obd_device *obd, struct obd_export *exp,
165 int cl_idx, void *localdata)
167 struct mds_obd *mds = &obd->u.mds;
168 struct mds_export_data *med = &exp->exp_mds_data;
169 unsigned long *bitmap = mds->mds_client_bitmap;
170 int new_client = (cl_idx == -1);
174 LASSERT(bitmap != NULL);
175 LASSERTF(cl_idx > -2, "%d\n", cl_idx);
177 /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
178 if (!strcmp(med->med_lcd->lcd_uuid, obd->obd_uuid.uuid))
181 /* VBR: remove expired exports before searching for free slot */
183 class_disconnect_expired_exports(obd);
185 /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
186 * there's no need for extra complication here
189 cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
191 if (cl_idx >= LR_MAX_CLIENTS ||
192 OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
193 CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n",
197 if (test_and_set_bit(cl_idx, bitmap)) {
198 cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
203 if (test_and_set_bit(cl_idx, bitmap)) {
204 CERROR("MDS client %d: bit already set in bitmap!!\n",
210 CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
211 cl_idx, med->med_lcd->lcd_uuid);
213 med->med_lr_idx = cl_idx;
214 med->med_lr_off = le32_to_cpu(mds->mds_server_data->lsd_client_start) +
215 (cl_idx * le16_to_cpu(mds->mds_server_data->lsd_client_size));
216 LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
217 mds_export_stats_init(obd, exp, 0, localdata);
220 struct lvfs_run_ctxt *saved = NULL;
221 loff_t off = med->med_lr_off;
222 struct file *file = mds->mds_rcvd_filp;
225 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
227 CERROR("cannot allocate memory for run ctxt\n");
231 push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
232 handle = fsfilt_start(obd, file->f_dentry->d_inode,
233 FSFILT_OP_SETATTR, NULL);
234 if (IS_ERR(handle)) {
235 rc = PTR_ERR(handle);
236 CERROR("unable to start transaction: rc %d\n", rc);
238 /* VBR: set client last_transno as mds_last_transno to
239 * remember last epoch for this client */
240 med->med_lcd->lcd_last_epoch =
241 mds->mds_server_data->lsd_start_epoch;
242 exp->exp_last_request_time = cfs_time_current_sec();
243 /* remember first epoch of client for orphan handling */
244 med->med_lcd->lcd_first_epoch =
245 cpu_to_le32(lr_epoch(mds->mds_last_transno));
246 rc = fsfilt_add_journal_cb(obd, 0, handle,
247 target_client_add_cb, exp);
249 spin_lock(&exp->exp_lock);
250 exp->exp_need_sync = 1;
251 spin_unlock(&exp->exp_lock);
253 rc = fsfilt_write_record(obd, file, med->med_lcd,
254 sizeof(*med->med_lcd),
255 &off, rc /* sync if no cb */);
256 fsfilt_commit(obd, file->f_dentry->d_inode, handle, 0);
259 pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
260 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
264 CDEBUG(D_INFO, "wrote client lcd at idx %u off %llu (len %u)\n",
265 med->med_lr_idx, med->med_lr_off,
266 (unsigned int)sizeof(*med->med_lcd));
271 struct lsd_client_data zero_lcd; /* globals are implicitly zeroed */
273 int mds_client_free(struct obd_export *exp)
275 struct mds_export_data *med = &exp->exp_mds_data;
276 struct mds_obd *mds = &exp->exp_obd->u.mds;
277 struct obd_device *obd = exp->exp_obd;
278 struct lvfs_run_ctxt *saved = NULL;
286 /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
287 if (!strcmp(med->med_lcd->lcd_uuid, obd->obd_uuid.uuid))
290 CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n",
291 med->med_lr_idx, med->med_lr_off, med->med_lcd->lcd_uuid);
293 LASSERT(mds->mds_client_bitmap != NULL);
296 off = med->med_lr_off;
298 /* Don't clear med_lr_idx here as it is likely also unset. At worst
299 * we leak a client slot that will be cleaned on the next recovery. */
301 CERROR("%s: client idx %d has offset %lld\n",
302 obd->obd_name, med->med_lr_idx, off);
303 GOTO(free, rc = -EINVAL);
306 /* Clear the bit _after_ zeroing out the client so we don't
307 race with mds_client_add and zero out new clients.*/
308 if (!test_bit(med->med_lr_idx, mds->mds_client_bitmap)) {
309 CERROR("MDS client %u: bit already clear in bitmap!!\n",
314 if (!(exp->exp_flags & OBD_OPT_FAILOVER)) {
315 /* Don't force sync on each disconnect if aborting recovery,
316 * or it does num_clients * num_osts syncs. b=17194 */
317 int need_sync = (!exp->exp_libclient || exp->exp_need_sync) &&
318 !(exp->exp_flags & OBD_OPT_ABORT_RECOV);
319 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
321 CERROR("cannot allocate memory for run ctxt\n");
322 GOTO(free, rc = -ENOMEM);
324 push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
325 rc = fsfilt_write_record(obd, mds->mds_rcvd_filp, &zero_lcd,
326 sizeof(zero_lcd), &off, 0);
328 /* Make sure the server's last_transno is up to date. Do this
329 * after the client is freed so we know all the client's
330 * transactions have been committed. */
332 mds_update_server_data(exp->exp_obd, need_sync);
334 pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
336 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
337 "zero out client %s at idx %u/%llu in %s %ssync rc %d\n",
338 med->med_lcd->lcd_uuid, med->med_lr_idx, med->med_lr_off,
339 LAST_RCVD, need_sync ? "" : "a", rc);
342 if (!test_and_clear_bit(med->med_lr_idx, mds->mds_client_bitmap)) {
343 CERROR("MDS client %u: bit already clear in bitmap!!\n",
351 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
353 OBD_FREE_PTR(med->med_lcd);
359 static int mds_server_free_data(struct mds_obd *mds)
361 OBD_FREE(mds->mds_client_bitmap, LR_MAX_CLIENTS / 8);
362 OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
363 mds->mds_server_data = NULL;
368 static void mds_add_fake_export(struct obd_device *obd, int num,
371 struct obd_export *exp;
372 struct lvfs_run_ctxt saved;
373 struct obd_device_target *obt = &obd->u.obt;
374 struct lu_export_data *led;
375 unsigned long *bitmap = obt->obt_client_bitmap;
376 struct lsd_client_data *lcd = NULL;
377 unsigned int idx = 0;
388 idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS, idx);
389 if (idx >= LR_MAX_CLIENTS) {
390 CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n", idx);
394 if (test_and_set_bit(idx, bitmap)) {
395 CERROR("Bit %u is set already\n", idx);
398 off = le32_to_cpu(obt->obt_lsd->lsd_client_start) +
399 idx * le16_to_cpu(obt->obt_lsd->lsd_client_size);
401 sprintf(lcd->lcd_uuid, "dead-%.16u", idx);
402 CDEBUG(D_INFO, "Create fake export %s, index %u, offset %lu\n",
403 lcd->lcd_uuid, idx, (unsigned long)off);
405 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
407 if (PTR_ERR(exp) == -EALREADY) {
408 CERROR("Export %s already exists\n",
411 CERROR("Failed to create export %lu\n", PTR_ERR(exp));
416 led = &exp->exp_target_data;
417 led->led_lr_idx = idx;
418 led->led_lr_off = off;
421 exp->exp_last_request_time = cfs_time_current_sec();
422 exp->exp_replay_needed = 1;
423 exp->exp_connecting = 0;
424 exp->exp_in_recovery = 0;
426 spin_lock_bh(&obd->obd_processing_task_lock);
427 obd->obd_recoverable_clients++;
428 obd->obd_max_recoverable_clients++;
429 spin_unlock_bh(&obd->obd_processing_task_lock);
431 class_set_export_delayed(exp);
432 class_export_put(exp);
434 lcd->lcd_last_epoch = cpu_to_le32(1);
435 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
436 rc = fsfilt_write_record(obd, file, lcd, sizeof(*lcd), &off, 0);
437 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
439 CERROR("Failed to create fake client record\n");
447 static int mds_init_server_data(struct obd_device *obd, struct file *file)
449 struct mds_obd *mds = &obd->u.mds;
450 struct lr_server_data *lsd;
451 struct lsd_client_data *lcd = NULL;
452 struct lustre_mount_info *lmi;
454 unsigned long last_rcvd_size = i_size_read(file->f_dentry->d_inode);
460 /* ensure padding in the struct is the correct size */
461 LASSERT(offsetof(struct lr_server_data, lsd_padding) +
462 sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
463 LASSERT(offsetof(struct lsd_client_data, lcd_padding) +
464 sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
466 OBD_ALLOC_WAIT(lsd, sizeof(*lsd));
470 OBD_ALLOC_WAIT(mds->mds_client_bitmap, LR_MAX_CLIENTS / 8);
471 if (!mds->mds_client_bitmap) {
472 OBD_FREE(lsd, sizeof(*lsd));
476 mds->mds_server_data = lsd;
478 if (last_rcvd_size == 0) {
479 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
481 memcpy(lsd->lsd_uuid, obd->obd_uuid.uuid,sizeof(lsd->lsd_uuid));
482 lsd->lsd_last_transno = 0;
483 mount_count = lsd->lsd_mount_count = 0;
484 lsd->lsd_server_size = cpu_to_le32(LR_SERVER_SIZE);
485 lsd->lsd_client_start = cpu_to_le32(LR_CLIENT_START);
486 lsd->lsd_client_size = cpu_to_le16(LR_CLIENT_SIZE);
487 lsd->lsd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
488 lsd->lsd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID);
489 lsd->lsd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT);
491 rc = fsfilt_read_record(obd, file, lsd, sizeof(*lsd), &off);
493 CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
496 if (strcmp(lsd->lsd_uuid, obd->obd_uuid.uuid) != 0) {
497 LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s using"
498 " the wrong disk %s. Were the /dev/ "
499 "assignments rearranged?\n",
500 obd->obd_uuid.uuid, lsd->lsd_uuid);
501 GOTO(err_msd, rc = -EINVAL);
503 lsd->lsd_feature_compat |= cpu_to_le32(OBD_COMPAT_MDT);
505 /* Assume old last_rcvd format unless I_C_LR is set */
506 if (!(lsd->lsd_feature_incompat &
507 cpu_to_le32(OBD_INCOMPAT_COMMON_LR)))
508 lsd->lsd_mount_count = lsd->lsd_compat14;
510 mount_count = le64_to_cpu(lsd->lsd_mount_count);
513 if (lsd->lsd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
514 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
515 obd->obd_name, le32_to_cpu(lsd->lsd_feature_incompat) &
517 GOTO(err_msd, rc = -EINVAL);
519 if (lsd->lsd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) {
520 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
521 obd->obd_name, le32_to_cpu(lsd->lsd_feature_rocompat) &
523 /* Do something like remount filesystem read-only */
524 GOTO(err_msd, rc = -EINVAL);
526 /* evict all clients as it is first boot with 2.0 last_rcvd */
527 if (lsd->lsd_feature_compat & cpu_to_le32(OBD_COMPAT_20)) {
528 LCONSOLE_WARN("Mounting %s at first time on 2.0 FS, remove all"
529 " clients for interop needs\n", obd->obd_name);
530 simple_truncate(mds->mds_vfsmnt->mnt_sb->s_root,
531 mds->mds_vfsmnt, LAST_RCVD,
532 lsd->lsd_client_start);
533 last_rcvd_size = lsd->lsd_client_start;
534 lsd->lsd_feature_compat &= ~cpu_to_le32(OBD_COMPAT_20);
537 target_trans_table_init(obd);
538 mds->mds_last_transno = le64_to_cpu(lsd->lsd_last_transno);
539 start_epoch = le32_to_cpu(lsd->lsd_start_epoch);
541 CDEBUG(D_INODE, "%s: server start_epoch: %#x\n",
542 obd->obd_name, start_epoch);
543 CDEBUG(D_INODE, "%s: server last_transno: "LPX64"\n",
544 obd->obd_name, mds->mds_last_transno);
545 CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
546 obd->obd_name, mount_count + 1);
547 CDEBUG(D_INODE, "%s: server data size: %u\n",
548 obd->obd_name, le32_to_cpu(lsd->lsd_server_size));
549 CDEBUG(D_INODE, "%s: per-client data start: %u\n",
550 obd->obd_name, le32_to_cpu(lsd->lsd_client_start));
551 CDEBUG(D_INODE, "%s: per-client data size: %u\n",
552 obd->obd_name, le32_to_cpu(lsd->lsd_client_size));
553 CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
554 obd->obd_name, last_rcvd_size);
555 CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
556 last_rcvd_size <= le32_to_cpu(lsd->lsd_client_start) ? 0 :
557 (last_rcvd_size - le32_to_cpu(lsd->lsd_client_start)) /
558 le16_to_cpu(lsd->lsd_client_size));
560 if (!lsd->lsd_server_size || !lsd->lsd_client_start ||
561 !lsd->lsd_client_size) {
562 CERROR("Bad last_rcvd contents!\n");
563 GOTO(err_msd, rc = -EINVAL);
566 /* When we do a clean MDS shutdown, we save the last_transno into
567 * the header. If we find clients with higher last_transno values
568 * then those clients may need recovery done. */
569 for (cl_idx = 0, off = le32_to_cpu(lsd->lsd_client_start);
570 off < last_rcvd_size; cl_idx++) {
573 struct obd_export *exp;
574 struct mds_export_data *med;
577 OBD_ALLOC_WAIT(lcd, sizeof(*lcd));
579 GOTO(err_client, rc = -ENOMEM);
582 /* Don't assume off is incremented properly by
583 * fsfilt_read_record(), in case sizeof(*lcd)
584 * isn't the same as lsd->lsd_client_size. */
585 off = le32_to_cpu(lsd->lsd_client_start) +
586 cl_idx * le16_to_cpu(lsd->lsd_client_size);
587 rc = fsfilt_read_record(obd, file, lcd, sizeof(*lcd), &off);
589 CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
590 LAST_RCVD, cl_idx, off, rc);
591 break; /* read error shouldn't cause startup to fail */
594 if (lcd->lcd_uuid[0] == '\0') {
595 CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
600 check_lcd(obd->obd_name, cl_idx, lcd);
602 last_transno = lsd_last_transno(lcd);
603 last_epoch = le32_to_cpu(lcd->lcd_last_epoch);
605 /* These exports are cleaned up by mds_disconnect(), so they
606 * need to be set up like real exports as mds_connect() does.
608 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
609 " srv lr: "LPU64" lx: "LPU64"\n", lcd->lcd_uuid, cl_idx,
610 last_transno, le64_to_cpu(lsd->lsd_last_transno),
611 le64_to_cpu(lcd->lcd_last_xid));
613 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
615 if (PTR_ERR(exp) == -EALREADY) {
616 /* export already exists, zero out this one */
617 lcd->lcd_uuid[0] = '\0';
619 GOTO(err_client, rc = PTR_ERR(exp));
622 med = &exp->exp_mds_data;
624 rc = mds_client_add(obd, exp, cl_idx, NULL);
625 /* can't fail for existing client */
626 LASSERTF(rc == 0, "rc = %d\n", rc);
628 /* VBR: set export last committed version */
629 exp->exp_last_committed = last_transno;
630 /* read last time from disk */
631 exp->exp_last_request_time = target_trans_table_last_time(exp);
634 spin_lock(&exp->exp_lock);
635 exp->exp_replay_needed = 1;
636 exp->exp_connecting = 0;
637 exp->exp_in_recovery = 0;
638 spin_unlock(&exp->exp_lock);
640 spin_lock_bh(&obd->obd_processing_task_lock);
641 obd->obd_recoverable_clients++;
642 obd->obd_max_recoverable_clients++;
643 spin_unlock_bh(&obd->obd_processing_task_lock);
645 /* VBR: if epoch too old mark export as delayed,
646 * if epoch is zero then client is pre-vbr one */
647 if (start_epoch > last_epoch && last_epoch != 0)
648 class_set_export_delayed(exp);
649 class_export_put(exp);
652 /* Need to check last_rcvd even for duplicated exports. */
653 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPX64","
654 "last_epoch %#x\n", cl_idx, last_transno, last_epoch);
656 if (last_transno > mds->mds_last_transno)
657 mds->mds_last_transno = last_transno;
660 if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_TGT_FAKE_EXP))) {
661 mds_add_fake_export(obd, obd_fail_val, file);
667 obd->obd_last_committed = mds->mds_last_transno;
669 if (obd->obd_recoverable_clients) {
670 CWARN("RECOVERY: service %s, %d recoverable clients, "
671 "%d delayed clients, last_transno "LPU64"\n",
672 obd->obd_name, obd->obd_recoverable_clients,
673 obd->obd_delayed_clients, mds->mds_last_transno);
674 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
675 obd->obd_recovering = 1;
676 obd->obd_recovery_start = 0;
677 obd->obd_recovery_end = 0;
679 LASSERT(!obd->obd_recovering);
680 /* VBR: update boot epoch after recovery */
681 mds_update_last_epoch(obd);
684 obd->obd_recovery_timeout = OBD_RECOVERY_TIME_SOFT;
685 obd->obd_recovery_time_hard = OBD_RECOVERY_TIME_HARD;
687 lmi = server_find_mount_locked(obd->obd_name);
689 struct lustre_sb_info *lsi = s2lsi(lmi->lmi_sb);
691 if (lsi->lsi_lmd && lsi->lsi_lmd->lmd_recovery_time_soft)
692 obd->obd_recovery_timeout =
693 lsi->lsi_lmd->lmd_recovery_time_soft;
695 if (lsi->lsi_lmd && lsi->lsi_lmd->lmd_recovery_time_hard)
696 obd->obd_recovery_time_hard =
697 lsi->lsi_lmd->lmd_recovery_time_hard;
700 mds->mds_mount_count = mount_count + 1;
701 lsd->lsd_mount_count = lsd->lsd_compat14 =
702 cpu_to_le64(mds->mds_mount_count);
704 /* save it, so mount count and last_transno is current */
705 rc = mds_update_server_data(obd, 1);
707 GOTO(err_client, rc);
712 class_disconnect_exports(obd);
714 mds_server_free_data(mds);
718 int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
720 struct mds_obd *mds = &obd->u.mds;
721 struct lvfs_run_ctxt *saved = NULL;
722 struct dentry *dentry;
727 OBD_FAIL_RETURN(OBD_FAIL_MDS_FS_SETUP, -ENOENT);
729 rc = cleanup_group_info();
733 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
735 CERROR("cannot allocate memory for run ctxt\n");
739 mds->mds_vfsmnt = mnt;
740 /* why not mnt->mnt_sb instead of mnt->mnt_root->d_inode->i_sb? */
741 obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
742 obd->u.obt.obt_stale_export_age = STALE_EXPORT_MAXTIME_DEFAULT;
743 spin_lock_init(&obd->u.obt.obt_trans_table_lock);
745 rc = fsfilt_setup(obd, obd->u.obt.obt_sb);
749 OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
750 obd->obd_lvfs_ctxt.pwdmnt = mnt;
751 obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
752 obd->obd_lvfs_ctxt.fs = get_ds();
753 obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops;
755 /* setup the directory tree */
756 push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
757 dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, "ROOT", 0755, 0);
758 if (IS_ERR(dentry)) {
759 rc = PTR_ERR(dentry);
760 CERROR("cannot create ROOT directory: rc = %d\n", rc);
764 mds->mds_rootfid.id = dentry->d_inode->i_ino;
765 mds->mds_rootfid.generation = dentry->d_inode->i_generation;
766 mds->mds_rootfid.f_type = S_IFDIR;
770 dentry = lookup_one_len("__iopen__", cfs_fs_pwd(current->fs),
771 strlen("__iopen__"));
772 if (IS_ERR(dentry)) {
773 rc = PTR_ERR(dentry);
774 CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
778 mds->mds_fid_de = dentry;
779 if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
781 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
785 dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, "PENDING", 0777, 1);
786 if (IS_ERR(dentry)) {
787 rc = PTR_ERR(dentry);
788 CERROR("cannot create PENDING directory: rc = %d\n", rc);
791 mds->mds_pending_dir = dentry;
794 dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, MDT_LOGS_DIR, 0777, 1);
795 if (IS_ERR(dentry)) {
796 rc = PTR_ERR(dentry);
797 CERROR("cannot create %s directory: rc = %d\n",
799 GOTO(err_pending, rc);
801 mds->mds_logs_dir = dentry;
804 dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, "OBJECTS", 0777, 1);
805 if (IS_ERR(dentry)) {
806 rc = PTR_ERR(dentry);
807 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
810 mds->mds_objects_dir = dentry;
812 /* open and test the last rcvd file */
813 file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
816 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
817 GOTO(err_objects, rc = PTR_ERR(file));
819 mds->mds_rcvd_filp = file;
820 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
821 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
822 file->f_dentry->d_inode->i_mode);
823 GOTO(err_last_rcvd, rc = -ENOENT);
826 rc = mds_init_server_data(obd, file);
828 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
829 GOTO(err_last_rcvd, rc);
832 rc = mds_lov_init_objids(obd);
834 CERROR("cannot init lov objid rc = %d\n", rc);
835 GOTO(err_client, rc );
838 /* open and test the check io file junk */
839 file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644);
842 CERROR("cannot open/create %s file: rc = %d\n",HEALTH_CHECK,rc);
843 GOTO(err_lov_objid, rc = PTR_ERR(file));
845 mds->mds_obt.obt_health_check_filp = file;
846 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
847 CERROR("%s is not a regular file!: mode = %o\n", HEALTH_CHECK,
848 file->f_dentry->d_inode->i_mode);
849 GOTO(err_health_check, rc = -ENOENT);
851 rc = lvfs_check_io_health(obd, file);
853 GOTO(err_health_check, rc);
855 pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
856 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
860 if (mds->mds_obt.obt_health_check_filp &&
861 filp_close(mds->mds_obt.obt_health_check_filp, 0))
862 CERROR("can't close %s after error\n", HEALTH_CHECK);
864 mds_lov_destroy_objids(obd);
866 class_disconnect_exports(obd);
868 if (mds->mds_rcvd_filp && filp_close(mds->mds_rcvd_filp, 0))
869 CERROR("can't close %s after error\n", LAST_RCVD);
871 dput(mds->mds_objects_dir);
873 dput(mds->mds_logs_dir);
875 dput(mds->mds_pending_dir);
877 dput(mds->mds_fid_de);
881 int mds_fs_cleanup(struct obd_device *obd)
883 struct mds_obd *mds = &obd->u.mds;
884 struct lvfs_run_ctxt *saved = NULL;
887 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
889 CERROR("cannot allocate memory for run ctxt\n");
894 LCONSOLE_WARN("%s: shutting down for failover; client state "
895 "will be preserved.\n", obd->obd_name);
897 class_disconnect_exports(obd); /* cleans up client info too */
899 /* some exports may still be in the zombie queue, so we make sure that
900 * all the exports have been processed, otherwise the last_rcvd slot
901 * may not be updated on time */
902 obd_zombie_barrier();
904 mds_server_free_data(mds);
906 push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
907 if (mds->mds_rcvd_filp) {
908 rc = filp_close(mds->mds_rcvd_filp, 0);
909 mds->mds_rcvd_filp = NULL;
911 CERROR("%s file won't close, rc=%d\n", LAST_RCVD, rc);
914 mds_lov_destroy_objids(obd);
916 if (mds->mds_obt.obt_health_check_filp) {
917 rc = filp_close(mds->mds_obt.obt_health_check_filp, 0);
918 mds->mds_obt.obt_health_check_filp = NULL;
920 CERROR("%s file won't close, rc=%d\n", HEALTH_CHECK,rc);
922 if (mds->mds_objects_dir != NULL) {
923 l_dput(mds->mds_objects_dir);
924 mds->mds_objects_dir = NULL;
926 if (mds->mds_logs_dir) {
927 l_dput(mds->mds_logs_dir);
928 mds->mds_logs_dir = NULL;
930 if (mds->mds_pending_dir) {
931 l_dput(mds->mds_pending_dir);
932 mds->mds_pending_dir = NULL;
935 lquota_fs_cleanup(mds_quota_interface_ref, obd);
937 pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
938 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
939 dput(mds->mds_fid_de);
940 LL_DQUOT_OFF(obd->u.obt.obt_sb, 0);
941 shrink_dcache_sb(mds->mds_obt.obt_sb);
946 /* Creates an object with the same name as its fid. Because this is not at all
947 * performance sensitive, it is accomplished by creating a file, checking the
948 * fid, and renaming it. */
949 int mds_obd_create(struct obd_export *exp, struct obdo *oa,
950 struct lov_stripe_md **ea, struct obd_trans_info *oti)
952 struct mds_obd *mds = &exp->exp_obd->u.mds;
953 struct inode *parent_inode = mds->mds_objects_dir->d_inode;
954 unsigned int tmpname = ll_rand();
955 struct dentry *dchild, *new_child;
956 struct lvfs_dentry_params dp = LVFS_DENTRY_PARAMS_INIT;
957 struct lvfs_run_ctxt *saved = NULL;
958 char fidname[LL_FID_NAMELEN];
960 struct lvfs_ucred ucred = { 0 };
961 int rc = 0, err, namelen;
964 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
966 CERROR("cannot allocate memory for run ctxt\n");
970 /* the owner of object file should always be root */
971 cap_raise(ucred.luc_cap, CAP_SYS_RESOURCE);
973 push_ctxt(saved, &exp->exp_obd->obd_lvfs_ctxt, &ucred);
975 sprintf(fidname, "%u.%u", tmpname, current->pid);
976 dchild = lookup_one_len(fidname, mds->mds_objects_dir, strlen(fidname));
977 if (IS_ERR(dchild)) {
978 CERROR("getting neg dentry for obj: %u\n", tmpname);
979 GOTO(out_pop, rc = PTR_ERR(dchild));
981 if (dchild->d_inode != NULL) {
982 CERROR("impossible non-negative obj dentry: %u\n", tmpname);
986 dchild->d_fsdata = (void *)&dp;
987 dp.ldp_ptr = (void *)DP_LASTGROUP_REVERSE;
989 LOCK_INODE_MUTEX(parent_inode);
990 rc = ll_vfs_create(parent_inode, dchild, S_IFREG | 0666, NULL);
992 oa->o_id = dchild->d_inode->i_ino;
993 oa->o_generation = dchild->d_inode->i_generation;
994 namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
996 new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
998 if (IS_ERR(new_child)) {
999 CERROR("getting neg dentry for obj rename: %d\n", rc);
1000 GOTO(out_dput, rc = PTR_ERR(new_child));
1002 if (new_child->d_inode != NULL) {
1003 CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
1004 oa->o_id, oa->o_generation);
1008 handle = fsfilt_start(exp->exp_obd, mds->mds_objects_dir->d_inode,
1009 FSFILT_OP_RENAME, NULL);
1011 GOTO(out_dput2, rc = PTR_ERR(handle));
1013 rc = ll_vfs_rename(parent_inode, dchild, mds->mds_vfsmnt,
1014 parent_inode, new_child, mds->mds_vfsmnt);
1016 CERROR("error renaming new object "LPU64":%u: rc %d\n",
1017 oa->o_id, oa->o_generation, rc);
1019 err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
1022 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
1029 UNLOCK_INODE_MUTEX(parent_inode);
1031 pop_ctxt(saved, &exp->exp_obd->obd_lvfs_ctxt, &ucred);
1032 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
1036 int mds_obd_destroy(struct obd_export *exp, struct obdo *oa,
1037 struct lov_stripe_md *ea, struct obd_trans_info *oti,
1038 struct obd_export *md_exp)
1040 struct mds_obd *mds = &exp->exp_obd->u.mds;
1041 struct inode *parent_inode = mds->mds_objects_dir->d_inode;
1042 struct obd_device *obd = exp->exp_obd;
1043 struct lvfs_run_ctxt *saved = NULL;
1044 struct lvfs_ucred ucred = { 0 };
1045 char fidname[LL_FID_NAMELEN];
1046 struct inode *inode = NULL;
1049 int err, namelen, rc = 0;
1052 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
1053 if (saved == NULL) {
1054 CERROR("cannot allocate memory for run ctxt\n");
1058 cap_raise(ucred.luc_cap, CAP_SYS_RESOURCE);
1059 push_ctxt(saved, &obd->obd_lvfs_ctxt, &ucred);
1061 namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
1063 LOCK_INODE_MUTEX(parent_inode);
1064 de = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
1068 CERROR("error looking up object "LPU64" %s: rc %d\n",
1069 oa->o_id, fidname, rc);
1072 if (de->d_inode == NULL) {
1073 CERROR("destroying non-existent object "LPU64" %s: rc %d\n",
1074 oa->o_id, fidname, rc);
1075 GOTO(out_dput, rc = -ENOENT);
1078 /* Stripe count is 1 here since this is some MDS specific stuff
1079 that is unlinked, not spanned across multiple OSTs */
1080 handle = fsfilt_start_log(obd, mds->mds_objects_dir->d_inode,
1081 FSFILT_OP_UNLINK, oti, 1);
1084 GOTO(out_dput, rc = PTR_ERR(handle));
1086 /* take a reference to protect inode from truncation within
1087 vfs_unlink() context. bug 10409 */
1088 inode = de->d_inode;
1089 atomic_inc(&inode->i_count);
1090 rc = ll_vfs_unlink(mds->mds_objects_dir->d_inode, de, mds->mds_vfsmnt);
1092 CERROR("error destroying object "LPU64":%u: rc %d\n",
1093 oa->o_id, oa->o_generation, rc);
1095 err = fsfilt_commit(obd, mds->mds_objects_dir->d_inode, handle, 0);
1101 UNLOCK_INODE_MUTEX(parent_inode);
1106 pop_ctxt(saved, &obd->obd_lvfs_ctxt, &ucred);
1107 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);