Whamcloud - gitweb
- merge with 1_5,some fixes.
[fs/lustre-release.git] / lustre / mdt / mdt_recovery.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_recovery.c
5  *  Lustre Metadata Target (mdt) recovery-related methods
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Huang Hua <huanghua@clusterfs.com>
9  *   Author; Pershin Mike <tappro@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include "mdt_internal.h"
35
36 static int mdt_update_server_data(const struct lu_context *ctx,
37                                   struct mdt_device *mdt);
38
39 /* last_rcvd handling */
40 static inline int mdt_read_last_rcvd_header (const struct lu_context *ctx,
41                                              struct mdt_device *mdt,
42                                              struct mdt_server_data *msd)
43 {
44         loff_t off = 0;
45         int rc;
46
47         rc = mdt->mdt_last_rcvd->do_body_ops->dbo_read(ctx,
48                                                        mdt->mdt_last_rcvd,
49                                                        msd, sizeof(*msd),
50                                                        &off);
51         if (rc == sizeof(*msd))
52                 rc = 0;
53         else if (rc >= 0)
54                 rc = -EFAULT;
55         return rc;
56 }
57
58 static inline int mdt_write_last_rcvd_header(const struct lu_context *ctx, 
59                                              struct mdt_device *mdt,
60                                              struct mdt_server_data *msd,
61                                              struct thandle *th)
62 {
63         loff_t off = 0;
64         int rc;
65
66         rc = mdt->mdt_last_rcvd->do_body_ops->dbo_write(ctx,
67                                                         mdt->mdt_last_rcvd,
68                                                         msd, sizeof(*msd),
69                                                         &off, th);
70         if (rc == sizeof(*msd))
71                 rc = 0;
72         else if (rc >= 0)
73                 rc = -EFAULT;
74         return rc;
75 }
76
77 static inline int mdt_read_last_rcvd(const struct lu_context *ctx,
78                                      struct mdt_device *mdt,
79                                      struct mdt_client_data *mcd, loff_t *off)
80
81         int rc;
82
83         rc = mdt->mdt_last_rcvd->do_body_ops->dbo_read(ctx, mdt->mdt_last_rcvd,
84                                                        mcd, sizeof(*mcd), off);
85         if (rc == sizeof(*mcd))
86                 rc = 0;
87         else if (rc >= 0)
88                 rc = -EFAULT;
89         return rc;
90 }
91
92 static inline int mdt_write_last_rcvd(const struct lu_context *ctx, 
93                                       struct mdt_device *mdt,
94                                       struct mdt_client_data *mcd,
95                                       loff_t *off, struct thandle *th)
96 {
97         int rc;
98
99         rc = mdt->mdt_last_rcvd->do_body_ops->dbo_write(ctx,
100                                                         mdt->mdt_last_rcvd,
101                                                         mcd, sizeof(*mcd),
102                                                         off, th);
103         if (rc == sizeof(*mcd))
104                 rc = 0;
105         else if (rc >= 0)
106                 rc = -EFAULT;
107         return rc;
108 }
109
110 static int mdt_init_clients_data(const struct lu_context *ctx,
111                                 struct mdt_device *mdt,
112                                 unsigned long last_size)
113 {
114         struct mdt_server_data *msd = &mdt->mdt_msd;
115         struct mdt_client_data *mcd;
116         struct obd_device      *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
117         loff_t off = 0;
118         int cl_idx;
119         int rc = 0;
120         ENTRY;
121
122         /* When we do a clean MDS shutdown, we save the last_transno into
123          * the header.  If we find clients with higher last_transno values
124          * then those clients may need recovery done. */
125
126         OBD_ALLOC_PTR(mcd);
127         if (!mcd)
128                 RETURN(rc = -ENOMEM);
129
130         for (cl_idx = 0, off = le32_to_cpu(msd->msd_client_start);
131              off < last_size; cl_idx++) {
132                 __u64 last_transno;
133                 struct obd_export *exp;
134                 struct mdt_export_data *med;
135
136                 off = le32_to_cpu(msd->msd_client_start) +
137                         cl_idx * le16_to_cpu(msd->msd_client_size);
138
139                 rc = mdt_read_last_rcvd(ctx, mdt, mcd, &off);
140                 if (rc) {
141                         CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
142                                LAST_RCVD, cl_idx, off, rc);
143                         break; /* read error shouldn't cause startup to fail */
144                 }
145
146                 if (mcd->mcd_uuid[0] == '\0') {
147                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
148                                cl_idx);
149                         continue;
150                 }
151
152                 last_transno = le64_to_cpu(mcd->mcd_last_transno);
153
154                 /* These exports are cleaned up by mdt_obd_disconnect(), so
155                  * they need to be set up like real exports as
156                  * mdt_obd_connect() does.
157                  */
158                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
159                        " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx,
160                        last_transno, le64_to_cpu(msd->msd_last_transno),
161                        le64_to_cpu(mcd->mcd_last_xid));
162
163                 exp = class_new_export(obd, (struct obd_uuid *)mcd->mcd_uuid);
164                 if (IS_ERR(exp))
165                         GOTO(err_client, rc = PTR_ERR(exp));
166
167                 med = &exp->exp_mdt_data;
168                 med->med_mcd = mcd;
169                 rc = mdt_client_add(ctx, mdt, med, cl_idx);
170                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
171
172                 exp->exp_replay_needed = 1;
173                 exp->exp_connecting = 0;
174                 obd->obd_recoverable_clients++;
175                 obd->obd_max_recoverable_clients++;
176                 class_export_put(exp);
177
178                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
179                        cl_idx, last_transno);
180
181                 spin_lock(&mdt->mdt_transno_lock);
182                 if (last_transno > mdt->mdt_last_transno)
183                         mdt->mdt_last_transno = last_transno;
184                 spin_unlock(&mdt->mdt_transno_lock);
185         }
186
187 err_client:
188         OBD_FREE_PTR(mcd);
189
190         RETURN(rc);
191 }
192
193 static int mdt_init_server_data(const struct lu_context *ctx,
194                                 struct mdt_device *mdt)
195 {
196         struct mdt_server_data *msd = &mdt->mdt_msd;
197         struct mdt_client_data *mcd = NULL;
198         struct obd_device      *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
199         struct mdt_thread_info *mti;
200         struct lu_attr         *la;
201         unsigned long last_rcvd_size = 0;
202         __u64 mount_count;
203         int rc = 0;
204         ENTRY;
205
206         /* ensure padding in the struct is the correct size */
207         LASSERT(offsetof(struct mdt_server_data, msd_padding) +
208                 sizeof(msd->msd_padding) == LR_SERVER_SIZE);
209         LASSERT(offsetof(struct mdt_client_data, mcd_padding) +
210                 sizeof(mcd->mcd_padding) == LR_CLIENT_SIZE);
211         
212         mti = lu_context_key_get(ctx, &mdt_thread_key);
213         LASSERT(mti != NULL);
214         la = &mti->mti_attr.ma_attr;
215
216         rc = mdt->mdt_last_rcvd->do_ops->do_attr_get(ctx,
217                                                      mdt->mdt_last_rcvd, la);
218         if (rc)
219                 RETURN(rc);
220
221         last_rcvd_size = la->la_size;
222
223         if (last_rcvd_size == 0) {
224                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
225
226                 memcpy(msd->msd_uuid, obd->obd_uuid.uuid,
227                        sizeof(msd->msd_uuid));
228                 msd->msd_last_transno = 0;
229                 msd->msd_mount_count = 0;
230                 msd->msd_server_size = cpu_to_le32(LR_SERVER_SIZE);
231                 msd->msd_client_start = cpu_to_le32(LR_CLIENT_START);
232                 msd->msd_client_size = cpu_to_le16(LR_CLIENT_SIZE);
233                 msd->msd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID);
234                 msd->msd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT |
235                                                        OBD_INCOMPAT_COMMON_LR);
236         } else {
237                 rc = mdt_read_last_rcvd_header(mti->mti_ctxt, mdt, msd);
238                 if (rc) {
239                         CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
240                         GOTO(out, rc);
241                 }
242                 if (strcmp(msd->msd_uuid, obd->obd_uuid.uuid) != 0) {
243                         LCONSOLE_ERROR("Trying to start OBD %s using the wrong"
244                                        " disk %s. Were the /dev/ assignments "
245                                        "rearranged?\n",
246                                        obd->obd_uuid.uuid, msd->msd_uuid);
247                         GOTO(out, rc = -EINVAL);
248                 }
249         }
250         mount_count = le64_to_cpu(msd->msd_mount_count);
251
252         if (msd->msd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
253                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
254                        obd->obd_name, le32_to_cpu(msd->msd_feature_incompat) &
255                        ~MDT_INCOMPAT_SUPP);
256                 GOTO(out, rc = -EINVAL);
257         }
258         if (msd->msd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) {
259                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
260                        obd->obd_name, le32_to_cpu(msd->msd_feature_rocompat) &
261                        ~MDT_ROCOMPAT_SUPP);
262                 /* Do something like remount filesystem read-only */
263                 GOTO(out, rc = -EINVAL);
264         }
265         if (!(msd->msd_feature_incompat & cpu_to_le32(OBD_INCOMPAT_COMMON_LR))){
266                 CDEBUG(D_WARNING, "using old last_rcvd format\n");
267                 msd->msd_mount_count = msd->msd_last_transno;
268                 msd->msd_last_transno = msd->msd_unused;
269                 /* If we update the last_rcvd, we can never go back to
270                    an old install, so leave this in the old format for now.
271                 msd->msd_feature_incompat |= cpu_to_le32(LR_INCOMPAT_COMMON_LR);
272                 */
273         }
274         msd->msd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
275
276         spin_lock(&mdt->mdt_transno_lock);
277         mdt->mdt_last_transno = le64_to_cpu(msd->msd_last_transno);
278         spin_unlock(&mdt->mdt_transno_lock);
279
280         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
281                obd->obd_name, mdt->mdt_last_transno);
282         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
283                obd->obd_name, mount_count + 1);
284         CDEBUG(D_INODE, "%s: server data size: %u\n",
285                obd->obd_name, le32_to_cpu(msd->msd_server_size));
286         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
287                obd->obd_name, le32_to_cpu(msd->msd_client_start));
288         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
289                obd->obd_name, le32_to_cpu(msd->msd_client_size));
290         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
291                obd->obd_name, last_rcvd_size);
292         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
293                last_rcvd_size <= le32_to_cpu(msd->msd_client_start) ? 0 :
294                (last_rcvd_size - le32_to_cpu(msd->msd_client_start)) /
295                 le16_to_cpu(msd->msd_client_size));
296
297         if (!msd->msd_server_size || !msd->msd_client_start ||
298             !msd->msd_client_size) {
299                 CERROR("Bad last_rcvd contents!\n");
300                 GOTO(out, rc = -EINVAL);
301         }
302
303         rc = mdt_init_clients_data(ctx, mdt, last_rcvd_size);
304         if (rc)
305                 GOTO(err_client, rc);
306
307         spin_lock(&mdt->mdt_transno_lock);
308         /* obd_last_committed is used for compatibility
309          * with other lustre recovery code */
310         obd->obd_last_committed = mdt->mdt_last_transno;
311         spin_unlock(&mdt->mdt_transno_lock);
312
313         if (obd->obd_recoverable_clients) {
314                 CWARN("RECOVERY: service %s, %d recoverable clients, "
315                       "last_transno "LPU64"\n", obd->obd_name,
316                       obd->obd_recoverable_clients, mdt->mdt_last_transno);
317                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
318                 obd->obd_recovering = 1;
319                 obd->obd_recovery_start = CURRENT_SECONDS;
320                 /* Only used for lprocfs_status */
321                 obd->obd_recovery_end = obd->obd_recovery_start +
322                         OBD_RECOVERY_TIMEOUT;
323         }
324
325         mdt->mdt_mount_count++;
326         msd->msd_mount_count = cpu_to_le64(mdt->mdt_mount_count);
327
328         /* save it, so mount count and last_transno is current */
329         rc = mdt_update_server_data(ctx, mdt);
330         if (rc)
331                 GOTO(err_client, rc);
332
333         RETURN(0);
334
335 err_client:
336         class_disconnect_exports(obd);
337 out:
338         return rc;
339 }
340
341 static int mdt_update_server_data(const struct lu_context *ctx,
342                                   struct mdt_device *mdt)
343 {
344         struct mdt_server_data *msd = &mdt->mdt_msd;
345         int rc = 0;
346         ENTRY;
347
348         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
349                 mdt->mdt_mount_count, mdt->mdt_last_transno);
350
351         spin_lock(&mdt->mdt_transno_lock);
352         msd->msd_last_transno = cpu_to_le64(mdt->mdt_last_transno);
353         spin_unlock(&mdt->mdt_transno_lock);
354
355         rc = mdt_write_last_rcvd_header(ctx, mdt, msd, NULL);
356         RETURN(rc);
357 }
358
359 /* Add client data to the MDS.  We use a bitmap to locate a free space
360  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
361  * Otherwise, we just have to read the data from the last_rcvd file and
362  * we know its offset.
363  *
364  * It should not be possible to fail adding an existing client - otherwise
365  * mdt_init_server_data() callsite needs to be fixed.
366  */
367 int mdt_client_add(const struct lu_context *ctx,
368                    struct mdt_device *mdt,
369                    struct mdt_export_data *med, int cl_idx)
370 {
371         unsigned long *bitmap = mdt->mdt_client_bitmap;
372         struct mdt_client_data *mcd = med->med_mcd;
373         struct mdt_server_data *msd = &mdt->mdt_msd;
374         int new_client = (cl_idx == -1);
375         int rc = 0;
376         ENTRY;
377
378         LASSERT(bitmap != NULL);
379         LASSERTF(cl_idx > -2, "%d\n", cl_idx);
380
381         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
382          * there's no need for extra complication here
383          */
384         if (new_client) {
385                 cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
386         repeat:
387                 if (cl_idx >= LR_MAX_CLIENTS ||
388                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
389                         CERROR("no room for clients - fix LR_MAX_CLIENTS\n");
390                         return -EOVERFLOW;
391                 }
392                 if (test_and_set_bit(cl_idx, bitmap)) {
393                         cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
394                                                     cl_idx);
395                         goto repeat;
396                 }
397         } else {
398                 if (test_and_set_bit(cl_idx, bitmap)) {
399                         CERROR("MDS client %d: bit already set in bitmap!!\n",
400                                cl_idx);
401                         LBUG();
402                 }
403         }
404
405         CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
406                cl_idx, med->med_mcd->mcd_uuid);
407
408         med->med_lr_idx = cl_idx;
409         med->med_lr_off = le32_to_cpu(msd->msd_client_start) +
410                           (cl_idx * le16_to_cpu(msd->msd_client_size));
411         init_mutex(&med->med_mcd_lock);
412
413         LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
414
415         if (new_client) {
416                 rc = mdt_write_last_rcvd(ctx, mdt, mcd,
417                                          &med->med_lr_off, NULL);
418                 CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
419                        cl_idx, med->med_lr_off, sizeof(*mcd));
420         }
421         RETURN(rc);
422 }
423
424 int mdt_client_free(const struct lu_context *ctx,
425                     struct mdt_device *mdt,
426                     struct mdt_export_data *med)
427 {
428         struct mdt_client_data *mcd  = med->med_mcd;
429         int rc = 0;
430         loff_t off;
431         ENTRY;
432
433         if (!mcd)
434                 RETURN(0);
435
436         CDEBUG(D_INFO, "freeing client at idx %u, offset %lld\n",
437                med->med_lr_idx, med->med_lr_off);
438
439         off = med->med_lr_off;
440
441         /* Don't clear med_lr_idx here as it is likely also unset.  At worst
442          * we leak a client slot that will be cleaned on the next recovery. */
443         if (off <= 0) {
444                 CERROR("client idx %d has offset %lld\n",
445                         med->med_lr_idx, off);
446                 GOTO(free, rc = -EINVAL);
447         }
448
449         /* Clear the bit _after_ zeroing out the client so we don't
450            race with mdt_client_add and zero out new clients.*/
451         if (!test_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
452                 CERROR("MDT client %u: bit already clear in bitmap!!\n",
453                        med->med_lr_idx);
454                 LBUG();
455         }
456
457         mutex_down(&med->med_mcd_lock);
458         memset(mcd, 0, sizeof *mcd);
459         rc = mdt_write_last_rcvd(ctx, mdt, mcd, &off, NULL);
460         mutex_up(&med->med_mcd_lock);
461
462         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
463                "zeroing out client idx %u in %s rc %d\n",
464                med->med_lr_idx, LAST_RCVD, rc);
465
466         if (!test_and_clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
467                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
468                        med->med_lr_idx);
469                 LBUG();
470         }
471
472         /* Make sure the server's last_transno is up to date. Do this
473          * after the client is freed so we know all the client's
474          * transactions have been committed. */
475         mdt_update_server_data(ctx, mdt);
476
477         EXIT;
478 free:
479         OBD_FREE_PTR(mcd);
480         med->med_mcd = NULL;
481         return 0;
482 }
483
484 /*
485  * last_rcvd & last_committed update callbacks
486  */
487 static int mdt_update_last_rcvd(struct mdt_thread_info *mti,
488                                 struct dt_device *dt,
489                                 struct thandle *th)
490 {
491         struct mdt_device *mdt = mti->mti_mdt;
492         struct ptlrpc_request *req = mdt_info_req(mti);
493         struct mdt_export_data *med;
494         struct mdt_client_data *mcd;
495         loff_t off;
496         int err;
497         __s32 rc = th->th_result;
498         
499         ENTRY;
500         LASSERT(req);
501         LASSERT(req->rq_export);
502         LASSERT(mdt);
503         med = &req->rq_export->exp_mdt_data;
504         LASSERT(med);
505         mcd = med->med_mcd;
506         /* if the export has already been failed, we have no last_rcvd slot */
507         if (req->rq_export->exp_failed) {
508                 CWARN("commit transaction for disconnected client %s: rc %d\n",
509                       req->rq_export->exp_client_uuid.uuid, rc);
510                 if (rc == 0)
511                         rc = -ENOTCONN;
512                 RETURN(rc);
513         }
514
515         off = med->med_lr_off;
516         mutex_down(&med->med_mcd_lock);
517         mcd->mcd_last_transno = cpu_to_le64(mti->mti_transno);
518         mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
519         mcd->mcd_last_result = cpu_to_le32(rc);
520         /* XXX: how to pass op_data here? */
521         //mcd->mcd_last_data = cpu_to_le32(op_data);
522         
523         if (off <= 0) {
524                 CERROR("client idx %d has offset %lld\n", med->med_lr_idx, off);
525                 err = -EINVAL;
526         } else {
527                 err = mdt_write_last_rcvd(mti->mti_ctxt, mdt, mcd, &off, th);
528         }
529         mutex_up(&med->med_mcd_lock);
530         RETURN(err);
531 }
532
533 extern struct lu_context_key mdt_txn_key;
534 extern struct lu_context_key mdt_thread_key;
535
536 enum {
537         MDT_TXN_LAST_RCVD_CREDITS = 3
538 };
539
540 /* add credits for last_rcvd update */
541 static int mdt_txn_start_cb(const struct lu_context *ctx,
542                             struct dt_device *dev,
543                             struct txn_param *param, void *cookie)
544 {
545         param->tp_credits += MDT_TXN_LAST_RCVD_CREDITS;
546         return 0;
547 }
548
549 /* Update last_rcvd records with latests transaction data */
550 static int mdt_txn_stop_cb(const struct lu_context *ctx,
551                            struct dt_device *dev,
552                            struct thandle *txn, void *cookie)
553 {
554         struct mdt_device *mdt = cookie;
555         struct mdt_txn_info *txi;
556         struct mdt_thread_info *mti;
557
558         /* transno in two contexts - for commit_cb and for thread */
559         txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
560         mti = lu_context_key_get(ctx, &mdt_thread_key);
561
562         /*TODO: checks for recovery cases, see mds_finish_transno */
563         spin_lock(&mdt->mdt_transno_lock);
564         if (txn->th_result != 0) {
565                 if (mti->mti_transno != 0) {
566                         CERROR("Replay transno "LPU64" failed: rc %i\n",
567                                mti->mti_transno, txn->th_result);
568                         mti->mti_transno = 0;
569                 }
570         } else if (mti->mti_transno == 0) {
571                 mti->mti_transno = ++ mdt->mdt_last_transno;
572         } else {
573                 /* replay */
574                 if (mti->mti_transno > mdt->mdt_last_transno)
575                         mdt->mdt_last_transno = mti->mti_transno;
576         }
577         /* save transno for the commit callback */
578         txi->txi_transno = mti->mti_transno;
579         spin_unlock(&mdt->mdt_transno_lock);
580
581         return 0;//mdt_update_last_rcvd(mti, dev, txn);
582 }
583
584 /* commit callback, need to update last_commited value */
585 static int mdt_txn_commit_cb(const struct lu_context *ctx,
586                              struct dt_device *dev,
587                              struct thandle *txn, void *cookie)
588 {
589         struct mdt_device *mdt = cookie;
590         struct obd_device *obd = md2lu_dev(&mdt->mdt_md_dev)->ld_obd;
591         struct mdt_txn_info *txi;
592
593         txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
594
595         /* copy of obd_transno_commit_cb() but with locking */
596         spin_lock(&mdt->mdt_transno_lock);
597         if (txi->txi_transno > obd->obd_last_committed) {
598                 obd->obd_last_committed = txi->txi_transno;
599                 spin_unlock(&mdt->mdt_transno_lock);
600                 ptlrpc_commit_replies (obd);
601         } else 
602                 spin_unlock(&mdt->mdt_transno_lock);
603
604         CDEBUG(D_HA, "%s: transno "LPD64" committed\n",
605                obd->obd_name, txi->txi_transno);
606
607         return 0;
608 }
609
610 int mdt_fs_setup(const struct lu_context *ctx, struct mdt_device *mdt)
611 {
612         struct lu_fid last_fid;
613         struct dt_object *last;
614         int rc = 0;
615         ENTRY;
616
617         /* prepare transactions callbacks */
618         mdt->mdt_txn_cb.dtc_txn_start = mdt_txn_start_cb;
619         mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
620         mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb;
621         mdt->mdt_txn_cb.dtc_cookie = mdt;
622
623         dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
624
625         last = dt_store_open(ctx, mdt->mdt_bottom,
626                              LAST_RCVD, &last_fid);
627         if(!IS_ERR(last)) {
628                 mdt->mdt_last_rcvd = last;
629                 rc = mdt_init_server_data(ctx, mdt);
630                 if (rc) {
631                         lu_object_put(ctx, &last->do_lu);
632                         mdt->mdt_last_rcvd = NULL;
633                 }
634         } else {
635                 rc = PTR_ERR(last);
636                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
637         }
638
639         RETURN (rc);
640 }
641
642
643 void mdt_fs_cleanup(const struct lu_context *ctx, struct mdt_device *mdt)
644 {
645         struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
646
647         /* remove transaction callback */
648         dt_txn_callback_del(mdt->mdt_bottom, &mdt->mdt_txn_cb);
649
650         class_disconnect_exports(obd); /* cleans up client info too */
651
652         if (mdt->mdt_last_rcvd)
653                 lu_object_put(ctx, &mdt->mdt_last_rcvd->do_lu);
654         mdt->mdt_last_rcvd = NULL;
655 }
656
657 /* reconstruction code */
658 static inline void mdt_req_from_mcd(struct ptlrpc_request *req,
659                                     struct mdt_client_data *mcd)
660 {
661         DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
662                   mcd->mcd_last_transno, mcd->mcd_last_result);
663         req->rq_transno = mcd->mcd_last_transno;
664         req->rq_status = mcd->mcd_last_result;
665         lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
666         lustre_msg_set_status(req->rq_repmsg, req->rq_status);
667         //mds_steal_ack_locks(req);
668 }
669
670 static void mdt_reconstruct_generic(struct mdt_thread_info *mti)
671 {
672         struct ptlrpc_request *req = mdt_info_req(mti);
673         struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
674
675         return mdt_req_from_mcd(req, med->med_mcd);
676 }
677
678 static void mdt_reconstruct_create(struct mdt_thread_info *mti)
679 {
680         struct ptlrpc_request  *req = mdt_info_req(mti);
681         struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
682         struct mdt_device *mdt = mti->mti_mdt;
683         struct mdt_object *child;
684         struct mdt_body *body;
685         int rc;
686
687         mdt_req_from_mcd(req, med->med_mcd);
688         if (req->rq_status)
689                 return;
690
691         /* if no error, so child was created with requested fid */
692         child = mdt_object_find(mti->mti_ctxt, mdt, mti->mti_rr.rr_fid2);
693         LASSERT(!IS_ERR(child));
694
695         body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
696         rc = mo_attr_get(mti->mti_ctxt, mdt_object_child(child),
697                          &mti->mti_attr);
698         if (rc == -EREMOTE) {
699                 /* object was created on remote server */
700                 req->rq_status = rc;
701                 body->valid |= OBD_MD_MDS;
702         }
703         mdt_pack_attr2body(body, &mti->mti_attr.ma_attr,
704                            mti->mti_rr.rr_fid2);
705         mdt_object_put(mti->mti_ctxt, child);
706 }
707
708 static void mdt_reconstruct_setattr(struct mdt_thread_info *mti)
709 {
710         struct ptlrpc_request  *req = mdt_info_req(mti);
711         struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
712         struct mdt_device *mdt = mti->mti_mdt;
713         struct mdt_object *obj;
714         struct mdt_body *body;
715         
716         mdt_req_from_mcd(req, med->med_mcd);
717         if (req->rq_status)
718                 return;
719         
720         body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
721         obj = mdt_object_find(mti->mti_ctxt, mdt, mti->mti_rr.rr_fid1);
722         LASSERT(!IS_ERR(obj));
723         mo_attr_get(mti->mti_ctxt, mdt_object_child(obj), &mti->mti_attr);
724         mdt_pack_attr2body(body, &mti->mti_attr.ma_attr,
725                            mti->mti_rr.rr_fid1);
726
727         /* Don't return OST-specific attributes if we didn't just set them */
728 /*
729         if (rec->ur_iattr.ia_valid & ATTR_SIZE)
730                 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
731         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
732                 body->valid |= OBD_MD_FLMTIME;
733         if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
734                 body->valid |= OBD_MD_FLATIME;
735 */
736         mdt_object_put(mti->mti_ctxt, obj);
737 }
738
739 static void mdt_reconstruct_open(struct mdt_thread_info *mti)
740 {
741         struct ptlrpc_request *req = mdt_info_req(mti);
742         struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
743
744         /*TODO: after open recovery task will be finished */
745
746         mdt_req_from_mcd(req, med->med_mcd);
747 }
748
749 typedef void (*mdt_reconstructor)(struct mdt_thread_info *mti);
750
751 static mdt_reconstructor reconstructors[REINT_MAX] = {
752         [REINT_SETATTR]  = mdt_reconstruct_setattr,
753         [REINT_CREATE] = mdt_reconstruct_create,
754         [REINT_LINK] = mdt_reconstruct_generic,
755         [REINT_UNLINK] = mdt_reconstruct_generic,
756         [REINT_RENAME] = mdt_reconstruct_generic,
757         [REINT_OPEN] = mdt_reconstruct_open
758 };
759
760 void mdt_reconstruct(struct mdt_thread_info *mti)
761 {
762         ENTRY;
763         reconstructors[mti->mti_rr.rr_opcode](mti);
764         EXIT;
765 }
766