1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/mdt/mdt_recovery.c
5 * Lustre Metadata Target (mdt) recovery-related methods
7 * Copyright (C) 2002-2006 Cluster File Systems, Inc.
8 * Author: Huang Hua <huanghua@clusterfs.com>
9 * Author: Pershin Mike <tappro@clusterfs.com>
11 * This file is part of the Lustre file system, http://www.lustre.org
12 * Lustre is a trademark of Cluster File Systems, Inc.
14 * You may have signed or agreed to another license before downloading
15 * this software. If so, you are bound by the terms and conditions
16 * of that agreement, and the following does not apply to you. See the
17 * LICENSE file included with this distribution for more information.
19 * If you did not agree to a different license, then this copy of Lustre
20 * is open source software; you can redistribute it and/or modify it
21 * under the terms of version 2 of the GNU General Public License as
22 * published by the Free Software Foundation.
24 * In either case, Lustre is distributed in the hope that it will be
25 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include "mdt_internal.h"
36 static int mdt_server_data_update(const struct lu_context *ctx,
37 struct mdt_device *mdt);
39 /* TODO: maybe this pair should be defined in dt_object.c */
40 static int mdt_record_read(const struct lu_context *ctx,
41 struct dt_object *dt, void *buf,
42 size_t count, loff_t *pos)
46 LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
48 rc = dt->do_body_ops->dbo_read(ctx, dt, buf, count, pos);
57 static int mdt_record_write(const struct lu_context *ctx,
58 struct dt_object *dt, const void *buf,
59 size_t count, loff_t *pos, struct thandle *th)
63 LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
65 rc = dt->do_body_ops->dbo_write(ctx, dt, buf, count, pos, th);
72 /* only one record write */
75 MDT_TXN_LAST_RCVD_WRITE_CREDITS = 3
78 static struct thandle* mdt_trans_start(const struct lu_context *ctx,
79 struct mdt_device *mdt, int credits)
81 struct mdt_thread_info *mti;
84 mti = lu_context_key_get(ctx, &mdt_thread_key);
85 p = &mti->mti_txn_param;
86 p->tp_credits = credits;
87 return mdt->mdt_bottom->dd_ops->dt_trans_start(ctx, mdt->mdt_bottom, p);
90 static void mdt_trans_stop(const struct lu_context *ctx,
91 struct mdt_device *mdt, struct thandle *th)
93 mdt->mdt_bottom->dd_ops->dt_trans_stop(ctx, th);
96 /* last_rcvd handling */
97 static inline void msd_le_to_cpu(struct mdt_server_data *buf,
98 struct mdt_server_data *msd)
100 memcpy(msd->msd_uuid, buf->msd_uuid, sizeof (msd->msd_uuid));
101 msd->msd_last_transno = le64_to_cpu(buf->msd_last_transno);
102 msd->msd_mount_count = le64_to_cpu(buf->msd_mount_count);
103 msd->msd_feature_compat = le32_to_cpu(buf->msd_feature_compat);
104 msd->msd_feature_rocompat = le32_to_cpu(buf->msd_feature_rocompat);
105 msd->msd_feature_incompat = le32_to_cpu(buf->msd_feature_incompat);
106 msd->msd_server_size = le32_to_cpu(buf->msd_server_size);
107 msd->msd_client_start = le32_to_cpu(buf->msd_client_start);
108 msd->msd_client_size = le16_to_cpu(buf->msd_client_size);
111 static inline void msd_cpu_to_le(struct mdt_server_data *msd,
112 struct mdt_server_data *buf)
114 memcpy(buf->msd_uuid, msd->msd_uuid, sizeof (msd->msd_uuid));
115 buf->msd_last_transno = cpu_to_le64(msd->msd_last_transno);
116 buf->msd_mount_count = cpu_to_le64(msd->msd_mount_count);
117 buf->msd_feature_compat = cpu_to_le32(msd->msd_feature_compat);
118 buf->msd_feature_rocompat = cpu_to_le32(msd->msd_feature_rocompat);
119 buf->msd_feature_incompat = cpu_to_le32(msd->msd_feature_incompat);
120 buf->msd_server_size = cpu_to_le32(msd->msd_server_size);
121 buf->msd_client_start = cpu_to_le32(msd->msd_client_start);
122 buf->msd_client_size = cpu_to_le16(msd->msd_client_size);
125 static inline void mcd_le_to_cpu(struct mdt_client_data *buf,
126 struct mdt_client_data *mcd)
128 memcpy(mcd->mcd_uuid, buf->mcd_uuid, sizeof (mcd->mcd_uuid));
129 mcd->mcd_last_transno = le64_to_cpu(buf->mcd_last_transno);
130 mcd->mcd_last_xid = le64_to_cpu(buf->mcd_last_xid);
131 mcd->mcd_last_result = le32_to_cpu(buf->mcd_last_result);
132 mcd->mcd_last_data = le32_to_cpu(buf->mcd_last_data);
133 mcd->mcd_last_close_transno = le64_to_cpu(buf->mcd_last_close_transno);
134 mcd->mcd_last_close_xid = le64_to_cpu(buf->mcd_last_close_xid);
135 mcd->mcd_last_close_result = le32_to_cpu(buf->mcd_last_close_result);
138 static inline void mcd_cpu_to_le(struct mdt_client_data *mcd,
139 struct mdt_client_data *buf)
141 memcpy(buf->mcd_uuid, mcd->mcd_uuid, sizeof (mcd->mcd_uuid));
142 buf->mcd_last_transno = cpu_to_le64(mcd->mcd_last_transno);
143 buf->mcd_last_xid = cpu_to_le64(mcd->mcd_last_xid);
144 buf->mcd_last_result = cpu_to_le32(mcd->mcd_last_result);
145 buf->mcd_last_data = cpu_to_le32(mcd->mcd_last_data);
146 buf->mcd_last_close_transno = cpu_to_le64(mcd->mcd_last_close_transno);
147 buf->mcd_last_close_xid = cpu_to_le64(mcd->mcd_last_close_xid);
148 buf->mcd_last_close_result = cpu_to_le32(mcd->mcd_last_close_result);
151 static int mdt_last_rcvd_header_read(const struct lu_context *ctx,
152 struct mdt_device *mdt,
153 struct mdt_server_data *msd)
155 struct mdt_thread_info *mti;
156 struct mdt_server_data *tmp;
160 mti = lu_context_key_get(ctx, &mdt_thread_key);
161 /* temporary stuff for read */
165 rc = mdt_record_read(ctx, mdt->mdt_last_rcvd,
166 tmp, sizeof(*tmp), off);
168 msd_le_to_cpu(tmp, msd);
170 CDEBUG(D_INFO, "read last_rcvd header rc = %d:\n"
172 "last_transno = "LPU64"\n",
175 msd->msd_last_transno);
179 static int mdt_last_rcvd_header_write(const struct lu_context *ctx,
180 struct mdt_device *mdt,
181 struct mdt_server_data *msd)
183 struct mdt_thread_info *mti;
184 struct mdt_server_data *tmp;
189 mti = lu_context_key_get(ctx, &mdt_thread_key);
191 th = mdt_trans_start(ctx, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
195 /* temporary stuff for read */
200 msd_cpu_to_le(msd, tmp);
202 rc = mdt_record_write(ctx, mdt->mdt_last_rcvd,
203 tmp, sizeof(*tmp), off, th);
205 mdt_trans_stop(ctx, mdt, th);
207 CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
209 "last_transno = "LPU64"\n",
212 msd->msd_last_transno);
216 static int mdt_last_rcvd_read(const struct lu_context *ctx,
217 struct mdt_device *mdt,
218 struct mdt_client_data *mcd, loff_t *off)
220 struct mdt_thread_info *mti;
221 struct mdt_client_data *tmp;
224 mti = lu_context_key_get(ctx, &mdt_thread_key);
226 rc = mdt_record_read(ctx, mdt->mdt_last_rcvd, tmp, sizeof(*tmp), off);
228 mcd_le_to_cpu(tmp, mcd);
230 CDEBUG(D_INFO, "read mcd @%d rc = %d:\n"
232 "last_transno = "LPU64"\n"
233 "last_xid = "LPU64"\n"
236 "last_close_transno = "LPU64"\n"
237 "last_close_xid = "LPU64"\n"
238 "last_close_result = %d\n",
239 (int)*off - sizeof(*tmp),
242 mcd->mcd_last_transno,
244 mcd->mcd_last_result,
246 mcd->mcd_last_close_transno,
247 mcd->mcd_last_close_xid,
248 mcd->mcd_last_close_result);
253 static int mdt_last_rcvd_write(const struct lu_context *ctx,
254 struct mdt_device *mdt,
255 struct mdt_client_data *mcd,
256 loff_t *off, struct thandle *th)
258 struct mdt_thread_info *mti;
259 struct mdt_client_data *tmp;
263 mti = lu_context_key_get(ctx, &mdt_thread_key);
266 mcd_cpu_to_le(mcd, tmp);
268 rc = mdt_record_write(ctx, mdt->mdt_last_rcvd,
269 tmp, sizeof(*tmp), off, th);
271 CDEBUG(D_INFO, "write mcd @%d rc = %d:\n"
273 "last_transno = "LPU64"\n"
274 "last_xid = "LPU64"\n"
277 "last_close_transno = "LPU64"\n"
278 "last_close_xid = "LPU64"\n"
279 "last_close_result = %d\n",
280 (int)*off - sizeof(*tmp),
283 mcd->mcd_last_transno,
285 mcd->mcd_last_result,
287 mcd->mcd_last_close_transno,
288 mcd->mcd_last_close_xid,
289 mcd->mcd_last_close_result);
294 static int mdt_clients_data_init(const struct lu_context *ctx,
295 struct mdt_device *mdt,
296 unsigned long last_size)
298 struct mdt_server_data *msd = &mdt->mdt_msd;
299 struct mdt_client_data *mcd = NULL;
300 struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
306 /* When we do a clean MDS shutdown, we save the last_transno into
307 * the header. If we find clients with higher last_transno values
308 * then those clients may need recovery done. */
310 for (cl_idx = 0, off = msd->msd_client_start;
311 off < last_size; cl_idx++) {
313 struct obd_export *exp;
314 struct mdt_export_data *med;
322 off = msd->msd_client_start +
323 cl_idx * msd->msd_client_size;
325 rc = mdt_last_rcvd_read(ctx, mdt, mcd, &off);
327 CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
328 LAST_RCVD, cl_idx, off, rc);
330 break; /* read error shouldn't cause startup to fail */
333 if (mcd->mcd_uuid[0] == '\0') {
334 CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
339 last_transno = mcd_last_transno(mcd);
341 /* These exports are cleaned up by mdt_obd_disconnect(), so
342 * they need to be set up like real exports as
343 * mdt_obd_connect() does.
345 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
346 " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx,
347 last_transno, msd->msd_last_transno,
350 exp = class_new_export(obd, (struct obd_uuid *)mcd->mcd_uuid);
354 /* FIXME: Do we really want to return error? */
357 med = &exp->exp_mdt_data;
359 rc = mdt_client_add(ctx, mdt, med, cl_idx);
360 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
362 exp->exp_replay_needed = 1;
363 exp->exp_connecting = 0;
364 obd->obd_recoverable_clients++;
365 obd->obd_max_recoverable_clients++;
366 class_export_put(exp);
368 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
369 cl_idx, last_transno);
370 /* protect __u64 value update */
371 spin_lock(&mdt->mdt_transno_lock);
372 mdt->mdt_last_transno = max(last_transno,
373 mdt->mdt_last_transno);
374 spin_unlock(&mdt->mdt_transno_lock);
382 static int mdt_server_data_init(const struct lu_context *ctx,
383 struct mdt_device *mdt)
385 struct mdt_server_data *msd = &mdt->mdt_msd;
386 struct mdt_client_data *mcd = NULL;
387 struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
388 struct mdt_thread_info *mti;
389 struct dt_object *obj;
391 unsigned long last_rcvd_size;
396 /* ensure padding in the struct is the correct size */
397 CLASSERT(offsetof(struct mdt_server_data, msd_padding) +
398 sizeof(msd->msd_padding) == LR_SERVER_SIZE);
399 CLASSERT(offsetof(struct mdt_client_data, mcd_padding) +
400 sizeof(mcd->mcd_padding) == LR_CLIENT_SIZE);
402 mti = lu_context_key_get(ctx, &mdt_thread_key);
403 LASSERT(mti != NULL);
404 la = &mti->mti_attr.ma_attr;
406 obj = mdt->mdt_last_rcvd;
407 obj->do_ops->do_read_lock(ctx, obj);
408 rc = obj->do_ops->do_attr_get(ctx, mdt->mdt_last_rcvd, la);
409 obj->do_ops->do_read_unlock(ctx, obj);
413 last_rcvd_size = (unsigned long)la->la_size;
415 if (last_rcvd_size == 0) {
416 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
418 memcpy(msd->msd_uuid, obd->obd_uuid.uuid,
419 sizeof(msd->msd_uuid));
420 msd->msd_last_transno = 0;
421 msd->msd_mount_count = 0;
422 msd->msd_server_size = LR_SERVER_SIZE;
423 msd->msd_client_start = LR_CLIENT_START;
424 msd->msd_client_size = LR_CLIENT_SIZE;
425 msd->msd_feature_rocompat = OBD_ROCOMPAT_LOVOBJID;
426 msd->msd_feature_incompat = OBD_INCOMPAT_MDT |
427 OBD_INCOMPAT_COMMON_LR;
429 LCONSOLE_WARN("%s: used disk, loading\n", obd->obd_name);
430 rc = mdt_last_rcvd_header_read(ctx, mdt, msd);
432 CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
435 if (strcmp(msd->msd_uuid, obd->obd_uuid.uuid) != 0) {
436 LCONSOLE_ERROR("Trying to start OBD %s using the wrong"
437 " disk %s. Were the /dev/ assignments "
439 obd->obd_uuid.uuid, msd->msd_uuid);
440 GOTO(out, rc = -EINVAL);
443 mount_count = msd->msd_mount_count;
445 if (msd->msd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
446 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
447 obd->obd_name, le32_to_cpu(msd->msd_feature_incompat) &
449 GOTO(out, rc = -EINVAL);
451 if (msd->msd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) {
452 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
453 obd->obd_name, le32_to_cpu(msd->msd_feature_rocompat) &
455 /* Do something like remount filesystem read-only */
456 GOTO(out, rc = -EINVAL);
458 if (!(msd->msd_feature_incompat & cpu_to_le32(OBD_INCOMPAT_COMMON_LR))){
459 CDEBUG(D_WARNING, "using old last_rcvd format\n");
460 msd->msd_mount_count = msd->msd_last_transno;
461 msd->msd_last_transno = msd->msd_unused;
462 /* If we update the last_rcvd, we can never go back to
463 an old install, so leave this in the old format for now.
464 msd->msd_feature_incompat |= cpu_to_le32(LR_INCOMPAT_COMMON_LR);
468 msd->msd_feature_compat = OBD_COMPAT_MDT;
470 spin_lock(&mdt->mdt_transno_lock);
471 mdt->mdt_last_transno = msd->msd_last_transno;
472 spin_unlock(&mdt->mdt_transno_lock);
474 CDEBUG(D_INODE, "========BEGIN DUMPING LAST_RCVD========\n");
475 CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
476 obd->obd_name, mdt->mdt_last_transno);
477 CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
478 obd->obd_name, mount_count + 1);
479 CDEBUG(D_INODE, "%s: server data size: %u\n",
480 obd->obd_name, msd->msd_server_size);
481 CDEBUG(D_INODE, "%s: per-client data start: %u\n",
482 obd->obd_name, msd->msd_client_start);
483 CDEBUG(D_INODE, "%s: per-client data size: %u\n",
484 obd->obd_name, msd->msd_client_size);
485 CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
486 obd->obd_name, last_rcvd_size);
487 CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
488 last_rcvd_size <= msd->msd_client_start ? 0 :
489 (last_rcvd_size - msd->msd_client_start) /
490 msd->msd_client_size);
491 CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
493 if (!msd->msd_server_size || !msd->msd_client_start ||
494 !msd->msd_client_size) {
495 CERROR("Bad last_rcvd contents!\n");
496 GOTO(out, rc = -EINVAL);
499 rc = mdt_clients_data_init(ctx, mdt, last_rcvd_size);
501 GOTO(err_client, rc);
503 spin_lock(&mdt->mdt_transno_lock);
504 /* obd_last_committed is used for compatibility
505 * with other lustre recovery code */
506 obd->obd_last_committed = mdt->mdt_last_transno;
507 spin_unlock(&mdt->mdt_transno_lock);
509 if (obd->obd_recoverable_clients) {
510 CWARN("RECOVERY: service %s, %d recoverable clients, "
511 "last_transno "LPU64"\n", obd->obd_name,
512 obd->obd_recoverable_clients, mdt->mdt_last_transno);
513 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
514 obd->obd_recovering = 1;
515 obd->obd_recovery_start = CURRENT_SECONDS;
516 /* Only used for lprocfs_status */
517 obd->obd_recovery_end = obd->obd_recovery_start +
518 OBD_RECOVERY_TIMEOUT;
521 mdt->mdt_mount_count++;
522 msd->msd_mount_count = mdt->mdt_mount_count;
524 /* save it, so mount count and last_transno is current */
525 rc = mdt_server_data_update(ctx, mdt);
527 GOTO(err_client, rc);
532 class_disconnect_exports(obd);
537 static int mdt_server_data_update(const struct lu_context *ctx,
538 struct mdt_device *mdt)
540 struct mdt_server_data *msd = &mdt->mdt_msd;
544 CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
545 mdt->mdt_mount_count, mdt->mdt_last_transno);
547 spin_lock(&mdt->mdt_transno_lock);
548 msd->msd_last_transno = mdt->mdt_last_transno;
549 spin_unlock(&mdt->mdt_transno_lock);
551 rc = mdt_last_rcvd_header_write(ctx, mdt, msd);
555 int mdt_client_new(const struct lu_context *ctx,
556 struct mdt_device *mdt,
557 struct mdt_export_data *med)
559 unsigned long *bitmap = mdt->mdt_client_bitmap;
560 struct mdt_client_data *mcd = med->med_mcd;
561 struct mdt_server_data *msd = &mdt->mdt_msd;
562 struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
563 struct mdt_thread_info *mti;
570 LASSERT(bitmap != NULL);
571 if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
573 mti = lu_context_key_get(ctx, &mdt_thread_key);
574 /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
575 * there's no need for extra complication here
577 spin_lock(&mdt->mdt_client_bitmap_lock);
578 cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
579 if (cl_idx >= LR_MAX_CLIENTS ||
580 MDT_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
581 CERROR("no room for clients - fix LR_MAX_CLIENTS\n");
582 spin_unlock(&mdt->mdt_client_bitmap_lock);
585 set_bit(cl_idx, bitmap);
586 spin_unlock(&mdt->mdt_client_bitmap_lock);
588 CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
589 cl_idx, med->med_mcd->mcd_uuid);
591 med->med_lr_idx = cl_idx;
592 med->med_lr_off = msd->msd_client_start +
593 (cl_idx * msd->msd_client_size);
594 init_mutex(&med->med_mcd_lock);
596 LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
597 /* write new client data */
598 off = med->med_lr_off;
599 th = mdt_trans_start(ctx, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
603 rc = mdt_last_rcvd_write(ctx, mdt, mcd, &off, th);
604 CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
605 cl_idx, med->med_lr_off, sizeof(*mcd));
606 mdt_trans_stop(ctx, mdt, th);
611 /* Add client data to the MDS. We use a bitmap to locate a free space
612 * in the last_rcvd file if cl_off is -1 (i.e. a new client).
613 * Otherwise, we just have to read the data from the last_rcvd file and
614 * we know its offset.
616 * It should not be possible to fail adding an existing client - otherwise
617 * mdt_init_server_data() callsite needs to be fixed.
619 int mdt_client_add(const struct lu_context *ctx,
620 struct mdt_device *mdt,
621 struct mdt_export_data *med, int cl_idx)
623 unsigned long *bitmap = mdt->mdt_client_bitmap;
624 struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
625 struct mdt_server_data *msd = &mdt->mdt_msd;
629 LASSERT(bitmap != NULL);
630 LASSERTF(cl_idx >= 0, "%d\n", cl_idx);
632 if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
635 spin_lock(&mdt->mdt_client_bitmap_lock);
636 if (test_and_set_bit(cl_idx, bitmap)) {
637 CERROR("MDS client %d: bit already set in bitmap!!\n",
641 spin_unlock(&mdt->mdt_client_bitmap_lock);
643 CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
644 cl_idx, med->med_mcd->mcd_uuid);
646 med->med_lr_idx = cl_idx;
647 med->med_lr_off = msd->msd_client_start +
648 (cl_idx * msd->msd_client_size);
649 init_mutex(&med->med_mcd_lock);
651 LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
656 int mdt_client_del(const struct lu_context *ctx,
657 struct mdt_device *mdt,
658 struct mdt_export_data *med)
660 struct mdt_client_data *mcd = med->med_mcd;
661 struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
670 /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
671 if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
674 CDEBUG(D_INFO, "freeing client at idx %u, offset %lld\n",
675 med->med_lr_idx, med->med_lr_off);
677 off = med->med_lr_off;
679 /* Don't clear med_lr_idx here as it is likely also unset. At worst
680 * we leak a client slot that will be cleaned on the next recovery. */
682 CERROR("client idx %d has offset %lld\n",
683 med->med_lr_idx, off);
684 GOTO(free, rc = -EINVAL);
687 /* Clear the bit _after_ zeroing out the client so we don't
688 race with mdt_client_add and zero out new clients.*/
689 if (!test_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
690 CERROR("MDT client %u: bit already clear in bitmap!!\n",
695 th = mdt_trans_start(ctx, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
697 GOTO(free, rc = PTR_ERR(th));
699 mutex_down(&med->med_mcd_lock);
700 memset(mcd, 0, sizeof *mcd);
702 rc = mdt_last_rcvd_write(ctx, mdt, mcd, &off, th);
703 mutex_up(&med->med_mcd_lock);
704 mdt_trans_stop(ctx, mdt, th);
706 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
707 "zeroing out client idx %u in %s rc %d\n",
708 med->med_lr_idx, LAST_RCVD, rc);
710 spin_lock(&mdt->mdt_client_bitmap_lock);
711 clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap);
712 spin_unlock(&mdt->mdt_client_bitmap_lock);
713 /* Make sure the server's last_transno is up to date. Do this
714 * after the client is freed so we know all the client's
715 * transactions have been committed. */
716 mdt_server_data_update(ctx, mdt);
726 * last_rcvd & last_committed update callbacks
728 static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
731 struct mdt_device *mdt = mti->mti_mdt;
732 struct ptlrpc_request *req = mdt_info_req(mti);
733 struct mdt_export_data *med;
734 struct mdt_client_data *mcd;
737 __s32 rc = th->th_result;
741 LASSERT(req->rq_export);
743 med = &req->rq_export->exp_mdt_data;
746 /* if the export has already been failed, we have no last_rcvd slot */
747 if (req->rq_export->exp_failed) {
748 CWARN("commit transaction for disconnected client %s: rc %d\n",
749 req->rq_export->exp_client_uuid.uuid, rc);
755 off = med->med_lr_off;
756 mutex_down(&med->med_mcd_lock);
757 if(lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
758 mcd->mcd_last_close_transno = mti->mti_transno;
759 mcd->mcd_last_close_xid = req->rq_xid;
760 mcd->mcd_last_close_result = rc;
762 mcd->mcd_last_transno = mti->mti_transno;
763 mcd->mcd_last_xid = req->rq_xid;
764 mcd->mcd_last_result = rc;
765 /*XXX: save intent_disposition in mdt_thread_info?
766 * also there is bug - intent_dispostion is __u64,
767 * see struct ldlm_reply->lock_policy_res1; */
768 mcd->mcd_last_data = mti->mti_opdata;
771 CERROR("client idx %d has offset %lld\n", med->med_lr_idx, off);
774 err = mdt_last_rcvd_write(mti->mti_ctxt, mdt, mcd, &off, th);
776 mutex_up(&med->med_mcd_lock);
780 extern struct lu_context_key mdt_txn_key;
781 extern struct lu_context_key mdt_thread_key;
783 /* add credits for last_rcvd update */
784 static int mdt_txn_start_cb(const struct lu_context *ctx,
785 struct txn_param *param, void *cookie)
787 param->tp_credits += MDT_TXN_LAST_RCVD_WRITE_CREDITS;
791 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
793 return req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid;
796 /* Update last_rcvd records with latests transaction data */
797 static int mdt_txn_stop_cb(const struct lu_context *ctx,
798 struct thandle *txn, void *cookie)
800 struct mdt_device *mdt = cookie;
801 struct mdt_txn_info *txi;
802 struct mdt_thread_info *mti;
803 struct ptlrpc_request *req;
805 /* transno in two contexts - for commit_cb and for thread */
806 txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
807 mti = lu_context_key_get(ctx, &mdt_thread_key);
808 req = mdt_info_req(mti);
810 if (mti->mti_mdt == NULL || req == NULL || mti->mti_no_need_trans) {
811 txi->txi_transno = 0;
815 if (mti->mti_has_trans) {
816 CWARN("More than one transaction "LPU64"\n", mti->mti_transno);
820 mti->mti_has_trans = 1;
821 /*TODO: checks for recovery cases, see mds_finish_transno */
822 spin_lock(&mdt->mdt_transno_lock);
823 if (txn->th_result != 0) {
824 if (mti->mti_transno != 0) {
825 CERROR("Replay transno "LPU64" failed: rc %i\n",
826 mti->mti_transno, txn->th_result);
827 mti->mti_transno = 0;
829 } else if (mti->mti_transno == 0) {
830 mti->mti_transno = ++ mdt->mdt_last_transno;
832 /* should be replay */
833 if (mti->mti_transno > mdt->mdt_last_transno)
834 mdt->mdt_last_transno = mti->mti_transno;
837 /* sometimes the reply message has not been successfully packed */
838 LASSERT(req != NULL && req->rq_repmsg != NULL);
840 /* filling reply data */
841 CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
842 mti->mti_transno, req->rq_export->exp_obd->obd_last_committed);
844 req->rq_transno = mti->mti_transno;
845 lustre_msg_set_transno(req->rq_repmsg, mti->mti_transno);
846 target_committed_to_req(req);
847 lustre_msg_set_last_xid(req->rq_repmsg, req_exp_last_xid(req));
848 /* save transno for the commit callback */
849 txi->txi_transno = mti->mti_transno;
850 spin_unlock(&mdt->mdt_transno_lock);
852 return mdt_last_rcvd_update(mti, txn);
855 /* commit callback, need to update last_commited value */
856 static int mdt_txn_commit_cb(const struct lu_context *ctx,
857 struct thandle *txn, void *cookie)
859 struct mdt_device *mdt = cookie;
860 struct obd_device *obd = md2lu_dev(&mdt->mdt_md_dev)->ld_obd;
861 struct mdt_txn_info *txi;
863 txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
865 /* copy of obd_transno_commit_cb() but with locking */
866 spin_lock(&mdt->mdt_transno_lock);
867 if (txi->txi_transno > obd->obd_last_committed) {
868 obd->obd_last_committed = txi->txi_transno;
869 spin_unlock(&mdt->mdt_transno_lock);
870 ptlrpc_commit_replies(obd);
872 spin_unlock(&mdt->mdt_transno_lock);
874 CDEBUG(D_HA, "%s: transno "LPD64" committed\n",
875 obd->obd_name, txi->txi_transno);
880 int mdt_fs_setup(const struct lu_context *ctx, struct mdt_device *mdt,
881 struct obd_device *obd)
883 struct lu_fid last_fid;
884 struct dt_object *last;
888 /* prepare transactions callbacks */
889 mdt->mdt_txn_cb.dtc_txn_start = mdt_txn_start_cb;
890 mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
891 mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb;
892 mdt->mdt_txn_cb.dtc_cookie = mdt;
894 dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
896 last = dt_store_open(ctx, mdt->mdt_bottom,
897 LAST_RCVD, &last_fid);
899 mdt->mdt_last_rcvd = last;
900 rc = mdt_server_data_init(ctx, mdt);
902 lu_object_put(ctx, &last->do_lu);
903 mdt->mdt_last_rcvd = NULL;
907 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
910 OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
911 obd->obd_lvfs_ctxt.pwdmnt = current->fs->pwdmnt;
912 obd->obd_lvfs_ctxt.pwd = current->fs->pwd;
913 obd->obd_lvfs_ctxt.fs = get_ds();
919 void mdt_fs_cleanup(const struct lu_context *ctx, struct mdt_device *mdt)
921 struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
923 /* remove transaction callback */
924 dt_txn_callback_del(mdt->mdt_bottom, &mdt->mdt_txn_cb);
926 class_disconnect_exports(obd); /* cleans up client info too */
927 if (mdt->mdt_last_rcvd)
928 lu_object_put(ctx, &mdt->mdt_last_rcvd->do_lu);
929 mdt->mdt_last_rcvd = NULL;
932 /* reconstruction code */
933 void mdt_req_from_mcd(struct ptlrpc_request *req,
934 struct mdt_client_data *mcd)
936 DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
937 mcd->mcd_last_transno, mcd->mcd_last_result);
939 if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
940 req->rq_transno = mcd->mcd_last_close_transno;
941 req->rq_status = mcd->mcd_last_close_result;
942 lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
943 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
945 req->rq_transno = mcd->mcd_last_transno;
946 req->rq_status = mcd->mcd_last_result;
947 lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
948 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
950 //mds_steal_ack_locks(req);
953 static void mdt_reconstruct_generic(struct mdt_thread_info *mti,
954 struct mdt_lock_handle *lhc)
956 struct ptlrpc_request *req = mdt_info_req(mti);
957 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
959 return mdt_req_from_mcd(req, med->med_mcd);
962 static void mdt_reconstruct_create(struct mdt_thread_info *mti,
963 struct mdt_lock_handle *lhc)
965 struct ptlrpc_request *req = mdt_info_req(mti);
966 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
967 struct mdt_device *mdt = mti->mti_mdt;
968 struct mdt_object *child;
969 struct mdt_body *body;
972 mdt_req_from_mcd(req, med->med_mcd);
976 /* if no error, so child was created with requested fid */
977 child = mdt_object_find(mti->mti_ctxt, mdt, mti->mti_rr.rr_fid2);
978 LASSERT(!IS_ERR(child));
980 body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
981 rc = mo_attr_get(mti->mti_ctxt, mdt_object_child(child),
982 &mti->mti_attr, &mti->mti_uc);
983 if (rc == -EREMOTE) {
984 /* object was created on remote server */
986 body->valid |= OBD_MD_MDS;
988 mdt_pack_attr2body(body, &mti->mti_attr.ma_attr, mdt_object_fid(child));
989 mdt_body_reverse_idmap(mti, body);
990 mdt_object_put(mti->mti_ctxt, child);
993 static void mdt_reconstruct_setattr(struct mdt_thread_info *mti,
994 struct mdt_lock_handle *lhc)
996 struct ptlrpc_request *req = mdt_info_req(mti);
997 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
998 struct mdt_device *mdt = mti->mti_mdt;
999 struct mdt_object *obj;
1000 struct mdt_body *body;
1002 mdt_req_from_mcd(req, med->med_mcd);
1006 body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
1007 obj = mdt_object_find(mti->mti_ctxt, mdt, mti->mti_rr.rr_fid1);
1008 LASSERT(!IS_ERR(obj));
1009 mo_attr_get(mti->mti_ctxt, mdt_object_child(obj),
1010 &mti->mti_attr, &mti->mti_uc);
1011 mdt_pack_attr2body(body, &mti->mti_attr.ma_attr, mdt_object_fid(obj));
1012 mdt_body_reverse_idmap(mti, body);
1014 /* Don't return OST-specific attributes if we didn't just set them */
1016 if (rec->ur_iattr.ia_valid & ATTR_SIZE)
1017 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
1018 if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
1019 body->valid |= OBD_MD_FLMTIME;
1020 if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
1021 body->valid |= OBD_MD_FLATIME;
1023 mdt_object_put(mti->mti_ctxt, obj);
1026 static void mdt_reconstruct_with_shrink(struct mdt_thread_info *mti,
1027 struct mdt_lock_handle *lhc)
1029 mdt_reconstruct_generic(mti, lhc);
1030 mdt_shrink_reply(mti, REPLY_REC_OFF + 1);
1033 typedef void (*mdt_reconstructor)(struct mdt_thread_info *mti,
1034 struct mdt_lock_handle *lhc);
1036 static mdt_reconstructor reconstructors[REINT_MAX] = {
1037 [REINT_SETATTR] = mdt_reconstruct_setattr,
1038 [REINT_CREATE] = mdt_reconstruct_create,
1039 [REINT_LINK] = mdt_reconstruct_generic,
1040 [REINT_UNLINK] = mdt_reconstruct_with_shrink,
1041 [REINT_RENAME] = mdt_reconstruct_with_shrink,
1042 [REINT_OPEN] = mdt_reconstruct_open
1045 void mdt_reconstruct(struct mdt_thread_info *mti,
1046 struct mdt_lock_handle *lhc)
1049 reconstructors[mti->mti_rr.rr_opcode](mti, lhc);