4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
53 #include <lustre_log.h>
55 #include "mds_internal.h"
57 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
59 struct mds_obd *mds = &obd->u.mds;
62 if ((libcfs_debug & D_INFO) == 0)
65 CDEBUG(D_INFO, "dump from %s\n", label);
66 if (mds->mds_lov_page_dirty == NULL) {
67 CERROR("NULL bitmap!\n");
71 for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
72 CDEBUG(D_INFO, "%u - %lx\n", i,
73 mds->mds_lov_page_dirty->data[i]);
75 if (mds->mds_lov_page_array == NULL) {
76 CERROR("not init page array!\n");
80 for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
81 obd_id *data = mds->mds_lov_page_array[i];
86 for(j=0; j < OBJID_PER_PAGE(); j++) {
89 CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
97 int mds_lov_init_objids(struct obd_device *obd)
99 struct mds_obd *mds = &obd->u.mds;
100 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
105 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
107 mds->mds_lov_page_dirty =
108 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
109 if (mds->mds_lov_page_dirty == NULL)
113 OBD_ALLOC(mds->mds_lov_page_array, size);
114 if (mds->mds_lov_page_array == NULL)
115 GOTO(err_free_bitmap, rc = -ENOMEM);
117 /* open and test the lov objd file */
118 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
121 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
122 GOTO(err_free, rc = PTR_ERR(file));
124 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
125 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
126 file->f_dentry->d_inode->i_mode);
127 GOTO(err_open, rc = -ENOENT);
129 mds->mds_lov_objid_filp = file;
133 if (filp_close((struct file *)file, 0))
134 CERROR("can't close %s after error\n", LOV_OBJID);
136 OBD_FREE(mds->mds_lov_page_array, size);
138 CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
143 void mds_lov_destroy_objids(struct obd_device *obd)
145 struct mds_obd *mds = &obd->u.mds;
149 if (mds->mds_lov_page_array != NULL) {
150 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
151 obd_id *data = mds->mds_lov_page_array[i];
153 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
155 OBD_FREE(mds->mds_lov_page_array,
156 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
159 if (mds->mds_lov_objid_filp) {
160 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
161 mds->mds_lov_objid_filp = NULL;
163 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
166 CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
171 * currently exist two ways for know about ost count and max ost index.
172 * first - after ost is connected to mds and sync process finished
173 * second - get from lmm in recovery process, in case when mds not have configs,
174 * and ost isn't registered in mgs.
176 * \param mds pointer to mds structure
177 * \param index maxium ost index
179 * \retval -ENOMEM is not hame memory for new page
180 * \retval 0 is update passed
182 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
184 __u32 page = index / OBJID_PER_PAGE();
185 __u32 off = index % OBJID_PER_PAGE();
186 obd_id *data = mds->mds_lov_page_array[page];
189 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
193 mds->mds_lov_page_array[page] = data;
196 if (index > mds->mds_lov_objid_max_index) {
197 mds->mds_lov_objid_lastpage = page;
198 mds->mds_lov_objid_lastidx = off;
199 mds->mds_lov_objid_max_index = index;
202 /* workaround - New target not in objids file; increase mdsize */
203 /* ld_tgt_count is used as the max index everywhere, despite its name. */
204 if (data[off] == 0) {
208 max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
210 mds->mds_lov_objid_count++;
211 stripes = min(lov_mds_md_stripecnt(max_easize, LOV_MAGIC_V3),
212 mds->mds_lov_objid_count);
214 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
215 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
217 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
218 " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
219 mds->mds_max_cookiesize);
226 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
228 __u32 page = index / OBJID_PER_PAGE();
229 __u32 off = index % OBJID_PER_PAGE();
230 obd_id *data = mds->mds_lov_page_array[page];
232 return (data[off] > 0);
235 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
237 struct lov_ost_data_v1 *data;
242 /* if we create file without objects - lmm is NULL */
246 switch (le32_to_cpu(lmm->lmm_magic)) {
248 count = le16_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
249 data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
252 count = le16_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
253 data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
256 CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
261 cfs_mutex_lock(&obd->obd_dev_mutex);
262 for (j = 0; j < count; j++) {
263 __u32 i = le32_to_cpu(data[j].l_ost_idx);
264 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
269 cfs_mutex_unlock(&obd->obd_dev_mutex);
273 EXPORT_SYMBOL(mds_lov_prepare_objids);
276 * write llog orphan record about lost ost object,
277 * Special lsm is allocated with single stripe, caller should deallocated it
280 static int mds_log_lost_precreated(struct obd_device *obd,
281 struct lov_stripe_md **lsmp, __u16 *stripes,
282 obd_id id, obd_count count, int idx)
284 struct lov_stripe_md *lsm = *lsmp;
289 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
292 /* need only one stripe, save old value */
293 *stripes = lsm->lsm_stripe_count;
294 lsm->lsm_stripe_count = 1;
298 lsm->lsm_oinfo[0]->loi_id = id;
299 lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id);
300 lsm->lsm_oinfo[0]->loi_ost_idx = idx;
302 rc = mds_log_op_orphan(obd, lsm, count);
306 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
308 struct mds_obd *mds = &obd->u.mds;
310 struct lov_ost_data_v1 *obj;
311 struct lov_stripe_md *lsm = NULL;
316 /* if we create file without objects - lmm is NULL */
320 switch (le32_to_cpu(lmm->lmm_magic)) {
322 count = le16_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
323 obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
326 count = le16_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
327 obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
330 CERROR("Unknow lmm type %X !\n",
331 le32_to_cpu(lmm->lmm_magic));
335 for (j = 0; j < count; j++) {
336 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
337 obd_id id = le64_to_cpu(obj[j].l_object_id);
338 __u32 page = i / OBJID_PER_PAGE();
339 __u32 idx = i % OBJID_PER_PAGE();
342 data = mds->mds_lov_page_array[page];
344 CDEBUG(D_INODE,"update last object for ost %u"
345 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
346 if (id > data[idx]) {
347 int lost = id - data[idx] - 1;
348 /* we might have lost precreated objects due to VBR */
349 if (lost > 0 && obd->obd_recovering) {
350 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
351 if (!obd->obd_version_recov)
352 CERROR("Unexpected gap in objids\n");
353 /* lsm is allocated if NULL */
354 mds_log_lost_precreated(obd, &lsm, &stripes,
355 data[idx]+1, lost, i);
358 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
362 /* restore stripes number */
363 lsm->lsm_stripe_count = stripes;
364 obd_free_memmd(mds->mds_lov_exp, &lsm);
369 EXPORT_SYMBOL(mds_lov_update_objids);
371 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
374 __u32 max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
377 for (i = 0; i < count; i++) {
381 mds->mds_lov_objid_count++;
384 stripes = min(lov_mds_md_stripecnt(max_easize, LOV_MAGIC_V3),
385 mds->mds_lov_objid_count);
387 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
388 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
390 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
391 "%d/%d\n", stripes, mds->mds_max_mdsize,
392 mds->mds_max_cookiesize);
398 static int mds_lov_read_objids(struct obd_device *obd)
400 struct mds_obd *mds = &obd->u.mds;
402 int i, rc = 0, count = 0, page = 0;
406 /* Read everything in the file, even if our current lov desc
407 has fewer targets. Old targets not in the lov descriptor
408 during mds setup may still have valid objids. */
409 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
413 page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
414 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
415 for (i = 0; i < page; i++) {
417 loff_t off_old = off;
419 LASSERT(mds->mds_lov_page_array[i] == NULL);
420 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
421 if (mds->mds_lov_page_array[i] == NULL)
422 GOTO(out, rc = -ENOMEM);
424 data = mds->mds_lov_page_array[i];
426 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
427 MDS_LOV_ALLOC_SIZE, &off);
429 CERROR("Error reading objids %d\n", rc);
432 if (off == off_old) /* hole is read */
433 off += MDS_LOV_ALLOC_SIZE;
435 count = (off - off_old) / sizeof(obd_id);
436 if (mds_lov_update_from_read(mds, data, count)) {
437 CERROR("Can't update mds data\n");
438 GOTO(out, rc = -EIO);
441 mds->mds_lov_objid_lastpage = page - 1;
442 mds->mds_lov_objid_lastidx = count - 1;
444 CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
445 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
447 mds_lov_dump_objids("read",obd);
452 int mds_lov_write_objids(struct obd_device *obd)
454 struct mds_obd *mds = &obd->u.mds;
458 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
461 mds_lov_dump_objids("write", obd);
463 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
464 obd_id *data = mds->mds_lov_page_array[i];
465 unsigned int size = MDS_LOV_ALLOC_SIZE;
466 loff_t off = i * size;
468 LASSERT(data != NULL);
470 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
473 /* check for particaly filled last page */
474 if (i == mds->mds_lov_objid_lastpage)
475 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
477 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
478 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
481 cfs_bitmap_set(mds->mds_lov_page_dirty, i);
490 EXPORT_SYMBOL(mds_lov_write_objids);
492 static int mds_lov_get_objid(struct obd_device * obd,
495 struct mds_obd *mds = &obd->u.mds;
496 struct obd_export *lov_exp = mds->mds_lov_exp;
504 page = idx / OBJID_PER_PAGE();
505 off = idx % OBJID_PER_PAGE();
506 data = mds->mds_lov_page_array[page];
509 /* We never read this lastid; ask the osc */
510 struct obd_id_info lastid;
512 size = sizeof(lastid);
514 lastid.data = &data[off];
515 rc = obd_get_info(NULL, lov_exp, sizeof(KEY_LAST_ID),
516 KEY_LAST_ID, &size, &lastid, NULL);
520 /* workaround for clean filter */
524 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
526 CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
527 idx, data, page, off, data[off]);
532 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
535 struct obdo oa = { 0 };
536 struct obd_trans_info oti = {0};
537 struct lov_stripe_md *empty_ea = NULL;
540 LASSERT(mds->mds_lov_page_array != NULL);
542 /* This create will in fact either create or destroy: If the OST is
543 * missing objects below this ID, they will be created. If it finds
544 * objects above this ID, they will be removed. */
545 memset(&oa, 0, sizeof(oa));
546 oa.o_flags = OBD_FL_DELORPHAN;
547 oa.o_seq = mdt_to_obd_objseq(mds->mds_id);
548 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
549 if (ost_uuid != NULL)
550 oti.oti_ost_uuid = ost_uuid;
552 rc = obd_create(NULL, mds->mds_lov_exp, &oa, &empty_ea, &oti);
558 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
560 struct mds_obd *mds = &obd->u.mds;
562 struct obd_id_info info;
565 LASSERT(!obd->obd_recovering);
569 rc = obd_set_info_async(NULL, mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
570 KEY_NEXT_ID, sizeof(info), &info, NULL);
572 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
578 /* Update the lov desc for a new size lov. */
579 static int mds_lov_update_desc(struct obd_device *obd, int idx,
580 struct obd_uuid *uuid)
582 struct mds_obd *mds = &obd->u.mds;
584 __u32 valsize = sizeof(mds->mds_lov_desc);
588 OBD_ALLOC(ld, sizeof(*ld));
592 rc = obd_get_info(NULL, mds->mds_lov_exp, sizeof(KEY_LOVDESC),
593 KEY_LOVDESC, &valsize, ld, NULL);
597 /* Don't change the mds_lov_desc until the objids size matches the
599 mds->mds_lov_desc = *ld;
600 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
601 mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
603 cfs_mutex_lock(&obd->obd_dev_mutex);
604 rc = mds_lov_update_max_ost(mds, idx);
605 cfs_mutex_unlock(&obd->obd_dev_mutex);
609 /* If we added a target we have to reconnect the llogs */
610 /* We only _need_ to do this at first add (idx), or the first time
611 after recovery. However, it should now be safe to call anytime. */
612 rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
617 OBD_FREE(ld, sizeof(*ld));
621 /* Inform MDS about new/updated target */
622 static int mds_lov_update_mds(struct obd_device *obd,
623 struct obd_device *watched,
626 struct mds_obd *mds = &obd->u.mds;
633 LASSERT(mds_lov_objinit(mds, idx));
635 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
636 idx, obd->obd_recovering, obd->obd_async_recov,
637 mds->mds_lov_desc.ld_tgt_count);
639 /* idx is set as data from lov_notify. */
640 if (obd->obd_recovering)
643 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
644 CERROR("index %d > count %d!\n", idx,
645 mds->mds_lov_desc.ld_tgt_count);
646 GOTO(out, rc = -EINVAL);
649 rc = mds_lov_get_objid(obd, idx);
653 page = idx / OBJID_PER_PAGE();
654 off = idx % OBJID_PER_PAGE();
655 data = mds->mds_lov_page_array[page];
657 /* We have read this lastid from disk; tell the osc.
658 Don't call this during recovery. */
659 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
661 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
662 /* Don't abort the rest of the sync */
665 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
672 /* update the LOV-OSC knowledge of the last used object id's */
673 int mds_lov_connect(struct obd_device *obd, char * lov_name)
675 struct mds_obd *mds = &obd->u.mds;
676 struct obd_connect_data *data;
680 if (IS_ERR(mds->mds_lov_obd))
681 RETURN(PTR_ERR(mds->mds_lov_obd));
683 if (mds->mds_lov_obd)
686 mds->mds_lov_obd = class_name2obd(lov_name);
687 if (!mds->mds_lov_obd) {
688 CERROR("MDS cannot locate LOV %s\n", lov_name);
689 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
693 cfs_mutex_lock(&obd->obd_dev_mutex);
694 rc = mds_lov_read_objids(obd);
695 cfs_mutex_unlock(&obd->obd_dev_mutex);
697 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
701 rc = obd_register_observer(mds->mds_lov_obd, obd);
703 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
708 /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
710 obd_notify(mds->mds_lov_obd, NULL, OBD_NOTIFY_CREATE, NULL);
712 mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
714 OBD_ALLOC(data, sizeof(*data));
716 GOTO(err_exit, rc = -ENOMEM);
718 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
719 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
720 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FULL20 |
721 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT |
722 OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN |
723 OBD_CONNECT_SOM | OBD_CONNECT_MAX_EASIZE;
724 #ifdef HAVE_LRU_RESIZE_SUPPORT
725 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
727 data->ocd_version = LUSTRE_VERSION_CODE;
728 data->ocd_group = mdt_to_obd_objseq(mds->mds_id);
729 data->ocd_max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
731 /* send max bytes per rpc */
732 data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
733 /* send the list of supported checksum types */
734 data->ocd_cksum_types = cksum_types_supported_client();
735 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
736 rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
737 OBD_FREE(data, sizeof(*data));
739 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
740 mds->mds_lov_obd = ERR_PTR(rc);
744 /* I want to see a callback happen when the OBD moves to a
745 * "For General Use" state, and that's when we'll call
746 * set_nextid(). The class driver can help us here, because
747 * it can use the obd_recovering flag to determine when the
748 * the OBD is full available. */
749 /* MDD device will care about that
750 if (!obd->obd_recovering)
751 rc = mds_postrecov(obd);
756 mds->mds_lov_exp = NULL;
757 mds->mds_lov_obd = ERR_PTR(rc);
761 int mds_lov_disconnect(struct obd_device *obd)
763 struct mds_obd *mds = &obd->u.mds;
767 if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
768 obd_register_observer(mds->mds_lov_obd, NULL);
770 /* The actual disconnect of the mds_lov will be called from
771 * class_disconnect_exports from mds_lov_clean. So we have to
772 * ensure that class_cleanup doesn't fail due to the extra ref
773 * we're holding now. The mechanism to do that already exists -
774 * the obd_force flag. We'll drop the final ref to the
775 * mds_lov_exp in mds_cleanup. */
776 mds->mds_lov_obd->obd_force = 1;
782 struct mds_lov_sync_info {
783 struct obd_device *mlsi_obd; /* the lov device to sync */
784 struct obd_device *mlsi_watched; /* target osc */
785 __u32 mlsi_index; /* index of target */
788 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
790 struct mds_capa_info info = { .uuid = uuid };
791 struct lustre_capa_key *key;
796 if (!mds->mds_capa_keys)
799 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
800 for (i = 0; i < 2; i++) {
801 key = &mds->mds_capa_keys[i];
802 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
805 rc = obd_set_info_async(NULL, mds->mds_lov_exp,
806 sizeof(KEY_CAPA_KEY), KEY_CAPA_KEY,
807 sizeof(info), &info, NULL);
809 DEBUG_CAPA_KEY(D_ERROR, key,
810 "propagate failed (rc = %d) for", rc);
818 /* We only sync one osc at a time, so that we don't have to hold
819 any kind of lock on the whole mds_lov_desc, which may change
820 (grow) as a result of mds_lov_add_ost. This also avoids any
821 kind of mismatch between the lov_desc and the mds_lov_desc,
822 which are not in lock-step during lov_add_obd */
823 static int __mds_lov_synchronize(void *data)
825 struct mds_lov_sync_info *mlsi = data;
826 struct obd_device *obd = mlsi->mlsi_obd;
827 struct obd_device *watched = mlsi->mlsi_watched;
828 struct mds_obd *mds = &obd->u.mds;
829 struct obd_uuid *uuid;
830 __u32 idx = mlsi->mlsi_index;
831 struct mds_group_info mgi;
832 struct llog_ctxt *ctxt;
840 uuid = &watched->u.cli.cl_target_uuid;
843 cfs_down_read(&mds->mds_notify_lock);
844 if (obd->obd_stopping || obd->obd_fail)
845 GOTO(out, rc = -ENODEV);
847 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
848 rc = mds_lov_update_mds(obd, watched, idx);
850 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
853 mgi.group = mdt_to_obd_objseq(mds->mds_id);
856 rc = obd_set_info_async(NULL, mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
857 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
860 /* propagate capability keys */
861 rc = mds_propagate_capa_keys(mds, uuid);
865 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
867 GOTO(out, rc = -ENODEV);
869 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
870 rc = llog_connect(ctxt, NULL, NULL, uuid);
873 CERROR("%s failed at llog_origin_connect: %d\n",
874 obd_uuid2str(uuid), rc);
878 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
879 obd->obd_name, obd_uuid2str(uuid));
881 rc = mds_lov_clear_orphans(mds, uuid);
883 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
884 obd_uuid2str(uuid), rc);
891 /* Deactivate it for safety */
892 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
894 if (!obd->obd_stopping && mds->mds_lov_obd &&
895 !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
896 obd_notify(mds->mds_lov_obd, watched,
897 OBD_NOTIFY_INACTIVE, NULL);
899 cfs_up_read(&mds->mds_notify_lock);
901 class_decref(obd, "mds_lov_synchronize", obd);
905 int mds_lov_synchronize(void *data)
907 struct mds_lov_sync_info *mlsi = data;
910 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
911 cfs_daemonize_ctxt(name);
913 RETURN(__mds_lov_synchronize(data));
916 int mds_lov_start_synchronize(struct obd_device *obd,
917 struct obd_device *watched,
918 void *data, enum obd_notify_event ev)
920 struct mds_lov_sync_info *mlsi;
922 struct obd_uuid *uuid;
926 uuid = &watched->u.cli.cl_target_uuid;
928 OBD_ALLOC(mlsi, sizeof(*mlsi));
933 mlsi->mlsi_obd = obd;
934 mlsi->mlsi_watched = watched;
935 mlsi->mlsi_index = *(__u32 *)data;
937 /* Although class_export_get(obd->obd_self_export) would lock
938 the MDS in place, since it's only a self-export
939 it doesn't lock the LOV in place. The LOV can be disconnected
940 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
941 Simply taking an export ref on the LOV doesn't help, because it's
942 still disconnected. Taking an obd reference insures that we don't
943 disconnect the LOV. This of course means a cleanup won't
944 finish for as long as the sync is blocking. */
945 class_incref(obd, "mds_lov_synchronize", obd);
947 if (ev != OBD_NOTIFY_SYNC) {
948 /* Synchronize in the background */
949 rc = cfs_create_thread(mds_lov_synchronize, mlsi,
952 CERROR("%s: error starting mds_lov_synchronize: %d\n",
954 class_decref(obd, "mds_lov_synchronize", obd);
956 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
957 "thread=%d\n", obd->obd_name,
958 mlsi->mlsi_index, rc);
962 rc = __mds_lov_synchronize((void *)mlsi);
968 int mds_notify(struct obd_device *obd, struct obd_device *watched,
969 enum obd_notify_event ev, void *data)
971 struct mds_obd *mds = &obd->u.mds;
975 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
977 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
978 CERROR("unexpected notification of %s %s!\n",
979 watched->obd_type->typ_name, watched->obd_name);
983 /*XXX this notifies the MDD until lov handling use old mds code
986 if (obd->obd_upcall.onu_owner) {
987 LASSERT(obd->obd_upcall.onu_upcall != NULL);
988 rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
989 obd->obd_upcall.onu_owner,
990 &mds->mds_obt.obt_mount_count);
994 /* We only handle these: */
995 case OBD_NOTIFY_CREATE:
996 CDEBUG(D_CONFIG, "%s: add target %s\n", obd->obd_name,
997 obd_uuid2str(&watched->u.cli.cl_target_uuid));
998 /* We still have to fix the lov descriptor for ost's */
1000 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1001 &watched->u.cli.cl_target_uuid);
1003 case OBD_NOTIFY_ACTIVE:
1004 /* lov want one or more _active_ targets for work */
1005 /* activate event should be pass lov idx as argument */
1006 case OBD_NOTIFY_SYNC:
1007 case OBD_NOTIFY_SYNC_NONBLOCK:
1008 /* sync event should be pass lov idx as argument */
1014 if (obd->obd_recovering) {
1015 CDEBUG(D_CONFIG, "%s: Is in recovery, "
1016 "not resetting orphans on %s\n",
1018 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1019 /* We still have to fix the lov descriptor for ost's added
1020 after the mdt in the config log. They didn't make it into
1022 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1023 &watched->u.cli.cl_target_uuid);
1025 rc = mds_lov_start_synchronize(obd, watched, data, ev);