1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/mdt/mdt_recovery.c
5 * Lustre Metadata Target (mdt) recovery-related methods
7 * Copyright (C) 2002-2006 Cluster File Systems, Inc.
8 * Author: Huang Hua <huanghua@clusterfs.com>
9 * Author; Pershin Mike <tappro@clusterfs.com>
11 * This file is part of the Lustre file system, http://www.lustre.org
12 * Lustre is a trademark of Cluster File Systems, Inc.
14 * You may have signed or agreed to another license before downloading
15 * this software. If so, you are bound by the terms and conditions
16 * of that agreement, and the following does not apply to you. See the
17 * LICENSE file included with this distribution for more information.
19 * If you did not agree to a different license, then this copy of Lustre
20 * is open source software; you can redistribute it and/or modify it
21 * under the terms of version 2 of the GNU General Public License as
22 * published by the Free Software Foundation.
24 * In either case, Lustre is distributed in the hope that it will be
25 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include "mdt_internal.h"
36 static int mdt_update_server_data(const struct lu_context *ctx,
37 struct mdt_device *mdt);
39 /* last_rcvd handling */
40 static inline int mdt_read_last_rcvd_header (const struct lu_context *ctx,
41 struct mdt_device *mdt,
42 struct mdt_server_data *msd)
47 rc = mdt->mdt_last_rcvd->do_body_ops->dbo_read(ctx,
51 if (rc == sizeof(*msd))
58 static inline int mdt_write_last_rcvd_header(const struct lu_context *ctx,
59 struct mdt_device *mdt,
60 struct mdt_server_data *msd,
66 rc = mdt->mdt_last_rcvd->do_body_ops->dbo_write(ctx,
70 if (rc == sizeof(*msd))
77 static inline int mdt_read_last_rcvd(const struct lu_context *ctx,
78 struct mdt_device *mdt,
79 struct mdt_client_data *mcd, loff_t *off)
83 rc = mdt->mdt_last_rcvd->do_body_ops->dbo_read(ctx, mdt->mdt_last_rcvd,
84 mcd, sizeof(*mcd), off);
85 if (rc == sizeof(*mcd))
92 static inline int mdt_write_last_rcvd(const struct lu_context *ctx,
93 struct mdt_device *mdt,
94 struct mdt_client_data *mcd,
95 loff_t *off, struct thandle *th)
99 rc = mdt->mdt_last_rcvd->do_body_ops->dbo_write(ctx,
103 if (rc == sizeof(*mcd))
110 static int mdt_init_clients_data(const struct lu_context *ctx,
111 struct mdt_device *mdt,
112 unsigned long last_size)
114 struct mdt_server_data *msd = &mdt->mdt_msd;
115 struct mdt_client_data *mcd;
116 struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
122 /* When we do a clean MDS shutdown, we save the last_transno into
123 * the header. If we find clients with higher last_transno values
124 * then those clients may need recovery done. */
128 RETURN(rc = -ENOMEM);
130 for (cl_idx = 0, off = le32_to_cpu(msd->msd_client_start);
131 off < last_size; cl_idx++) {
133 struct obd_export *exp;
134 struct mdt_export_data *med;
136 off = le32_to_cpu(msd->msd_client_start) +
137 cl_idx * le16_to_cpu(msd->msd_client_size);
139 rc = mdt_read_last_rcvd(ctx, mdt, mcd, &off);
141 CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
142 LAST_RCVD, cl_idx, off, rc);
143 break; /* read error shouldn't cause startup to fail */
146 if (mcd->mcd_uuid[0] == '\0') {
147 CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
152 last_transno = le64_to_cpu(mcd->mcd_last_transno);
154 /* These exports are cleaned up by mdt_obd_disconnect(), so
155 * they need to be set up like real exports as
156 * mdt_obd_connect() does.
158 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
159 " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx,
160 last_transno, le64_to_cpu(msd->msd_last_transno),
161 le64_to_cpu(mcd->mcd_last_xid));
163 exp = class_new_export(obd, (struct obd_uuid *)mcd->mcd_uuid);
165 GOTO(err_client, rc = PTR_ERR(exp));
167 med = &exp->exp_mdt_data;
169 rc = mdt_client_add(ctx, mdt, med, cl_idx);
170 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
172 exp->exp_replay_needed = 1;
173 exp->exp_connecting = 0;
174 obd->obd_recoverable_clients++;
175 obd->obd_max_recoverable_clients++;
176 class_export_put(exp);
178 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
179 cl_idx, last_transno);
181 spin_lock(&mdt->mdt_transno_lock);
182 if (last_transno > mdt->mdt_last_transno)
183 mdt->mdt_last_transno = last_transno;
184 spin_unlock(&mdt->mdt_transno_lock);
193 static int mdt_init_server_data(const struct lu_context *ctx,
194 struct mdt_device *mdt)
196 struct mdt_server_data *msd = &mdt->mdt_msd;
197 struct mdt_client_data *mcd = NULL;
198 struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
199 struct mdt_thread_info *mti;
201 unsigned long last_rcvd_size = 0;
206 /* ensure padding in the struct is the correct size */
207 LASSERT(offsetof(struct mdt_server_data, msd_padding) +
208 sizeof(msd->msd_padding) == LR_SERVER_SIZE);
209 LASSERT(offsetof(struct mdt_client_data, mcd_padding) +
210 sizeof(mcd->mcd_padding) == LR_CLIENT_SIZE);
212 mti = lu_context_key_get(ctx, &mdt_thread_key);
213 LASSERT(mti != NULL);
214 la = &mti->mti_attr.ma_attr;
216 rc = mdt->mdt_last_rcvd->do_ops->do_attr_get(ctx,
217 mdt->mdt_last_rcvd, la);
221 last_rcvd_size = la->la_size;
223 if (last_rcvd_size == 0) {
224 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
226 memcpy(msd->msd_uuid, obd->obd_uuid.uuid,
227 sizeof(msd->msd_uuid));
228 msd->msd_last_transno = 0;
229 msd->msd_mount_count = 0;
230 msd->msd_server_size = cpu_to_le32(LR_SERVER_SIZE);
231 msd->msd_client_start = cpu_to_le32(LR_CLIENT_START);
232 msd->msd_client_size = cpu_to_le16(LR_CLIENT_SIZE);
233 msd->msd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID);
234 msd->msd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT |
235 OBD_INCOMPAT_COMMON_LR);
237 rc = mdt_read_last_rcvd_header(mti->mti_ctxt, mdt, msd);
239 CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
242 if (strcmp(msd->msd_uuid, obd->obd_uuid.uuid) != 0) {
243 LCONSOLE_ERROR("Trying to start OBD %s using the wrong"
244 " disk %s. Were the /dev/ assignments "
246 obd->obd_uuid.uuid, msd->msd_uuid);
247 GOTO(out, rc = -EINVAL);
250 mount_count = le64_to_cpu(msd->msd_mount_count);
252 if (msd->msd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
253 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
254 obd->obd_name, le32_to_cpu(msd->msd_feature_incompat) &
256 GOTO(out, rc = -EINVAL);
258 if (msd->msd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) {
259 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
260 obd->obd_name, le32_to_cpu(msd->msd_feature_rocompat) &
262 /* Do something like remount filesystem read-only */
263 GOTO(out, rc = -EINVAL);
265 if (!(msd->msd_feature_incompat & cpu_to_le32(OBD_INCOMPAT_COMMON_LR))){
266 CDEBUG(D_WARNING, "using old last_rcvd format\n");
267 msd->msd_mount_count = msd->msd_last_transno;
268 msd->msd_last_transno = msd->msd_unused;
269 /* If we update the last_rcvd, we can never go back to
270 an old install, so leave this in the old format for now.
271 msd->msd_feature_incompat |= cpu_to_le32(LR_INCOMPAT_COMMON_LR);
274 msd->msd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
276 spin_lock(&mdt->mdt_transno_lock);
277 mdt->mdt_last_transno = le64_to_cpu(msd->msd_last_transno);
278 spin_unlock(&mdt->mdt_transno_lock);
280 CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
281 obd->obd_name, mdt->mdt_last_transno);
282 CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
283 obd->obd_name, mount_count + 1);
284 CDEBUG(D_INODE, "%s: server data size: %u\n",
285 obd->obd_name, le32_to_cpu(msd->msd_server_size));
286 CDEBUG(D_INODE, "%s: per-client data start: %u\n",
287 obd->obd_name, le32_to_cpu(msd->msd_client_start));
288 CDEBUG(D_INODE, "%s: per-client data size: %u\n",
289 obd->obd_name, le32_to_cpu(msd->msd_client_size));
290 CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
291 obd->obd_name, last_rcvd_size);
292 CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
293 last_rcvd_size <= le32_to_cpu(msd->msd_client_start) ? 0 :
294 (last_rcvd_size - le32_to_cpu(msd->msd_client_start)) /
295 le16_to_cpu(msd->msd_client_size));
297 if (!msd->msd_server_size || !msd->msd_client_start ||
298 !msd->msd_client_size) {
299 CERROR("Bad last_rcvd contents!\n");
300 GOTO(out, rc = -EINVAL);
303 rc = mdt_init_clients_data(ctx, mdt, last_rcvd_size);
305 GOTO(err_client, rc);
307 spin_lock(&mdt->mdt_transno_lock);
308 /* obd_last_committed is used for compatibility
309 * with other lustre recovery code */
310 obd->obd_last_committed = mdt->mdt_last_transno;
311 spin_unlock(&mdt->mdt_transno_lock);
313 if (obd->obd_recoverable_clients) {
314 CWARN("RECOVERY: service %s, %d recoverable clients, "
315 "last_transno "LPU64"\n", obd->obd_name,
316 obd->obd_recoverable_clients, mdt->mdt_last_transno);
317 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
318 obd->obd_recovering = 1;
319 obd->obd_recovery_start = CURRENT_SECONDS;
320 /* Only used for lprocfs_status */
321 obd->obd_recovery_end = obd->obd_recovery_start +
322 OBD_RECOVERY_TIMEOUT;
325 mdt->mdt_mount_count++;
326 msd->msd_mount_count = cpu_to_le64(mdt->mdt_mount_count);
328 /* save it, so mount count and last_transno is current */
329 rc = mdt_update_server_data(ctx, mdt);
331 GOTO(err_client, rc);
336 class_disconnect_exports(obd);
341 static int mdt_update_server_data(const struct lu_context *ctx,
342 struct mdt_device *mdt)
344 struct mdt_server_data *msd = &mdt->mdt_msd;
348 CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
349 mdt->mdt_mount_count, mdt->mdt_last_transno);
351 spin_lock(&mdt->mdt_transno_lock);
352 msd->msd_last_transno = cpu_to_le64(mdt->mdt_last_transno);
353 spin_unlock(&mdt->mdt_transno_lock);
355 rc = mdt_write_last_rcvd_header(ctx, mdt, msd, NULL);
359 /* Add client data to the MDS. We use a bitmap to locate a free space
360 * in the last_rcvd file if cl_off is -1 (i.e. a new client).
361 * Otherwise, we just have to read the data from the last_rcvd file and
362 * we know its offset.
364 * It should not be possible to fail adding an existing client - otherwise
365 * mdt_init_server_data() callsite needs to be fixed.
367 int mdt_client_add(const struct lu_context *ctx,
368 struct mdt_device *mdt,
369 struct mdt_export_data *med, int cl_idx)
371 unsigned long *bitmap = mdt->mdt_client_bitmap;
372 struct mdt_client_data *mcd = med->med_mcd;
373 struct mdt_server_data *msd = &mdt->mdt_msd;
374 int new_client = (cl_idx == -1);
378 LASSERT(bitmap != NULL);
379 LASSERTF(cl_idx > -2, "%d\n", cl_idx);
381 /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
382 * there's no need for extra complication here
385 cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
387 if (cl_idx >= LR_MAX_CLIENTS ||
388 OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
389 CERROR("no room for clients - fix LR_MAX_CLIENTS\n");
392 if (test_and_set_bit(cl_idx, bitmap)) {
393 cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
398 if (test_and_set_bit(cl_idx, bitmap)) {
399 CERROR("MDS client %d: bit already set in bitmap!!\n",
405 CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
406 cl_idx, med->med_mcd->mcd_uuid);
408 med->med_lr_idx = cl_idx;
409 med->med_lr_off = le32_to_cpu(msd->msd_client_start) +
410 (cl_idx * le16_to_cpu(msd->msd_client_size));
411 init_mutex(&med->med_mcd_lock);
413 LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
416 rc = mdt_write_last_rcvd(ctx, mdt, mcd,
417 &med->med_lr_off, NULL);
418 CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
419 cl_idx, med->med_lr_off, sizeof(*mcd));
424 int mdt_client_free(const struct lu_context *ctx,
425 struct mdt_device *mdt,
426 struct mdt_export_data *med)
428 struct mdt_client_data *mcd = med->med_mcd;
436 CDEBUG(D_INFO, "freeing client at idx %u, offset %lld\n",
437 med->med_lr_idx, med->med_lr_off);
439 off = med->med_lr_off;
441 /* Don't clear med_lr_idx here as it is likely also unset. At worst
442 * we leak a client slot that will be cleaned on the next recovery. */
444 CERROR("client idx %d has offset %lld\n",
445 med->med_lr_idx, off);
446 GOTO(free, rc = -EINVAL);
449 /* Clear the bit _after_ zeroing out the client so we don't
450 race with mdt_client_add and zero out new clients.*/
451 if (!test_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
452 CERROR("MDT client %u: bit already clear in bitmap!!\n",
457 mutex_down(&med->med_mcd_lock);
458 memset(mcd, 0, sizeof *mcd);
459 rc = mdt_write_last_rcvd(ctx, mdt, mcd, &off, NULL);
460 mutex_up(&med->med_mcd_lock);
462 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
463 "zeroing out client idx %u in %s rc %d\n",
464 med->med_lr_idx, LAST_RCVD, rc);
466 if (!test_and_clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
467 CERROR("MDS client %u: bit already clear in bitmap!!\n",
472 /* Make sure the server's last_transno is up to date. Do this
473 * after the client is freed so we know all the client's
474 * transactions have been committed. */
475 mdt_update_server_data(ctx, mdt);
485 * last_rcvd & last_committed update callbacks
487 static int mdt_update_last_rcvd(struct mdt_thread_info *mti,
488 struct dt_device *dt,
491 struct mdt_device *mdt = mti->mti_mdt;
492 struct ptlrpc_request *req = mdt_info_req(mti);
493 struct mdt_export_data *med;
494 struct mdt_client_data *mcd;
497 __s32 rc = th->th_result;
501 LASSERT(req->rq_export);
503 med = &req->rq_export->exp_mdt_data;
506 /* if the export has already been failed, we have no last_rcvd slot */
507 if (req->rq_export->exp_failed) {
508 CWARN("commit transaction for disconnected client %s: rc %d\n",
509 req->rq_export->exp_client_uuid.uuid, rc);
515 off = med->med_lr_off;
516 mutex_down(&med->med_mcd_lock);
517 mcd->mcd_last_transno = cpu_to_le64(mti->mti_transno);
518 mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
519 mcd->mcd_last_result = cpu_to_le32(rc);
520 /* XXX: how to pass op_data here? */
521 //mcd->mcd_last_data = cpu_to_le32(op_data);
524 CERROR("client idx %d has offset %lld\n", med->med_lr_idx, off);
527 err = mdt_write_last_rcvd(mti->mti_ctxt, mdt, mcd, &off, th);
529 mutex_up(&med->med_mcd_lock);
533 extern struct lu_context_key mdt_txn_key;
534 extern struct lu_context_key mdt_thread_key;
537 MDT_TXN_LAST_RCVD_CREDITS = 3
540 /* add credits for last_rcvd update */
541 static int mdt_txn_start_cb(const struct lu_context *ctx,
542 struct dt_device *dev,
543 struct txn_param *param, void *cookie)
545 param->tp_credits += MDT_TXN_LAST_RCVD_CREDITS;
549 /* Update last_rcvd records with latests transaction data */
550 static int mdt_txn_stop_cb(const struct lu_context *ctx,
551 struct dt_device *dev,
552 struct thandle *txn, void *cookie)
554 struct mdt_device *mdt = cookie;
555 struct mdt_txn_info *txi;
556 struct mdt_thread_info *mti;
558 /* transno in two contexts - for commit_cb and for thread */
559 txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
560 mti = lu_context_key_get(ctx, &mdt_thread_key);
562 /*TODO: checks for recovery cases, see mds_finish_transno */
563 spin_lock(&mdt->mdt_transno_lock);
564 if (txn->th_result != 0) {
565 if (mti->mti_transno != 0) {
566 CERROR("Replay transno "LPU64" failed: rc %i\n",
567 mti->mti_transno, txn->th_result);
568 mti->mti_transno = 0;
570 } else if (mti->mti_transno == 0) {
571 mti->mti_transno = ++ mdt->mdt_last_transno;
574 if (mti->mti_transno > mdt->mdt_last_transno)
575 mdt->mdt_last_transno = mti->mti_transno;
577 /* save transno for the commit callback */
578 txi->txi_transno = mti->mti_transno;
579 spin_unlock(&mdt->mdt_transno_lock);
581 return 0;//mdt_update_last_rcvd(mti, dev, txn);
584 /* commit callback, need to update last_commited value */
585 static int mdt_txn_commit_cb(const struct lu_context *ctx,
586 struct dt_device *dev,
587 struct thandle *txn, void *cookie)
589 struct mdt_device *mdt = cookie;
590 struct obd_device *obd = md2lu_dev(&mdt->mdt_md_dev)->ld_obd;
591 struct mdt_txn_info *txi;
593 txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
595 /* copy of obd_transno_commit_cb() but with locking */
596 spin_lock(&mdt->mdt_transno_lock);
597 if (txi->txi_transno > obd->obd_last_committed) {
598 obd->obd_last_committed = txi->txi_transno;
599 spin_unlock(&mdt->mdt_transno_lock);
600 ptlrpc_commit_replies (obd);
602 spin_unlock(&mdt->mdt_transno_lock);
604 CDEBUG(D_HA, "%s: transno "LPD64" committed\n",
605 obd->obd_name, txi->txi_transno);
610 int mdt_fs_setup(const struct lu_context *ctx, struct mdt_device *mdt)
612 struct lu_fid last_fid;
613 struct dt_object *last;
617 /* prepare transactions callbacks */
618 mdt->mdt_txn_cb.dtc_txn_start = mdt_txn_start_cb;
619 mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
620 mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb;
621 mdt->mdt_txn_cb.dtc_cookie = mdt;
623 dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
625 last = dt_store_open(ctx, mdt->mdt_bottom,
626 LAST_RCVD, &last_fid);
628 mdt->mdt_last_rcvd = last;
629 rc = mdt_init_server_data(ctx, mdt);
631 lu_object_put(ctx, &last->do_lu);
632 mdt->mdt_last_rcvd = NULL;
636 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
643 void mdt_fs_cleanup(const struct lu_context *ctx, struct mdt_device *mdt)
645 struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
647 /* remove transaction callback */
648 dt_txn_callback_del(mdt->mdt_bottom, &mdt->mdt_txn_cb);
650 class_disconnect_exports(obd); /* cleans up client info too */
652 if (mdt->mdt_last_rcvd)
653 lu_object_put(ctx, &mdt->mdt_last_rcvd->do_lu);
654 mdt->mdt_last_rcvd = NULL;
657 /* reconstruction code */
658 static inline void mdt_req_from_mcd(struct ptlrpc_request *req,
659 struct mdt_client_data *mcd)
661 DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
662 mcd->mcd_last_transno, mcd->mcd_last_result);
663 req->rq_transno = mcd->mcd_last_transno;
664 req->rq_status = mcd->mcd_last_result;
665 lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
666 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
667 //mds_steal_ack_locks(req);
670 static void mdt_reconstruct_generic(struct mdt_thread_info *mti)
672 struct ptlrpc_request *req = mdt_info_req(mti);
673 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
675 return mdt_req_from_mcd(req, med->med_mcd);
678 static void mdt_reconstruct_create(struct mdt_thread_info *mti)
680 struct ptlrpc_request *req = mdt_info_req(mti);
681 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
682 struct mdt_device *mdt = mti->mti_mdt;
683 struct mdt_object *child;
684 struct mdt_body *body;
687 mdt_req_from_mcd(req, med->med_mcd);
691 /* if no error, so child was created with requested fid */
692 child = mdt_object_find(mti->mti_ctxt, mdt, mti->mti_rr.rr_fid2);
693 LASSERT(!IS_ERR(child));
695 body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
696 rc = mo_attr_get(mti->mti_ctxt, mdt_object_child(child),
698 if (rc == -EREMOTE) {
699 /* object was created on remote server */
701 body->valid |= OBD_MD_MDS;
703 mdt_pack_attr2body(body, &mti->mti_attr.ma_attr,
704 mti->mti_rr.rr_fid2);
705 mdt_object_put(mti->mti_ctxt, child);
708 static void mdt_reconstruct_setattr(struct mdt_thread_info *mti)
710 struct ptlrpc_request *req = mdt_info_req(mti);
711 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
712 struct mdt_device *mdt = mti->mti_mdt;
713 struct mdt_object *obj;
714 struct mdt_body *body;
716 mdt_req_from_mcd(req, med->med_mcd);
720 body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
721 obj = mdt_object_find(mti->mti_ctxt, mdt, mti->mti_rr.rr_fid1);
722 LASSERT(!IS_ERR(obj));
723 mo_attr_get(mti->mti_ctxt, mdt_object_child(obj), &mti->mti_attr);
724 mdt_pack_attr2body(body, &mti->mti_attr.ma_attr,
725 mti->mti_rr.rr_fid1);
727 /* Don't return OST-specific attributes if we didn't just set them */
729 if (rec->ur_iattr.ia_valid & ATTR_SIZE)
730 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
731 if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
732 body->valid |= OBD_MD_FLMTIME;
733 if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
734 body->valid |= OBD_MD_FLATIME;
736 mdt_object_put(mti->mti_ctxt, obj);
739 static void mdt_reconstruct_open(struct mdt_thread_info *mti)
741 struct ptlrpc_request *req = mdt_info_req(mti);
742 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
744 /*TODO: after open recovery task will be finished */
746 mdt_req_from_mcd(req, med->med_mcd);
749 typedef void (*mdt_reconstructor)(struct mdt_thread_info *mti);
751 static mdt_reconstructor reconstructors[REINT_MAX] = {
752 [REINT_SETATTR] = mdt_reconstruct_setattr,
753 [REINT_CREATE] = mdt_reconstruct_create,
754 [REINT_LINK] = mdt_reconstruct_generic,
755 [REINT_UNLINK] = mdt_reconstruct_generic,
756 [REINT_RENAME] = mdt_reconstruct_generic,
757 [REINT_OPEN] = mdt_reconstruct_open
760 void mdt_reconstruct(struct mdt_thread_info *mti)
763 reconstructors[mti->mti_rr.rr_opcode](mti);