1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_MDS
48 #include <linux/module.h>
49 #include <lustre_mds.h>
50 #include <lustre/lustre_idl.h>
51 #include <obd_class.h>
53 #include <lustre_lib.h>
54 #include <lustre_fsfilt.h>
56 #include "mds_internal.h"
58 static void mds_allow_cli(struct obd_device *obd, unsigned long flag);
60 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
62 struct mds_obd *mds = &obd->u.mds;
65 CDEBUG(D_INFO, "dump from %s\n", label);
66 if (mds->mds_lov_page_dirty == NULL) {
67 CERROR("NULL bitmap!\n");
71 for(i=0; i < ((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1) ;i++)
72 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
74 if (mds->mds_lov_page_array == NULL) {
75 CERROR("not init page array!\n");
79 for(i=0; i < MDS_LOV_OBJID_PAGES_COUNT ;i++) {
80 obd_id *data = mds->mds_lov_page_array[i];
85 for(j=0; j < OBJID_PER_PAGE(); j++) {
88 CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
96 int mds_lov_init_objids(struct obd_device *obd)
98 struct mds_obd *mds = &obd->u.mds;
99 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
104 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
106 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
107 if (mds->mds_lov_page_dirty == NULL)
111 OBD_ALLOC(mds->mds_lov_page_array, size);
112 if (mds->mds_lov_page_array == NULL)
113 GOTO(err_free_bitmap, rc = -ENOMEM);
115 /* open and test the lov objd file */
116 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
119 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
120 GOTO(err_free, rc = PTR_ERR(file));
122 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
123 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
124 file->f_dentry->d_inode->i_mode);
125 GOTO(err_open, rc = -ENOENT);
127 mds->mds_lov_objid_filp = file;
131 if (filp_close((struct file *)file, 0))
132 CERROR("can't close %s after error\n", LOV_OBJID);
134 OBD_FREE(mds->mds_lov_page_array, size);
136 FREE_BITMAP(mds->mds_lov_page_dirty);
140 EXPORT_SYMBOL(mds_lov_init_objids);
142 void mds_lov_destroy_objids(struct obd_device *obd)
144 struct mds_obd *mds = &obd->u.mds;
148 if (mds->mds_lov_page_array != NULL) {
149 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
150 obd_id *data = mds->mds_lov_page_array[i];
152 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
154 OBD_FREE(mds->mds_lov_page_array,
155 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
158 if (mds->mds_lov_objid_filp) {
159 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
160 mds->mds_lov_objid_filp = NULL;
162 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
165 FREE_BITMAP(mds->mds_lov_page_dirty);
168 EXPORT_SYMBOL(mds_lov_destroy_objids);
171 * currently exist two ways for know about ost count and max ost index.
172 * first - after ost is connected to mds and sync process finished
173 * second - get from lmm in recovery process, in case when mds not have configs,
174 * and ost isn't registered in mgs.
176 * \param mds pointer to mds structure
177 * \param index maxium ost index
179 * \retval -ENOMEM is not hame memory for new page
180 * \retval 0 is update passed
182 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
184 __u32 page = index / OBJID_PER_PAGE();
185 __u32 off = index % OBJID_PER_PAGE();
186 obd_id *data = mds->mds_lov_page_array[page];
189 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
193 mds->mds_lov_page_array[page] = data;
196 if (index > mds->mds_lov_objid_max_index) {
197 mds->mds_lov_objid_lastpage = page;
198 mds->mds_lov_objid_lastidx = off;
199 mds->mds_lov_objid_max_index = index;
202 /* workaround - New target not in objids file; increase mdsize */
203 /* ld_tgt_count is used as the max index everywhere, despite its name. */
204 if (data[off] == 0) {
208 mds->mds_lov_objid_count++;
209 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
210 mds->mds_lov_objid_count);
212 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
213 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
214 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d "
215 "stripes: %d/%d\n", mds->mds_max_mdsize,
216 mds->mds_max_cookiesize, stripes);
223 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
225 __u32 page = index / OBJID_PER_PAGE();
226 __u32 off = index % OBJID_PER_PAGE();
227 obd_id *data = mds->mds_lov_page_array[page];
229 return (data[off] > 0);
232 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
236 struct lov_ost_data_v1 *lmm_objects;
238 /* if we create file without objects - lmm is NULL */
242 mutex_down(&obd->obd_dev_sem);
243 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)
244 lmm_objects = ((struct lov_mds_md_v3 *)lmm)->lmm_objects;
246 lmm_objects = lmm->lmm_objects;
248 for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
249 __u32 i = le32_to_cpu(lmm_objects[j].l_ost_idx);
250 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
255 mutex_up(&obd->obd_dev_sem);
259 EXPORT_SYMBOL(mds_lov_prepare_objids);
262 * write llog orphan record about lost ost object,
263 * Special lsm is allocated with single stripe, caller should deallocated it
266 static int mds_log_lost_precreated(struct obd_device *obd,
267 struct lov_stripe_md **lsmp, int *stripes,
268 obd_id id, obd_count count, int idx)
270 struct lov_stripe_md *lsm = *lsmp;
275 rc = obd_alloc_memmd(obd->u.mds.mds_osc_exp, &lsm);
278 /* need only one stripe, save old value */
279 *stripes = lsm->lsm_stripe_count;
280 lsm->lsm_stripe_count = 1;
284 lsm->lsm_oinfo[0]->loi_id = id;
285 lsm->lsm_oinfo[0]->loi_gr = 0; /* needed in 2.0 */
286 lsm->lsm_oinfo[0]->loi_ost_idx = idx;
288 rc = mds_log_op_orphan(obd, lsm, count);
292 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
294 struct mds_obd *mds = &obd->u.mds;
296 struct lov_ost_data_v1 *lmm_objects;
297 #ifndef HAVE_DELAYED_RECOVERY
298 struct lov_stripe_md *lsm = NULL;
303 /* if we create file without objects - lmm is NULL */
307 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)
308 lmm_objects = ((struct lov_mds_md_v3 *)lmm)->lmm_objects;
310 lmm_objects = lmm->lmm_objects;
312 for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
313 __u32 i = le32_to_cpu(lmm_objects[j].l_ost_idx);
314 obd_id id = le64_to_cpu(lmm_objects[j].l_object_id);
315 __u32 page = i / OBJID_PER_PAGE();
316 __u32 idx = i % OBJID_PER_PAGE();
319 data = mds->mds_lov_page_array[page];
321 CDEBUG(D_INODE,"update last object for ost %u"
322 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
323 if (id > data[idx]) {
324 #ifndef HAVE_DELAYED_RECOVERY
325 int lost = id - data[idx] - 1;
326 /* we might have lost precreated objects due to VBR */
327 if (lost > 0 && obd->obd_recovering) {
328 CDEBUG(D_HA, "GAP in objids is %u\n", lost);
329 if (!obd->obd_version_recov)
330 CWARN("Unexpected gap in objids\n");
331 /* lsm is allocated if NULL */
332 mds_log_lost_precreated(obd, &lsm, &stripes,
333 data[idx] + 1, lost, i);
337 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
340 #ifndef HAVE_DELAYED_RECOVERY
342 /* restore stripes number */
343 lsm->lsm_stripe_count = stripes;
344 obd_free_memmd(mds->mds_osc_exp, &lsm);
350 EXPORT_SYMBOL(mds_lov_update_objids);
352 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
358 for(i = 0; i < count; i++) {
362 mds->mds_lov_objid_count++;
363 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
364 mds->mds_lov_objid_count);
366 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
367 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
368 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
369 "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
375 static int mds_lov_read_objids(struct obd_device *obd)
377 struct mds_obd *mds = &obd->u.mds;
379 int i, rc = 0, count = 0, page = 0;
383 /* Read everything in the file, even if our current lov desc
384 has fewer targets. Old targets not in the lov descriptor
385 during mds setup may still have valid objids. */
386 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
390 page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
391 CDEBUG(D_INFO, "file size %d pages %d\n", (int)size, page);
392 for (i = 0; i < page; i++) {
394 loff_t off_old = off;
396 LASSERT(mds->mds_lov_page_array[i] == NULL);
397 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
398 if (mds->mds_lov_page_array[i] == NULL)
399 GOTO(out, rc = -ENOMEM);
401 data = mds->mds_lov_page_array[i];
403 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
404 OBJID_PER_PAGE()*sizeof(obd_id), &off);
406 CERROR("Error reading objids %d\n", rc);
410 count = (off - off_old) / sizeof(obd_id);
411 if (mds_lov_update_from_read(mds, data, count)) {
412 CERROR("Can't update mds data\n");
413 GOTO(out, rc = -EIO);
419 mds->mds_lov_objid_lastpage = i;
420 mds->mds_lov_objid_lastidx = count;
422 CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
423 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
425 mds_lov_dump_objids("read",obd);
430 int mds_lov_write_objids(struct obd_device *obd)
432 struct mds_obd *mds = &obd->u.mds;
436 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
439 mds_lov_dump_objids("write", obd);
441 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
442 obd_id *data = mds->mds_lov_page_array[i];
443 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
444 loff_t off = i * size;
446 LASSERT(data != NULL);
448 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
451 /* check for particaly filled last page */
452 if (i == mds->mds_lov_objid_lastpage)
453 size = (mds->mds_lov_objid_lastidx + 1) * sizeof(obd_id);
455 CDEBUG(D_INFO,"write %lld - %u\n", off, size);
456 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
459 cfs_bitmap_set(mds->mds_lov_page_dirty, i);
468 EXPORT_SYMBOL(mds_lov_write_objids);
470 static int mds_lov_get_objid(struct obd_device * obd,
473 struct mds_obd *mds = &obd->u.mds;
474 struct obd_export *osc_exp = mds->mds_osc_exp;
481 LASSERT(osc_exp != NULL);
483 page = idx / OBJID_PER_PAGE();
484 off = idx % OBJID_PER_PAGE();
486 data = mds->mds_lov_page_array[page];
488 !(osc_exp->exp_connect_flags & OBD_CONNECT_SKIP_ORPHAN)) {
489 /* We never read this lastid; ask the osc */
490 struct obd_id_info lastid;
491 __u32 size = sizeof(lastid);
494 lastid.data = &data[off];
495 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
496 KEY_LAST_ID, &size, &lastid, NULL);
500 /* workaround for clean filter */
504 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
506 CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
507 idx, data, page, off, data[off]);
512 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
515 struct obdo oa = { 0 };
516 struct obd_trans_info oti = {0};
517 struct lov_stripe_md *empty_ea = NULL;
520 LASSERT(mds->mds_lov_page_array != NULL);
522 /* This create will in fact either create or destroy: If the OST is
523 * missing objects below this ID, they will be created. If it finds
524 * objects above this ID, they will be removed. */
525 memset(&oa, 0, sizeof(oa));
526 oa.o_flags = OBD_FL_DELORPHAN;
527 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
528 if (ost_uuid != NULL)
529 oti.oti_ost_uuid = ost_uuid;
531 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
537 static int mds_lov_set_one_nextid(struct obd_device * obd, __u32 idx, obd_id *id)
539 struct mds_obd *mds = &obd->u.mds;
541 struct obd_id_info info;
544 LASSERT(!obd->obd_recovering);
548 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
549 KEY_NEXT_ID, sizeof(info), &info, NULL);
551 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
557 /* Update the lov desc for a new size lov. */
558 static int mds_lov_update_desc(struct obd_device *obd, __u32 index,
559 struct obd_uuid *uuid)
561 struct mds_obd *mds = &obd->u.mds;
563 __u32 valsize = sizeof(mds->mds_lov_desc);
567 OBD_ALLOC(ld, sizeof(*ld));
571 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
576 /* Don't change the mds_lov_desc until the objids size matches the
578 mds->mds_lov_desc = *ld;
579 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
580 mds->mds_lov_desc.ld_tgt_count, index, uuid->uuid);
582 mutex_down(&obd->obd_dev_sem);
583 rc = mds_lov_update_max_ost(mds, index);
584 mutex_up(&obd->obd_dev_sem);
586 GOTO(out, rc = -ENOMEM);
588 /* If we added a target we have to reconnect the llogs */
589 /* We only _need_ to do this at first add (idx), or the first time
590 after recovery. However, it should now be safe to call anytime. */
591 rc = obd_llog_init(obd, obd, (void *)&index);
594 OBD_FREE(ld, sizeof(*ld));
599 /* Inform MDS about new/updated target */
600 static int mds_lov_update_mds(struct obd_device *obd,
601 struct obd_device *watched,
604 struct mds_obd *mds = &obd->u.mds;
613 /* Don't let anyone else mess with mds_lov_objids now */
614 old_count = mds->mds_lov_desc.ld_tgt_count;
615 LASSERT(mds_lov_objinit(mds, idx));
617 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
618 idx, obd->obd_recovering, obd->obd_async_recov, old_count,
619 mds->mds_lov_desc.ld_tgt_count);
621 /* idx is set as data from lov_notify. */
622 if (obd->obd_recovering)
625 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
626 CERROR("index %d > count %d!\n", idx,
627 mds->mds_lov_desc.ld_tgt_count);
628 GOTO(out, rc = -EINVAL);
631 rc = mds_lov_get_objid(obd, idx);
633 CERROR("Failed to get objid - %d\n", rc);
637 page = idx / OBJID_PER_PAGE();
638 off = idx % OBJID_PER_PAGE();
639 data = mds->mds_lov_page_array[page];
640 /* We have read this lastid from disk; tell the osc.
641 Don't call this during recovery. */
642 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
644 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
645 /* Don't abort the rest of the sync */
649 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
655 /* update the LOV-OSC knowledge of the last used object id's */
656 int mds_lov_connect(struct obd_device *obd, char * lov_name)
658 struct mds_obd *mds = &obd->u.mds;
659 struct lustre_handle conn = {0,};
660 struct obd_connect_data *data;
664 if (IS_ERR(mds->mds_osc_obd))
665 RETURN(PTR_ERR(mds->mds_osc_obd));
667 if (mds->mds_osc_obd)
670 mds->mds_osc_obd = class_name2obd(lov_name);
671 if (!mds->mds_osc_obd) {
672 CERROR("MDS cannot locate LOV %s\n", lov_name);
673 GOTO(error_exit, rc = -ENOTCONN);
676 mutex_down(&obd->obd_dev_sem);
677 rc = mds_lov_read_objids(obd);
678 mutex_up(&obd->obd_dev_sem);
680 CERROR("cannot read lov_objids: rc = %d\n", rc);
681 GOTO(error_exit, rc);
684 /* Deny new client connections until we are sure we have some OSTs */
685 obd->obd_no_conn = 1;
687 rc = obd_register_observer(mds->mds_osc_obd, obd);
689 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
691 GOTO(error_exit, rc);
694 /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
696 obd_notify(obd->u.mds.mds_osc_obd, NULL, OBD_NOTIFY_CREATE, NULL);
698 OBD_ALLOC(data, sizeof(*data));
701 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
702 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 | OBD_CONNECT_AT |
703 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_MDS |
704 OBD_CONNECT_SKIP_ORPHAN;
705 #ifdef HAVE_LRU_RESIZE_SUPPORT
706 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
708 data->ocd_version = LUSTRE_VERSION_CODE;
709 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
710 rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, data, &mds->mds_osc_exp);
711 OBD_FREE(data, sizeof(*data));
713 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
714 GOTO(error_exit, rc);
716 /* we not want postrecov in case clean fs, in other cases postrecov will
717 * be called from ldlm. otherwise we can call postrecov twice - in case
723 mds->mds_osc_exp = NULL;
724 mds->mds_osc_obd = ERR_PTR(rc);
728 int mds_lov_disconnect(struct obd_device *obd)
730 struct mds_obd *mds = &obd->u.mds;
734 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
735 obd_register_observer(mds->mds_osc_obd, NULL);
737 /* The actual disconnect of the mds_lov will be called from
738 * class_disconnect_exports from mds_lov_clean. So we have to
739 * ensure that class_cleanup doesn't fail due to the extra ref
740 * we're holding now. The mechanism to do that already exists -
741 * the obd_force flag. We'll drop the final ref to the
742 * mds_osc_exp in mds_cleanup. */
743 mds->mds_osc_obd->obd_force = 1;
749 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
750 void *karg, void *uarg)
752 static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
753 struct obd_device *obd = exp->exp_obd;
754 struct mds_obd *mds = &obd->u.mds;
755 struct obd_ioctl_data *data = karg;
756 struct lvfs_run_ctxt saved;
760 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
763 case OBD_IOC_RECORD: {
764 char *name = data->ioc_inlbuf1;
765 struct llog_ctxt *ctxt;
767 if (mds->mds_cfg_llh)
770 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
771 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
772 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
775 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
778 mds->mds_cfg_llh = NULL;
779 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
784 case OBD_IOC_ENDRECORD: {
785 if (!mds->mds_cfg_llh)
788 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
789 rc = llog_close(mds->mds_cfg_llh);
790 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
792 mds->mds_cfg_llh = NULL;
796 case OBD_IOC_CLEAR_LOG: {
797 char *name = data->ioc_inlbuf1;
798 struct llog_ctxt *ctxt;
799 if (mds->mds_cfg_llh)
802 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
803 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
804 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
807 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
810 rc = llog_destroy(mds->mds_cfg_llh);
811 llog_free_handle(mds->mds_cfg_llh);
813 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
815 mds->mds_cfg_llh = NULL;
819 case OBD_IOC_DORECORD: {
821 struct llog_rec_hdr rec;
822 if (!mds->mds_cfg_llh)
825 rec.lrh_len = llog_data_len(data->ioc_plen1);
827 if (data->ioc_type == LUSTRE_CFG_TYPE) {
828 rec.lrh_type = OBD_CFG_REC;
830 CERROR("unknown cfg record type:%d \n", data->ioc_type);
834 OBD_ALLOC(cfg_buf, data->ioc_plen1);
837 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
839 OBD_FREE(cfg_buf, data->ioc_plen1);
843 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
844 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
846 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
848 OBD_FREE(cfg_buf, data->ioc_plen1);
852 case OBD_IOC_PARSE: {
853 struct llog_ctxt *ctxt =
854 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
855 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
856 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
857 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
865 case OBD_IOC_DUMP_LOG: {
866 struct llog_ctxt *ctxt =
867 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
868 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
869 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
870 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
879 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
880 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
884 case OBD_IOC_SET_READONLY: {
886 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
887 LCONSOLE_WARN("*** setting obd %s device '%s' read-only ***\n",
888 obd->obd_name, obd->u.obt.obt_sb->s_id);
890 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
892 rc = fsfilt_commit(obd, inode, handle, 1);
894 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
895 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
897 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
901 case OBD_IOC_CATLOGLIST: {
902 int count = mds->mds_lov_desc.ld_tgt_count;
903 rc = llog_catalog_list(obd, count, data);
907 case OBD_IOC_LLOG_CHECK:
908 case OBD_IOC_LLOG_CANCEL:
909 case OBD_IOC_LLOG_REMOVE: {
910 struct llog_ctxt *ctxt =
911 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
914 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
915 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
916 rc = llog_ioctl(ctxt, cmd, data);
917 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
919 rc = obd_llog_init(obd, obd, NULL);
921 rc2 = obd_set_info_async(mds->mds_osc_exp,
922 sizeof(KEY_MDS_CONN), KEY_MDS_CONN,
928 case OBD_IOC_LLOG_INFO:
929 case OBD_IOC_LLOG_PRINT: {
930 struct llog_ctxt *ctxt =
931 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
933 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
934 rc = llog_ioctl(ctxt, cmd, data);
935 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
941 case OBD_IOC_ABORT_RECOVERY:
942 LCONSOLE_WARN("%s: Aborting recovery.\n", obd->obd_name);
943 target_abort_recovery(obd);
944 /* obd_recovering has been changed */
945 mds_allow_cli(obd, 0);
948 case OBD_IOC_GET_OBJ_VERSION: {
949 struct ll_fid *fid = (struct ll_fid *) data->ioc_inlbuf1;
950 struct dentry *dentry;
951 struct lustre_handle lockh;
955 dentry = mds_fid2locked_dentry(obd, fid, NULL, lockm, &lockh,
956 NULL, 0, MDS_INODELOCK_UPDATE);
957 if (IS_ERR(dentry)) {
958 rc = PTR_ERR(dentry);
961 version = fsfilt_get_version(obd, dentry->d_inode);
962 ldlm_lock_decref(&lockh, lockm);
967 *(__u64 *) data->ioc_inlbuf2 = version;
972 CDEBUG(D_INFO, "unknown command %x\n", cmd);
979 /* Collect the preconditions we need to allow client connects */
980 static void mds_allow_cli(struct obd_device *obd, unsigned long flag)
982 if (flag & CONFIG_LOG)
983 obd->u.mds.mds_fl_cfglog = 1;
984 if (flag & CONFIG_SYNC)
985 obd->u.mds.mds_fl_synced = 1;
986 if (flag & CONFIG_TARGET)
987 obd->u.mds.mds_fl_target = 1;
988 if (obd->u.mds.mds_fl_cfglog && obd->u.mds.mds_fl_target &&
989 (!obd->obd_recovering || obd->u.mds.mds_fl_synced))
990 /* Open for clients */
991 obd->obd_no_conn = 0;
994 struct mds_lov_sync_info {
995 struct obd_device *mlsi_obd; /* the lov device to sync */
996 struct obd_device *mlsi_watched; /* target osc */
997 __u32 mlsi_index; /* index of target */
1000 /* We only sync one osc at a time, so that we don't have to hold
1001 any kind of lock on the whole mds_lov_desc, which may change
1002 (grow) as a result of mds_lov_add_ost. This also avoids any
1003 kind of mismatch between the lov_desc and the mds_lov_desc,
1004 which are not in lock-step during lov_add_obd */
1005 static int __mds_lov_synchronize(void *data)
1007 struct mds_lov_sync_info *mlsi = data;
1008 struct obd_device *obd = mlsi->mlsi_obd;
1009 struct obd_device *watched = mlsi->mlsi_watched;
1010 struct mds_obd *mds = &obd->u.mds;
1011 struct obd_uuid *uuid;
1012 __u32 idx = mlsi->mlsi_index;
1013 struct llog_ctxt *ctxt;
1017 OBD_FREE(mlsi, sizeof(*mlsi));
1021 uuid = &watched->u.cli.cl_target_uuid;
1024 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
1026 rc = mds_lov_update_mds(obd, watched, idx);
1028 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
1032 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
1033 KEY_MDS_CONN, 0, uuid, NULL);
1037 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1041 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
1043 rc = llog_connect(ctxt, NULL, NULL, uuid);
1044 llog_ctxt_put(ctxt);
1047 CERROR("%s failed at llog_origin_connect: %d\n",
1048 obd_uuid2str(uuid), rc);
1052 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
1053 obd->obd_name, obd_uuid2str(uuid));
1055 if (obd->obd_stopping)
1056 GOTO(out, rc = -ENODEV);
1058 rc = mds_lov_clear_orphans(mds, uuid);
1060 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
1061 obd_uuid2str(uuid), rc);
1068 /* Deactivate it for safety */
1069 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
1071 if (!obd->obd_stopping && mds->mds_osc_obd &&
1072 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
1073 obd_notify(mds->mds_osc_obd, watched,
1074 OBD_NOTIFY_INACTIVE, NULL);
1076 /* We've successfully synced at least 1 OST and are ready
1077 to handle client requests */
1078 mds_allow_cli(obd, CONFIG_SYNC);
1085 int mds_lov_synchronize(void *data)
1087 struct mds_lov_sync_info *mlsi = data;
1090 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
1091 cfs_daemonize_ctxt(name);
1093 RETURN(__mds_lov_synchronize(data));
1096 int mds_lov_start_synchronize(struct obd_device *obd,
1097 struct obd_device *watched,
1098 void *data, int nonblock)
1100 struct mds_lov_sync_info *mlsi;
1102 struct obd_uuid *uuid;
1106 uuid = &watched->u.cli.cl_target_uuid;
1108 OBD_ALLOC(mlsi, sizeof(*mlsi));
1113 mlsi->mlsi_obd = obd;
1114 mlsi->mlsi_watched = watched;
1115 mlsi->mlsi_index = *(__u32 *)data;
1117 /* Although class_export_get(obd->obd_self_export) would lock
1118 the MDS in place, since it's only a self-export
1119 it doesn't lock the LOV in place. The LOV can be disconnected
1120 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
1121 Simply taking an export ref on the LOV doesn't help, because it's
1122 still disconnected. Taking an obd reference insures that we don't
1123 disconnect the LOV. This of course means a cleanup won't
1124 finish for as long as the sync is blocking. */
1128 /* Synchronize in the background */
1129 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
1130 CLONE_VM | CLONE_FILES);
1132 CERROR("%s: error starting mds_lov_synchronize: %d\n",
1136 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
1137 "thread=%d\n", obd->obd_name,
1138 mlsi->mlsi_index, rc);
1142 rc = __mds_lov_synchronize((void *)mlsi);
1148 int mds_notify(struct obd_device *obd, struct obd_device *watched,
1149 enum obd_notify_event ev, void *data)
1154 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
1157 case OBD_NOTIFY_CREATE:
1158 CWARN("MDS %s: add target %s\n",obd->obd_name,
1159 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1160 /* We still have to fix the lov descriptor for ost's */
1162 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1163 &watched->u.cli.cl_target_uuid);
1165 /* We only handle these: */
1166 case OBD_NOTIFY_ACTIVE:
1167 /* lov want one or more _active_ targets for work */
1168 mds_allow_cli(obd, CONFIG_TARGET);
1169 /* activate event should be pass lov idx as argument */
1170 case OBD_NOTIFY_SYNC:
1171 case OBD_NOTIFY_SYNC_NONBLOCK:
1172 /* sync event should be pass lov idx as argument */
1174 case OBD_NOTIFY_CONFIG:
1175 mds_allow_cli(obd, (unsigned long)data);
1176 /* call this only when config is processed and stale_export_age
1177 * value is configured */
1178 class_disconnect_expired_exports(obd);
1179 /* quota_type has been processed, we can now handle
1180 * incoming quota requests */
1181 QUOTA_MASTER_READY(&obd->u.obt.obt_qctxt);
1186 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1187 CERROR("unexpected notification of %s %s!\n",
1188 watched->obd_type->typ_name, watched->obd_name);
1192 if (obd->obd_recovering) {
1193 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1195 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1196 mds_allow_cli(obd, CONFIG_SYNC);
1200 rc = mds_lov_start_synchronize(obd, watched, data,
1201 !(ev == OBD_NOTIFY_SYNC));
1203 if (likely(obd->obd_stopping == 0))
1204 lquota_recovery(mds_quota_interface_ref, obd);
1209 int mds_get_default_md(struct obd_device *obd, struct lov_mds_md *lmm,
1212 struct lov_desc *ldesc;
1215 ldesc = &obd->u.mds.mds_lov_desc;
1216 LASSERT(ldesc != NULL);
1221 lmm->lmm_magic = LOV_MAGIC_V1;
1222 lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
1223 lmm->lmm_pattern = ldesc->ld_pattern;
1224 lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
1225 lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
1226 *size = sizeof(struct lov_mds_md);
1228 RETURN(sizeof(struct lov_mds_md));
1231 /* Convert the on-disk LOV EA structre.
1232 * We always try to convert from an old LOV EA format to the common in-memory
1233 * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1234 * then convert back to the new on-disk format and save it back to disk
1235 * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1236 * to convert it each time this inode is accessed.
1238 * This function is a bit interesting in the error handling. We can safely
1239 * ship the old lmm to the client in case of failure, since it uses the same
1240 * obd_unpackmd() code and can do the conversion if the MDS fails for some
1241 * reason. We will not delete the old lmm data until we have written the
1242 * new format lmm data in fsfilt_set_md(). */
1243 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1244 struct lov_mds_md *lmm, int lmm_size,
1245 __u64 connect_flags)
1247 struct lov_stripe_md *lsm = NULL;
1252 if (((connect_flags & OBD_CONNECT_LOV_V3) == 0) &&
1253 (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)) {
1254 /* client does not support LOV_MAGIC_V3, so we have to convert
1256 * we convert the lmm from v3 to v1
1257 * and return the new size (which is smaller)
1258 * the caller supports this way to return the new size */
1261 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1262 /* lmm_stripe_count for non reg files is not used or -1 */
1263 if (!S_ISREG(inode->i_mode)) {
1264 new_lmm_size = lov_mds_md_size(0, LOV_MAGIC_V1);
1266 __u32 lmm_stripe_count;
1268 lmm_stripe_count = le32_to_cpu(lmm->lmm_stripe_count);
1269 new_lmm_size = lov_mds_md_size(lmm_stripe_count,
1271 /* move the objects to the new place */
1272 memmove(lmm->lmm_objects,
1273 ((struct lov_mds_md_v3 *)lmm)->lmm_objects,
1274 lmm_stripe_count * sizeof(struct lov_ost_data_v1));
1276 /* even if new size is smaller than old one,
1277 * this should not generate memory leak */
1278 RETURN(new_lmm_size);
1281 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 ||
1282 le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3 ||
1283 le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_JOIN)
1286 CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1287 inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1290 rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
1294 rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
1296 GOTO(conv_free, rc);
1299 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1300 if (IS_ERR(handle)) {
1301 rc = PTR_ERR(handle);
1302 GOTO(conv_free, rc);
1305 rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1307 err = fsfilt_commit(obd, inode, handle, 0);
1309 rc = err ? err : lmm_size;
1310 GOTO(conv_free, rc);
1312 obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);