1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * wangdi <wangdi@clusterfs.com>
11 * This file is part of the Lustre file system, http://www.lustre.org
12 * Lustre is a trademark of Cluster File Systems, Inc.
14 * You may have signed or agreed to another license before downloading
15 * this software. If so, you are bound by the terms and conditions
16 * of that agreement, and the following does not apply to you. See the
17 * LICENSE file included with this distribution for more information.
19 * If you did not agree to a different license, then this copy of Lustre
20 * is open source software; you can redistribute it and/or modify it
21 * under the terms of version 2 of the GNU General Public License as
22 * published by the Free Software Foundation.
24 * In either case, Lustre is distributed in the hope that it will be
25 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/module.h>
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
41 #include <lprocfs_status.h>
43 #include <lu_object.h>
44 #include <md_object.h>
45 #include <dt_object.h>
46 #include <lustre_mds.h>
48 #include "mdd_internal.h"
50 static const char mdd_lov_objid_name[] = "lov_objid";
52 static int mdd_lov_read_objids(const struct lu_context *ctxt,
53 struct mdd_device *mdd)
55 struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
56 struct dt_object *obj_ids = lov_info->mdd_lov_objid_obj;
57 struct lu_attr *lu_attr = &mdd_ctx_info(ctxt)->mti_attr;
62 LASSERT(!lov_info->mdd_lov_objids_size);
63 LASSERT(!lov_info->mdd_lov_objids_dirty);
65 /* Read everything in the file, even if our current lov desc
66 has fewer targets. Old targets not in the lov descriptor
67 during mds setup may still have valid objids. */
69 rc = obj_ids->do_ops->do_attr_get(ctxt, obj_ids, lu_attr);
73 if (lu_attr->la_size == 0)
76 OBD_ALLOC(ids, lu_attr->la_size);
80 lov_info->mdd_lov_objids = ids;
81 lov_info->mdd_lov_objids_size = lu_attr->la_size;
84 rc = obj_ids->do_body_ops->dbo_read(ctxt, obj_ids, ids,
85 lu_attr->la_size, &off);
87 CERROR("Error reading objids %d\n", rc);
91 lov_info->mdd_lov_objids_in_file = lu_attr->la_size / sizeof(*ids);
93 for (i = 0; i < lov_info->mdd_lov_objids_in_file; i++) {
94 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
95 lov_info->mdd_lov_objids[i], i);
101 /* Update the lov desc for a new size lov. */
102 static int mdd_lov_update_desc(const struct lu_context *ctxt,
103 struct mdd_device *mdd)
105 struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
106 __u32 size, stripes, valsize = sizeof(lov_info->mdd_lov_desc);
108 struct obd_device *lov_obd = lov_info->mdd_lov_obd;
112 ld = &mdd_ctx_info(ctxt)->mti_ld;
114 rc = obd_get_info(lov_obd->obd_self_export, strlen(KEY_LOVDESC) + 1,
115 KEY_LOVDESC, &valsize, ld);
119 /* The size of the LOV target table may have increased. */
120 size = ld->ld_tgt_count * sizeof(obd_id);
121 if ((lov_info->mdd_lov_objids_size == 0) ||
122 (size > lov_info->mdd_lov_objids_size)) {
125 /* add room by powers of 2 */
127 while (size < ld->ld_tgt_count)
129 size = size * sizeof(obd_id);
131 OBD_ALLOC(ids, size);
133 GOTO(out, rc = -ENOMEM);
134 memset(ids, 0, size);
135 if (lov_info->mdd_lov_objids_size) {
136 obd_id *old_ids = lov_info->mdd_lov_objids;
137 memcpy(ids, lov_info->mdd_lov_objids,
138 lov_info->mdd_lov_objids_size);
139 lov_info->mdd_lov_objids = ids;
140 OBD_FREE(old_ids, lov_info->mdd_lov_objids_size);
142 lov_info->mdd_lov_objids = ids;
143 lov_info->mdd_lov_objids_size = size;
146 /* Don't change the mds_lov_desc until the objids size matches the
148 lov_info->mdd_lov_desc = *ld;
149 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
150 lov_info->mdd_lov_desc.ld_tgt_count);
152 stripes = min((__u32)LOV_MAX_STRIPE_COUNT,
153 max(lov_info->mdd_lov_desc.ld_tgt_count,
154 lov_info->mdd_lov_objids_in_file));
155 mdd->mdd_max_mdsize = lov_mds_md_size(stripes);
156 mdd->mdd_max_cookiesize = stripes * sizeof(struct llog_cookie);
157 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize: %d/%d\n",
158 mdd->mdd_max_mdsize, mdd->mdd_max_cookiesize);
163 int mdd_lov_write_objids(const struct lu_context *ctxt,
164 struct mdd_lov_info *lov_info)
169 if (!lov_info->mdd_lov_objids_dirty)
172 tgts = max(lov_info->mdd_lov_desc.ld_tgt_count,
173 lov_info->mdd_lov_objids_in_file);
177 for (i = 0; i < tgts; i++)
178 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
179 lov_info->mdd_lov_objids[i], i);
181 rc = ids_obj->do_body_ops->dbo_write(ctxt, ids_obj,
182 lov_info->mdd_lov_objids,
183 tgts * sizeof(obd_id), &off);
185 lov_info->mdd_lov_objids_dirty = 0;
192 static int mdd_lov_connect(const struct lu_context *ctxt,
193 struct mdd_device *mdd, char *lov_name)
195 struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
196 struct lustre_handle conn = {0,};
197 struct obd_connect_data *data;
202 OBD_ALLOC(data, sizeof(*data));
205 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
206 OBD_CONNECT_REQPORTAL;
207 data->ocd_version = LUSTRE_VERSION_CODE;
208 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
209 rc = obd_connect(&conn, lov_info->mdd_lov_obd, &lov_info->mdd_lov_uuid,
211 OBD_FREE(data, sizeof(*data));
213 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
214 lov_info->mdd_lov_obd = ERR_PTR(rc);
218 /* open and test the lov objd file */
220 rc = mdd_lov_read_objids(ctxt, mdd);
222 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
226 rc = mdd_lov_update_desc(ctxt, mdd);
230 /* tgt_count may be 0! */
231 rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
233 CERROR("failed to initialize catalog %d\n", rc);
237 /* If we're mounting this code for the first time on an existing FS,
238 * we need to populate the objids array from the real OST values */
239 if (lov_info->mdd_lov_desc.ld_tgt_count >
240 lov_info->mdd_lov_objids_in_file) {
241 int size = sizeof(obd_id) * lov_info->mdd_lov_desc.ld_tgt_count;
244 rc = obd_get_info(lov_info->mdd_lov_obd->obd_self_export,
245 strlen("last_id"), "last_id", &size,
246 lov_info->mdd_lov_objids);
248 for (i = 0; i < lov_info->mdd_lov_desc.ld_tgt_count; i++)
249 CWARN("got last object "LPU64" from OST %d\n",
250 lov_info->mdd_lov_objids[i], i);
251 lov_info->mdd_lov_objids_dirty = 1;
252 rc = mdd_lov_write_objids(ctxt, lov_info);
254 CERROR("got last objids from OSTs, but error "
255 "writing objids file: %d\n", rc);
258 /* I want to see a callback happen when the OBD moves to a
259 * "For General Use" state, and that's when we'll call
260 * set_nextid(). The class driver can help us here, because
261 * it can use the obd_recovering flag to determine when the
262 * the OBD is full available. */
264 if (!obd->obd_recovering)
265 rc = mds_postrecov(obd);
269 obd_disconnect(lov_info->mdd_lov_obd->obd_self_export);
273 int mdd_lov_fini(const struct lu_context *ctxt, struct mdd_device *mdd)
275 struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
277 dt_object_fini(lov_info->mdd_lov_objid_obj);
281 int mdd_lov_init(const struct lu_context *ctxt, struct mdd_device *mdd,
282 struct lustre_cfg *cfg)
284 struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
285 struct dt_object *obj_id;
286 struct obd_device *obd = NULL;
287 char *lov_name = NULL, *srv = NULL;
291 if (IS_ERR(lov_info->mdd_lov_obd))
292 RETURN(PTR_ERR(lov_info->mdd_lov_obd));
294 lov_name = lustre_cfg_string(cfg, 3);
295 LASSERTF(lov_name != NULL, "MDD need lov \n");
296 lov_info->mdd_lov_obd = class_name2obd(lov_name);
297 if (!lov_info->mdd_lov_obd) {
298 CERROR("MDS cannot locate LOV %s\n", lov_name);
299 lov_info->mdd_lov_obd = ERR_PTR(-ENOTCONN);
303 obj_id = dt_store_open(ctxt, mdd->mdd_child, mdd_lov_objid_name,
304 &lov_info->mdd_lov_objid_fid);
306 rc = PTR_ERR(obj_id);
310 LASSERT(obj_id != NULL);
311 lov_info->mdd_lov_objid_obj = obj_id;
313 obd_str2uuid(&lov_info->mdd_lov_uuid, lustre_cfg_string(cfg, 1));
315 rc = mdd_lov_connect(ctxt, mdd, lov_name);
319 /*register the obd server for lov*/
320 srv = lustre_cfg_string(cfg, 0);
321 obd = class_name2obd(srv);
323 CERROR("No such OBD %s\n", srv);
326 rc = obd_register_observer(lov_info->mdd_lov_obd, obd);
328 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
335 mdd_lov_fini(ctxt, mdd);
339 /* update the LOV-OSC knowledge of the last used object id's */
340 int mdd_lov_set_nextid(struct mdd_device *mdd)
342 struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
346 LASSERT(lov_info->mdd_lov_objids != NULL);
348 rc = obd_set_info_async(lov_info->mdd_lov_obd->obd_self_export,
349 strlen(KEY_NEXT_ID), KEY_NEXT_ID,
350 lov_info->mdd_lov_desc.ld_tgt_count,
351 lov_info->mdd_lov_objids, NULL);
354 CERROR ("mdd_lov_set_nextid failed (%d)\n", rc);
359 struct mdd_lov_sync_info {
360 struct lu_context *mlsi_ctxt;
361 struct lu_device *mlsi_ld; /* the lov device to sync */
362 struct obd_device *mlsi_watched; /* target osc */
363 __u32 mlsi_index; /* index of target */
366 #define MDSLOV_NO_INDEX -1
368 /* Inform MDS about new/updated target */
369 static int mdd_lov_update_mds(struct lu_context *ctxt,
370 struct lu_device *ld,
371 struct obd_device *watched,
374 struct mdd_device *mdd = lu2mdd_dev(ld);
375 struct mdd_lov_info *lov_info = &mdd->mdd_lov_info;
380 old_count = lov_info->mdd_lov_desc.ld_tgt_count;
381 rc = mdd_lov_update_desc(ctxt, mdd);
386 * idx is set as data from lov_notify.
387 * XXX did not consider recovery here
389 if (idx != MDSLOV_NO_INDEX) {
390 if (idx >= lov_info->mdd_lov_desc.ld_tgt_count) {
391 CERROR("index %d > count %d!\n", idx,
392 lov_info->mdd_lov_desc.ld_tgt_count);
396 if (idx >= lov_info->mdd_lov_objids_in_file) {
397 /* We never read this lastid; ask the osc */
399 __u32 size = sizeof(lastid);
400 rc = obd_get_info(watched->obd_self_export,
402 "last_id", &size, &lastid);
405 lov_info->mdd_lov_objids[idx] = lastid;
406 lov_info->mdd_lov_objids_dirty = 1;
407 mdd_lov_write_objids(ctxt, lov_info);
409 /* We have read this lastid from disk; tell the osc.
410 Don't call this during recovery. */
411 rc = mdd_lov_set_nextid(mdd);
414 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
415 lov_info->mdd_lov_objids[idx], idx);
421 static int mdd_lov_clear_orphans(struct mdd_lov_info *mli,
422 struct obd_uuid *ost_uuid)
426 struct obd_trans_info oti = {0};
427 struct lov_stripe_md *empty_ea = NULL;
430 LASSERT(mli->mdd_lov_objids != NULL);
432 /* This create will in fact either create or destroy: If the OST is
433 * missing objects below this ID, they will be created. If it finds
434 * objects above this ID, they will be removed. */
435 memset(&oa, 0, sizeof(oa));
436 oa.o_valid = OBD_MD_FLFLAGS;
437 oa.o_flags = OBD_FL_DELORPHAN;
438 if (ost_uuid != NULL) {
439 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
440 oa.o_valid |= OBD_MD_FLINLINE;
442 rc = obd_create(mli->mdd_lov_obd->obd_self_export, &oa,
448 /* We only sync one osc at a time, so that we don't have to hold
449 any kind of lock on the whole mds_lov_desc, which may change
450 (grow) as a result of mds_lov_add_ost. This also avoids any
451 kind of mismatch between the lov_desc and the mds_lov_desc,
452 which are not in lock-step during lov_add_obd */
453 static int __mdd_lov_synchronize(void *data)
455 struct mdd_lov_sync_info *mlsi = data;
456 struct lu_device *ld = mlsi->mlsi_ld;
457 struct obd_device *watched = mlsi->mlsi_watched;
458 struct lu_context *ctxt = mlsi->mlsi_ctxt;
459 struct mdd_device *mdd = lu2mdd_dev(ld);
460 struct obd_uuid *uuid;
461 __u32 idx = mlsi->mlsi_index;
465 OBD_FREE(mlsi, sizeof(*mlsi));
469 uuid = &watched->u.cli.cl_target_uuid;
472 rc = mdd_lov_update_mds(ctxt, ld, watched, idx);
476 rc = obd_set_info_async(mdd->mdd_lov_info.mdd_lov_obd->obd_self_export,
477 strlen(KEY_MDS_CONN), KEY_MDS_CONN, 0, uuid,
480 CERROR("failed at obd_set_info_async: %d\n", rc);
484 rc = mdd_lov_clear_orphans(&mdd->mdd_lov_info, uuid);
486 CERROR("failed at mds_lov_clear_orphans: %d\n", rc);
495 int mdd_lov_synchronize(void *data)
497 struct mdd_lov_sync_info *mlsi = data;
500 sprintf(name, "ll_mlov_sync_%02u", mlsi->mlsi_index);
501 ptlrpc_daemonize(name);
503 RETURN(__mdd_lov_synchronize(data));
506 int mdd_lov_start_synchronize(const struct lu_context *ctxt,
507 struct lu_device *ld,
508 struct obd_device *watched,
509 void *data, int nonblock)
511 struct mdd_lov_sync_info *mlsi;
518 OBD_ALLOC(mlsi, sizeof(*mlsi));
522 mlsi->mlsi_ctxt = (struct lu_context *)ctxt;
524 mlsi->mlsi_watched = watched;
526 mlsi->mlsi_index = *(__u32 *)data;
528 mlsi->mlsi_index = MDSLOV_NO_INDEX;
530 /* Although class_export_get(obd->obd_self_export) would lock
531 the MDS in place, since it's only a self-export
532 it doesn't lock the LOV in place. The LOV can be disconnected
533 during MDS precleanup, leaving nothing for __mdd_lov_synchronize.
534 Simply taking an export ref on the LOV doesn't help, because it's
535 still disconnected. Taking an obd reference insures that we don't
536 disconnect the LOV. This of course means a cleanup won't
537 finish for as long as the sync is blocking. */
541 /* Synchronize in the background */
542 rc = cfs_kernel_thread(mdd_lov_synchronize, mlsi,
543 CLONE_VM | CLONE_FILES);
545 CERROR("error starting mdd_lov_synchronize: %d\n", rc);
548 CDEBUG(D_HA, "mdd_lov_synchronize idx=%d thread=%d\n",
549 mlsi->mlsi_index, rc);
553 rc = __mdd_lov_synchronize((void *)mlsi);
556 /*FIXME: Did not implement the nonblock lov sync here. because ctxt can not
557 * be shared, maybe we need ref_count for ctxt */
558 rc = __mdd_lov_synchronize((void *)mlsi);
563 int mdd_notify(const struct lu_context *ctxt, struct lu_device *ld,
564 struct obd_device *watched, enum obd_notify_event ev,
567 struct mdd_device *mdd = lu2mdd_dev(ld);
568 struct obd_device *obd = ld->ld_site->ls_top_dev->ld_obd;
573 /* We only handle these: */
574 case OBD_NOTIFY_ACTIVE:
575 case OBD_NOTIFY_SYNC:
576 case OBD_NOTIFY_SYNC_NONBLOCK:
582 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
584 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
585 CERROR("unexpected notification of %s %s!\n",
586 watched->obd_type->typ_name, watched->obd_name);
590 /*FIXME later, Recovery stuff still not be designed */
591 if (obd->obd_recovering) {
592 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
594 obd_uuid2str(&watched->u.cli.cl_target_uuid));
595 /* We still have to fix the lov descriptor for ost's added
596 after the mdt in the config log. They didn't make it into
598 rc = mdd_lov_update_desc(ctxt, mdd);
602 rc = mdd_lov_start_synchronize(ctxt, ld, watched, data,
603 !(ev == OBD_NOTIFY_SYNC));
607 static int mdd_get_md(const struct lu_context *ctxt, struct md_object *obj,
608 void *md, int *md_size, int lock)
610 struct dt_object *next;
614 next = mdd_object_child(md2mdd_obj(obj));
615 rc = next->do_ops->do_xattr_get(ctxt, next, md, *md_size,
618 CERROR("Error %d reading eadata \n", rc);
621 /*FIXME convert lov EA necessary for this version?*/
631 int mdd_lov_set_md(const struct lu_context *ctxt, struct md_object *pobj,
632 struct md_object *child)
634 struct dt_object *next = mdd_object_child(md2mdd_obj(child));
638 if (dt_is_dir(ctxt, next)) {
639 struct lov_mds_md *lmm = &mdd_ctx_info(ctxt)->mti_lmm;
640 int lmm_size = sizeof(lmm);
641 rc = mdd_get_md(ctxt, pobj, &lmm, &lmm_size, 1);
643 rc = mdd_xattr_set(ctxt, child, lmm, lmm_size, MDS_LOV_MD_NAME);
645 CERROR("error on copy stripe info: rc = %d\n", rc);
651 int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd,
652 struct mdd_object *child)
654 struct mdd_lov_info *mli = &mdd->mdd_lov_info;
656 struct lov_mds_md *lmm = NULL;
657 struct lov_stripe_md *lsm = NULL;
658 int rc = 0, lmm_size;
663 oa->o_uid = 0; /* must have 0 uid / gid on OST */
665 oa->o_mode = S_IFREG | 0600;
666 oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS |
667 OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID;
670 rc = obd_create(mli->mdd_lov_obd->obd_self_export, oa, &lsm, NULL);
674 rc = obd_packmd(mli->mdd_lov_obd->obd_self_export, &lmm, lsm);
676 CERROR("cannot pack lsm, err = %d\n", rc);
681 rc = mdd_xattr_set(ctxt, &child->mod_obj, lmm, lmm_size,