1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
54 #include "mds_internal.h"
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
58 struct mds_obd *mds = &obd->u.mds;
61 CDEBUG(D_INFO, "dump from %s\n", label);
62 if (mds->mds_lov_page_dirty == NULL) {
63 CERROR("NULL bitmap!\n");
67 for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
68 CDEBUG(D_INFO, "%u - %lx\n", i,
69 mds->mds_lov_page_dirty->data[i]);
71 if (mds->mds_lov_page_array == NULL) {
72 CERROR("not init page array!\n");
76 for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
77 obd_id *data = mds->mds_lov_page_array[i];
82 for(j=0; j < OBJID_PER_PAGE(); j++) {
85 CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
93 int mds_lov_init_objids(struct obd_device *obd)
95 struct mds_obd *mds = &obd->u.mds;
96 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
101 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
103 mds->mds_lov_page_dirty =
104 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
105 if (mds->mds_lov_page_dirty == NULL)
109 OBD_ALLOC(mds->mds_lov_page_array, size);
110 if (mds->mds_lov_page_array == NULL)
111 GOTO(err_free_bitmap, rc = -ENOMEM);
113 /* open and test the lov objd file */
114 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
117 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
118 GOTO(err_free, rc = PTR_ERR(file));
120 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
121 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
122 file->f_dentry->d_inode->i_mode);
123 GOTO(err_open, rc = -ENOENT);
125 mds->mds_lov_objid_filp = file;
129 if (filp_close((struct file *)file, 0))
130 CERROR("can't close %s after error\n", LOV_OBJID);
132 OBD_FREE(mds->mds_lov_page_array, size);
134 CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
139 void mds_lov_destroy_objids(struct obd_device *obd)
141 struct mds_obd *mds = &obd->u.mds;
145 if (mds->mds_lov_page_array != NULL) {
146 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
147 obd_id *data = mds->mds_lov_page_array[i];
149 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
151 OBD_FREE(mds->mds_lov_page_array,
152 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
155 if (mds->mds_lov_objid_filp) {
156 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
157 mds->mds_lov_objid_filp = NULL;
159 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
162 CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
167 * currently exist two ways for know about ost count and max ost index.
168 * first - after ost is connected to mds and sync process finished
169 * second - get from lmm in recovery process, in case when mds not have configs,
170 * and ost isn't registered in mgs.
172 * \param mds pointer to mds structure
173 * \param index maxium ost index
175 * \retval -ENOMEM is not hame memory for new page
176 * \retval 0 is update passed
178 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
180 __u32 page = index / OBJID_PER_PAGE();
181 __u32 off = index % OBJID_PER_PAGE();
182 obd_id *data = mds->mds_lov_page_array[page];
185 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
189 mds->mds_lov_page_array[page] = data;
192 if (index > mds->mds_lov_objid_max_index) {
193 mds->mds_lov_objid_lastpage = page;
194 mds->mds_lov_objid_lastidx = off;
195 mds->mds_lov_objid_max_index = index;
198 /* workaround - New target not in objids file; increase mdsize */
199 /* ld_tgt_count is used as the max index everywhere, despite its name. */
200 if (data[off] == 0) {
204 mds->mds_lov_objid_count++;
205 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
206 mds->mds_lov_objid_count);
208 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
209 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
211 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
212 " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
213 mds->mds_max_cookiesize);
220 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
222 struct lov_ost_data_v1 *data;
227 /* if we create file without objects - lmm is NULL */
231 switch (le32_to_cpu(lmm->lmm_magic)) {
233 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
234 data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
237 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
238 data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
241 CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
246 cfs_mutex_down(&obd->obd_dev_sem);
247 for (j = 0; j < count; j++) {
248 __u32 i = le32_to_cpu(data[j].l_ost_idx);
249 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
254 cfs_mutex_up(&obd->obd_dev_sem);
258 EXPORT_SYMBOL(mds_lov_prepare_objids);
261 * write llog orphan record about lost ost object,
262 * Special lsm is allocated with single stripe, caller should deallocated it
265 static int mds_log_lost_precreated(struct obd_device *obd,
266 struct lov_stripe_md **lsmp, int *stripes,
267 obd_id id, obd_count count, int idx)
269 struct lov_stripe_md *lsm = *lsmp;
274 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
277 /* need only one stripe, save old value */
278 *stripes = lsm->lsm_stripe_count;
279 lsm->lsm_stripe_count = 1;
283 lsm->lsm_oinfo[0]->loi_id = id;
284 lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id);
285 lsm->lsm_oinfo[0]->loi_ost_idx = idx;
287 rc = mds_log_op_orphan(obd, lsm, count);
291 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
293 struct mds_obd *mds = &obd->u.mds;
295 struct lov_ost_data_v1 *obj;
296 struct lov_stripe_md *lsm = NULL;
301 /* if we create file without objects - lmm is NULL */
305 switch (le32_to_cpu(lmm->lmm_magic)) {
307 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
308 obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
311 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
312 obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
315 CERROR("Unknow lmm type %X !\n",
316 le32_to_cpu(lmm->lmm_magic));
320 for (j = 0; j < count; j++) {
321 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
322 obd_id id = le64_to_cpu(obj[j].l_object_id);
323 __u32 page = i / OBJID_PER_PAGE();
324 __u32 idx = i % OBJID_PER_PAGE();
327 data = mds->mds_lov_page_array[page];
329 CDEBUG(D_INODE,"update last object for ost %u"
330 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
331 if (id > data[idx]) {
332 int lost = id - data[idx] - 1;
333 /* we might have lost precreated objects due to VBR */
334 if (lost > 0 && obd->obd_recovering) {
335 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
336 if (!obd->obd_version_recov)
337 CERROR("Unexpected gap in objids\n");
338 /* lsm is allocated if NULL */
339 mds_log_lost_precreated(obd, &lsm, &stripes,
340 data[idx]+1, lost, i);
343 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
347 /* restore stripes number */
348 lsm->lsm_stripe_count = stripes;
349 obd_free_memmd(mds->mds_lov_exp, &lsm);
354 EXPORT_SYMBOL(mds_lov_update_objids);
356 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
362 for (i = 0; i < count; i++) {
366 mds->mds_lov_objid_count++;
369 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
370 mds->mds_lov_objid_count);
372 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
373 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
375 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
376 "%d/%d\n", stripes, mds->mds_max_mdsize,
377 mds->mds_max_cookiesize);
383 static int mds_lov_read_objids(struct obd_device *obd)
385 struct mds_obd *mds = &obd->u.mds;
387 int i, rc = 0, count = 0, page = 0;
391 /* Read everything in the file, even if our current lov desc
392 has fewer targets. Old targets not in the lov descriptor
393 during mds setup may still have valid objids. */
394 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
398 page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
399 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
400 for (i = 0; i < page; i++) {
402 loff_t off_old = off;
404 LASSERT(mds->mds_lov_page_array[i] == NULL);
405 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
406 if (mds->mds_lov_page_array[i] == NULL)
407 GOTO(out, rc = -ENOMEM);
409 data = mds->mds_lov_page_array[i];
411 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
412 MDS_LOV_ALLOC_SIZE, &off);
414 CERROR("Error reading objids %d\n", rc);
417 if (off == off_old) /* hole is read */
418 off += MDS_LOV_ALLOC_SIZE;
420 count = (off - off_old) / sizeof(obd_id);
421 if (mds_lov_update_from_read(mds, data, count)) {
422 CERROR("Can't update mds data\n");
423 GOTO(out, rc = -EIO);
426 mds->mds_lov_objid_lastpage = page - 1;
427 mds->mds_lov_objid_lastidx = count - 1;
429 CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
430 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
432 mds_lov_dump_objids("read",obd);
437 int mds_lov_write_objids(struct obd_device *obd)
439 struct mds_obd *mds = &obd->u.mds;
443 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
446 mds_lov_dump_objids("write", obd);
448 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
449 obd_id *data = mds->mds_lov_page_array[i];
450 unsigned int size = MDS_LOV_ALLOC_SIZE;
451 loff_t off = i * size;
453 LASSERT(data != NULL);
455 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
458 /* check for particaly filled last page */
459 if (i == mds->mds_lov_objid_lastpage)
460 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
462 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
463 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
466 cfs_bitmap_set(mds->mds_lov_page_dirty, i);
475 EXPORT_SYMBOL(mds_lov_write_objids);
477 static int mds_lov_get_objid(struct obd_device * obd,
480 struct mds_obd *mds = &obd->u.mds;
481 struct obd_export *lov_exp = mds->mds_lov_exp;
490 page = idx / OBJID_PER_PAGE();
491 off = idx % OBJID_PER_PAGE();
492 data = mds->mds_lov_page_array[page];
494 size = sizeof(__u64);
496 rc = obd_get_info(lov_exp, sizeof(KEY_CONNECT_FLAG), KEY_CONNECT_FLAG,
497 &size, &connect_flags, NULL);
501 if (data[off] < 2 || connect_flags & OBD_CONNECT_SKIP_ORPHAN) {
502 /* We never read this lastid; ask the osc */
503 struct obd_id_info lastid;
505 size = sizeof(lastid);
507 lastid.data = &data[off];
508 rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID), KEY_LAST_ID,
509 &size, &lastid, NULL);
513 /* workaround for clean filter */
517 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
519 CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
520 idx, data, page, off, data[off]);
525 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
528 struct obdo oa = { 0 };
529 struct obd_trans_info oti = {0};
530 struct lov_stripe_md *empty_ea = NULL;
533 LASSERT(mds->mds_lov_page_array != NULL);
535 /* This create will in fact either create or destroy: If the OST is
536 * missing objects below this ID, they will be created. If it finds
537 * objects above this ID, they will be removed. */
538 memset(&oa, 0, sizeof(oa));
539 oa.o_flags = OBD_FL_DELORPHAN;
540 oa.o_seq = mdt_to_obd_objseq(mds->mds_id);
541 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
542 if (ost_uuid != NULL)
543 oti.oti_ost_uuid = ost_uuid;
545 rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
551 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
553 struct mds_obd *mds = &obd->u.mds;
555 struct obd_id_info info;
558 LASSERT(!obd->obd_recovering);
562 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
563 KEY_NEXT_ID, sizeof(info), &info, NULL);
565 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
571 /* Update the lov desc for a new size lov. */
572 static int mds_lov_update_desc(struct obd_device *obd, int idx,
573 struct obd_uuid *uuid, enum obd_notify_event ev)
575 struct mds_obd *mds = &obd->u.mds;
577 __u32 valsize = sizeof(mds->mds_lov_desc);
581 OBD_ALLOC(ld, sizeof(*ld));
585 rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
590 /* Don't change the mds_lov_desc until the objids size matches the
592 mds->mds_lov_desc = *ld;
593 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
594 mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
596 cfs_mutex_down(&obd->obd_dev_sem);
597 rc = mds_lov_update_max_ost(mds, idx);
598 cfs_mutex_up(&obd->obd_dev_sem);
602 /* If we added a target we have to reconnect the llogs */
603 /* We only _need_ to do this at first add (idx), or the first time
604 after recovery. However, it should now be safe to call anytime. */
605 rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
609 /*XXX this notifies the MDD until lov handling use old mds code */
610 if (obd->obd_upcall.onu_owner) {
611 LASSERT(obd->obd_upcall.onu_upcall != NULL);
612 rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
613 obd->obd_upcall.onu_owner,
614 &mds->mds_obt.obt_mount_count);
617 OBD_FREE(ld, sizeof(*ld));
621 /* Inform MDS about new/updated target */
622 static int mds_lov_update_mds(struct obd_device *obd,
623 struct obd_device *watched,
624 __u32 idx, enum obd_notify_event ev)
626 struct mds_obd *mds = &obd->u.mds;
634 /* Don't let anyone else mess with mds_lov_objids now */
635 rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid, ev);
639 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
640 idx, obd->obd_recovering, obd->obd_async_recov,
641 mds->mds_lov_desc.ld_tgt_count);
643 /* idx is set as data from lov_notify. */
644 if (obd->obd_recovering)
647 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
648 CERROR("index %d > count %d!\n", idx,
649 mds->mds_lov_desc.ld_tgt_count);
650 GOTO(out, rc = -EINVAL);
653 rc = mds_lov_get_objid(obd, idx);
657 page = idx / OBJID_PER_PAGE();
658 off = idx % OBJID_PER_PAGE();
659 data = mds->mds_lov_page_array[page];
661 /* We have read this lastid from disk; tell the osc.
662 Don't call this during recovery. */
663 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
665 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
666 /* Don't abort the rest of the sync */
669 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
676 /* update the LOV-OSC knowledge of the last used object id's */
677 int mds_lov_connect(struct obd_device *obd, char * lov_name)
679 struct mds_obd *mds = &obd->u.mds;
680 struct obd_connect_data *data;
684 if (IS_ERR(mds->mds_lov_obd))
685 RETURN(PTR_ERR(mds->mds_lov_obd));
687 if (mds->mds_lov_obd)
690 mds->mds_lov_obd = class_name2obd(lov_name);
691 if (!mds->mds_lov_obd) {
692 CERROR("MDS cannot locate LOV %s\n", lov_name);
693 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
697 cfs_mutex_down(&obd->obd_dev_sem);
698 rc = mds_lov_read_objids(obd);
699 cfs_mutex_up(&obd->obd_dev_sem);
701 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
705 rc = obd_register_observer(mds->mds_lov_obd, obd);
707 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
712 /* try init too early */
713 rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
717 mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
719 OBD_ALLOC(data, sizeof(*data));
721 GOTO(err_exit, rc = -ENOMEM);
723 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
724 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
725 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FID |
726 OBD_CONNECT_BRW_SIZE | OBD_CONNECT_CKSUM |
727 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT |
728 OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN |
729 OBD_CONNECT_SOM | OBD_CONNECT_FULL20;
730 #ifdef HAVE_LRU_RESIZE_SUPPORT
731 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
733 data->ocd_version = LUSTRE_VERSION_CODE;
734 data->ocd_group = mdt_to_obd_objseq(mds->mds_id);
735 /* send max bytes per rpc */
736 data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
737 /* send the list of supported checksum types */
738 data->ocd_cksum_types = OBD_CKSUM_ALL;
739 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
740 rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
741 OBD_FREE(data, sizeof(*data));
743 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
744 mds->mds_lov_obd = ERR_PTR(rc);
748 /* I want to see a callback happen when the OBD moves to a
749 * "For General Use" state, and that's when we'll call
750 * set_nextid(). The class driver can help us here, because
751 * it can use the obd_recovering flag to determine when the
752 * the OBD is full available. */
753 /* MDD device will care about that
754 if (!obd->obd_recovering)
755 rc = mds_postrecov(obd);
760 mds->mds_lov_exp = NULL;
761 mds->mds_lov_obd = ERR_PTR(rc);
765 int mds_lov_disconnect(struct obd_device *obd)
767 struct mds_obd *mds = &obd->u.mds;
771 if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
772 obd_register_observer(mds->mds_lov_obd, NULL);
774 /* The actual disconnect of the mds_lov will be called from
775 * class_disconnect_exports from mds_lov_clean. So we have to
776 * ensure that class_cleanup doesn't fail due to the extra ref
777 * we're holding now. The mechanism to do that already exists -
778 * the obd_force flag. We'll drop the final ref to the
779 * mds_lov_exp in mds_cleanup. */
780 mds->mds_lov_obd->obd_force = 1;
786 struct mds_lov_sync_info {
787 struct obd_device *mlsi_obd; /* the lov device to sync */
788 struct obd_device *mlsi_watched; /* target osc */
789 __u32 mlsi_index; /* index of target */
790 enum obd_notify_event mlsi_ev; /* event type */
793 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
795 struct mds_capa_info info = { .uuid = uuid };
796 struct lustre_capa_key *key;
801 if (!mds->mds_capa_keys)
804 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
805 for (i = 0; i < 2; i++) {
806 key = &mds->mds_capa_keys[i];
807 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
810 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_CAPA_KEY),
811 KEY_CAPA_KEY, sizeof(info), &info, NULL);
813 DEBUG_CAPA_KEY(D_ERROR, key,
814 "propagate failed (rc = %d) for", rc);
822 /* We only sync one osc at a time, so that we don't have to hold
823 any kind of lock on the whole mds_lov_desc, which may change
824 (grow) as a result of mds_lov_add_ost. This also avoids any
825 kind of mismatch between the lov_desc and the mds_lov_desc,
826 which are not in lock-step during lov_add_obd */
827 static int __mds_lov_synchronize(void *data)
829 struct mds_lov_sync_info *mlsi = data;
830 struct obd_device *obd = mlsi->mlsi_obd;
831 struct obd_device *watched = mlsi->mlsi_watched;
832 struct mds_obd *mds = &obd->u.mds;
833 struct obd_uuid *uuid;
834 __u32 idx = mlsi->mlsi_index;
835 enum obd_notify_event ev = mlsi->mlsi_ev;
836 struct mds_group_info mgi;
837 struct llog_ctxt *ctxt;
845 uuid = &watched->u.cli.cl_target_uuid;
848 cfs_down_read(&mds->mds_notify_lock);
849 if (obd->obd_stopping || obd->obd_fail)
850 GOTO(out, rc = -ENODEV);
852 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
853 rc = mds_lov_update_mds(obd, watched, idx, ev);
855 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
858 mgi.group = mdt_to_obd_objseq(mds->mds_id);
861 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
862 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
865 /* propagate capability keys */
866 rc = mds_propagate_capa_keys(mds, uuid);
870 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
872 GOTO(out, rc = -ENODEV);
874 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
875 rc = llog_connect(ctxt, NULL, NULL, uuid);
878 CERROR("%s failed at llog_origin_connect: %d\n",
879 obd_uuid2str(uuid), rc);
883 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
884 obd->obd_name, obd_uuid2str(uuid));
885 rc = mds_lov_clear_orphans(mds, uuid);
887 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
888 obd_uuid2str(uuid), rc);
892 #ifdef HAVE_QUOTA_SUPPORT
893 if (obd->obd_upcall.onu_owner) {
895 * This is a hack for mds_notify->mdd_notify. When the mds obd
896 * in mdd is removed, This hack should be removed.
898 LASSERT(obd->obd_upcall.onu_upcall != NULL);
899 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
900 obd->obd_upcall.onu_owner,NULL);
905 cfs_up_read(&mds->mds_notify_lock);
907 /* Deactivate it for safety */
908 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
910 if (!obd->obd_stopping && mds->mds_lov_obd &&
911 !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
912 obd_notify(mds->mds_lov_obd, watched,
913 OBD_NOTIFY_INACTIVE, NULL);
916 class_decref(obd, "mds_lov_synchronize", obd);
920 int mds_lov_synchronize(void *data)
922 struct mds_lov_sync_info *mlsi = data;
925 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
926 cfs_daemonize_ctxt(name);
928 RETURN(__mds_lov_synchronize(data));
931 int mds_lov_start_synchronize(struct obd_device *obd,
932 struct obd_device *watched,
933 void *data, enum obd_notify_event ev)
935 struct mds_lov_sync_info *mlsi;
937 struct obd_uuid *uuid;
941 uuid = &watched->u.cli.cl_target_uuid;
943 OBD_ALLOC(mlsi, sizeof(*mlsi));
948 mlsi->mlsi_obd = obd;
949 mlsi->mlsi_watched = watched;
950 mlsi->mlsi_index = *(__u32 *)data;
953 /* Although class_export_get(obd->obd_self_export) would lock
954 the MDS in place, since it's only a self-export
955 it doesn't lock the LOV in place. The LOV can be disconnected
956 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
957 Simply taking an export ref on the LOV doesn't help, because it's
958 still disconnected. Taking an obd reference insures that we don't
959 disconnect the LOV. This of course means a cleanup won't
960 finish for as long as the sync is blocking. */
961 class_incref(obd, "mds_lov_synchronize", obd);
963 if (ev != OBD_NOTIFY_SYNC) {
964 /* Synchronize in the background */
965 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
966 CLONE_VM | CLONE_FILES);
968 CERROR("%s: error starting mds_lov_synchronize: %d\n",
970 class_decref(obd, "mds_lov_synchronize", obd);
972 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
973 "thread=%d\n", obd->obd_name,
974 mlsi->mlsi_index, rc);
978 rc = __mds_lov_synchronize((void *)mlsi);
984 int mds_notify(struct obd_device *obd, struct obd_device *watched,
985 enum obd_notify_event ev, void *data)
990 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
993 /* We only handle these: */
994 case OBD_NOTIFY_ACTIVE:
995 /* lov want one or more _active_ targets for work */
996 /* activate event should be pass lov idx as argument */
997 case OBD_NOTIFY_SYNC:
998 case OBD_NOTIFY_SYNC_NONBLOCK:
999 /* sync event should be pass lov idx as argument */
1005 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1006 CERROR("unexpected notification of %s %s!\n",
1007 watched->obd_type->typ_name, watched->obd_name);
1011 if (obd->obd_recovering) {
1012 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1014 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1015 /* We still have to fix the lov descriptor for ost's added
1016 after the mdt in the config log. They didn't make it into
1018 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1019 &watched->u.cli.cl_target_uuid, ev);
1021 rc = mds_lov_start_synchronize(obd, watched, data, ev);