1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/mdt/mdt_recovery.c
5 * Lustre Metadata Target (mdt) recovery-related methods
7 * Copyright (C) 2002-2006 Cluster File Systems, Inc.
8 * Author: Huang Hua <huanghua@clusterfs.com>
9 * Author; Pershin Mike <tappro@clusterfs.com>
11 * This file is part of the Lustre file system, http://www.lustre.org
12 * Lustre is a trademark of Cluster File Systems, Inc.
14 * You may have signed or agreed to another license before downloading
15 * this software. If so, you are bound by the terms and conditions
16 * of that agreement, and the following does not apply to you. See the
17 * LICENSE file included with this distribution for more information.
19 * If you did not agree to a different license, then this copy of Lustre
20 * is open source software; you can redistribute it and/or modify it
21 * under the terms of version 2 of the GNU General Public License as
22 * published by the Free Software Foundation.
24 * In either case, Lustre is distributed in the hope that it will be
25 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include "mdt_internal.h"
36 static int mdt_update_server_data(const struct lu_context *ctx,
37 struct mdt_device *mdt);
39 /* last_rcvd handling */
40 static inline int mdt_read_last_rcvd_header (const struct lu_context *ctx,
41 struct mdt_device *mdt,
42 struct mdt_server_data *msd)
47 rc = mdt->mdt_last_rcvd->do_body_ops->dbo_read(ctx,
51 if (rc == sizeof(*msd))
58 static inline int mdt_write_last_rcvd_header(const struct lu_context *ctx,
59 struct mdt_device *mdt,
60 struct mdt_server_data *msd,
66 rc = mdt->mdt_last_rcvd->do_body_ops->dbo_write(ctx,
70 if (rc == sizeof(*msd))
77 static inline int mdt_read_last_rcvd(const struct lu_context *ctx,
78 struct mdt_device *mdt,
79 struct mdt_client_data *mcd, loff_t *off)
83 rc = mdt->mdt_last_rcvd->do_body_ops->dbo_read(ctx, mdt->mdt_last_rcvd,
84 mcd, sizeof(*mcd), off);
85 if (rc == sizeof(*mcd))
92 static inline int mdt_write_last_rcvd(const struct lu_context *ctx,
93 struct mdt_device *mdt,
94 struct mdt_client_data *mcd,
95 loff_t *off, struct thandle *th)
99 rc = mdt->mdt_last_rcvd->do_body_ops->dbo_write(ctx,
103 if (rc == sizeof(*mcd))
110 static int mdt_init_clients_data(const struct lu_context *ctx,
111 struct mdt_device *mdt,
112 unsigned long last_size)
114 struct mdt_server_data *msd = &mdt->mdt_msd;
115 struct mdt_client_data *mcd;
116 struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
122 /* When we do a clean MDS shutdown, we save the last_transno into
123 * the header. If we find clients with higher last_transno values
124 * then those clients may need recovery done. */
128 RETURN(rc = -ENOMEM);
130 for (cl_idx = 0, off = le32_to_cpu(msd->msd_client_start);
131 off < last_size; cl_idx++) {
133 struct obd_export *exp;
134 struct mdt_export_data *med;
136 off = le32_to_cpu(msd->msd_client_start) +
137 cl_idx * le16_to_cpu(msd->msd_client_size);
139 rc = mdt_read_last_rcvd(ctx, mdt, mcd, &off);
141 CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
142 LAST_RCVD, cl_idx, off, rc);
143 break; /* read error shouldn't cause startup to fail */
146 if (mcd->mcd_uuid[0] == '\0') {
147 CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
152 last_transno = mcd_last_transno(mcd);
154 /* These exports are cleaned up by mdt_obd_disconnect(), so
155 * they need to be set up like real exports as
156 * mdt_obd_connect() does.
158 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
159 " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx,
160 last_transno, le64_to_cpu(msd->msd_last_transno),
163 exp = class_new_export(obd, (struct obd_uuid *)mcd->mcd_uuid);
165 GOTO(err_client, rc = PTR_ERR(exp));
167 med = &exp->exp_mdt_data;
169 rc = mdt_client_add(ctx, mdt, med, cl_idx);
170 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
172 exp->exp_replay_needed = 1;
173 exp->exp_connecting = 0;
174 obd->obd_recoverable_clients++;
175 obd->obd_max_recoverable_clients++;
176 class_export_put(exp);
178 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
179 cl_idx, last_transno);
181 spin_lock(&mdt->mdt_transno_lock);
182 if (last_transno > mdt->mdt_last_transno)
183 mdt->mdt_last_transno = last_transno;
184 spin_unlock(&mdt->mdt_transno_lock);
193 static int mdt_init_server_data(const struct lu_context *ctx,
194 struct mdt_device *mdt)
196 struct mdt_server_data *msd = &mdt->mdt_msd;
197 struct mdt_client_data *mcd = NULL;
198 struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
199 struct mdt_thread_info *mti;
200 struct dt_object *obj;
202 unsigned long last_rcvd_size = 0;
207 /* ensure padding in the struct is the correct size */
208 LASSERT(offsetof(struct mdt_server_data, msd_padding) +
209 sizeof(msd->msd_padding) == LR_SERVER_SIZE);
210 LASSERT(offsetof(struct mdt_client_data, mcd_padding) +
211 sizeof(mcd->mcd_padding) == LR_CLIENT_SIZE);
213 mti = lu_context_key_get(ctx, &mdt_thread_key);
214 LASSERT(mti != NULL);
215 la = &mti->mti_attr.ma_attr;
217 obj = mdt->mdt_last_rcvd;
218 obj->do_ops->do_read_lock(ctx, obj);
219 rc = obj->do_ops->do_attr_get(ctx, mdt->mdt_last_rcvd, la);
220 obj->do_ops->do_read_unlock(ctx, obj);
224 last_rcvd_size = la->la_size;
226 if (last_rcvd_size == 0) {
227 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
229 memcpy(msd->msd_uuid, obd->obd_uuid.uuid,
230 sizeof(msd->msd_uuid));
231 msd->msd_last_transno = 0;
232 msd->msd_mount_count = 0;
233 msd->msd_server_size = cpu_to_le32(LR_SERVER_SIZE);
234 msd->msd_client_start = cpu_to_le32(LR_CLIENT_START);
235 msd->msd_client_size = cpu_to_le16(LR_CLIENT_SIZE);
236 msd->msd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID);
237 msd->msd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT |
238 OBD_INCOMPAT_COMMON_LR);
240 rc = mdt_read_last_rcvd_header(ctx, mdt, msd);
242 CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
245 if (strcmp(msd->msd_uuid, obd->obd_uuid.uuid) != 0) {
246 LCONSOLE_ERROR("Trying to start OBD %s using the wrong"
247 " disk %s. Were the /dev/ assignments "
249 obd->obd_uuid.uuid, msd->msd_uuid);
250 GOTO(out, rc = -EINVAL);
253 mount_count = le64_to_cpu(msd->msd_mount_count);
255 if (msd->msd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
256 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
257 obd->obd_name, le32_to_cpu(msd->msd_feature_incompat) &
259 GOTO(out, rc = -EINVAL);
261 if (msd->msd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) {
262 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
263 obd->obd_name, le32_to_cpu(msd->msd_feature_rocompat) &
265 /* Do something like remount filesystem read-only */
266 GOTO(out, rc = -EINVAL);
268 if (!(msd->msd_feature_incompat & cpu_to_le32(OBD_INCOMPAT_COMMON_LR))){
269 CDEBUG(D_WARNING, "using old last_rcvd format\n");
270 msd->msd_mount_count = msd->msd_last_transno;
271 msd->msd_last_transno = msd->msd_unused;
272 /* If we update the last_rcvd, we can never go back to
273 an old install, so leave this in the old format for now.
274 msd->msd_feature_incompat |= cpu_to_le32(LR_INCOMPAT_COMMON_LR);
277 msd->msd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
279 spin_lock(&mdt->mdt_transno_lock);
280 mdt->mdt_last_transno = le64_to_cpu(msd->msd_last_transno);
281 spin_unlock(&mdt->mdt_transno_lock);
282 CDEBUG(D_INODE, "========BEGIN DUMPING LAST_RCVD========\n");
283 CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
284 obd->obd_name, mdt->mdt_last_transno);
285 CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
286 obd->obd_name, mount_count + 1);
287 CDEBUG(D_INODE, "%s: server data size: %u\n",
288 obd->obd_name, le32_to_cpu(msd->msd_server_size));
289 CDEBUG(D_INODE, "%s: per-client data start: %u\n",
290 obd->obd_name, le32_to_cpu(msd->msd_client_start));
291 CDEBUG(D_INODE, "%s: per-client data size: %u\n",
292 obd->obd_name, le32_to_cpu(msd->msd_client_size));
293 CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
294 obd->obd_name, last_rcvd_size);
295 CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
296 last_rcvd_size <= le32_to_cpu(msd->msd_client_start) ? 0 :
297 (last_rcvd_size - le32_to_cpu(msd->msd_client_start)) /
298 le16_to_cpu(msd->msd_client_size));
299 CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
301 if (!msd->msd_server_size || !msd->msd_client_start ||
302 !msd->msd_client_size) {
303 CERROR("Bad last_rcvd contents!\n");
304 GOTO(out, rc = -EINVAL);
307 rc = mdt_init_clients_data(ctx, mdt, last_rcvd_size);
309 GOTO(err_client, rc);
311 spin_lock(&mdt->mdt_transno_lock);
312 /* obd_last_committed is used for compatibility
313 * with other lustre recovery code */
314 obd->obd_last_committed = mdt->mdt_last_transno;
315 spin_unlock(&mdt->mdt_transno_lock);
317 if (obd->obd_recoverable_clients) {
318 CWARN("RECOVERY: service %s, %d recoverable clients, "
319 "last_transno "LPU64"\n", obd->obd_name,
320 obd->obd_recoverable_clients, mdt->mdt_last_transno);
321 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
322 obd->obd_recovering = 1;
323 obd->obd_recovery_start = CURRENT_SECONDS;
324 /* Only used for lprocfs_status */
325 obd->obd_recovery_end = obd->obd_recovery_start +
326 OBD_RECOVERY_TIMEOUT;
329 mdt->mdt_mount_count++;
330 msd->msd_mount_count = cpu_to_le64(mdt->mdt_mount_count);
332 /* save it, so mount count and last_transno is current */
333 rc = mdt_update_server_data(ctx, mdt);
335 GOTO(err_client, rc);
340 class_disconnect_exports(obd);
345 static int mdt_update_server_data(const struct lu_context *ctx,
346 struct mdt_device *mdt)
348 struct mdt_server_data *msd = &mdt->mdt_msd;
352 CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
353 mdt->mdt_mount_count, mdt->mdt_last_transno);
355 spin_lock(&mdt->mdt_transno_lock);
356 msd->msd_last_transno = cpu_to_le64(mdt->mdt_last_transno);
357 spin_unlock(&mdt->mdt_transno_lock);
359 rc = mdt_write_last_rcvd_header(ctx, mdt, msd, NULL);
363 /* Add client data to the MDS. We use a bitmap to locate a free space
364 * in the last_rcvd file if cl_off is -1 (i.e. a new client).
365 * Otherwise, we just have to read the data from the last_rcvd file and
366 * we know its offset.
368 * It should not be possible to fail adding an existing client - otherwise
369 * mdt_init_server_data() callsite needs to be fixed.
371 int mdt_client_add(const struct lu_context *ctx,
372 struct mdt_device *mdt,
373 struct mdt_export_data *med, int cl_idx)
375 unsigned long *bitmap = mdt->mdt_client_bitmap;
376 struct mdt_client_data *mcd = med->med_mcd;
377 struct mdt_server_data *msd = &mdt->mdt_msd;
378 int new_client = (cl_idx == -1);
382 LASSERT(bitmap != NULL);
383 LASSERTF(cl_idx > -2, "%d\n", cl_idx);
385 /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
386 * there's no need for extra complication here
389 cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
391 if (cl_idx >= LR_MAX_CLIENTS ||
392 MDT_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
393 CERROR("no room for clients - fix LR_MAX_CLIENTS\n");
396 if (test_and_set_bit(cl_idx, bitmap)) {
397 cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
402 if (test_and_set_bit(cl_idx, bitmap)) {
403 CERROR("MDS client %d: bit already set in bitmap!!\n",
409 CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
410 cl_idx, med->med_mcd->mcd_uuid);
412 med->med_lr_idx = cl_idx;
413 med->med_lr_off = le32_to_cpu(msd->msd_client_start) +
414 (cl_idx * le16_to_cpu(msd->msd_client_size));
415 init_mutex(&med->med_mcd_lock);
417 LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
420 rc = mdt_write_last_rcvd(ctx, mdt, mcd,
421 &med->med_lr_off, NULL);
422 CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
423 cl_idx, med->med_lr_off, sizeof(*mcd));
428 int mdt_client_free(const struct lu_context *ctx,
429 struct mdt_device *mdt,
430 struct mdt_export_data *med)
432 struct mdt_client_data *mcd = med->med_mcd;
440 CDEBUG(D_INFO, "freeing client at idx %u, offset %lld\n",
441 med->med_lr_idx, med->med_lr_off);
443 off = med->med_lr_off;
445 /* Don't clear med_lr_idx here as it is likely also unset. At worst
446 * we leak a client slot that will be cleaned on the next recovery. */
448 CERROR("client idx %d has offset %lld\n",
449 med->med_lr_idx, off);
450 GOTO(free, rc = -EINVAL);
453 /* Clear the bit _after_ zeroing out the client so we don't
454 race with mdt_client_add and zero out new clients.*/
455 if (!test_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
456 CERROR("MDT client %u: bit already clear in bitmap!!\n",
461 mutex_down(&med->med_mcd_lock);
462 memset(mcd, 0, sizeof *mcd);
463 rc = mdt_write_last_rcvd(ctx, mdt, mcd, &off, NULL);
464 mutex_up(&med->med_mcd_lock);
466 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
467 "zeroing out client idx %u in %s rc %d\n",
468 med->med_lr_idx, LAST_RCVD, rc);
470 if (!test_and_clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
471 CERROR("MDS client %u: bit already clear in bitmap!!\n",
476 /* Make sure the server's last_transno is up to date. Do this
477 * after the client is freed so we know all the client's
478 * transactions have been committed. */
479 mdt_update_server_data(ctx, mdt);
489 * last_rcvd & last_committed update callbacks
491 static int mdt_update_last_rcvd(struct mdt_thread_info *mti,
492 struct dt_device *dt,
495 struct mdt_device *mdt = mti->mti_mdt;
496 struct ptlrpc_request *req = mdt_info_req(mti);
497 struct mdt_export_data *med;
498 struct mdt_client_data *mcd;
501 __s32 rc = th->th_result;
505 LASSERT(req->rq_export);
507 med = &req->rq_export->exp_mdt_data;
510 /* if the export has already been failed, we have no last_rcvd slot */
511 if (req->rq_export->exp_failed) {
512 CWARN("commit transaction for disconnected client %s: rc %d\n",
513 req->rq_export->exp_client_uuid.uuid, rc);
519 off = med->med_lr_off;
520 mutex_down(&med->med_mcd_lock);
521 if(lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
522 mcd->mcd_last_close_transno = cpu_to_le64(mti->mti_transno);
523 mcd->mcd_last_close_xid = cpu_to_le64(req->rq_xid);
524 mcd->mcd_last_close_result = cpu_to_le32(rc);
526 mcd->mcd_last_transno = cpu_to_le64(mti->mti_transno);
527 mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
528 mcd->mcd_last_result = cpu_to_le32(rc);
529 /*XXX: save intent_disposition in mdt_thread_info?
530 * also there is bug - intent_dispostion is __u64,
531 * see struct ldlm_reply->lock_policy_res1; */
532 mcd->mcd_last_data = cpu_to_le32(mti->mti_opdata);
535 CERROR("client idx %d has offset %lld\n", med->med_lr_idx, off);
538 err = mdt_write_last_rcvd(mti->mti_ctxt, mdt, mcd, &off, th);
540 mutex_up(&med->med_mcd_lock);
544 extern struct lu_context_key mdt_txn_key;
545 extern struct lu_context_key mdt_thread_key;
548 MDT_TXN_LAST_RCVD_CREDITS = 3
551 /* add credits for last_rcvd update */
552 static int mdt_txn_start_cb(const struct lu_context *ctx,
553 struct txn_param *param, void *cookie)
555 param->tp_credits += MDT_TXN_LAST_RCVD_CREDITS;
559 /* Update last_rcvd records with latests transaction data */
560 static int mdt_txn_stop_cb(const struct lu_context *ctx,
561 struct thandle *txn, void *cookie)
563 struct mdt_device *mdt = cookie;
564 struct mdt_txn_info *txi;
565 struct mdt_thread_info *mti;
567 /* transno in two contexts - for commit_cb and for thread */
568 txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
569 mti = lu_context_key_get(ctx, &mdt_thread_key);
571 /*TODO: checks for recovery cases, see mds_finish_transno */
572 spin_lock(&mdt->mdt_transno_lock);
573 if (txn->th_result != 0) {
574 if (mti->mti_transno != 0) {
575 CERROR("Replay transno "LPU64" failed: rc %i\n",
576 mti->mti_transno, txn->th_result);
577 mti->mti_transno = 0;
579 } else if (mti->mti_transno == 0) {
580 mti->mti_transno = ++ mdt->mdt_last_transno;
583 if (mti->mti_transno > mdt->mdt_last_transno)
584 mdt->mdt_last_transno = mti->mti_transno;
586 /* save transno for the commit callback */
587 txi->txi_transno = mti->mti_transno;
588 spin_unlock(&mdt->mdt_transno_lock);
590 return 0;//mdt_update_last_rcvd(mti, dev, txn);
593 /* commit callback, need to update last_commited value */
594 static int mdt_txn_commit_cb(const struct lu_context *ctxt,
595 struct thandle *txn, void *cookie)
597 struct mdt_device *mdt = cookie;
598 struct obd_device *obd = md2lu_dev(&mdt->mdt_md_dev)->ld_obd;
599 struct mdt_txn_info *txi;
601 txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
603 /* copy of obd_transno_commit_cb() but with locking */
604 spin_lock(&mdt->mdt_transno_lock);
605 if (txi->txi_transno > obd->obd_last_committed) {
606 obd->obd_last_committed = txi->txi_transno;
607 spin_unlock(&mdt->mdt_transno_lock);
608 ptlrpc_commit_replies(obd);
610 spin_unlock(&mdt->mdt_transno_lock);
612 CDEBUG(D_HA, "%s: transno "LPD64" committed\n",
613 obd->obd_name, txi->txi_transno);
618 int mdt_fs_setup(const struct lu_context *ctx, struct mdt_device *mdt)
620 struct lu_fid last_fid;
621 struct dt_object *last;
625 /* prepare transactions callbacks */
626 mdt->mdt_txn_cb.dtc_txn_start = mdt_txn_start_cb;
627 mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
628 mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb;
629 mdt->mdt_txn_cb.dtc_cookie = mdt;
631 dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
633 last = dt_store_open(ctx, mdt->mdt_bottom,
634 LAST_RCVD, &last_fid);
636 mdt->mdt_last_rcvd = last;
637 rc = mdt_init_server_data(ctx, mdt);
639 lu_object_put(ctx, &last->do_lu);
640 mdt->mdt_last_rcvd = NULL;
644 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
651 void mdt_fs_cleanup(const struct lu_context *ctx, struct mdt_device *mdt)
653 struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
655 /* remove transaction callback */
656 dt_txn_callback_del(mdt->mdt_bottom, &mdt->mdt_txn_cb);
658 class_disconnect_exports(obd); /* cleans up client info too */
660 if (mdt->mdt_last_rcvd)
661 lu_object_put(ctx, &mdt->mdt_last_rcvd->do_lu);
662 mdt->mdt_last_rcvd = NULL;
665 /* reconstruction code */
666 void mdt_req_from_mcd(struct ptlrpc_request *req,
667 struct mdt_client_data *mcd)
669 DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
670 mcd->mcd_last_transno, mcd->mcd_last_result);
671 req->rq_transno = mcd->mcd_last_transno;
672 req->rq_status = mcd->mcd_last_result;
673 lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
674 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
675 //mds_steal_ack_locks(req);
678 static void mdt_reconstruct_generic(struct mdt_thread_info *mti)
680 struct ptlrpc_request *req = mdt_info_req(mti);
681 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
683 return mdt_req_from_mcd(req, med->med_mcd);
686 static void mdt_reconstruct_create(struct mdt_thread_info *mti)
688 struct ptlrpc_request *req = mdt_info_req(mti);
689 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
690 struct mdt_device *mdt = mti->mti_mdt;
691 struct mdt_object *child;
692 struct mdt_body *body;
695 mdt_req_from_mcd(req, med->med_mcd);
699 /* if no error, so child was created with requested fid */
700 child = mdt_object_find(mti->mti_ctxt, mdt, mti->mti_rr.rr_fid2);
701 LASSERT(!IS_ERR(child));
703 body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
704 rc = mo_attr_get(mti->mti_ctxt, mdt_object_child(child),
706 if (rc == -EREMOTE) {
707 /* object was created on remote server */
709 body->valid |= OBD_MD_MDS;
711 mdt_pack_attr2body(body, &mti->mti_attr.ma_attr,
712 mti->mti_rr.rr_fid2);
713 mdt_object_put(mti->mti_ctxt, child);
716 static void mdt_reconstruct_setattr(struct mdt_thread_info *mti)
718 struct ptlrpc_request *req = mdt_info_req(mti);
719 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
720 struct mdt_device *mdt = mti->mti_mdt;
721 struct mdt_object *obj;
722 struct mdt_body *body;
724 mdt_req_from_mcd(req, med->med_mcd);
728 body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
729 obj = mdt_object_find(mti->mti_ctxt, mdt, mti->mti_rr.rr_fid1);
730 LASSERT(!IS_ERR(obj));
731 mo_attr_get(mti->mti_ctxt, mdt_object_child(obj), &mti->mti_attr);
732 mdt_pack_attr2body(body, &mti->mti_attr.ma_attr,
733 mti->mti_rr.rr_fid1);
735 /* Don't return OST-specific attributes if we didn't just set them */
737 if (rec->ur_iattr.ia_valid & ATTR_SIZE)
738 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
739 if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
740 body->valid |= OBD_MD_FLMTIME;
741 if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
742 body->valid |= OBD_MD_FLATIME;
744 mdt_object_put(mti->mti_ctxt, obj);
747 typedef void (*mdt_reconstructor)(struct mdt_thread_info *mti);
749 static mdt_reconstructor reconstructors[REINT_MAX] = {
750 [REINT_SETATTR] = mdt_reconstruct_setattr,
751 [REINT_CREATE] = mdt_reconstruct_create,
752 [REINT_LINK] = mdt_reconstruct_generic,
753 [REINT_UNLINK] = mdt_reconstruct_generic,
754 [REINT_RENAME] = mdt_reconstruct_generic,
755 [REINT_OPEN] = mdt_reconstruct_open
758 void mdt_reconstruct(struct mdt_thread_info *mti)
761 reconstructors[mti->mti_rr.rr_opcode](mti);