1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #include <lustre_mds.h>
50 #include <lustre/lustre_idl.h>
51 #include <obd_class.h>
53 #include <lustre_lib.h>
54 #include <lustre_fsfilt.h>
56 #include "mds_internal.h"
58 static void mds_allow_cli(struct obd_device *obd, unsigned long flag);
60 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
62 struct mds_obd *mds = &obd->u.mds;
65 CDEBUG(D_INFO, "dump from %s\n", label);
66 if (mds->mds_lov_page_dirty == NULL) {
67 CERROR("NULL bitmap!\n");
71 for(i=0; i < ((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1) ;i++)
72 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
74 if (mds->mds_lov_page_array == NULL) {
75 CERROR("not init page array!\n");
79 for(i=0; i < MDS_LOV_OBJID_PAGES_COUNT ;i++) {
80 obd_id *data = mds->mds_lov_page_array[i];
85 for(j=0; j < OBJID_PER_PAGE(); j++) {
88 CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
96 int mds_lov_init_objids(struct obd_device *obd)
98 struct mds_obd *mds = &obd->u.mds;
99 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
104 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
106 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
107 if (mds->mds_lov_page_dirty == NULL)
111 OBD_ALLOC(mds->mds_lov_page_array, size);
112 if (mds->mds_lov_page_array == NULL)
113 GOTO(err_free_bitmap, rc = -ENOMEM);
115 /* open and test the lov objd file */
116 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
119 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
120 GOTO(err_free, rc = PTR_ERR(file));
122 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
123 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
124 file->f_dentry->d_inode->i_mode);
125 GOTO(err_open, rc = -ENOENT);
127 mds->mds_lov_objid_filp = file;
131 if (filp_close((struct file *)file, 0))
132 CERROR("can't close %s after error\n", LOV_OBJID);
134 OBD_FREE(mds->mds_lov_page_array, size);
136 FREE_BITMAP(mds->mds_lov_page_dirty);
140 EXPORT_SYMBOL(mds_lov_init_objids);
142 void mds_lov_destroy_objids(struct obd_device *obd)
144 struct mds_obd *mds = &obd->u.mds;
148 if (mds->mds_lov_page_array != NULL) {
149 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
150 obd_id *data = mds->mds_lov_page_array[i];
152 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
154 OBD_FREE(mds->mds_lov_page_array,
155 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
158 if (mds->mds_lov_objid_filp) {
159 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
160 mds->mds_lov_objid_filp = NULL;
162 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
165 FREE_BITMAP(mds->mds_lov_page_dirty);
168 EXPORT_SYMBOL(mds_lov_destroy_objids);
171 * currently exist two ways for know about ost count and max ost index.
172 * first - after ost is connected to mds and sync process finished
173 * second - get from lmm in recovery process, in case when mds not have configs,
174 * and ost isn't registered in mgs.
176 * \param mds pointer to mds structure
177 * \param index maxium ost index
179 * \retval -ENOMEM is not hame memory for new page
180 * \retval 0 is update passed
182 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
184 __u32 page = index / OBJID_PER_PAGE();
185 __u32 off = index % OBJID_PER_PAGE();
186 obd_id *data = mds->mds_lov_page_array[page];
189 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
193 mds->mds_lov_page_array[page] = data;
196 if (index > mds->mds_lov_objid_max_index) {
197 mds->mds_lov_objid_lastpage = page;
198 mds->mds_lov_objid_lastidx = off;
199 mds->mds_lov_objid_max_index = index;
202 /* workaround - New target not in objids file; increase mdsize */
203 /* ld_tgt_count is used as the max index everywhere, despite its name. */
204 if (data[off] == 0) {
208 mds->mds_lov_objid_count++;
209 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
210 mds->mds_lov_objid_count);
212 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
213 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
214 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d "
215 "stripes: %d/%d\n", mds->mds_max_mdsize,
216 mds->mds_max_cookiesize, stripes);
223 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
227 struct lov_ost_data_v1 *lmm_objects;
229 /* if we create file without objects - lmm is NULL */
233 mutex_down(&obd->obd_dev_sem);
234 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)
235 lmm_objects = ((struct lov_mds_md_v3 *)lmm)->lmm_objects;
237 lmm_objects = lmm->lmm_objects;
239 for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
240 __u32 i = le32_to_cpu(lmm_objects[j].l_ost_idx);
241 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
246 mutex_up(&obd->obd_dev_sem);
250 EXPORT_SYMBOL(mds_lov_prepare_objids);
253 * write llog orphan record about lost ost object,
254 * Special lsm is allocated with single stripe, caller should deallocated it
257 static int mds_log_lost_precreated(struct obd_device *obd,
258 struct lov_stripe_md **lsmp, int *stripes,
259 obd_id id, obd_count count, int idx)
261 struct lov_stripe_md *lsm = *lsmp;
266 rc = obd_alloc_memmd(obd->u.mds.mds_osc_exp, &lsm);
269 /* need only one stripe, save old value */
270 *stripes = lsm->lsm_stripe_count;
271 lsm->lsm_stripe_count = 1;
275 lsm->lsm_oinfo[0]->loi_id = id;
276 lsm->lsm_oinfo[0]->loi_gr = 0; /* needed in 2.0 */
277 lsm->lsm_oinfo[0]->loi_ost_idx = idx;
279 rc = mds_log_op_orphan(obd, lsm, count);
283 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
285 struct mds_obd *mds = &obd->u.mds;
287 struct lov_ost_data_v1 *lmm_objects;
288 #ifndef HAVE_DELAYED_RECOVERY
289 struct lov_stripe_md *lsm = NULL;
294 /* if we create file without objects - lmm is NULL */
298 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)
299 lmm_objects = ((struct lov_mds_md_v3 *)lmm)->lmm_objects;
301 lmm_objects = lmm->lmm_objects;
303 for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
304 __u32 i = le32_to_cpu(lmm_objects[j].l_ost_idx);
305 obd_id id = le64_to_cpu(lmm_objects[j].l_object_id);
306 __u32 page = i / OBJID_PER_PAGE();
307 __u32 idx = i % OBJID_PER_PAGE();
310 data = mds->mds_lov_page_array[page];
312 CDEBUG(D_INODE,"update last object for ost %u"
313 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
314 if (id > data[idx]) {
315 #ifndef HAVE_DELAYED_RECOVERY
316 int lost = id - data[idx] - 1;
317 /* we might have lost precreated objects due to VBR */
318 if (lost > 0 && obd->obd_recovering) {
319 CDEBUG(D_HA, "GAP in objids is %u\n", lost);
320 if (!obd->obd_version_recov)
321 CWARN("Unexpected gap in objids\n");
322 /* lsm is allocated if NULL */
323 mds_log_lost_precreated(obd, &lsm, &stripes,
324 data[idx] + 1, lost, i);
328 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
331 #ifndef HAVE_DELAYED_RECOVERY
333 /* restore stripes number */
334 lsm->lsm_stripe_count = stripes;
335 obd_free_memmd(mds->mds_osc_exp, &lsm);
341 EXPORT_SYMBOL(mds_lov_update_objids);
343 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
349 for(i = 0; i < count; i++) {
353 mds->mds_lov_objid_count++;
354 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
355 mds->mds_lov_objid_count);
357 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
358 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
359 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
360 "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
366 static int mds_lov_read_objids(struct obd_device *obd)
368 struct mds_obd *mds = &obd->u.mds;
370 int i, rc = 0, count = 0, page = 0;
374 /* Read everything in the file, even if our current lov desc
375 has fewer targets. Old targets not in the lov descriptor
376 during mds setup may still have valid objids. */
377 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
381 page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
382 CDEBUG(D_INFO, "file size %d pages %d\n", (int)size, page);
383 for (i = 0; i < page; i++) {
385 loff_t off_old = off;
387 LASSERT(mds->mds_lov_page_array[i] == NULL);
388 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
389 if (mds->mds_lov_page_array[i] == NULL)
390 GOTO(out, rc = -ENOMEM);
392 data = mds->mds_lov_page_array[i];
394 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
395 OBJID_PER_PAGE()*sizeof(obd_id), &off);
397 CERROR("Error reading objids %d\n", rc);
401 count = (off - off_old) / sizeof(obd_id);
402 if (mds_lov_update_from_read(mds, data, count)) {
403 CERROR("Can't update mds data\n");
404 GOTO(out, rc = -EIO);
410 mds->mds_lov_objid_lastpage = i;
411 mds->mds_lov_objid_lastidx = count;
413 CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
414 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
416 mds_lov_dump_objids("read",obd);
421 int mds_lov_write_objids(struct obd_device *obd)
423 struct mds_obd *mds = &obd->u.mds;
427 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
430 mds_lov_dump_objids("write", obd);
432 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
433 obd_id *data = mds->mds_lov_page_array[i];
434 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
435 loff_t off = i * size;
437 LASSERT(data != NULL);
439 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
442 /* check for particaly filled last page */
443 if (i == mds->mds_lov_objid_lastpage)
444 size = (mds->mds_lov_objid_lastidx + 1) * sizeof(obd_id);
446 CDEBUG(D_INFO,"write %lld - %u\n", off, size);
447 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
450 cfs_bitmap_set(mds->mds_lov_page_dirty, i);
459 EXPORT_SYMBOL(mds_lov_write_objids);
461 static int mds_lov_get_objid(struct obd_device * obd,
464 struct mds_obd *mds = &obd->u.mds;
465 struct obd_export *osc_exp = mds->mds_osc_exp;
472 LASSERT(osc_exp != NULL);
474 page = idx / OBJID_PER_PAGE();
475 off = idx % OBJID_PER_PAGE();
477 data = mds->mds_lov_page_array[page];
479 !(osc_exp->exp_connect_flags & OBD_CONNECT_SKIP_ORPHAN)) {
480 /* We never read this lastid; ask the osc */
481 struct obd_id_info lastid;
482 __u32 size = sizeof(lastid);
485 lastid.data = &data[off];
486 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
487 KEY_LAST_ID, &size, &lastid, NULL);
491 /* workaround for clean filter */
495 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
497 CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
498 idx, data, page, off, data[off]);
503 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
506 struct obdo oa = { 0 };
507 struct obd_trans_info oti = {0};
508 struct lov_stripe_md *empty_ea = NULL;
511 LASSERT(mds->mds_lov_page_array != NULL);
513 /* This create will in fact either create or destroy: If the OST is
514 * missing objects below this ID, they will be created. If it finds
515 * objects above this ID, they will be removed. */
516 memset(&oa, 0, sizeof(oa));
517 oa.o_flags = OBD_FL_DELORPHAN;
518 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
519 if (ost_uuid != NULL)
520 oti.oti_ost_uuid = ost_uuid;
522 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
528 static int mds_lov_set_one_nextid(struct obd_device * obd, __u32 idx, obd_id *id)
530 struct mds_obd *mds = &obd->u.mds;
532 struct obd_id_info info;
535 LASSERT(!obd->obd_recovering);
539 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
540 KEY_NEXT_ID, sizeof(info), &info, NULL);
542 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
548 /* Update the lov desc for a new size lov. */
549 static int mds_lov_update_desc(struct obd_device *obd, __u32 index,
550 struct obd_uuid *uuid)
552 struct mds_obd *mds = &obd->u.mds;
554 __u32 valsize = sizeof(mds->mds_lov_desc);
558 OBD_ALLOC(ld, sizeof(*ld));
562 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
567 /* Don't change the mds_lov_desc until the objids size matches the
569 mds->mds_lov_desc = *ld;
570 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
571 mds->mds_lov_desc.ld_tgt_count, index, uuid->uuid);
573 mutex_down(&obd->obd_dev_sem);
574 rc = mds_lov_update_max_ost(mds, index);
575 mutex_up(&obd->obd_dev_sem);
577 GOTO(out, rc = -ENOMEM);
579 /* If we added a target we have to reconnect the llogs */
580 /* We only _need_ to do this at first add (idx), or the first time
581 after recovery. However, it should now be safe to call anytime. */
582 rc = obd_llog_init(obd, obd, (void *)&index);
585 OBD_FREE(ld, sizeof(*ld));
590 /* Inform MDS about new/updated target */
591 static int mds_lov_update_mds(struct obd_device *obd,
592 struct obd_device *watched,
595 struct mds_obd *mds = &obd->u.mds;
604 /* Don't let anyone else mess with mds_lov_objids now */
605 old_count = mds->mds_lov_desc.ld_tgt_count;
606 rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid);
610 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
611 idx, obd->obd_recovering, obd->obd_async_recov, old_count,
612 mds->mds_lov_desc.ld_tgt_count);
614 /* idx is set as data from lov_notify. */
615 if (obd->obd_recovering)
618 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
619 CERROR("index %d > count %d!\n", idx,
620 mds->mds_lov_desc.ld_tgt_count);
621 GOTO(out, rc = -EINVAL);
624 rc = mds_lov_get_objid(obd, idx);
626 CERROR("Failed to get objid - %d\n", rc);
630 page = idx / OBJID_PER_PAGE();
631 off = idx % OBJID_PER_PAGE();
632 data = mds->mds_lov_page_array[page];
633 /* We have read this lastid from disk; tell the osc.
634 Don't call this during recovery. */
635 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
637 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
638 /* Don't abort the rest of the sync */
642 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
648 /* update the LOV-OSC knowledge of the last used object id's */
649 int mds_lov_connect(struct obd_device *obd, char * lov_name)
651 struct mds_obd *mds = &obd->u.mds;
652 struct lustre_handle conn = {0,};
653 struct obd_connect_data *data;
657 if (IS_ERR(mds->mds_osc_obd))
658 RETURN(PTR_ERR(mds->mds_osc_obd));
660 if (mds->mds_osc_obd)
663 mds->mds_osc_obd = class_name2obd(lov_name);
664 if (!mds->mds_osc_obd) {
665 CERROR("MDS cannot locate LOV %s\n", lov_name);
666 GOTO(error_exit, rc = -ENOTCONN);
669 mutex_down(&obd->obd_dev_sem);
670 rc = mds_lov_read_objids(obd);
671 mutex_up(&obd->obd_dev_sem);
673 CERROR("cannot read lov_objids: rc = %d\n", rc);
674 GOTO(error_exit, rc);
677 /* Deny new client connections until we are sure we have some OSTs */
678 obd->obd_no_conn = 1;
680 rc = obd_register_observer(mds->mds_osc_obd, obd);
682 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
684 GOTO(error_exit, rc);
687 rc = obd_llog_init(obd, obd, NULL);
689 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
691 GOTO(error_exit, rc);
694 OBD_ALLOC(data, sizeof(*data));
697 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
698 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 | OBD_CONNECT_AT |
699 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_MDS |
700 OBD_CONNECT_SKIP_ORPHAN;
701 #ifdef HAVE_LRU_RESIZE_SUPPORT
702 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
704 data->ocd_version = LUSTRE_VERSION_CODE;
705 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
706 rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, data, &mds->mds_osc_exp);
707 OBD_FREE(data, sizeof(*data));
709 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
710 GOTO(error_exit, rc);
712 /* we not want postrecov in case clean fs, in other cases postrecov will
713 * be called from ldlm. otherwise we can call postrecov twice - in case
719 mds->mds_osc_exp = NULL;
720 mds->mds_osc_obd = ERR_PTR(rc);
724 int mds_lov_disconnect(struct obd_device *obd)
726 struct mds_obd *mds = &obd->u.mds;
730 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
731 obd_register_observer(mds->mds_osc_obd, NULL);
733 /* The actual disconnect of the mds_lov will be called from
734 * class_disconnect_exports from mds_lov_clean. So we have to
735 * ensure that class_cleanup doesn't fail due to the extra ref
736 * we're holding now. The mechanism to do that already exists -
737 * the obd_force flag. We'll drop the final ref to the
738 * mds_osc_exp in mds_cleanup. */
739 mds->mds_osc_obd->obd_force = 1;
745 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
746 void *karg, void *uarg)
748 static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
749 struct obd_device *obd = exp->exp_obd;
750 struct mds_obd *mds = &obd->u.mds;
751 struct obd_ioctl_data *data = karg;
752 struct lvfs_run_ctxt saved;
756 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
759 case OBD_IOC_RECORD: {
760 char *name = data->ioc_inlbuf1;
761 struct llog_ctxt *ctxt;
763 if (mds->mds_cfg_llh)
766 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
767 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
768 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
771 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
774 mds->mds_cfg_llh = NULL;
775 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
780 case OBD_IOC_ENDRECORD: {
781 if (!mds->mds_cfg_llh)
784 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
785 rc = llog_close(mds->mds_cfg_llh);
786 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
788 mds->mds_cfg_llh = NULL;
792 case OBD_IOC_CLEAR_LOG: {
793 char *name = data->ioc_inlbuf1;
794 struct llog_ctxt *ctxt;
795 if (mds->mds_cfg_llh)
798 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
799 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
800 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
803 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
806 rc = llog_destroy(mds->mds_cfg_llh);
807 llog_free_handle(mds->mds_cfg_llh);
809 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
811 mds->mds_cfg_llh = NULL;
815 case OBD_IOC_DORECORD: {
817 struct llog_rec_hdr rec;
818 if (!mds->mds_cfg_llh)
821 rec.lrh_len = llog_data_len(data->ioc_plen1);
823 if (data->ioc_type == LUSTRE_CFG_TYPE) {
824 rec.lrh_type = OBD_CFG_REC;
826 CERROR("unknown cfg record type:%d \n", data->ioc_type);
830 OBD_ALLOC(cfg_buf, data->ioc_plen1);
833 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
835 OBD_FREE(cfg_buf, data->ioc_plen1);
839 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
840 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
842 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
844 OBD_FREE(cfg_buf, data->ioc_plen1);
848 case OBD_IOC_PARSE: {
849 struct llog_ctxt *ctxt =
850 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
851 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
852 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
853 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
861 case OBD_IOC_DUMP_LOG: {
862 struct llog_ctxt *ctxt =
863 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
864 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
865 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
866 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
875 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
876 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
880 case OBD_IOC_SET_READONLY: {
882 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
883 LCONSOLE_WARN("*** setting obd %s device '%s' read-only ***\n",
884 obd->obd_name, obd->u.obt.obt_sb->s_id);
886 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
888 rc = fsfilt_commit(obd, inode, handle, 1);
890 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
891 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
893 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
897 case OBD_IOC_CATLOGLIST: {
898 int count = mds->mds_lov_desc.ld_tgt_count;
899 rc = llog_catalog_list(obd, count, data);
903 case OBD_IOC_LLOG_CHECK:
904 case OBD_IOC_LLOG_CANCEL:
905 case OBD_IOC_LLOG_REMOVE: {
906 struct llog_ctxt *ctxt =
907 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
910 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
911 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
912 rc = llog_ioctl(ctxt, cmd, data);
913 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
915 rc = obd_llog_init(obd, obd, NULL);
917 rc2 = obd_set_info_async(mds->mds_osc_exp,
918 sizeof(KEY_MDS_CONN), KEY_MDS_CONN,
924 case OBD_IOC_LLOG_INFO:
925 case OBD_IOC_LLOG_PRINT: {
926 struct llog_ctxt *ctxt =
927 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
929 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
930 rc = llog_ioctl(ctxt, cmd, data);
931 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
937 case OBD_IOC_ABORT_RECOVERY:
938 LCONSOLE_WARN("%s: Aborting recovery.\n", obd->obd_name);
939 target_abort_recovery(obd);
940 /* obd_recovering has been changed */
941 mds_allow_cli(obd, 0);
944 case OBD_IOC_GET_OBJ_VERSION: {
945 struct ll_fid *fid = (struct ll_fid *) data->ioc_inlbuf1;
946 struct dentry *dentry;
947 struct lustre_handle lockh;
951 dentry = mds_fid2locked_dentry(obd, fid, NULL, lockm, &lockh,
952 NULL, 0, MDS_INODELOCK_UPDATE);
953 if (IS_ERR(dentry)) {
954 rc = PTR_ERR(dentry);
957 version = fsfilt_get_version(obd, dentry->d_inode);
958 ldlm_lock_decref(&lockh, lockm);
963 *(__u64 *) data->ioc_inlbuf2 = version;
968 CDEBUG(D_INFO, "unknown command %x\n", cmd);
975 /* Collect the preconditions we need to allow client connects */
976 static void mds_allow_cli(struct obd_device *obd, unsigned long flag)
978 if (flag & CONFIG_LOG)
979 obd->u.mds.mds_fl_cfglog = 1;
980 if (flag & CONFIG_SYNC)
981 obd->u.mds.mds_fl_synced = 1;
982 if (flag & CONFIG_TARGET)
983 obd->u.mds.mds_fl_target = 1;
984 if (obd->u.mds.mds_fl_cfglog && obd->u.mds.mds_fl_target &&
985 (!obd->obd_recovering || obd->u.mds.mds_fl_synced))
986 /* Open for clients */
987 obd->obd_no_conn = 0;
990 struct mds_lov_sync_info {
991 struct obd_device *mlsi_obd; /* the lov device to sync */
992 struct obd_device *mlsi_watched; /* target osc */
993 __u32 mlsi_index; /* index of target */
996 /* We only sync one osc at a time, so that we don't have to hold
997 any kind of lock on the whole mds_lov_desc, which may change
998 (grow) as a result of mds_lov_add_ost. This also avoids any
999 kind of mismatch between the lov_desc and the mds_lov_desc,
1000 which are not in lock-step during lov_add_obd */
1001 static int __mds_lov_synchronize(void *data)
1003 struct mds_lov_sync_info *mlsi = data;
1004 struct obd_device *obd = mlsi->mlsi_obd;
1005 struct obd_device *watched = mlsi->mlsi_watched;
1006 struct mds_obd *mds = &obd->u.mds;
1007 struct obd_uuid *uuid;
1008 __u32 idx = mlsi->mlsi_index;
1009 struct llog_ctxt *ctxt;
1013 OBD_FREE(mlsi, sizeof(*mlsi));
1017 uuid = &watched->u.cli.cl_target_uuid;
1020 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
1022 rc = mds_lov_update_mds(obd, watched, idx);
1024 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
1028 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
1029 KEY_MDS_CONN, 0, uuid, NULL);
1033 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1037 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
1039 rc = llog_connect(ctxt, NULL, NULL, uuid);
1040 llog_ctxt_put(ctxt);
1043 CERROR("%s failed at llog_origin_connect: %d\n",
1044 obd_uuid2str(uuid), rc);
1048 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
1049 obd->obd_name, obd_uuid2str(uuid));
1051 if (obd->obd_stopping)
1052 GOTO(out, rc = -ENODEV);
1054 rc = mds_lov_clear_orphans(mds, uuid);
1056 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
1057 obd_uuid2str(uuid), rc);
1064 /* Deactivate it for safety */
1065 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
1067 if (!obd->obd_stopping && mds->mds_osc_obd &&
1068 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
1069 obd_notify(mds->mds_osc_obd, watched,
1070 OBD_NOTIFY_INACTIVE, NULL);
1072 /* We've successfully synced at least 1 OST and are ready
1073 to handle client requests */
1074 mds_allow_cli(obd, CONFIG_SYNC);
1081 int mds_lov_synchronize(void *data)
1083 struct mds_lov_sync_info *mlsi = data;
1086 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
1087 cfs_daemonize_ctxt(name);
1089 RETURN(__mds_lov_synchronize(data));
1092 int mds_lov_start_synchronize(struct obd_device *obd,
1093 struct obd_device *watched,
1094 void *data, int nonblock)
1096 struct mds_lov_sync_info *mlsi;
1098 struct obd_uuid *uuid;
1102 uuid = &watched->u.cli.cl_target_uuid;
1104 OBD_ALLOC(mlsi, sizeof(*mlsi));
1109 mlsi->mlsi_obd = obd;
1110 mlsi->mlsi_watched = watched;
1111 mlsi->mlsi_index = *(__u32 *)data;
1113 /* Although class_export_get(obd->obd_self_export) would lock
1114 the MDS in place, since it's only a self-export
1115 it doesn't lock the LOV in place. The LOV can be disconnected
1116 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
1117 Simply taking an export ref on the LOV doesn't help, because it's
1118 still disconnected. Taking an obd reference insures that we don't
1119 disconnect the LOV. This of course means a cleanup won't
1120 finish for as long as the sync is blocking. */
1124 /* Synchronize in the background */
1125 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
1126 CLONE_VM | CLONE_FILES);
1128 CERROR("%s: error starting mds_lov_synchronize: %d\n",
1132 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
1133 "thread=%d\n", obd->obd_name,
1134 mlsi->mlsi_index, rc);
1138 rc = __mds_lov_synchronize((void *)mlsi);
1144 int mds_notify(struct obd_device *obd, struct obd_device *watched,
1145 enum obd_notify_event ev, void *data)
1150 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
1153 /* We only handle these: */
1154 case OBD_NOTIFY_ACTIVE:
1155 /* lov want one or more _active_ targets for work */
1156 mds_allow_cli(obd, CONFIG_TARGET);
1157 /* activate event should be pass lov idx as argument */
1158 case OBD_NOTIFY_SYNC:
1159 case OBD_NOTIFY_SYNC_NONBLOCK:
1160 /* sync event should be pass lov idx as argument */
1162 case OBD_NOTIFY_CONFIG:
1163 mds_allow_cli(obd, (unsigned long)data);
1164 /* call this only when config is processed and stale_export_age
1165 * value is configured */
1166 class_disconnect_expired_exports(obd);
1167 /* quota_type has been processed, we can now handle
1168 * incoming quota requests */
1169 QUOTA_MASTER_READY(&obd->u.obt.obt_qctxt);
1174 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1175 CERROR("unexpected notification of %s %s!\n",
1176 watched->obd_type->typ_name, watched->obd_name);
1180 if (obd->obd_recovering) {
1181 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1183 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1184 /* We still have to fix the lov descriptor for ost's added
1185 after the mdt in the config log. They didn't make it into
1188 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1189 &watched->u.cli.cl_target_uuid);
1191 mds_allow_cli(obd, CONFIG_SYNC);
1195 rc = mds_lov_start_synchronize(obd, watched, data,
1196 !(ev == OBD_NOTIFY_SYNC));
1198 if (likely(obd->obd_stopping == 0))
1199 lquota_recovery(mds_quota_interface_ref, obd);
1204 int mds_get_default_md(struct obd_device *obd, struct lov_mds_md *lmm,
1207 struct lov_desc *ldesc;
1210 ldesc = &obd->u.mds.mds_lov_desc;
1211 LASSERT(ldesc != NULL);
1216 lmm->lmm_magic = LOV_MAGIC_V1;
1217 lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
1218 lmm->lmm_pattern = ldesc->ld_pattern;
1219 lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
1220 lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
1221 *size = sizeof(struct lov_mds_md);
1223 RETURN(sizeof(struct lov_mds_md));
1226 /* Convert the on-disk LOV EA structre.
1227 * We always try to convert from an old LOV EA format to the common in-memory
1228 * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1229 * then convert back to the new on-disk format and save it back to disk
1230 * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1231 * to convert it each time this inode is accessed.
1233 * This function is a bit interesting in the error handling. We can safely
1234 * ship the old lmm to the client in case of failure, since it uses the same
1235 * obd_unpackmd() code and can do the conversion if the MDS fails for some
1236 * reason. We will not delete the old lmm data until we have written the
1237 * new format lmm data in fsfilt_set_md(). */
1238 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1239 struct lov_mds_md *lmm, int lmm_size,
1240 __u64 connect_flags)
1242 struct lov_stripe_md *lsm = NULL;
1247 if (((connect_flags & OBD_CONNECT_LOV_V3) == 0) &&
1248 (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)) {
1249 /* client does not support LOV_MAGIC_V3, so we have to convert
1251 * we convert the lmm from v3 to v1
1252 * and return the new size (which is smaller)
1253 * the caller supports this way to return the new size */
1256 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1257 /* lmm_stripe_count for non reg files is not used or -1 */
1258 if (!S_ISREG(inode->i_mode)) {
1259 new_lmm_size = lov_mds_md_size(0, LOV_MAGIC_V1);
1261 __u32 lmm_stripe_count;
1263 lmm_stripe_count = le32_to_cpu(lmm->lmm_stripe_count);
1264 new_lmm_size = lov_mds_md_size(lmm_stripe_count,
1266 /* move the objects to the new place */
1267 memmove(lmm->lmm_objects,
1268 ((struct lov_mds_md_v3 *)lmm)->lmm_objects,
1269 lmm_stripe_count * sizeof(struct lov_ost_data_v1));
1271 /* even if new size is smaller than old one,
1272 * this should not generate memory leak */
1273 RETURN(new_lmm_size);
1276 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 ||
1277 le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3 ||
1278 le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_JOIN)
1281 CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1282 inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1285 rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
1289 rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
1291 GOTO(conv_free, rc);
1294 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1295 if (IS_ERR(handle)) {
1296 rc = PTR_ERR(handle);
1297 GOTO(conv_free, rc);
1300 rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1302 err = fsfilt_commit(obd, inode, handle, 0);
1304 rc = err ? err : lmm_size;
1305 GOTO(conv_free, rc);
1307 obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);