1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
54 #include "mds_internal.h"
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
58 struct mds_obd *mds = &obd->u.mds;
61 if ((libcfs_debug & D_INFO) == 0)
64 CDEBUG(D_INFO, "dump from %s\n", label);
65 if (mds->mds_lov_page_dirty == NULL) {
66 CERROR("NULL bitmap!\n");
70 for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
71 CDEBUG(D_INFO, "%u - %lx\n", i,
72 mds->mds_lov_page_dirty->data[i]);
74 if (mds->mds_lov_page_array == NULL) {
75 CERROR("not init page array!\n");
79 for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
80 obd_id *data = mds->mds_lov_page_array[i];
85 for(j=0; j < OBJID_PER_PAGE(); j++) {
88 CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
96 int mds_lov_init_objids(struct obd_device *obd)
98 struct mds_obd *mds = &obd->u.mds;
99 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
104 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
106 mds->mds_lov_page_dirty =
107 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
108 if (mds->mds_lov_page_dirty == NULL)
112 OBD_ALLOC(mds->mds_lov_page_array, size);
113 if (mds->mds_lov_page_array == NULL)
114 GOTO(err_free_bitmap, rc = -ENOMEM);
116 /* open and test the lov objd file */
117 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
120 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
121 GOTO(err_free, rc = PTR_ERR(file));
123 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
124 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
125 file->f_dentry->d_inode->i_mode);
126 GOTO(err_open, rc = -ENOENT);
128 mds->mds_lov_objid_filp = file;
132 if (filp_close((struct file *)file, 0))
133 CERROR("can't close %s after error\n", LOV_OBJID);
135 OBD_FREE(mds->mds_lov_page_array, size);
137 CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
142 void mds_lov_destroy_objids(struct obd_device *obd)
144 struct mds_obd *mds = &obd->u.mds;
148 if (mds->mds_lov_page_array != NULL) {
149 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
150 obd_id *data = mds->mds_lov_page_array[i];
152 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
154 OBD_FREE(mds->mds_lov_page_array,
155 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
158 if (mds->mds_lov_objid_filp) {
159 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
160 mds->mds_lov_objid_filp = NULL;
162 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
165 CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
170 * currently exist two ways for know about ost count and max ost index.
171 * first - after ost is connected to mds and sync process finished
172 * second - get from lmm in recovery process, in case when mds not have configs,
173 * and ost isn't registered in mgs.
175 * \param mds pointer to mds structure
176 * \param index maxium ost index
178 * \retval -ENOMEM is not hame memory for new page
179 * \retval 0 is update passed
181 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
183 __u32 page = index / OBJID_PER_PAGE();
184 __u32 off = index % OBJID_PER_PAGE();
185 obd_id *data = mds->mds_lov_page_array[page];
188 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
192 mds->mds_lov_page_array[page] = data;
195 if (index > mds->mds_lov_objid_max_index) {
196 mds->mds_lov_objid_lastpage = page;
197 mds->mds_lov_objid_lastidx = off;
198 mds->mds_lov_objid_max_index = index;
201 /* workaround - New target not in objids file; increase mdsize */
202 /* ld_tgt_count is used as the max index everywhere, despite its name. */
203 if (data[off] == 0) {
207 mds->mds_lov_objid_count++;
208 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
209 mds->mds_lov_objid_count);
211 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
212 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
214 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
215 " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
216 mds->mds_max_cookiesize);
223 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
225 __u32 page = index / OBJID_PER_PAGE();
226 __u32 off = index % OBJID_PER_PAGE();
227 obd_id *data = mds->mds_lov_page_array[page];
229 return (data[off] > 0);
232 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
234 struct lov_ost_data_v1 *data;
239 /* if we create file without objects - lmm is NULL */
243 switch (le32_to_cpu(lmm->lmm_magic)) {
245 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
246 data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
249 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
250 data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
253 CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
258 cfs_mutex_down(&obd->obd_dev_sem);
259 for (j = 0; j < count; j++) {
260 __u32 i = le32_to_cpu(data[j].l_ost_idx);
261 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
266 cfs_mutex_up(&obd->obd_dev_sem);
270 EXPORT_SYMBOL(mds_lov_prepare_objids);
273 * write llog orphan record about lost ost object,
274 * Special lsm is allocated with single stripe, caller should deallocated it
277 static int mds_log_lost_precreated(struct obd_device *obd,
278 struct lov_stripe_md **lsmp, int *stripes,
279 obd_id id, obd_count count, int idx)
281 struct lov_stripe_md *lsm = *lsmp;
286 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
289 /* need only one stripe, save old value */
290 *stripes = lsm->lsm_stripe_count;
291 lsm->lsm_stripe_count = 1;
295 lsm->lsm_oinfo[0]->loi_id = id;
296 lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id);
297 lsm->lsm_oinfo[0]->loi_ost_idx = idx;
299 rc = mds_log_op_orphan(obd, lsm, count);
303 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
305 struct mds_obd *mds = &obd->u.mds;
307 struct lov_ost_data_v1 *obj;
308 struct lov_stripe_md *lsm = NULL;
313 /* if we create file without objects - lmm is NULL */
317 switch (le32_to_cpu(lmm->lmm_magic)) {
319 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
320 obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
323 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
324 obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
327 CERROR("Unknow lmm type %X !\n",
328 le32_to_cpu(lmm->lmm_magic));
332 for (j = 0; j < count; j++) {
333 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
334 obd_id id = le64_to_cpu(obj[j].l_object_id);
335 __u32 page = i / OBJID_PER_PAGE();
336 __u32 idx = i % OBJID_PER_PAGE();
339 data = mds->mds_lov_page_array[page];
341 CDEBUG(D_INODE,"update last object for ost %u"
342 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
343 if (id > data[idx]) {
344 int lost = id - data[idx] - 1;
345 /* we might have lost precreated objects due to VBR */
346 if (lost > 0 && obd->obd_recovering) {
347 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
348 if (!obd->obd_version_recov)
349 CERROR("Unexpected gap in objids\n");
350 /* lsm is allocated if NULL */
351 mds_log_lost_precreated(obd, &lsm, &stripes,
352 data[idx]+1, lost, i);
355 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
359 /* restore stripes number */
360 lsm->lsm_stripe_count = stripes;
361 obd_free_memmd(mds->mds_lov_exp, &lsm);
366 EXPORT_SYMBOL(mds_lov_update_objids);
368 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
374 for (i = 0; i < count; i++) {
378 mds->mds_lov_objid_count++;
381 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
382 mds->mds_lov_objid_count);
384 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
385 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
387 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
388 "%d/%d\n", stripes, mds->mds_max_mdsize,
389 mds->mds_max_cookiesize);
395 static int mds_lov_read_objids(struct obd_device *obd)
397 struct mds_obd *mds = &obd->u.mds;
399 int i, rc = 0, count = 0, page = 0;
403 /* Read everything in the file, even if our current lov desc
404 has fewer targets. Old targets not in the lov descriptor
405 during mds setup may still have valid objids. */
406 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
410 page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
411 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
412 for (i = 0; i < page; i++) {
414 loff_t off_old = off;
416 LASSERT(mds->mds_lov_page_array[i] == NULL);
417 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
418 if (mds->mds_lov_page_array[i] == NULL)
419 GOTO(out, rc = -ENOMEM);
421 data = mds->mds_lov_page_array[i];
423 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
424 MDS_LOV_ALLOC_SIZE, &off);
426 CERROR("Error reading objids %d\n", rc);
429 if (off == off_old) /* hole is read */
430 off += MDS_LOV_ALLOC_SIZE;
432 count = (off - off_old) / sizeof(obd_id);
433 if (mds_lov_update_from_read(mds, data, count)) {
434 CERROR("Can't update mds data\n");
435 GOTO(out, rc = -EIO);
438 mds->mds_lov_objid_lastpage = page - 1;
439 mds->mds_lov_objid_lastidx = count - 1;
441 CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
442 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
444 mds_lov_dump_objids("read",obd);
449 int mds_lov_write_objids(struct obd_device *obd)
451 struct mds_obd *mds = &obd->u.mds;
455 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
458 mds_lov_dump_objids("write", obd);
460 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
461 obd_id *data = mds->mds_lov_page_array[i];
462 unsigned int size = MDS_LOV_ALLOC_SIZE;
463 loff_t off = i * size;
465 LASSERT(data != NULL);
467 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
470 /* check for particaly filled last page */
471 if (i == mds->mds_lov_objid_lastpage)
472 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
474 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
475 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
478 cfs_bitmap_set(mds->mds_lov_page_dirty, i);
487 EXPORT_SYMBOL(mds_lov_write_objids);
489 static int mds_lov_get_objid(struct obd_device * obd,
492 struct mds_obd *mds = &obd->u.mds;
493 struct obd_export *lov_exp = mds->mds_lov_exp;
501 page = idx / OBJID_PER_PAGE();
502 off = idx % OBJID_PER_PAGE();
503 data = mds->mds_lov_page_array[page];
506 /* We never read this lastid; ask the osc */
507 struct obd_id_info lastid;
509 size = sizeof(lastid);
511 lastid.data = &data[off];
512 rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID), KEY_LAST_ID,
513 &size, &lastid, NULL);
517 /* workaround for clean filter */
521 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
523 CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
524 idx, data, page, off, data[off]);
529 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
532 struct obdo oa = { 0 };
533 struct obd_trans_info oti = {0};
534 struct lov_stripe_md *empty_ea = NULL;
537 LASSERT(mds->mds_lov_page_array != NULL);
539 /* This create will in fact either create or destroy: If the OST is
540 * missing objects below this ID, they will be created. If it finds
541 * objects above this ID, they will be removed. */
542 memset(&oa, 0, sizeof(oa));
543 oa.o_flags = OBD_FL_DELORPHAN;
544 oa.o_seq = mdt_to_obd_objseq(mds->mds_id);
545 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
546 if (ost_uuid != NULL)
547 oti.oti_ost_uuid = ost_uuid;
549 rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
555 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
557 struct mds_obd *mds = &obd->u.mds;
559 struct obd_id_info info;
562 LASSERT(!obd->obd_recovering);
566 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
567 KEY_NEXT_ID, sizeof(info), &info, NULL);
569 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
575 /* Update the lov desc for a new size lov. */
576 static int mds_lov_update_desc(struct obd_device *obd, int idx,
577 struct obd_uuid *uuid)
579 struct mds_obd *mds = &obd->u.mds;
581 __u32 valsize = sizeof(mds->mds_lov_desc);
585 OBD_ALLOC(ld, sizeof(*ld));
589 rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
594 /* Don't change the mds_lov_desc until the objids size matches the
596 mds->mds_lov_desc = *ld;
597 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
598 mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
600 cfs_mutex_down(&obd->obd_dev_sem);
601 rc = mds_lov_update_max_ost(mds, idx);
602 cfs_mutex_up(&obd->obd_dev_sem);
606 /* If we added a target we have to reconnect the llogs */
607 /* We only _need_ to do this at first add (idx), or the first time
608 after recovery. However, it should now be safe to call anytime. */
609 rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
614 OBD_FREE(ld, sizeof(*ld));
618 /* Inform MDS about new/updated target */
619 static int mds_lov_update_mds(struct obd_device *obd,
620 struct obd_device *watched,
623 struct mds_obd *mds = &obd->u.mds;
630 LASSERT(mds_lov_objinit(mds, idx));
632 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
633 idx, obd->obd_recovering, obd->obd_async_recov,
634 mds->mds_lov_desc.ld_tgt_count);
636 /* idx is set as data from lov_notify. */
637 if (obd->obd_recovering)
640 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
641 CERROR("index %d > count %d!\n", idx,
642 mds->mds_lov_desc.ld_tgt_count);
643 GOTO(out, rc = -EINVAL);
646 rc = mds_lov_get_objid(obd, idx);
650 page = idx / OBJID_PER_PAGE();
651 off = idx % OBJID_PER_PAGE();
652 data = mds->mds_lov_page_array[page];
654 /* We have read this lastid from disk; tell the osc.
655 Don't call this during recovery. */
656 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
658 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
659 /* Don't abort the rest of the sync */
662 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
669 /* update the LOV-OSC knowledge of the last used object id's */
670 int mds_lov_connect(struct obd_device *obd, char * lov_name)
672 struct mds_obd *mds = &obd->u.mds;
673 struct obd_connect_data *data;
677 if (IS_ERR(mds->mds_lov_obd))
678 RETURN(PTR_ERR(mds->mds_lov_obd));
680 if (mds->mds_lov_obd)
683 mds->mds_lov_obd = class_name2obd(lov_name);
684 if (!mds->mds_lov_obd) {
685 CERROR("MDS cannot locate LOV %s\n", lov_name);
686 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
690 cfs_mutex_down(&obd->obd_dev_sem);
691 rc = mds_lov_read_objids(obd);
692 cfs_mutex_up(&obd->obd_dev_sem);
694 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
698 rc = obd_register_observer(mds->mds_lov_obd, obd);
700 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
705 /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
707 obd_notify(mds->mds_lov_obd, NULL, OBD_NOTIFY_CREATE, NULL);
709 mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
711 OBD_ALLOC(data, sizeof(*data));
713 GOTO(err_exit, rc = -ENOMEM);
715 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
716 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
717 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FID |
718 OBD_CONNECT_BRW_SIZE | OBD_CONNECT_CKSUM |
719 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT |
720 OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN |
721 OBD_CONNECT_SOM | OBD_CONNECT_FULL20;
722 #ifdef HAVE_LRU_RESIZE_SUPPORT
723 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
725 data->ocd_version = LUSTRE_VERSION_CODE;
726 data->ocd_group = mdt_to_obd_objseq(mds->mds_id);
727 /* send max bytes per rpc */
728 data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
729 /* send the list of supported checksum types */
730 data->ocd_cksum_types = OBD_CKSUM_ALL;
731 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
732 rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
733 OBD_FREE(data, sizeof(*data));
735 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
736 mds->mds_lov_obd = ERR_PTR(rc);
740 /* I want to see a callback happen when the OBD moves to a
741 * "For General Use" state, and that's when we'll call
742 * set_nextid(). The class driver can help us here, because
743 * it can use the obd_recovering flag to determine when the
744 * the OBD is full available. */
745 /* MDD device will care about that
746 if (!obd->obd_recovering)
747 rc = mds_postrecov(obd);
752 mds->mds_lov_exp = NULL;
753 mds->mds_lov_obd = ERR_PTR(rc);
757 int mds_lov_disconnect(struct obd_device *obd)
759 struct mds_obd *mds = &obd->u.mds;
763 if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
764 obd_register_observer(mds->mds_lov_obd, NULL);
766 /* The actual disconnect of the mds_lov will be called from
767 * class_disconnect_exports from mds_lov_clean. So we have to
768 * ensure that class_cleanup doesn't fail due to the extra ref
769 * we're holding now. The mechanism to do that already exists -
770 * the obd_force flag. We'll drop the final ref to the
771 * mds_lov_exp in mds_cleanup. */
772 mds->mds_lov_obd->obd_force = 1;
778 struct mds_lov_sync_info {
779 struct obd_device *mlsi_obd; /* the lov device to sync */
780 struct obd_device *mlsi_watched; /* target osc */
781 __u32 mlsi_index; /* index of target */
784 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
786 struct mds_capa_info info = { .uuid = uuid };
787 struct lustre_capa_key *key;
792 if (!mds->mds_capa_keys)
795 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
796 for (i = 0; i < 2; i++) {
797 key = &mds->mds_capa_keys[i];
798 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
801 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_CAPA_KEY),
802 KEY_CAPA_KEY, sizeof(info), &info, NULL);
804 DEBUG_CAPA_KEY(D_ERROR, key,
805 "propagate failed (rc = %d) for", rc);
813 /* We only sync one osc at a time, so that we don't have to hold
814 any kind of lock on the whole mds_lov_desc, which may change
815 (grow) as a result of mds_lov_add_ost. This also avoids any
816 kind of mismatch between the lov_desc and the mds_lov_desc,
817 which are not in lock-step during lov_add_obd */
818 static int __mds_lov_synchronize(void *data)
820 struct mds_lov_sync_info *mlsi = data;
821 struct obd_device *obd = mlsi->mlsi_obd;
822 struct obd_device *watched = mlsi->mlsi_watched;
823 struct mds_obd *mds = &obd->u.mds;
824 struct obd_uuid *uuid;
825 __u32 idx = mlsi->mlsi_index;
826 struct mds_group_info mgi;
827 struct llog_ctxt *ctxt;
835 uuid = &watched->u.cli.cl_target_uuid;
838 cfs_down_read(&mds->mds_notify_lock);
839 if (obd->obd_stopping || obd->obd_fail)
840 GOTO(out, rc = -ENODEV);
842 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
843 rc = mds_lov_update_mds(obd, watched, idx);
845 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
848 mgi.group = mdt_to_obd_objseq(mds->mds_id);
851 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
852 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
855 /* propagate capability keys */
856 rc = mds_propagate_capa_keys(mds, uuid);
860 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
862 GOTO(out, rc = -ENODEV);
864 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
865 rc = llog_connect(ctxt, NULL, NULL, uuid);
868 CERROR("%s failed at llog_origin_connect: %d\n",
869 obd_uuid2str(uuid), rc);
873 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
874 obd->obd_name, obd_uuid2str(uuid));
876 rc = mds_lov_clear_orphans(mds, uuid);
878 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
879 obd_uuid2str(uuid), rc);
883 #ifdef HAVE_QUOTA_SUPPORT
884 if (obd->obd_upcall.onu_owner) {
886 * This is a hack for mds_notify->mdd_notify. When the mds obd
887 * in mdd is removed, This hack should be removed.
889 LASSERT(obd->obd_upcall.onu_upcall != NULL);
890 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
891 obd->obd_upcall.onu_owner,NULL);
896 cfs_up_read(&mds->mds_notify_lock);
898 /* Deactivate it for safety */
899 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
901 if (!obd->obd_stopping && mds->mds_lov_obd &&
902 !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
903 obd_notify(mds->mds_lov_obd, watched,
904 OBD_NOTIFY_INACTIVE, NULL);
907 class_decref(obd, "mds_lov_synchronize", obd);
911 int mds_lov_synchronize(void *data)
913 struct mds_lov_sync_info *mlsi = data;
916 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
917 cfs_daemonize_ctxt(name);
919 RETURN(__mds_lov_synchronize(data));
922 int mds_lov_start_synchronize(struct obd_device *obd,
923 struct obd_device *watched,
924 void *data, enum obd_notify_event ev)
926 struct mds_lov_sync_info *mlsi;
928 struct obd_uuid *uuid;
932 uuid = &watched->u.cli.cl_target_uuid;
934 OBD_ALLOC(mlsi, sizeof(*mlsi));
939 mlsi->mlsi_obd = obd;
940 mlsi->mlsi_watched = watched;
941 mlsi->mlsi_index = *(__u32 *)data;
943 /* Although class_export_get(obd->obd_self_export) would lock
944 the MDS in place, since it's only a self-export
945 it doesn't lock the LOV in place. The LOV can be disconnected
946 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
947 Simply taking an export ref on the LOV doesn't help, because it's
948 still disconnected. Taking an obd reference insures that we don't
949 disconnect the LOV. This of course means a cleanup won't
950 finish for as long as the sync is blocking. */
951 class_incref(obd, "mds_lov_synchronize", obd);
953 if (ev != OBD_NOTIFY_SYNC) {
954 /* Synchronize in the background */
955 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
956 CLONE_VM | CLONE_FILES);
958 CERROR("%s: error starting mds_lov_synchronize: %d\n",
960 class_decref(obd, "mds_lov_synchronize", obd);
962 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
963 "thread=%d\n", obd->obd_name,
964 mlsi->mlsi_index, rc);
968 rc = __mds_lov_synchronize((void *)mlsi);
974 int mds_notify(struct obd_device *obd, struct obd_device *watched,
975 enum obd_notify_event ev, void *data)
977 struct mds_obd *mds = &obd->u.mds;
981 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
983 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
984 CERROR("unexpected notification of %s %s!\n",
985 watched->obd_type->typ_name, watched->obd_name);
989 /*XXX this notifies the MDD until lov handling use old mds code
992 if (obd->obd_upcall.onu_owner) {
993 LASSERT(obd->obd_upcall.onu_upcall != NULL);
994 rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
995 obd->obd_upcall.onu_owner,
996 &mds->mds_obt.obt_mount_count);
1000 /* We only handle these: */
1001 case OBD_NOTIFY_CREATE:
1002 CWARN("MDS %s: add target %s\n",obd->obd_name,
1003 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1004 /* We still have to fix the lov descriptor for ost's */
1006 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1007 &watched->u.cli.cl_target_uuid);
1009 case OBD_NOTIFY_ACTIVE:
1010 /* lov want one or more _active_ targets for work */
1011 /* activate event should be pass lov idx as argument */
1012 case OBD_NOTIFY_SYNC:
1013 case OBD_NOTIFY_SYNC_NONBLOCK:
1014 /* sync event should be pass lov idx as argument */
1020 if (obd->obd_recovering) {
1021 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1023 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1024 /* We still have to fix the lov descriptor for ost's added
1025 after the mdt in the config log. They didn't make it into
1027 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1028 &watched->u.cli.cl_target_uuid);
1030 rc = mds_lov_start_synchronize(obd, watched, data, ev);