1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
42 #include "mds_internal.h"
44 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
46 struct mds_obd *mds = &obd->u.mds;
51 for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
52 if (ids[i] > (mds->mds_lov_objids)[i]) {
53 (mds->mds_lov_objids)[i] = ids[i];
54 mds->mds_lov_objids_dirty = 1;
59 EXPORT_SYMBOL(mds_lov_update_objids);
61 static int mds_lov_read_objids(struct obd_device *obd)
63 struct mds_obd *mds = &obd->u.mds;
69 LASSERT(!mds->mds_lov_objids_size);
70 LASSERT(!mds->mds_lov_objids_dirty);
72 /* Read everything in the file, even if our current lov desc
73 has fewer targets. Old targets not in the lov descriptor
74 during mds setup may still have valid objids. */
75 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
82 mds->mds_lov_objids = ids;
83 mds->mds_lov_objids_size = size;
85 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
87 CERROR("Error reading objids %d\n", rc);
91 mds->mds_lov_objids_in_file = size / sizeof(*ids);
93 for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
94 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
95 mds->mds_lov_objids[i], i);
100 int mds_lov_write_objids(struct obd_device *obd)
102 struct mds_obd *mds = &obd->u.mds;
107 if (!mds->mds_lov_objids_dirty)
110 tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
115 for (i = 0; i < tgts; i++)
116 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
117 mds->mds_lov_objids[i], i);
119 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
120 mds->mds_lov_objids, tgts * sizeof(obd_id),
123 mds->mds_lov_objids_dirty = 0;
129 EXPORT_SYMBOL(mds_lov_write_objids);
131 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
135 struct obd_trans_info oti = {0};
136 struct lov_stripe_md *empty_ea = NULL;
139 LASSERT(mds->mds_lov_objids != NULL);
141 /* This create will in fact either create or destroy: If the OST is
142 * missing objects below this ID, they will be created. If it finds
143 * objects above this ID, they will be removed. */
144 memset(&oa, 0, sizeof(oa));
145 oa.o_flags = OBD_FL_DELORPHAN;
146 oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
147 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
148 if (ost_uuid != NULL) {
149 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
150 oa.o_valid |= OBD_MD_FLINLINE;
152 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
157 /* update the LOV-OSC knowledge of the last used object id's */
158 int mds_lov_set_nextid(struct obd_device *obd)
160 struct mds_obd *mds = &obd->u.mds;
164 LASSERT(!obd->obd_recovering);
165 LASSERT(mds->mds_lov_objids != NULL);
167 /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
168 LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
170 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
172 mds->mds_lov_desc.ld_tgt_count *
173 sizeof(*mds->mds_lov_objids),
174 mds->mds_lov_objids, NULL);
177 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
183 /* Update the lov desc for a new size lov. */
184 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
186 struct mds_obd *mds = &obd->u.mds;
188 __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
192 OBD_ALLOC(ld, sizeof(*ld));
196 rc = obd_get_info(lov, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC,
201 /* The size of the LOV target table may have increased. */
202 size = ld->ld_tgt_count * sizeof(obd_id);
203 if ((mds->mds_lov_objids_size == 0) ||
204 (size > mds->mds_lov_objids_size)) {
207 /* add room by powers of 2 */
209 while (size < ld->ld_tgt_count)
211 size = size * sizeof(obd_id);
213 OBD_ALLOC(ids, size);
215 GOTO(out, rc = -ENOMEM);
216 memset(ids, 0, size);
217 if (mds->mds_lov_objids_size) {
218 obd_id *old_ids = mds->mds_lov_objids;
219 memcpy(ids, mds->mds_lov_objids,
220 mds->mds_lov_objids_size);
221 mds->mds_lov_objids = ids;
222 OBD_FREE(old_ids, mds->mds_lov_objids_size);
224 mds->mds_lov_objids = ids;
225 mds->mds_lov_objids_size = size;
228 /* Don't change the mds_lov_desc until the objids size matches the
230 mds->mds_lov_desc = *ld;
231 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
232 mds->mds_lov_desc.ld_tgt_count);
234 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
235 max(mds->mds_lov_desc.ld_tgt_count,
236 mds->mds_lov_objids_in_file));
237 mds->mds_max_mdsize = lov_mds_md_size(stripes);
238 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
239 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
240 "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
243 /* If we added a target we have to reconnect the llogs */
244 /* We only _need_ to do this at first add (idx), or the first time
245 after recovery. However, it should now be safe to call anytime. */
246 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
248 /*XXX this notifies the MDD until lov handling use old mds code */
249 if (obd->obd_upcall.onu_owner) {
250 LASSERT(obd->obd_upcall.onu_upcall != NULL);
251 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
252 obd->obd_upcall.onu_owner);
255 OBD_FREE(ld, sizeof(*ld));
260 #define MDSLOV_NO_INDEX -1
262 /* Inform MDS about new/updated target */
263 static int mds_lov_update_mds(struct obd_device *obd,
264 struct obd_device *watched,
265 __u32 idx, struct obd_uuid *uuid)
267 struct mds_obd *mds = &obd->u.mds;
272 /* Don't let anyone else mess with mds_lov_objids now */
273 mutex_down(&obd->obd_dev_sem);
275 old_count = mds->mds_lov_desc.ld_tgt_count;
276 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
280 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
281 idx, obd->obd_recovering, obd->obd_async_recov, old_count,
282 mds->mds_lov_desc.ld_tgt_count);
284 /* idx is set as data from lov_notify. */
285 if (idx == MDSLOV_NO_INDEX || obd->obd_recovering)
288 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
289 CERROR("index %d > count %d!\n", idx,
290 mds->mds_lov_desc.ld_tgt_count);
291 GOTO(out, rc = -EINVAL);
294 if (idx >= mds->mds_lov_objids_in_file) {
295 /* We never read this lastid; ask the osc */
297 __u32 size = sizeof(lastid);
298 rc = obd_get_info(watched->obd_self_export, strlen("last_id"),
299 "last_id", &size, &lastid);
302 mds->mds_lov_objids[idx] = lastid;
303 mds->mds_lov_objids_dirty = 1;
304 mds_lov_write_objids(obd);
306 /* We have read this lastid from disk; tell the osc.
307 Don't call this during recovery. */
308 rc = mds_lov_set_nextid(obd);
310 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
311 /* Don't abort the rest of the sync */
316 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
317 mds->mds_lov_objids[idx], idx, rc);
319 mutex_up(&obd->obd_dev_sem);
323 /* update the LOV-OSC knowledge of the last used object id's */
324 int mds_lov_connect(struct obd_device *obd, char * lov_name)
326 struct mds_obd *mds = &obd->u.mds;
327 struct lustre_handle conn = {0,};
328 struct obd_connect_data *data;
332 if (IS_ERR(mds->mds_osc_obd))
333 RETURN(PTR_ERR(mds->mds_osc_obd));
335 if (mds->mds_osc_obd)
338 mds->mds_osc_obd = class_name2obd(lov_name);
339 if (!mds->mds_osc_obd) {
340 CERROR("MDS cannot locate LOV %s\n", lov_name);
341 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
345 OBD_ALLOC(data, sizeof(*data));
348 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
349 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
350 OBD_CONNECT_OSS_CAPA;
351 #ifdef HAVE_LRU_RESIZE_SUPPORT
352 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
354 data->ocd_version = LUSTRE_VERSION_CODE;
355 data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0;
356 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
357 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
358 OBD_FREE(data, sizeof(*data));
360 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
361 mds->mds_osc_obd = ERR_PTR(rc);
364 mds->mds_osc_exp = class_conn2export(&conn);
366 rc = obd_register_observer(mds->mds_osc_obd, obd);
368 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
370 GOTO(err_discon, rc);
373 /* Deny new client connections until we are sure we have some OSTs */
374 obd->obd_no_conn = 1;
376 mutex_down(&obd->obd_dev_sem);
377 rc = mds_lov_read_objids(obd);
379 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
383 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
387 /* tgt_count may be 0! */
388 rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
390 CERROR("failed to initialize catalog %d\n", rc);
394 /* If we're mounting this code for the first time on an existing FS,
395 * we need to populate the objids array from the real OST values */
396 if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
397 int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
398 rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
399 "last_id", &size, mds->mds_lov_objids);
401 for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
402 CWARN("got last object "LPU64" from OST %d\n",
403 mds->mds_lov_objids[i], i);
404 mds->mds_lov_objids_dirty = 1;
405 rc = mds_lov_write_objids(obd);
407 CERROR("got last objids from OSTs, but error "
408 "writing objids file: %d\n", rc);
411 mutex_up(&obd->obd_dev_sem);
413 /* I want to see a callback happen when the OBD moves to a
414 * "For General Use" state, and that's when we'll call
415 * set_nextid(). The class driver can help us here, because
416 * it can use the obd_recovering flag to determine when the
417 * the OBD is full available. */
418 /* MDD device will care about that
419 if (!obd->obd_recovering)
420 rc = mds_postrecov(obd);
425 mutex_up(&obd->obd_dev_sem);
426 obd_register_observer(mds->mds_osc_obd, NULL);
428 obd_disconnect(mds->mds_osc_exp);
429 mds->mds_osc_exp = NULL;
430 mds->mds_osc_obd = ERR_PTR(rc);
434 int mds_lov_disconnect(struct obd_device *obd)
436 struct mds_obd *mds = &obd->u.mds;
440 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
441 obd_register_observer(mds->mds_osc_obd, NULL);
443 /* The actual disconnect of the mds_lov will be called from
444 * class_disconnect_exports from mds_lov_clean. So we have to
445 * ensure that class_cleanup doesn't fail due to the extra ref
446 * we're holding now. The mechanism to do that already exists -
447 * the obd_force flag. We'll drop the final ref to the
448 * mds_osc_exp in mds_cleanup. */
449 mds->mds_osc_obd->obd_force = 1;
455 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
456 void *karg, void *uarg)
458 static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
459 struct obd_device *obd = exp->exp_obd;
460 struct mds_obd *mds = &obd->u.mds;
461 struct obd_ioctl_data *data = karg;
462 struct lvfs_run_ctxt saved;
466 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
469 case OBD_IOC_RECORD: {
470 char *name = data->ioc_inlbuf1;
471 if (mds->mds_cfg_llh)
474 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
475 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
476 &mds->mds_cfg_llh, NULL, name);
478 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
481 mds->mds_cfg_llh = NULL;
482 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
487 case OBD_IOC_ENDRECORD: {
488 if (!mds->mds_cfg_llh)
491 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
492 rc = llog_close(mds->mds_cfg_llh);
493 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
495 mds->mds_cfg_llh = NULL;
499 case OBD_IOC_CLEAR_LOG: {
500 char *name = data->ioc_inlbuf1;
501 if (mds->mds_cfg_llh)
504 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
505 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
506 &mds->mds_cfg_llh, NULL, name);
508 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
511 rc = llog_destroy(mds->mds_cfg_llh);
512 llog_free_handle(mds->mds_cfg_llh);
514 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
516 mds->mds_cfg_llh = NULL;
520 case OBD_IOC_DORECORD: {
522 struct llog_rec_hdr rec;
523 if (!mds->mds_cfg_llh)
526 rec.lrh_len = llog_data_len(data->ioc_plen1);
528 if (data->ioc_type == LUSTRE_CFG_TYPE) {
529 rec.lrh_type = OBD_CFG_REC;
531 CERROR("unknown cfg record type:%d \n", data->ioc_type);
535 OBD_ALLOC(cfg_buf, data->ioc_plen1);
538 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
540 OBD_FREE(cfg_buf, data->ioc_plen1);
544 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
545 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
547 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
549 OBD_FREE(cfg_buf, data->ioc_plen1);
553 case OBD_IOC_PARSE: {
554 struct llog_ctxt *ctxt =
555 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
556 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
557 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
558 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
565 case OBD_IOC_DUMP_LOG: {
566 struct llog_ctxt *ctxt =
567 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
568 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
569 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
570 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
578 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
579 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
583 case OBD_IOC_SET_READONLY: {
585 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
586 BDEVNAME_DECLARE_STORAGE(tmp);
587 CERROR("*** setting device %s read-only ***\n",
588 ll_bdevname(obd->u.obt.obt_sb, tmp));
590 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
592 rc = fsfilt_commit(obd, inode, handle, 1);
594 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
595 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
597 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
601 case OBD_IOC_CATLOGLIST: {
602 int count = mds->mds_lov_desc.ld_tgt_count;
603 rc = llog_catalog_list(obd, count, data);
607 case OBD_IOC_LLOG_CHECK:
608 case OBD_IOC_LLOG_CANCEL:
609 case OBD_IOC_LLOG_REMOVE: {
610 struct llog_ctxt *ctxt =
611 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
615 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
616 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
617 rc = llog_ioctl(ctxt, cmd, data);
618 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
619 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
620 group = FILTER_GROUP_MDS0 + mds->mds_id;
621 rc2 = obd_set_info_async(mds->mds_osc_exp,
622 strlen(KEY_MDS_CONN), KEY_MDS_CONN,
623 sizeof(group), &group, NULL);
628 case OBD_IOC_LLOG_INFO:
629 case OBD_IOC_LLOG_PRINT: {
630 struct llog_ctxt *ctxt =
631 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
633 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
634 rc = llog_ioctl(ctxt, cmd, data);
635 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
640 case OBD_IOC_ABORT_RECOVERY:
641 CERROR("aborting recovery for device %s\n", obd->obd_name);
642 target_stop_recovery_thread(obd);
646 CDEBUG(D_INFO, "unknown command %x\n", cmd);
653 /* Collect the preconditions we need to allow client connects */
654 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
656 if (flag & CONFIG_LOG)
657 obd->u.mds.mds_fl_cfglog = 1;
658 if (flag & CONFIG_SYNC)
659 obd->u.mds.mds_fl_synced = 1;
660 if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
661 /* Open for clients */
662 obd->obd_no_conn = 0;
665 struct mds_lov_sync_info {
666 struct obd_device *mlsi_obd; /* the lov device to sync */
667 struct obd_device *mlsi_watched; /* target osc */
668 __u32 mlsi_index; /* index of target */
671 static int mds_propagate_capa_keys(struct mds_obd *mds)
673 struct lustre_capa_key *key;
678 if (!mds->mds_capa_keys)
681 for (i = 0; i < 2; i++) {
682 key = &mds->mds_capa_keys[i];
683 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
685 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
686 KEY_CAPA_KEY, sizeof(*key), key, NULL);
688 DEBUG_CAPA_KEY(D_ERROR, key,
689 "propagate failed (rc = %d) for", rc);
697 /* We only sync one osc at a time, so that we don't have to hold
698 any kind of lock on the whole mds_lov_desc, which may change
699 (grow) as a result of mds_lov_add_ost. This also avoids any
700 kind of mismatch between the lov_desc and the mds_lov_desc,
701 which are not in lock-step during lov_add_obd */
702 static int __mds_lov_synchronize(void *data)
704 struct mds_lov_sync_info *mlsi = data;
705 struct obd_device *obd = mlsi->mlsi_obd;
706 struct obd_device *watched = mlsi->mlsi_watched;
707 struct mds_obd *mds = &obd->u.mds;
708 struct obd_uuid *uuid;
709 __u32 idx = mlsi->mlsi_index;
710 struct mds_group_info mgi;
714 OBD_FREE(mlsi, sizeof(*mlsi));
718 uuid = &watched->u.cli.cl_target_uuid;
721 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
723 rc = mds_lov_update_mds(obd, watched, idx, uuid);
725 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
728 mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
731 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
732 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
736 /* propagate capability keys */
737 rc = mds_propagate_capa_keys(mds);
741 rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
742 mds->mds_lov_desc.ld_tgt_count,
746 CERROR("%s failed at llog_origin_connect: %d\n",
747 obd_uuid2str(uuid), rc);
751 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
752 obd->obd_name, obd_uuid2str(uuid));
754 * FIXME: this obd_stopping was useless,
755 * since obd in mdt layer was set
757 if (obd->obd_stopping)
758 GOTO(out, rc = -ENODEV);
760 rc = mds_lov_clear_orphans(mds, uuid);
762 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
763 obd_uuid2str(uuid), rc);
767 if (obd->obd_upcall.onu_owner) {
769 * This is a hack for mds_notify->mdd_notify. When the mds obd
770 * in mdd is removed, This hack should be removed.
772 LASSERT(obd->obd_upcall.onu_upcall != NULL);
773 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
774 obd->obd_upcall.onu_owner);
779 /* Deactivate it for safety */
780 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
782 if (!obd->obd_stopping && mds->mds_osc_obd &&
783 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
784 obd_notify(mds->mds_osc_obd, watched,
785 OBD_NOTIFY_INACTIVE, NULL);
792 int mds_lov_synchronize(void *data)
794 struct mds_lov_sync_info *mlsi = data;
797 if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
798 /* There is still a watched target,
799 but we don't know its index */
800 sprintf(name, "ll_sync_tgt");
802 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
803 ptlrpc_daemonize(name);
805 RETURN(__mds_lov_synchronize(data));
808 int mds_lov_start_synchronize(struct obd_device *obd,
809 struct obd_device *watched,
810 void *data, int nonblock)
812 struct mds_lov_sync_info *mlsi;
819 OBD_ALLOC(mlsi, sizeof(*mlsi));
823 mlsi->mlsi_obd = obd;
824 mlsi->mlsi_watched = watched;
826 mlsi->mlsi_index = *(__u32 *)data;
828 mlsi->mlsi_index = MDSLOV_NO_INDEX;
830 /* Although class_export_get(obd->obd_self_export) would lock
831 the MDS in place, since it's only a self-export
832 it doesn't lock the LOV in place. The LOV can be disconnected
833 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
834 Simply taking an export ref on the LOV doesn't help, because it's
835 still disconnected. Taking an obd reference insures that we don't
836 disconnect the LOV. This of course means a cleanup won't
837 finish for as long as the sync is blocking. */
841 /* Synchronize in the background */
842 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
843 CLONE_VM | CLONE_FILES);
845 CERROR("%s: error starting mds_lov_synchronize: %d\n",
849 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
850 "thread=%d\n", obd->obd_name,
851 mlsi->mlsi_index, rc);
855 rc = __mds_lov_synchronize((void *)mlsi);
861 int mds_notify(struct obd_device *obd, struct obd_device *watched,
862 enum obd_notify_event ev, void *data)
868 /* We only handle these: */
869 case OBD_NOTIFY_ACTIVE:
870 case OBD_NOTIFY_SYNC:
871 case OBD_NOTIFY_SYNC_NONBLOCK:
873 case OBD_NOTIFY_CONFIG:
874 mds_allow_cli(obd, (unsigned int)data);
879 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
880 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
881 CERROR("unexpected notification of %s %s!\n",
882 watched->obd_type->typ_name, watched->obd_name);
886 if (obd->obd_recovering) {
887 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
889 obd_uuid2str(&watched->u.cli.cl_target_uuid));
890 /* We still have to fix the lov descriptor for ost's added
891 after the mdt in the config log. They didn't make it into
893 mutex_down(&obd->obd_dev_sem);
894 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
896 mutex_up(&obd->obd_dev_sem);
899 /* We should update init llog here too for replay unlink and
900 * possiable llog init race when recovery complete */
901 llog_cat_initialize(obd, NULL,
902 obd->u.mds.mds_lov_desc.ld_tgt_count,
903 &watched->u.cli.cl_target_uuid);
904 mutex_up(&obd->obd_dev_sem);
905 mds_allow_cli(obd, CONFIG_SYNC);
909 LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
910 rc = mds_lov_start_synchronize(obd, watched, data,
911 !(ev == OBD_NOTIFY_SYNC));
913 lquota_recovery(mds_quota_interface_ref, obd);
918 /* Convert the on-disk LOV EA structre.
919 * We always try to convert from an old LOV EA format to the common in-memory
920 * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
921 * then convert back to the new on-disk format and save it back to disk
922 * (obd_packmd() only ever saves to the new on-disk format) so we don't have
923 * to convert it each time this inode is accessed.
925 * This function is a bit interesting in the error handling. We can safely
926 * ship the old lmm to the client in case of failure, since it uses the same
927 * obd_unpackmd() code and can do the conversion if the MDS fails for some
928 * reason. We will not delete the old lmm data until we have written the
929 * new format lmm data in fsfilt_set_md(). */
930 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
931 struct lov_mds_md *lmm, int lmm_size)
933 struct lov_stripe_md *lsm = NULL;
938 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
939 le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
942 CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
943 inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
946 rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
950 rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
955 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
956 if (IS_ERR(handle)) {
957 rc = PTR_ERR(handle);
961 rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
963 err = fsfilt_commit(obd, inode, handle, 0);
965 rc = err ? err : lmm_size;
968 obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
973 void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
974 struct lov_desc *desc)
977 for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
978 ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
979 le64_to_cpu(lmm->lmm_objects[i].l_object_id);
982 EXPORT_SYMBOL(mds_objids_from_lmm);