1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #include <lustre_mds.h>
50 #include <lustre/lustre_idl.h>
51 #include <obd_class.h>
53 #include <lustre_lib.h>
54 #include <lustre_fsfilt.h>
56 #include "mds_internal.h"
58 static void mds_allow_cli(struct obd_device *obd, unsigned long flag);
60 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
62 struct mds_obd *mds = &obd->u.mds;
65 CDEBUG(D_INFO, "dump from %s\n", label);
66 if (mds->mds_lov_page_dirty == NULL) {
67 CERROR("NULL bitmap!\n");
71 for(i=0; i < ((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1) ;i++)
72 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
74 if (mds->mds_lov_page_array == NULL) {
75 CERROR("not init page array!\n");
79 for(i=0; i < MDS_LOV_OBJID_PAGES_COUNT ;i++) {
80 obd_id *data = mds->mds_lov_page_array[i];
85 for(j=0; j < OBJID_PER_PAGE(); j++) {
88 CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
96 int mds_lov_init_objids(struct obd_device *obd)
98 struct mds_obd *mds = &obd->u.mds;
99 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
104 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
106 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
107 if (mds->mds_lov_page_dirty == NULL)
111 OBD_ALLOC(mds->mds_lov_page_array, size);
112 if (mds->mds_lov_page_array == NULL)
113 GOTO(err_free_bitmap, rc = -ENOMEM);
115 /* open and test the lov objd file */
116 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
119 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
120 GOTO(err_free, rc = PTR_ERR(file));
122 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
123 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
124 file->f_dentry->d_inode->i_mode);
125 GOTO(err_open, rc = -ENOENT);
127 mds->mds_lov_objid_filp = file;
131 if (filp_close((struct file *)file, 0))
132 CERROR("can't close %s after error\n", LOV_OBJID);
134 OBD_FREE(mds->mds_lov_page_array, size);
136 FREE_BITMAP(mds->mds_lov_page_dirty);
140 EXPORT_SYMBOL(mds_lov_init_objids);
142 void mds_lov_destroy_objids(struct obd_device *obd)
144 struct mds_obd *mds = &obd->u.mds;
148 if (mds->mds_lov_page_array != NULL) {
149 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
150 obd_id *data = mds->mds_lov_page_array[i];
152 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
154 OBD_FREE(mds->mds_lov_page_array,
155 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
158 if (mds->mds_lov_objid_filp) {
159 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
160 mds->mds_lov_objid_filp = NULL;
162 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
165 FREE_BITMAP(mds->mds_lov_page_dirty);
168 EXPORT_SYMBOL(mds_lov_destroy_objids);
171 * currently exist two ways for know about ost count and max ost index.
172 * first - after ost is connected to mds and sync process finished
173 * second - get from lmm in recovery process, in case when mds not have configs,
174 * and ost isn't registered in mgs.
176 * \param mds pointer to mds structure
177 * \param index maxium ost index
179 * \retval -ENOMEM is not hame memory for new page
180 * \retval 0 is update passed
182 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
184 __u32 page = index / OBJID_PER_PAGE();
185 __u32 off = index % OBJID_PER_PAGE();
186 obd_id *data = mds->mds_lov_page_array[page];
189 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
193 mds->mds_lov_page_array[page] = data;
196 if (index > mds->mds_lov_objid_max_index) {
197 mds->mds_lov_objid_lastpage = page;
198 mds->mds_lov_objid_lastidx = off;
199 mds->mds_lov_objid_max_index = index;
202 /* workaround - New target not in objids file; increase mdsize */
203 /* ld_tgt_count is used as the max index everywhere, despite its name. */
204 if (data[off] == 0) {
208 mds->mds_lov_objid_count++;
209 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
210 mds->mds_lov_objid_count);
212 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
213 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
214 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d "
215 "stripes: %d/%d\n", mds->mds_max_mdsize,
216 mds->mds_max_cookiesize, stripes);
223 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
225 __u32 page = index / OBJID_PER_PAGE();
226 __u32 off = index % OBJID_PER_PAGE();
227 obd_id *data = mds->mds_lov_page_array[page];
229 return (data[off] > 0);
232 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
236 struct lov_ost_data_v1 *lmm_objects;
238 /* if we create file without objects - lmm is NULL */
242 mutex_down(&obd->obd_dev_sem);
243 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)
244 lmm_objects = ((struct lov_mds_md_v3 *)lmm)->lmm_objects;
246 lmm_objects = lmm->lmm_objects;
248 for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
249 __u32 i = le32_to_cpu(lmm_objects[j].l_ost_idx);
250 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
255 mutex_up(&obd->obd_dev_sem);
259 EXPORT_SYMBOL(mds_lov_prepare_objids);
262 * write llog orphan record about lost ost object,
263 * Special lsm is allocated with single stripe, caller should deallocated it
266 static int mds_log_lost_precreated(struct obd_device *obd,
267 struct lov_stripe_md **lsmp, int *stripes,
268 obd_id id, obd_count count, int idx)
270 struct lov_stripe_md *lsm = *lsmp;
275 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
278 /* need only one stripe, save old value */
279 *stripes = lsm->lsm_stripe_count;
280 lsm->lsm_stripe_count = 1;
284 lsm->lsm_oinfo[0]->loi_id = id;
285 lsm->lsm_oinfo[0]->loi_gr = 0; /* needed in 2.0 */
286 lsm->lsm_oinfo[0]->loi_ost_idx = idx;
288 rc = mds_log_op_orphan(obd, lsm, count);
292 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
294 struct mds_obd *mds = &obd->u.mds;
296 struct lov_ost_data_v1 *lmm_objects;
297 #ifndef HAVE_DELAYED_RECOVERY
298 struct lov_stripe_md *lsm = NULL;
303 /* if we create file without objects - lmm is NULL */
307 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)
308 lmm_objects = ((struct lov_mds_md_v3 *)lmm)->lmm_objects;
310 lmm_objects = lmm->lmm_objects;
312 for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
313 __u32 i = le32_to_cpu(lmm_objects[j].l_ost_idx);
314 obd_id id = le64_to_cpu(lmm_objects[j].l_object_id);
315 __u32 page = i / OBJID_PER_PAGE();
316 __u32 idx = i % OBJID_PER_PAGE();
319 data = mds->mds_lov_page_array[page];
321 CDEBUG(D_INODE,"update last object for ost %u"
322 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
323 if (id > data[idx]) {
324 #ifndef HAVE_DELAYED_RECOVERY
325 int lost = id - data[idx] - 1;
326 /* we might have lost precreated objects due to VBR */
327 if (lost > 0 && obd->obd_recovering) {
328 CDEBUG(D_HA, "GAP in objids is %u\n", lost);
329 if (!obd->obd_version_recov)
330 CWARN("Unexpected gap in objids\n");
331 /* lsm is allocated if NULL */
332 mds_log_lost_precreated(obd, &lsm, &stripes,
333 data[idx] + 1, lost, i);
337 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
340 #ifndef HAVE_DELAYED_RECOVERY
342 /* restore stripes number */
343 lsm->lsm_stripe_count = stripes;
344 obd_free_memmd(mds->mds_lov_exp, &lsm);
350 EXPORT_SYMBOL(mds_lov_update_objids);
352 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
358 for(i = 0; i < count; i++) {
362 mds->mds_lov_objid_count++;
363 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
364 mds->mds_lov_objid_count);
366 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
367 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
368 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
369 "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
375 static int mds_lov_read_objids(struct obd_device *obd)
377 struct mds_obd *mds = &obd->u.mds;
379 int i, rc = 0, count = 0, page = 0;
383 /* Read everything in the file, even if our current lov desc
384 has fewer targets. Old targets not in the lov descriptor
385 during mds setup may still have valid objids. */
386 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
390 page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
391 CDEBUG(D_INFO, "file size %d pages %d\n", (int)size, page);
392 for (i = 0; i < page; i++) {
394 loff_t off_old = off;
396 LASSERT(mds->mds_lov_page_array[i] == NULL);
397 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
398 if (mds->mds_lov_page_array[i] == NULL)
399 GOTO(out, rc = -ENOMEM);
401 data = mds->mds_lov_page_array[i];
403 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
404 MDS_LOV_ALLOC_SIZE, &off);
406 CERROR("Error reading objids %d\n", rc);
410 if (off == off_old) /* hole is read */
411 off += MDS_LOV_ALLOC_SIZE;
412 count = (off - off_old) / sizeof(obd_id);
413 if (mds_lov_update_from_read(mds, data, count)) {
414 CERROR("Can't update mds data\n");
415 GOTO(out, rc = -EIO);
418 mds->mds_lov_objid_lastpage = page - 1;
419 mds->mds_lov_objid_lastidx = count - 1;
421 CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
422 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
424 mds_lov_dump_objids("read",obd);
429 int mds_lov_write_objids(struct obd_device *obd)
431 struct mds_obd *mds = &obd->u.mds;
435 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
438 mds_lov_dump_objids("write", obd);
440 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
441 obd_id *data = mds->mds_lov_page_array[i];
442 unsigned int size = MDS_LOV_ALLOC_SIZE;
443 loff_t off = i * size;
445 LASSERT(data != NULL);
447 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
450 /* check for particaly filled last page */
451 if (i == mds->mds_lov_objid_lastpage)
452 size = (mds->mds_lov_objid_lastidx + 1) * sizeof(obd_id);
454 CDEBUG(D_INFO,"write %lld - %u\n", off, size);
455 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
458 cfs_bitmap_set(mds->mds_lov_page_dirty, i);
467 EXPORT_SYMBOL(mds_lov_write_objids);
469 static int mds_lov_get_objid(struct obd_device * obd,
472 struct mds_obd *mds = &obd->u.mds;
473 struct obd_export *lov_exp = mds->mds_lov_exp;
481 LASSERT(lov_exp != NULL);
483 page = idx / OBJID_PER_PAGE();
484 off = idx % OBJID_PER_PAGE();
486 data = mds->mds_lov_page_array[page];
489 /* We never read this lastid; ask the osc */
490 struct obd_id_info lastid;
492 size = sizeof(lastid);
494 lastid.data = &data[off];
495 rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID),
496 KEY_LAST_ID, &size, &lastid, NULL);
500 /* workaround for clean filter */
504 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
506 CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
507 idx, data, page, off, data[off]);
512 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
515 struct obdo oa = { 0 };
516 struct obd_trans_info oti = {0};
517 struct lov_stripe_md *empty_ea = NULL;
520 LASSERT(mds->mds_lov_page_array != NULL);
522 /* This create will in fact either create or destroy: If the OST is
523 * missing objects below this ID, they will be created. If it finds
524 * objects above this ID, they will be removed. */
525 memset(&oa, 0, sizeof(oa));
526 oa.o_flags = OBD_FL_DELORPHAN;
527 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
528 if (ost_uuid != NULL)
529 oti.oti_ost_uuid = ost_uuid;
531 rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
537 static int mds_lov_set_one_nextid(struct obd_device * obd, __u32 idx, obd_id *id)
539 struct mds_obd *mds = &obd->u.mds;
541 struct obd_id_info info;
544 LASSERT(!obd->obd_recovering);
548 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
549 KEY_NEXT_ID, sizeof(info), &info, NULL);
551 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
557 /* Update the lov desc for a new size lov. */
558 static int mds_lov_update_desc(struct obd_device *obd, __u32 index,
559 struct obd_uuid *uuid)
561 struct mds_obd *mds = &obd->u.mds;
563 __u32 valsize = sizeof(mds->mds_lov_desc);
567 OBD_ALLOC(ld, sizeof(*ld));
571 rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
576 /* Don't change the mds_lov_desc until the objids size matches the
578 mds->mds_lov_desc = *ld;
579 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
580 mds->mds_lov_desc.ld_tgt_count, index, uuid->uuid);
582 mutex_down(&obd->obd_dev_sem);
583 rc = mds_lov_update_max_ost(mds, index);
584 mutex_up(&obd->obd_dev_sem);
586 GOTO(out, rc = -ENOMEM);
588 /* If we added a target we have to reconnect the llogs */
589 /* We only _need_ to do this at first add (idx), or the first time
590 after recovery. However, it should now be safe to call anytime. */
591 rc = obd_llog_init(obd, obd, (void *)&index);
594 OBD_FREE(ld, sizeof(*ld));
599 /* Inform MDS about new/updated target */
600 static int mds_lov_update_mds(struct obd_device *obd,
601 struct obd_device *watched,
604 struct mds_obd *mds = &obd->u.mds;
613 /* Don't let anyone else mess with mds_lov_objids now */
614 old_count = mds->mds_lov_desc.ld_tgt_count;
615 LASSERT(mds_lov_objinit(mds, idx));
617 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
618 idx, obd->obd_recovering, obd->obd_async_recov, old_count,
619 mds->mds_lov_desc.ld_tgt_count);
621 /* idx is set as data from lov_notify. */
622 if (obd->obd_recovering)
625 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
626 CERROR("index %d > count %d!\n", idx,
627 mds->mds_lov_desc.ld_tgt_count);
628 GOTO(out, rc = -EINVAL);
631 rc = mds_lov_get_objid(obd, idx);
633 CERROR("Failed to get objid - %d\n", rc);
637 page = idx / OBJID_PER_PAGE();
638 off = idx % OBJID_PER_PAGE();
639 data = mds->mds_lov_page_array[page];
640 /* We have read this lastid from disk; tell the osc.
641 Don't call this during recovery. */
642 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
644 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
645 /* Don't abort the rest of the sync */
649 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
655 /* update the LOV-OSC knowledge of the last used object id's */
656 int mds_lov_connect(struct obd_device *obd, char * lov_name)
658 struct mds_obd *mds = &obd->u.mds;
659 struct lustre_handle conn = {0,};
660 struct obd_connect_data *data;
664 if (IS_ERR(mds->mds_lov_obd))
665 RETURN(PTR_ERR(mds->mds_lov_obd));
667 if (mds->mds_lov_obd)
670 mds->mds_lov_obd = class_name2obd(lov_name);
671 if (!mds->mds_lov_obd) {
672 CERROR("MDS cannot locate LOV %s\n", lov_name);
673 GOTO(error_exit, rc = -ENOTCONN);
676 mutex_down(&obd->obd_dev_sem);
677 rc = mds_lov_read_objids(obd);
678 mutex_up(&obd->obd_dev_sem);
680 CERROR("cannot read lov_objids: rc = %d\n", rc);
681 GOTO(error_exit, rc);
684 /* Deny new client connections until we are sure we have some OSTs */
685 obd->obd_no_conn = 1;
687 rc = obd_register_observer(mds->mds_lov_obd, obd);
689 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
691 GOTO(error_exit, rc);
694 OBD_ALLOC(data, sizeof(*data));
697 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
698 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 | OBD_CONNECT_AT |
699 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_MDS |
700 OBD_CONNECT_SKIP_ORPHAN;
701 #ifdef HAVE_LRU_RESIZE_SUPPORT
702 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
704 data->ocd_version = LUSTRE_VERSION_CODE;
705 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
706 rc = obd_connect(&conn, mds->mds_lov_obd, &obd->obd_uuid, data, &mds->mds_lov_exp);
707 OBD_FREE(data, sizeof(*data));
709 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
710 GOTO(error_exit, rc);
712 /* we not want postrecov in case clean fs, in other cases postrecov will
713 * be called from ldlm. otherwise we can call postrecov twice - in case
719 mds->mds_lov_exp = NULL;
720 mds->mds_lov_obd = ERR_PTR(rc);
724 int mds_lov_disconnect(struct obd_device *obd)
726 struct mds_obd *mds = &obd->u.mds;
730 if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
731 obd_register_observer(mds->mds_lov_obd, NULL);
733 /* The actual disconnect of the mds_lov will be called from
734 * class_disconnect_exports from mds_lov_clean. So we have to
735 * ensure that class_cleanup doesn't fail due to the extra ref
736 * we're holding now. The mechanism to do that already exists -
737 * the obd_force flag. We'll drop the final ref to the
738 * mds_lov_exp in mds_cleanup. */
739 mds->mds_lov_obd->obd_force = 1;
745 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
746 void *karg, void *uarg)
748 static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
749 struct obd_device *obd = exp->exp_obd;
750 struct mds_obd *mds = &obd->u.mds;
751 struct obd_ioctl_data *data = karg;
752 struct lvfs_run_ctxt saved;
756 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
759 case OBD_IOC_RECORD: {
760 char *name = data->ioc_inlbuf1;
761 struct llog_ctxt *ctxt;
763 if (mds->mds_cfg_llh)
766 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
767 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
768 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
771 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
774 mds->mds_cfg_llh = NULL;
775 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
780 case OBD_IOC_ENDRECORD: {
781 if (!mds->mds_cfg_llh)
784 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
785 rc = llog_close(mds->mds_cfg_llh);
786 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
788 mds->mds_cfg_llh = NULL;
792 case OBD_IOC_CLEAR_LOG: {
793 char *name = data->ioc_inlbuf1;
794 struct llog_ctxt *ctxt;
795 if (mds->mds_cfg_llh)
798 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
799 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
800 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
803 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
806 rc = llog_destroy(mds->mds_cfg_llh);
807 llog_free_handle(mds->mds_cfg_llh);
809 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
811 mds->mds_cfg_llh = NULL;
815 case OBD_IOC_DORECORD: {
817 struct llog_rec_hdr rec;
818 if (!mds->mds_cfg_llh)
821 rec.lrh_len = llog_data_len(data->ioc_plen1);
823 if (data->ioc_type == LUSTRE_CFG_TYPE) {
824 rec.lrh_type = OBD_CFG_REC;
826 CERROR("unknown cfg record type:%d \n", data->ioc_type);
830 OBD_ALLOC(cfg_buf, data->ioc_plen1);
833 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
835 OBD_FREE(cfg_buf, data->ioc_plen1);
839 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
840 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
842 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
844 OBD_FREE(cfg_buf, data->ioc_plen1);
848 case OBD_IOC_PARSE: {
849 struct llog_ctxt *ctxt =
850 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
851 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
852 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
853 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
861 case OBD_IOC_DUMP_LOG: {
862 struct llog_ctxt *ctxt =
863 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
864 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
865 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
866 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
875 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
876 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
880 case OBD_IOC_SET_READONLY: {
882 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
883 LCONSOLE_WARN("*** setting obd %s device '%s' read-only ***\n",
884 obd->obd_name, obd->u.obt.obt_sb->s_id);
886 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
888 rc = fsfilt_commit(obd, inode, handle, 1);
890 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
891 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
893 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
897 case OBD_IOC_CATLOGLIST: {
898 int count = mds->mds_lov_desc.ld_tgt_count;
899 rc = llog_catalog_list(obd, count, data);
903 case OBD_IOC_LLOG_CHECK:
904 case OBD_IOC_LLOG_CANCEL:
905 case OBD_IOC_LLOG_REMOVE: {
906 struct llog_ctxt *ctxt =
907 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
910 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
911 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
912 rc = llog_ioctl(ctxt, cmd, data);
913 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
915 rc = obd_llog_init(obd, obd, NULL);
917 rc2 = obd_set_info_async(mds->mds_lov_exp,
918 sizeof(KEY_MDS_CONN), KEY_MDS_CONN,
924 case OBD_IOC_LLOG_INFO:
925 case OBD_IOC_LLOG_PRINT: {
926 struct llog_ctxt *ctxt =
927 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
929 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
930 rc = llog_ioctl(ctxt, cmd, data);
931 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
937 case OBD_IOC_ABORT_RECOVERY:
938 LCONSOLE_WARN("%s: Aborting recovery.\n", obd->obd_name);
939 target_abort_recovery(obd);
940 /* obd_recovering has been changed */
941 mds_allow_cli(obd, 0);
944 case OBD_IOC_GET_OBJ_VERSION: {
945 struct ll_fid *fid = (struct ll_fid *) data->ioc_inlbuf1;
946 struct dentry *dentry;
947 struct lustre_handle lockh;
951 dentry = mds_fid2locked_dentry(obd, fid, NULL, lockm, &lockh,
952 NULL, 0, MDS_INODELOCK_UPDATE);
953 if (IS_ERR(dentry)) {
954 rc = PTR_ERR(dentry);
957 version = fsfilt_get_version(obd, dentry->d_inode);
958 ldlm_lock_decref(&lockh, lockm);
963 *(__u64 *) data->ioc_inlbuf2 = version;
968 CDEBUG(D_INFO, "unknown command %x\n", cmd);
975 /* Collect the preconditions we need to allow client connects */
976 static void mds_allow_cli(struct obd_device *obd, unsigned long flag)
978 if (flag & CONFIG_LOG)
979 obd->u.mds.mds_fl_cfglog = 1;
980 if (flag & CONFIG_SYNC)
981 obd->u.mds.mds_fl_synced = 1;
982 if (flag & CONFIG_TARGET)
983 obd->u.mds.mds_fl_target = 1;
984 if (obd->u.mds.mds_fl_cfglog && obd->u.mds.mds_fl_target &&
985 (!obd->obd_recovering || obd->u.mds.mds_fl_synced))
986 /* Open for clients */
987 obd->obd_no_conn = 0;
990 struct mds_lov_sync_info {
991 struct obd_device *mlsi_obd; /* the lov device to sync */
992 struct obd_device *mlsi_watched; /* target osc */
993 __u32 mlsi_index; /* index of target */
996 /* We only sync one osc at a time, so that we don't have to hold
997 any kind of lock on the whole mds_lov_desc, which may change
998 (grow) as a result of mds_lov_add_ost. This also avoids any
999 kind of mismatch between the lov_desc and the mds_lov_desc,
1000 which are not in lock-step during lov_add_obd */
1001 static int __mds_lov_synchronize(void *data)
1003 struct mds_lov_sync_info *mlsi = data;
1004 struct obd_device *obd = mlsi->mlsi_obd;
1005 struct obd_device *watched = mlsi->mlsi_watched;
1006 struct mds_obd *mds = &obd->u.mds;
1007 struct obd_uuid *uuid;
1008 __u32 idx = mlsi->mlsi_index;
1009 struct llog_ctxt *ctxt;
1013 OBD_FREE(mlsi, sizeof(*mlsi));
1017 uuid = &watched->u.cli.cl_target_uuid;
1020 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
1022 rc = mds_lov_update_mds(obd, watched, idx);
1024 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
1028 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
1029 KEY_MDS_CONN, 0, uuid, NULL);
1033 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1037 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
1039 rc = llog_connect(ctxt, NULL, NULL, uuid);
1040 llog_ctxt_put(ctxt);
1043 CERROR("%s failed at llog_origin_connect: %d\n",
1044 obd_uuid2str(uuid), rc);
1048 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
1049 obd->obd_name, obd_uuid2str(uuid));
1051 if (obd->obd_stopping)
1052 GOTO(out, rc = -ENODEV);
1054 rc = mds_lov_clear_orphans(mds, uuid);
1056 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
1057 obd_uuid2str(uuid), rc);
1064 /* Deactivate it for safety */
1065 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
1067 if (!obd->obd_stopping && mds->mds_lov_obd &&
1068 !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
1069 obd_notify(mds->mds_lov_obd, watched,
1070 OBD_NOTIFY_INACTIVE, NULL);
1072 /* We've successfully synced at least 1 OST and are ready
1073 to handle client requests */
1074 mds_allow_cli(obd, CONFIG_SYNC);
1081 int mds_lov_synchronize(void *data)
1083 struct mds_lov_sync_info *mlsi = data;
1086 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
1087 cfs_daemonize_ctxt(name);
1089 RETURN(__mds_lov_synchronize(data));
1092 int mds_lov_start_synchronize(struct obd_device *obd,
1093 struct obd_device *watched,
1094 void *data, int nonblock)
1096 struct mds_lov_sync_info *mlsi;
1098 struct obd_uuid *uuid;
1102 uuid = &watched->u.cli.cl_target_uuid;
1104 OBD_ALLOC(mlsi, sizeof(*mlsi));
1109 mlsi->mlsi_obd = obd;
1110 mlsi->mlsi_watched = watched;
1111 mlsi->mlsi_index = *(__u32 *)data;
1113 /* Although class_export_get(obd->obd_self_export) would lock
1114 the MDS in place, since it's only a self-export
1115 it doesn't lock the LOV in place. The LOV can be disconnected
1116 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
1117 Simply taking an export ref on the LOV doesn't help, because it's
1118 still disconnected. Taking an obd reference insures that we don't
1119 disconnect the LOV. This of course means a cleanup won't
1120 finish for as long as the sync is blocking. */
1124 /* Synchronize in the background */
1125 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
1126 CLONE_VM | CLONE_FILES);
1128 CERROR("%s: error starting mds_lov_synchronize: %d\n",
1132 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
1133 "thread=%d\n", obd->obd_name,
1134 mlsi->mlsi_index, rc);
1138 rc = __mds_lov_synchronize((void *)mlsi);
1144 int mds_notify(struct obd_device *obd, struct obd_device *watched,
1145 enum obd_notify_event ev, void *data)
1150 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
1153 case OBD_NOTIFY_CREATE:
1154 CWARN("MDS %s: add target %s\n",obd->obd_name,
1155 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1156 /* We still have to fix the lov descriptor for ost's */
1158 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1159 &watched->u.cli.cl_target_uuid);
1161 /* We only handle these: */
1162 case OBD_NOTIFY_ACTIVE:
1163 /* lov want one or more _active_ targets for work */
1164 mds_allow_cli(obd, CONFIG_TARGET);
1165 /* activate event should be pass lov idx as argument */
1166 case OBD_NOTIFY_SYNC:
1167 case OBD_NOTIFY_SYNC_NONBLOCK:
1168 /* sync event should be pass lov idx as argument */
1170 case OBD_NOTIFY_CONFIG:
1171 mds_allow_cli(obd, (unsigned long)data);
1172 /* call this only when config is processed and stale_export_age
1173 * value is configured */
1174 class_disconnect_expired_exports(obd);
1175 /* quota_type has been processed, we can now handle
1176 * incoming quota requests */
1177 QUOTA_MASTER_READY(&obd->u.obt.obt_qctxt);
1182 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1183 CERROR("unexpected notification of %s %s!\n",
1184 watched->obd_type->typ_name, watched->obd_name);
1188 if (obd->obd_recovering) {
1189 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1191 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1192 mds_allow_cli(obd, CONFIG_SYNC);
1196 rc = mds_lov_start_synchronize(obd, watched, data,
1197 !(ev == OBD_NOTIFY_SYNC));
1199 if (likely(obd->obd_stopping == 0))
1200 lquota_recovery(mds_quota_interface_ref, obd);
1205 int mds_get_default_md(struct obd_device *obd, struct lov_user_md *lum)
1207 struct lov_desc *ldesc;
1208 int rc, size = sizeof(*ldesc);
1214 ldesc = &obd->u.mds.mds_lov_desc;
1215 LASSERT(ldesc != NULL);
1217 rc = obd_get_info(obd->u.mds.mds_lov_exp, sizeof(KEY_LOVDESC),
1218 KEY_LOVDESC, &size, ldesc, NULL);
1222 lum->lmm_magic = LOV_MAGIC_V1;
1223 lum->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
1224 lum->lmm_pattern = ldesc->ld_pattern;
1225 lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
1226 lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
1227 lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
1229 RETURN(sizeof(*lum));
1232 /* Convert the on-disk LOV EA structre.
1233 * We always try to convert from an old LOV EA format to the common in-memory
1234 * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1235 * then convert back to the new on-disk format and save it back to disk
1236 * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1237 * to convert it each time this inode is accessed.
1239 * This function is a bit interesting in the error handling. We can safely
1240 * ship the old lmm to the client in case of failure, since it uses the same
1241 * obd_unpackmd() code and can do the conversion if the MDS fails for some
1242 * reason. We will not delete the old lmm data until we have written the
1243 * new format lmm data in fsfilt_set_md(). */
1244 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1245 struct lov_mds_md *lmm, int lmm_size,
1246 __u64 connect_flags)
1248 struct lov_stripe_md *lsm = NULL;
1253 if (((connect_flags & OBD_CONNECT_LOV_V3) == 0) &&
1254 (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)) {
1255 /* client does not support LOV_MAGIC_V3, so we have to convert
1257 * we convert the lmm from v3 to v1
1258 * and return the new size (which is smaller)
1259 * the caller supports this way to return the new size */
1262 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1263 /* lmm_stripe_count for non reg files is not used or -1 */
1264 if (!S_ISREG(inode->i_mode)) {
1265 new_lmm_size = lov_mds_md_size(0, LOV_MAGIC_V1);
1267 __u32 lmm_stripe_count;
1269 lmm_stripe_count = le32_to_cpu(lmm->lmm_stripe_count);
1270 new_lmm_size = lov_mds_md_size(lmm_stripe_count,
1272 /* move the objects to the new place */
1273 memmove(lmm->lmm_objects,
1274 ((struct lov_mds_md_v3 *)lmm)->lmm_objects,
1275 lmm_stripe_count * sizeof(struct lov_ost_data_v1));
1277 /* even if new size is smaller than old one,
1278 * this should not generate memory leak */
1279 RETURN(new_lmm_size);
1282 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 ||
1283 le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3 ||
1284 le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_JOIN)
1287 CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1288 inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1291 rc = obd_unpackmd(obd->u.mds.mds_lov_exp, &lsm, lmm, lmm_size);
1295 rc = obd_packmd(obd->u.mds.mds_lov_exp, &lmm, lsm);
1297 GOTO(conv_free, rc);
1300 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1301 if (IS_ERR(handle)) {
1302 rc = PTR_ERR(handle);
1303 GOTO(conv_free, rc);
1306 rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1308 err = fsfilt_commit(obd, inode, handle, 0);
1310 rc = err ? err : lmm_size;
1311 GOTO(conv_free, rc);
1313 obd_free_memmd(obd->u.mds.mds_lov_exp, &lsm);