1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
42 #include "mds_internal.h"
44 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
46 struct mds_obd *mds = &obd->u.mds;
51 down_read(&mds->mds_lov_objids_sem);
53 spin_lock(&mds->mds_lov_objids_lock);
54 for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++) {
55 if (ids[i] > (mds->mds_lov_objids)[i]) {
56 (mds->mds_lov_objids)[i] = ids[i];
60 mds->mds_lov_objids_dirty = dirty;
62 spin_unlock(&mds->mds_lov_objids_lock);
64 up_read(&mds->mds_lov_objids_sem);
68 EXPORT_SYMBOL(mds_lov_update_objids);
70 static int mds_lov_read_objids(struct obd_device *obd)
72 struct mds_obd *mds = &obd->u.mds;
78 LASSERT(!mds->mds_lov_objids_count);
79 LASSERT(!mds->mds_lov_objids_dirty);
81 /* Read everything in the file, even if our current lov desc
82 has fewer targets. Old targets not in the lov descriptor
83 during mds setup may still have valid objids. */
84 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
90 GOTO(out, rc = -ENOMEM);
92 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
95 CERROR("Error reading objids %d\n", rc);
98 mds->mds_lov_objids = ids;
99 mds->mds_lov_objids_count = size / sizeof(*ids);
101 for (i = 0; i < mds->mds_lov_objids_count; i++) {
102 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
103 mds->mds_lov_objids[i], i);
109 /* must held mds_lov_objids_sem */
110 int mds_lov_write_objids(struct obd_device *obd)
112 struct mds_obd *mds = &obd->u.mds;
117 spin_lock(&mds->mds_lov_objids_lock);
118 if (!mds->mds_lov_objids_dirty) {
119 spin_unlock(&mds->mds_lov_objids_lock);
122 mds->mds_lov_objids_dirty = 0;
123 spin_unlock(&mds->mds_lov_objids_lock);
125 tgts = mds->mds_lov_objids_count;
129 for (i = 0; i < tgts; i++)
130 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
131 mds->mds_lov_objids[i], i);
133 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
134 mds->mds_lov_objids, tgts * sizeof(obd_id),
138 cfs_waitq_signal(&mds->mds_lov_objids_wait);
140 mds->mds_lov_objids_dirty = 1;
145 EXPORT_SYMBOL(mds_lov_write_objids);
147 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
151 struct obd_trans_info oti = {0};
152 struct lov_stripe_md *empty_ea = NULL;
155 LASSERT(mds->mds_lov_objids != NULL);
157 /* This create will in fact either create or destroy: If the OST is
158 * missing objects below this ID, they will be created. If it finds
159 * objects above this ID, they will be removed. */
160 memset(&oa, 0, sizeof(oa));
161 oa.o_flags = OBD_FL_DELORPHAN;
162 oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
163 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
164 if (ost_uuid != NULL) {
165 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
166 oa.o_valid |= OBD_MD_FLINLINE;
168 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
173 /* update the LOV-OSC knowledge of the last used object id's */
174 int mds_lov_set_nextid(struct obd_device *obd)
176 struct mds_obd *mds = &obd->u.mds;
180 LASSERT(!obd->obd_recovering);
181 if (mds->mds_lov_objids_count == 0)
184 LASSERT(mds->mds_lov_objids != NULL);
186 /* mds->mds_lov_objids_sem must be held so mds_lov_objids doesn't change
187 * we need use mds->mds_lov_desc.ld_tgt_count because in recovery not all
188 * target can be connected at start time */
189 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
191 mds->mds_lov_desc.ld_tgt_count *
192 sizeof(*mds->mds_lov_objids),
193 mds->mds_lov_objids, NULL);
196 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
202 /* Update the lov desc for a new size lov. */
203 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
205 struct mds_obd *mds = &obd->u.mds;
207 __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
211 OBD_ALLOC(ld, sizeof(*ld));
215 rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
220 /* The size of the LOV target table may have increased. */
221 if (ld->ld_tgt_count > mds->mds_lov_objids_count) {
224 size = ld->ld_tgt_count * sizeof(obd_id);
226 OBD_ALLOC(ids, size);
228 GOTO(out, rc = -ENOMEM);
229 memset(ids, 0, size);
231 /* write lock is enough for protect from access to
232 * old pointer in mds_lov_update_objids and dirty == 0
233 * is enough for protect from access in write_objids */
234 if (mds->mds_lov_objids) {
235 struct l_wait_info lwi = { 0 };
236 obd_id *old_ids = mds->mds_lov_objids;
238 memcpy(ids, mds->mds_lov_objids,
239 mds_lov_objids_size(mds));
241 l_wait_event(mds->mds_lov_objids_wait,
242 mds->mds_lov_objids_dirty == 0, &lwi);
244 OBD_FREE(old_ids, mds_lov_objids_size(mds));
246 mds->mds_lov_objids = ids;
247 mds->mds_lov_objids_count = ld->ld_tgt_count;
250 /* Don't change the mds_lov_desc until the objids size matches the
252 mds->mds_lov_desc = *ld;
253 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
254 mds->mds_lov_desc.ld_tgt_count);
256 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
257 max(mds->mds_lov_desc.ld_tgt_count,
258 mds->mds_lov_objids_count));
259 mds->mds_max_mdsize = lov_mds_md_size(stripes);
260 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
261 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
262 "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
265 /* If we added a target we have to reconnect the llogs */
266 /* We only _need_ to do this at first add (idx), or the first time
267 after recovery. However, it should now be safe to call anytime. */
268 mutex_down(&obd->obd_dev_sem);
269 rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
270 mutex_up(&obd->obd_dev_sem);
272 /*XXX this notifies the MDD until lov handling use old mds code */
273 if (obd->obd_upcall.onu_owner) {
274 LASSERT(obd->obd_upcall.onu_upcall != NULL);
275 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
276 obd->obd_upcall.onu_owner);
279 OBD_FREE(ld, sizeof(*ld));
284 #define MDSLOV_NO_INDEX -1
286 /* Inform MDS about new/updated target */
287 static int mds_lov_update_mds(struct obd_device *obd,
288 struct obd_device *watched,
289 __u32 idx, struct obd_uuid *uuid)
291 struct mds_obd *mds = &obd->u.mds;
295 __u32 size = sizeof(lastid);
299 old_count = mds->mds_lov_desc.ld_tgt_count;
300 down_write(&mds->mds_lov_objids_sem);
301 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
302 downgrade_write(&mds->mds_lov_objids_sem);
306 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
307 idx, obd->obd_recovering, obd->obd_async_recov, old_count,
308 mds->mds_lov_desc.ld_tgt_count);
310 /* idx is set as data from lov_notify. */
311 if (idx == MDSLOV_NO_INDEX || obd->obd_recovering)
314 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
315 CERROR("index %d > count %d!\n", idx,
316 mds->mds_lov_desc.ld_tgt_count);
317 GOTO(out, rc = -EINVAL);
319 rc = obd_get_info(watched->obd_self_export, sizeof("last_id"),
320 "last_id", &size, &lastid);
324 spin_lock(&mds->mds_lov_objids_lock);
325 if (mds->mds_lov_objids[idx] == 0 || mds->mds_lov_objids[idx] > lastid) {
326 /* last id not init or corrupted - use data from osc */
327 mds->mds_lov_objids[idx] = lastid;
328 mds->mds_lov_objids_dirty = 1;
329 spin_unlock(&mds->mds_lov_objids_lock);
330 /* not need write immediately, mark for write for avoid
333 spin_unlock(&mds->mds_lov_objids_lock);
334 /* We have read this lastid from disk; tell the osc.
335 Don't call this during recovery. */
336 rc = mds_lov_set_nextid(obd);
339 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
340 mds->mds_lov_objids[idx], idx);
343 up_read(&mds->mds_lov_objids_sem);
347 int mds_update_objids_from_lastid(struct obd_device *obd)
349 struct mds_obd *mds = &obd->u.mds;
353 if (mds->mds_lov_objids_count < mds->mds_lov_desc.ld_tgt_count) {
356 size = mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id);
357 OBD_ALLOC(ids, size);
359 GOTO(out, rc = -ENOMEM);
361 OBD_FREE(mds->mds_lov_objids, mds_lov_objids_size(mds));
362 mds->mds_lov_objids = ids;
363 mds->mds_lov_objids_count = size / sizeof(obd_id);
366 size = mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id);
367 rc = obd_get_info(mds->mds_osc_exp, sizeof("last_id"),
368 "last_id", &size, mds->mds_lov_objids);
370 for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
371 CWARN("got last object "LPU64" from OST %d\n",
372 mds->mds_lov_objids[i], i);
373 mds->mds_lov_objids_dirty = 1;
374 rc = mds_lov_write_objids(obd);
376 CERROR("got last objids from OSTs, but error "
377 "writing objids file: %d\n", rc);
384 /* update the LOV-OSC knowledge of the last used object id's */
385 int mds_lov_connect(struct obd_device *obd, char * lov_name)
387 struct mds_obd *mds = &obd->u.mds;
388 struct lustre_handle conn = {0,};
389 struct obd_connect_data *data;
393 if (IS_ERR(mds->mds_osc_obd))
394 RETURN(PTR_ERR(mds->mds_osc_obd));
396 if (mds->mds_osc_obd)
399 mds->mds_osc_obd = class_name2obd(lov_name);
400 if (!mds->mds_osc_obd) {
401 CERROR("MDS cannot locate LOV %s\n", lov_name);
402 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
406 OBD_ALLOC(data, sizeof(*data));
409 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
410 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
411 OBD_CONNECT_OSS_CAPA;
412 #ifdef HAVE_LRU_RESIZE_SUPPORT
413 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
415 data->ocd_version = LUSTRE_VERSION_CODE;
416 data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0;
417 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
418 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
419 OBD_FREE(data, sizeof(*data));
421 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
422 mds->mds_osc_obd = ERR_PTR(rc);
425 mds->mds_osc_exp = class_conn2export(&conn);
427 rc = obd_register_observer(mds->mds_osc_obd, obd);
429 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
431 GOTO(err_discon, rc);
434 /* Deny new client connections until we are sure we have some OSTs */
435 obd->obd_no_conn = 1;
437 down_write(&mds->mds_lov_objids_sem);
438 rc = mds_lov_read_objids(obd);
440 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
444 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
448 /* If we're mounting this code for the first time on an existing FS,
449 * we need to populate the objids array from the real OST values */
450 if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_count) {
451 mds_update_objids_from_lastid(obd);
453 up_write(&mds->mds_lov_objids_sem);
455 /* I want to see a callback happen when the OBD moves to a
456 * "For General Use" state, and that's when we'll call
457 * set_nextid(). The class driver can help us here, because
458 * it can use the obd_recovering flag to determine when the
459 * the OBD is full available. */
460 /* MDD device will care about that
461 if (!obd->obd_recovering)
462 rc = mds_postrecov(obd);
467 up_write(&mds->mds_lov_objids_sem);
468 obd_register_observer(mds->mds_osc_obd, NULL);
470 obd_disconnect(mds->mds_osc_exp);
471 mds->mds_osc_exp = NULL;
472 mds->mds_osc_obd = ERR_PTR(rc);
476 int mds_lov_disconnect(struct obd_device *obd)
478 struct mds_obd *mds = &obd->u.mds;
482 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
483 obd_register_observer(mds->mds_osc_obd, NULL);
485 /* The actual disconnect of the mds_lov will be called from
486 * class_disconnect_exports from mds_lov_clean. So we have to
487 * ensure that class_cleanup doesn't fail due to the extra ref
488 * we're holding now. The mechanism to do that already exists -
489 * the obd_force flag. We'll drop the final ref to the
490 * mds_osc_exp in mds_cleanup. */
491 mds->mds_osc_obd->obd_force = 1;
497 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
498 void *karg, void *uarg)
500 static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
501 struct obd_device *obd = exp->exp_obd;
502 struct mds_obd *mds = &obd->u.mds;
503 struct obd_ioctl_data *data = karg;
504 struct lvfs_run_ctxt saved;
508 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
511 case OBD_IOC_RECORD: {
512 char *name = data->ioc_inlbuf1;
513 if (mds->mds_cfg_llh)
516 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
517 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
518 &mds->mds_cfg_llh, NULL, name);
520 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
523 mds->mds_cfg_llh = NULL;
524 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
529 case OBD_IOC_ENDRECORD: {
530 if (!mds->mds_cfg_llh)
533 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
534 rc = llog_close(mds->mds_cfg_llh);
535 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
537 mds->mds_cfg_llh = NULL;
541 case OBD_IOC_CLEAR_LOG: {
542 char *name = data->ioc_inlbuf1;
543 if (mds->mds_cfg_llh)
546 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
547 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
548 &mds->mds_cfg_llh, NULL, name);
550 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
553 rc = llog_destroy(mds->mds_cfg_llh);
554 llog_free_handle(mds->mds_cfg_llh);
556 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
558 mds->mds_cfg_llh = NULL;
562 case OBD_IOC_DORECORD: {
564 struct llog_rec_hdr rec;
565 if (!mds->mds_cfg_llh)
568 rec.lrh_len = llog_data_len(data->ioc_plen1);
570 if (data->ioc_type == LUSTRE_CFG_TYPE) {
571 rec.lrh_type = OBD_CFG_REC;
573 CERROR("unknown cfg record type:%d \n", data->ioc_type);
577 OBD_ALLOC(cfg_buf, data->ioc_plen1);
580 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
582 OBD_FREE(cfg_buf, data->ioc_plen1);
586 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
587 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
589 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
591 OBD_FREE(cfg_buf, data->ioc_plen1);
595 case OBD_IOC_PARSE: {
596 struct llog_ctxt *ctxt =
597 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
598 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
599 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
600 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
607 case OBD_IOC_DUMP_LOG: {
608 struct llog_ctxt *ctxt =
609 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
610 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
611 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
612 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
620 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
621 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
625 case OBD_IOC_SET_READONLY: {
627 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
628 BDEVNAME_DECLARE_STORAGE(tmp);
629 CERROR("*** setting device %s read-only ***\n",
630 ll_bdevname(obd->u.obt.obt_sb, tmp));
632 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
634 rc = fsfilt_commit(obd, inode, handle, 1);
636 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
637 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
639 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
643 case OBD_IOC_CATLOGLIST: {
644 int count = mds->mds_lov_desc.ld_tgt_count;
645 rc = llog_catalog_list(obd, count, data);
649 case OBD_IOC_LLOG_CHECK:
650 case OBD_IOC_LLOG_CANCEL:
651 case OBD_IOC_LLOG_REMOVE: {
652 struct llog_ctxt *ctxt =
653 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
657 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
658 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
659 rc = llog_ioctl(ctxt, cmd, data);
660 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
661 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
662 group = FILTER_GROUP_MDS0 + mds->mds_id;
663 rc2 = obd_set_info_async(mds->mds_osc_exp,
664 strlen(KEY_MDS_CONN), KEY_MDS_CONN,
665 sizeof(group), &group, NULL);
670 case OBD_IOC_LLOG_INFO:
671 case OBD_IOC_LLOG_PRINT: {
672 struct llog_ctxt *ctxt =
673 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
675 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
676 rc = llog_ioctl(ctxt, cmd, data);
677 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
682 case OBD_IOC_ABORT_RECOVERY:
683 CERROR("aborting recovery for device %s\n", obd->obd_name);
684 target_stop_recovery_thread(obd);
688 CDEBUG(D_INFO, "unknown command %x\n", cmd);
695 struct mds_lov_sync_info {
696 struct obd_device *mlsi_obd; /* the lov device to sync */
697 struct obd_device *mlsi_watched; /* target osc */
698 __u32 mlsi_index; /* index of target */
701 static int mds_propagate_capa_keys(struct mds_obd *mds)
703 struct lustre_capa_key *key;
708 if (!mds->mds_capa_keys)
711 for (i = 0; i < 2; i++) {
712 key = &mds->mds_capa_keys[i];
713 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
715 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
716 KEY_CAPA_KEY, sizeof(*key), key, NULL);
718 DEBUG_CAPA_KEY(D_ERROR, key,
719 "propagate failed (rc = %d) for", rc);
727 /* We only sync one osc at a time, so that we don't have to hold
728 any kind of lock on the whole mds_lov_desc, which may change
729 (grow) as a result of mds_lov_add_ost. This also avoids any
730 kind of mismatch between the lov_desc and the mds_lov_desc,
731 which are not in lock-step during lov_add_obd */
732 static int __mds_lov_synchronize(void *data)
734 struct mds_lov_sync_info *mlsi = data;
735 struct obd_device *obd = mlsi->mlsi_obd;
736 struct obd_device *watched = mlsi->mlsi_watched;
737 struct mds_obd *mds = &obd->u.mds;
738 struct obd_uuid *uuid;
739 __u32 idx = mlsi->mlsi_index;
740 struct mds_group_info mgi;
744 OBD_FREE(mlsi, sizeof(*mlsi));
748 uuid = &watched->u.cli.cl_target_uuid;
751 rc = mds_lov_update_mds(obd, watched, idx, uuid);
754 mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
756 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
757 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
761 /* propagate capability keys */
762 rc = mds_propagate_capa_keys(mds);
766 rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
767 mds->mds_lov_desc.ld_tgt_count,
771 CERROR("%s: failed at llog_origin_connect: %d\n",
776 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
777 obd->obd_name, obd_uuid2str(uuid));
779 * FIXME: this obd_stopping was useless,
780 * since obd in mdt layer was set
782 if (obd->obd_stopping)
783 GOTO(out, rc = -ENODEV);
785 rc = mds_lov_clear_orphans(mds, uuid);
787 CERROR("%s: failed at mds_lov_clear_orphans: %d\n",
792 if (obd->obd_upcall.onu_owner) {
794 * This is a hack for mds_notify->mdd_notify. When the mds obd
795 * in mdd is removed, This hack should be removed.
797 LASSERT(obd->obd_upcall.onu_upcall != NULL);
798 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
799 obd->obd_upcall.onu_owner);
807 int mds_lov_synchronize(void *data)
809 struct mds_lov_sync_info *mlsi = data;
812 if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
813 /* There is still a watched target,
814 but we don't know its index */
815 sprintf(name, "ll_sync_tgt");
817 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
818 ptlrpc_daemonize(name);
820 RETURN(__mds_lov_synchronize(data));
823 int mds_lov_start_synchronize(struct obd_device *obd,
824 struct obd_device *watched,
825 void *data, int nonblock)
827 struct mds_lov_sync_info *mlsi;
834 OBD_ALLOC(mlsi, sizeof(*mlsi));
838 mlsi->mlsi_obd = obd;
839 mlsi->mlsi_watched = watched;
841 mlsi->mlsi_index = *(__u32 *)data;
843 mlsi->mlsi_index = MDSLOV_NO_INDEX;
845 /* Although class_export_get(obd->obd_self_export) would lock
846 the MDS in place, since it's only a self-export
847 it doesn't lock the LOV in place. The LOV can be disconnected
848 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
849 Simply taking an export ref on the LOV doesn't help, because it's
850 still disconnected. Taking an obd reference insures that we don't
851 disconnect the LOV. This of course means a cleanup won't
852 finish for as long as the sync is blocking. */
856 /* Synchronize in the background */
857 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
858 CLONE_VM | CLONE_FILES);
860 CERROR("%s: error starting mds_lov_synchronize: %d\n",
864 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
865 "thread=%d\n", obd->obd_name,
866 mlsi->mlsi_index, rc);
870 rc = __mds_lov_synchronize((void *)mlsi);
876 int mds_notify(struct obd_device *obd, struct obd_device *watched,
877 enum obd_notify_event ev, void *data)
879 struct mds_obd *mds = &obd->u.mds;
884 /* We only handle these: */
885 case OBD_NOTIFY_ACTIVE:
886 case OBD_NOTIFY_SYNC:
887 case OBD_NOTIFY_SYNC_NONBLOCK:
889 case OBD_NOTIFY_CONFIG:
890 /* Open for clients */
891 obd->obd_no_conn = 0;
896 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
897 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
898 CERROR("unexpected notification of %s %s!\n",
899 watched->obd_type->typ_name, watched->obd_name);
903 if (obd->obd_recovering) {
904 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
906 obd_uuid2str(&watched->u.cli.cl_target_uuid));
907 /* We still have to fix the lov descriptor for ost's added
908 after the mdt in the config log. They didn't make it into
910 down_write(&mds->mds_lov_objids_sem);
911 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
912 up_write(&mds->mds_lov_objids_sem);
915 /* We should update init llog here too for replay unlink and
916 * possiable llog init race when recovery complete */
917 mutex_down(&obd->obd_dev_sem);
918 llog_cat_initialize(obd, NULL,
919 obd->u.mds.mds_lov_desc.ld_tgt_count,
920 &watched->u.cli.cl_target_uuid);
921 mutex_up(&obd->obd_dev_sem);
925 LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
926 rc = mds_lov_start_synchronize(obd, watched, data,
927 !(ev == OBD_NOTIFY_SYNC));
929 lquota_recovery(mds_quota_interface_ref, obd);
934 /* Convert the on-disk LOV EA structre.
935 * We always try to convert from an old LOV EA format to the common in-memory
936 * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
937 * then convert back to the new on-disk format and save it back to disk
938 * (obd_packmd() only ever saves to the new on-disk format) so we don't have
939 * to convert it each time this inode is accessed.
941 * This function is a bit interesting in the error handling. We can safely
942 * ship the old lmm to the client in case of failure, since it uses the same
943 * obd_unpackmd() code and can do the conversion if the MDS fails for some
944 * reason. We will not delete the old lmm data until we have written the
945 * new format lmm data in fsfilt_set_md(). */
946 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
947 struct lov_mds_md *lmm, int lmm_size)
949 struct lov_stripe_md *lsm = NULL;
954 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
955 le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
958 CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
959 inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
962 rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
966 rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
971 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
972 if (IS_ERR(handle)) {
973 rc = PTR_ERR(handle);
977 rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
979 err = fsfilt_commit(obd, inode, handle, 0);
981 rc = err ? err : lmm_size;
984 obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
989 void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
990 struct lov_desc *desc)
993 for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
994 ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
995 le64_to_cpu(lmm->lmm_objects[i].l_object_id);
998 EXPORT_SYMBOL(mds_objids_from_lmm);