1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
54 #include "mds_internal.h"
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
58 struct mds_obd *mds = &obd->u.mds;
61 if ((libcfs_debug & D_INFO) == 0)
64 CDEBUG(D_INFO, "dump from %s\n", label);
65 if (mds->mds_lov_page_dirty == NULL) {
66 CERROR("NULL bitmap!\n");
70 for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
71 CDEBUG(D_INFO, "%u - %lx\n", i,
72 mds->mds_lov_page_dirty->data[i]);
74 if (mds->mds_lov_page_array == NULL) {
75 CERROR("not init page array!\n");
79 for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
80 obd_id *data = mds->mds_lov_page_array[i];
85 for(j=0; j < OBJID_PER_PAGE(); j++) {
88 CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
96 int mds_lov_init_objids(struct obd_device *obd)
98 struct mds_obd *mds = &obd->u.mds;
99 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
104 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
106 mds->mds_lov_page_dirty =
107 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
108 if (mds->mds_lov_page_dirty == NULL)
112 OBD_ALLOC(mds->mds_lov_page_array, size);
113 if (mds->mds_lov_page_array == NULL)
114 GOTO(err_free_bitmap, rc = -ENOMEM);
116 /* open and test the lov objd file */
117 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
120 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
121 GOTO(err_free, rc = PTR_ERR(file));
123 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
124 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
125 file->f_dentry->d_inode->i_mode);
126 GOTO(err_open, rc = -ENOENT);
128 mds->mds_lov_objid_filp = file;
132 if (filp_close((struct file *)file, 0))
133 CERROR("can't close %s after error\n", LOV_OBJID);
135 OBD_FREE(mds->mds_lov_page_array, size);
137 CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
142 void mds_lov_destroy_objids(struct obd_device *obd)
144 struct mds_obd *mds = &obd->u.mds;
148 if (mds->mds_lov_page_array != NULL) {
149 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
150 obd_id *data = mds->mds_lov_page_array[i];
152 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
154 OBD_FREE(mds->mds_lov_page_array,
155 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
158 if (mds->mds_lov_objid_filp) {
159 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
160 mds->mds_lov_objid_filp = NULL;
162 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
165 CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
170 * currently exist two ways for know about ost count and max ost index.
171 * first - after ost is connected to mds and sync process finished
172 * second - get from lmm in recovery process, in case when mds not have configs,
173 * and ost isn't registered in mgs.
175 * \param mds pointer to mds structure
176 * \param index maxium ost index
178 * \retval -ENOMEM is not hame memory for new page
179 * \retval 0 is update passed
181 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
183 __u32 page = index / OBJID_PER_PAGE();
184 __u32 off = index % OBJID_PER_PAGE();
185 obd_id *data = mds->mds_lov_page_array[page];
188 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
192 mds->mds_lov_page_array[page] = data;
195 if (index > mds->mds_lov_objid_max_index) {
196 mds->mds_lov_objid_lastpage = page;
197 mds->mds_lov_objid_lastidx = off;
198 mds->mds_lov_objid_max_index = index;
201 /* workaround - New target not in objids file; increase mdsize */
202 /* ld_tgt_count is used as the max index everywhere, despite its name. */
203 if (data[off] == 0) {
207 mds->mds_lov_objid_count++;
208 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
209 mds->mds_lov_objid_count);
211 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
212 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
214 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
215 " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
216 mds->mds_max_cookiesize);
223 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
225 __u32 page = index / OBJID_PER_PAGE();
226 __u32 off = index % OBJID_PER_PAGE();
227 obd_id *data = mds->mds_lov_page_array[page];
229 return (data[off] > 0);
232 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
234 struct lov_ost_data_v1 *data;
239 /* if we create file without objects - lmm is NULL */
243 switch (le32_to_cpu(lmm->lmm_magic)) {
245 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
246 data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
249 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
250 data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
253 CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
258 cfs_mutex_down(&obd->obd_dev_sem);
259 for (j = 0; j < count; j++) {
260 __u32 i = le32_to_cpu(data[j].l_ost_idx);
261 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
266 cfs_mutex_up(&obd->obd_dev_sem);
270 EXPORT_SYMBOL(mds_lov_prepare_objids);
273 * write llog orphan record about lost ost object,
274 * Special lsm is allocated with single stripe, caller should deallocated it
277 static int mds_log_lost_precreated(struct obd_device *obd,
278 struct lov_stripe_md **lsmp, int *stripes,
279 obd_id id, obd_count count, int idx)
281 struct lov_stripe_md *lsm = *lsmp;
286 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
289 /* need only one stripe, save old value */
290 *stripes = lsm->lsm_stripe_count;
291 lsm->lsm_stripe_count = 1;
295 lsm->lsm_oinfo[0]->loi_id = id;
296 lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id);
297 lsm->lsm_oinfo[0]->loi_ost_idx = idx;
299 rc = mds_log_op_orphan(obd, lsm, count);
303 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
305 struct mds_obd *mds = &obd->u.mds;
307 struct lov_ost_data_v1 *obj;
308 struct lov_stripe_md *lsm = NULL;
313 /* if we create file without objects - lmm is NULL */
317 switch (le32_to_cpu(lmm->lmm_magic)) {
319 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
320 obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
323 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
324 obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
327 CERROR("Unknow lmm type %X !\n",
328 le32_to_cpu(lmm->lmm_magic));
332 for (j = 0; j < count; j++) {
333 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
334 obd_id id = le64_to_cpu(obj[j].l_object_id);
335 __u32 page = i / OBJID_PER_PAGE();
336 __u32 idx = i % OBJID_PER_PAGE();
339 data = mds->mds_lov_page_array[page];
341 CDEBUG(D_INODE,"update last object for ost %u"
342 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
343 if (id > data[idx]) {
344 int lost = id - data[idx] - 1;
345 /* we might have lost precreated objects due to VBR */
346 if (lost > 0 && obd->obd_recovering) {
347 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
348 if (!obd->obd_version_recov)
349 CERROR("Unexpected gap in objids\n");
350 /* lsm is allocated if NULL */
351 mds_log_lost_precreated(obd, &lsm, &stripes,
352 data[idx]+1, lost, i);
355 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
359 /* restore stripes number */
360 lsm->lsm_stripe_count = stripes;
361 obd_free_memmd(mds->mds_lov_exp, &lsm);
366 EXPORT_SYMBOL(mds_lov_update_objids);
368 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
374 for (i = 0; i < count; i++) {
378 mds->mds_lov_objid_count++;
381 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
382 mds->mds_lov_objid_count);
384 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
385 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
387 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
388 "%d/%d\n", stripes, mds->mds_max_mdsize,
389 mds->mds_max_cookiesize);
395 static int mds_lov_read_objids(struct obd_device *obd)
397 struct mds_obd *mds = &obd->u.mds;
399 int i, rc = 0, count = 0, page = 0;
403 /* Read everything in the file, even if our current lov desc
404 has fewer targets. Old targets not in the lov descriptor
405 during mds setup may still have valid objids. */
406 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
410 page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
411 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
412 for (i = 0; i < page; i++) {
414 loff_t off_old = off;
416 LASSERT(mds->mds_lov_page_array[i] == NULL);
417 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
418 if (mds->mds_lov_page_array[i] == NULL)
419 GOTO(out, rc = -ENOMEM);
421 data = mds->mds_lov_page_array[i];
423 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
424 MDS_LOV_ALLOC_SIZE, &off);
426 CERROR("Error reading objids %d\n", rc);
429 if (off == off_old) /* hole is read */
430 off += MDS_LOV_ALLOC_SIZE;
432 count = (off - off_old) / sizeof(obd_id);
433 if (mds_lov_update_from_read(mds, data, count)) {
434 CERROR("Can't update mds data\n");
435 GOTO(out, rc = -EIO);
438 mds->mds_lov_objid_lastpage = page - 1;
439 mds->mds_lov_objid_lastidx = count - 1;
441 CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
442 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
444 mds_lov_dump_objids("read",obd);
449 int mds_lov_write_objids(struct obd_device *obd)
451 struct mds_obd *mds = &obd->u.mds;
455 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
458 mds_lov_dump_objids("write", obd);
460 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
461 obd_id *data = mds->mds_lov_page_array[i];
462 unsigned int size = MDS_LOV_ALLOC_SIZE;
463 loff_t off = i * size;
465 LASSERT(data != NULL);
467 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
470 /* check for particaly filled last page */
471 if (i == mds->mds_lov_objid_lastpage)
472 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
474 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
475 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
478 cfs_bitmap_set(mds->mds_lov_page_dirty, i);
487 EXPORT_SYMBOL(mds_lov_write_objids);
489 static int mds_lov_get_objid(struct obd_device * obd,
492 struct mds_obd *mds = &obd->u.mds;
493 struct obd_export *lov_exp = mds->mds_lov_exp;
501 page = idx / OBJID_PER_PAGE();
502 off = idx % OBJID_PER_PAGE();
503 data = mds->mds_lov_page_array[page];
506 /* We never read this lastid; ask the osc */
507 struct obd_id_info lastid;
509 size = sizeof(lastid);
511 lastid.data = &data[off];
512 rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID), KEY_LAST_ID,
513 &size, &lastid, NULL);
517 /* workaround for clean filter */
521 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
523 CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
524 idx, data, page, off, data[off]);
529 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
532 struct obdo oa = { 0 };
533 struct obd_trans_info oti = {0};
534 struct lov_stripe_md *empty_ea = NULL;
537 LASSERT(mds->mds_lov_page_array != NULL);
539 /* This create will in fact either create or destroy: If the OST is
540 * missing objects below this ID, they will be created. If it finds
541 * objects above this ID, they will be removed. */
542 memset(&oa, 0, sizeof(oa));
543 oa.o_flags = OBD_FL_DELORPHAN;
544 oa.o_seq = mdt_to_obd_objseq(mds->mds_id);
545 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
546 if (ost_uuid != NULL)
547 oti.oti_ost_uuid = ost_uuid;
549 rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
555 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
557 struct mds_obd *mds = &obd->u.mds;
559 struct obd_id_info info;
562 LASSERT(!obd->obd_recovering);
566 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
567 KEY_NEXT_ID, sizeof(info), &info, NULL);
569 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
575 /* Update the lov desc for a new size lov. */
576 static int mds_lov_update_desc(struct obd_device *obd, int idx,
577 struct obd_uuid *uuid)
579 struct mds_obd *mds = &obd->u.mds;
581 __u32 valsize = sizeof(mds->mds_lov_desc);
585 OBD_ALLOC(ld, sizeof(*ld));
589 rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
594 /* Don't change the mds_lov_desc until the objids size matches the
596 mds->mds_lov_desc = *ld;
597 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
598 mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
600 cfs_mutex_down(&obd->obd_dev_sem);
601 rc = mds_lov_update_max_ost(mds, idx);
602 cfs_mutex_up(&obd->obd_dev_sem);
606 /* If we added a target we have to reconnect the llogs */
607 /* We only _need_ to do this at first add (idx), or the first time
608 after recovery. However, it should now be safe to call anytime. */
609 rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
614 OBD_FREE(ld, sizeof(*ld));
618 /* Inform MDS about new/updated target */
619 static int mds_lov_update_mds(struct obd_device *obd,
620 struct obd_device *watched,
623 struct mds_obd *mds = &obd->u.mds;
630 LASSERT(mds_lov_objinit(mds, idx));
632 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
633 idx, obd->obd_recovering, obd->obd_async_recov,
634 mds->mds_lov_desc.ld_tgt_count);
636 /* idx is set as data from lov_notify. */
637 if (obd->obd_recovering)
640 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
641 CERROR("index %d > count %d!\n", idx,
642 mds->mds_lov_desc.ld_tgt_count);
643 GOTO(out, rc = -EINVAL);
646 rc = mds_lov_get_objid(obd, idx);
650 page = idx / OBJID_PER_PAGE();
651 off = idx % OBJID_PER_PAGE();
652 data = mds->mds_lov_page_array[page];
654 /* We have read this lastid from disk; tell the osc.
655 Don't call this during recovery. */
656 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
658 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
659 /* Don't abort the rest of the sync */
662 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
669 /* update the LOV-OSC knowledge of the last used object id's */
670 int mds_lov_connect(struct obd_device *obd, char * lov_name)
672 struct mds_obd *mds = &obd->u.mds;
673 struct obd_connect_data *data;
677 if (IS_ERR(mds->mds_lov_obd))
678 RETURN(PTR_ERR(mds->mds_lov_obd));
680 if (mds->mds_lov_obd)
683 mds->mds_lov_obd = class_name2obd(lov_name);
684 if (!mds->mds_lov_obd) {
685 CERROR("MDS cannot locate LOV %s\n", lov_name);
686 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
690 cfs_mutex_down(&obd->obd_dev_sem);
691 rc = mds_lov_read_objids(obd);
692 cfs_mutex_up(&obd->obd_dev_sem);
694 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
698 rc = obd_register_observer(mds->mds_lov_obd, obd);
700 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
705 /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
707 obd_notify(mds->mds_lov_obd, NULL, OBD_NOTIFY_CREATE, NULL);
709 mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
711 OBD_ALLOC(data, sizeof(*data));
713 GOTO(err_exit, rc = -ENOMEM);
715 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
716 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
717 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FID |
718 OBD_CONNECT_BRW_SIZE | OBD_CONNECT_CKSUM |
719 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT |
720 OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN |
721 OBD_CONNECT_SOM | OBD_CONNECT_FULL20 |
722 OBD_CONNECT_64BITHASH;
723 #ifdef HAVE_LRU_RESIZE_SUPPORT
724 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
726 data->ocd_version = LUSTRE_VERSION_CODE;
727 data->ocd_group = mdt_to_obd_objseq(mds->mds_id);
728 /* send max bytes per rpc */
729 data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
730 /* send the list of supported checksum types */
731 data->ocd_cksum_types = OBD_CKSUM_ALL;
732 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
733 rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
734 OBD_FREE(data, sizeof(*data));
736 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
737 mds->mds_lov_obd = ERR_PTR(rc);
741 /* I want to see a callback happen when the OBD moves to a
742 * "For General Use" state, and that's when we'll call
743 * set_nextid(). The class driver can help us here, because
744 * it can use the obd_recovering flag to determine when the
745 * the OBD is full available. */
746 /* MDD device will care about that
747 if (!obd->obd_recovering)
748 rc = mds_postrecov(obd);
753 mds->mds_lov_exp = NULL;
754 mds->mds_lov_obd = ERR_PTR(rc);
758 int mds_lov_disconnect(struct obd_device *obd)
760 struct mds_obd *mds = &obd->u.mds;
764 if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
765 obd_register_observer(mds->mds_lov_obd, NULL);
767 /* The actual disconnect of the mds_lov will be called from
768 * class_disconnect_exports from mds_lov_clean. So we have to
769 * ensure that class_cleanup doesn't fail due to the extra ref
770 * we're holding now. The mechanism to do that already exists -
771 * the obd_force flag. We'll drop the final ref to the
772 * mds_lov_exp in mds_cleanup. */
773 mds->mds_lov_obd->obd_force = 1;
779 struct mds_lov_sync_info {
780 struct obd_device *mlsi_obd; /* the lov device to sync */
781 struct obd_device *mlsi_watched; /* target osc */
782 __u32 mlsi_index; /* index of target */
785 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
787 struct mds_capa_info info = { .uuid = uuid };
788 struct lustre_capa_key *key;
793 if (!mds->mds_capa_keys)
796 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
797 for (i = 0; i < 2; i++) {
798 key = &mds->mds_capa_keys[i];
799 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
802 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_CAPA_KEY),
803 KEY_CAPA_KEY, sizeof(info), &info, NULL);
805 DEBUG_CAPA_KEY(D_ERROR, key,
806 "propagate failed (rc = %d) for", rc);
814 /* We only sync one osc at a time, so that we don't have to hold
815 any kind of lock on the whole mds_lov_desc, which may change
816 (grow) as a result of mds_lov_add_ost. This also avoids any
817 kind of mismatch between the lov_desc and the mds_lov_desc,
818 which are not in lock-step during lov_add_obd */
819 static int __mds_lov_synchronize(void *data)
821 struct mds_lov_sync_info *mlsi = data;
822 struct obd_device *obd = mlsi->mlsi_obd;
823 struct obd_device *watched = mlsi->mlsi_watched;
824 struct mds_obd *mds = &obd->u.mds;
825 struct obd_uuid *uuid;
826 __u32 idx = mlsi->mlsi_index;
827 struct mds_group_info mgi;
828 struct llog_ctxt *ctxt;
836 uuid = &watched->u.cli.cl_target_uuid;
839 cfs_down_read(&mds->mds_notify_lock);
840 if (obd->obd_stopping || obd->obd_fail)
841 GOTO(out, rc = -ENODEV);
843 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
844 rc = mds_lov_update_mds(obd, watched, idx);
846 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
849 mgi.group = mdt_to_obd_objseq(mds->mds_id);
852 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
853 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
856 /* propagate capability keys */
857 rc = mds_propagate_capa_keys(mds, uuid);
861 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
863 GOTO(out, rc = -ENODEV);
865 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
866 rc = llog_connect(ctxt, NULL, NULL, uuid);
869 CERROR("%s failed at llog_origin_connect: %d\n",
870 obd_uuid2str(uuid), rc);
874 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
875 obd->obd_name, obd_uuid2str(uuid));
877 rc = mds_lov_clear_orphans(mds, uuid);
879 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
880 obd_uuid2str(uuid), rc);
884 #ifdef HAVE_QUOTA_SUPPORT
885 if (obd->obd_upcall.onu_owner) {
887 * This is a hack for mds_notify->mdd_notify. When the mds obd
888 * in mdd is removed, This hack should be removed.
890 LASSERT(obd->obd_upcall.onu_upcall != NULL);
891 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
892 obd->obd_upcall.onu_owner,NULL);
897 cfs_up_read(&mds->mds_notify_lock);
899 /* Deactivate it for safety */
900 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
902 if (!obd->obd_stopping && mds->mds_lov_obd &&
903 !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
904 obd_notify(mds->mds_lov_obd, watched,
905 OBD_NOTIFY_INACTIVE, NULL);
908 class_decref(obd, "mds_lov_synchronize", obd);
912 int mds_lov_synchronize(void *data)
914 struct mds_lov_sync_info *mlsi = data;
917 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
918 cfs_daemonize_ctxt(name);
920 RETURN(__mds_lov_synchronize(data));
923 int mds_lov_start_synchronize(struct obd_device *obd,
924 struct obd_device *watched,
925 void *data, enum obd_notify_event ev)
927 struct mds_lov_sync_info *mlsi;
929 struct obd_uuid *uuid;
933 uuid = &watched->u.cli.cl_target_uuid;
935 OBD_ALLOC(mlsi, sizeof(*mlsi));
940 mlsi->mlsi_obd = obd;
941 mlsi->mlsi_watched = watched;
942 mlsi->mlsi_index = *(__u32 *)data;
944 /* Although class_export_get(obd->obd_self_export) would lock
945 the MDS in place, since it's only a self-export
946 it doesn't lock the LOV in place. The LOV can be disconnected
947 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
948 Simply taking an export ref on the LOV doesn't help, because it's
949 still disconnected. Taking an obd reference insures that we don't
950 disconnect the LOV. This of course means a cleanup won't
951 finish for as long as the sync is blocking. */
952 class_incref(obd, "mds_lov_synchronize", obd);
954 if (ev != OBD_NOTIFY_SYNC) {
955 /* Synchronize in the background */
956 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
957 CLONE_VM | CLONE_FILES);
959 CERROR("%s: error starting mds_lov_synchronize: %d\n",
961 class_decref(obd, "mds_lov_synchronize", obd);
963 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
964 "thread=%d\n", obd->obd_name,
965 mlsi->mlsi_index, rc);
969 rc = __mds_lov_synchronize((void *)mlsi);
975 int mds_notify(struct obd_device *obd, struct obd_device *watched,
976 enum obd_notify_event ev, void *data)
978 struct mds_obd *mds = &obd->u.mds;
982 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
984 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
985 CERROR("unexpected notification of %s %s!\n",
986 watched->obd_type->typ_name, watched->obd_name);
990 /*XXX this notifies the MDD until lov handling use old mds code
993 if (obd->obd_upcall.onu_owner) {
994 LASSERT(obd->obd_upcall.onu_upcall != NULL);
995 rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
996 obd->obd_upcall.onu_owner,
997 &mds->mds_obt.obt_mount_count);
1001 /* We only handle these: */
1002 case OBD_NOTIFY_CREATE:
1003 CWARN("MDS %s: add target %s\n",obd->obd_name,
1004 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1005 /* We still have to fix the lov descriptor for ost's */
1007 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1008 &watched->u.cli.cl_target_uuid);
1010 case OBD_NOTIFY_ACTIVE:
1011 /* lov want one or more _active_ targets for work */
1012 /* activate event should be pass lov idx as argument */
1013 case OBD_NOTIFY_SYNC:
1014 case OBD_NOTIFY_SYNC_NONBLOCK:
1015 /* sync event should be pass lov idx as argument */
1021 if (obd->obd_recovering) {
1022 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1024 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1025 /* We still have to fix the lov descriptor for ost's added
1026 after the mdt in the config log. They didn't make it into
1028 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1029 &watched->u.cli.cl_target_uuid);
1031 rc = mds_lov_start_synchronize(obd, watched, data, ev);