Whamcloud - gitweb
added some open recovery prototype code.
[fs/lustre-release.git] / lustre / mdt / mdt_recovery.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_recovery.c
5  *  Lustre Metadata Target (mdt) recovery-related methods
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Huang Hua <huanghua@clusterfs.com>
9  *   Author; Pershin Mike <tappro@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include "mdt_internal.h"
35
36 static int mdt_update_server_data(const struct lu_context *ctx,
37                                   struct mdt_device *mdt);
38
39 /* last_rcvd handling */
40 static inline int mdt_read_last_rcvd_header (const struct lu_context *ctx,
41                                              struct mdt_device *mdt,
42                                              struct mdt_server_data *msd)
43 {
44         loff_t off = 0;
45         int rc;
46
47         rc = mdt->mdt_last_rcvd->do_body_ops->dbo_read(ctx,
48                                                        mdt->mdt_last_rcvd,
49                                                        msd, sizeof(*msd),
50                                                        &off);
51         if (rc == sizeof(*msd))
52                 rc = 0;
53         else if (rc >= 0)
54                 rc = -EFAULT;
55         return rc;
56 }
57
58 static inline int mdt_write_last_rcvd_header(const struct lu_context *ctx,
59                                              struct mdt_device *mdt,
60                                              struct mdt_server_data *msd,
61                                              struct thandle *th)
62 {
63         loff_t off = 0;
64         int rc;
65
66         rc = mdt->mdt_last_rcvd->do_body_ops->dbo_write(ctx,
67                                                         mdt->mdt_last_rcvd,
68                                                         msd, sizeof(*msd),
69                                                         &off, th);
70         if (rc == sizeof(*msd))
71                 rc = 0;
72         else if (rc >= 0)
73                 rc = -EFAULT;
74         return rc;
75 }
76
77 static inline int mdt_read_last_rcvd(const struct lu_context *ctx,
78                                      struct mdt_device *mdt,
79                                      struct mdt_client_data *mcd, loff_t *off)
80 {
81         int rc;
82
83         rc = mdt->mdt_last_rcvd->do_body_ops->dbo_read(ctx, mdt->mdt_last_rcvd,
84                                                        mcd, sizeof(*mcd), off);
85         if (rc == sizeof(*mcd))
86                 rc = 0;
87         else if (rc >= 0)
88                 rc = -EFAULT;
89         return rc;
90 }
91
92 static inline int mdt_write_last_rcvd(const struct lu_context *ctx,
93                                       struct mdt_device *mdt,
94                                       struct mdt_client_data *mcd,
95                                       loff_t *off, struct thandle *th)
96 {
97         int rc;
98
99         rc = mdt->mdt_last_rcvd->do_body_ops->dbo_write(ctx,
100                                                         mdt->mdt_last_rcvd,
101                                                         mcd, sizeof(*mcd),
102                                                         off, th);
103         if (rc == sizeof(*mcd))
104                 rc = 0;
105         else if (rc >= 0)
106                 rc = -EFAULT;
107         return rc;
108 }
109
110 static int mdt_init_clients_data(const struct lu_context *ctx,
111                                 struct mdt_device *mdt,
112                                 unsigned long last_size)
113 {
114         struct mdt_server_data *msd = &mdt->mdt_msd;
115         struct mdt_client_data *mcd;
116         struct obd_device      *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
117         loff_t off = 0;
118         int cl_idx;
119         int rc = 0;
120         ENTRY;
121
122         /* When we do a clean MDS shutdown, we save the last_transno into
123          * the header.  If we find clients with higher last_transno values
124          * then those clients may need recovery done. */
125
126         OBD_ALLOC_PTR(mcd);
127         if (!mcd)
128                 RETURN(rc = -ENOMEM);
129
130         for (cl_idx = 0, off = le32_to_cpu(msd->msd_client_start);
131              off < last_size; cl_idx++) {
132                 __u64 last_transno;
133                 struct obd_export *exp;
134                 struct mdt_export_data *med;
135
136                 off = le32_to_cpu(msd->msd_client_start) +
137                         cl_idx * le16_to_cpu(msd->msd_client_size);
138
139                 rc = mdt_read_last_rcvd(ctx, mdt, mcd, &off);
140                 if (rc) {
141                         CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
142                                LAST_RCVD, cl_idx, off, rc);
143                         break; /* read error shouldn't cause startup to fail */
144                 }
145
146                 if (mcd->mcd_uuid[0] == '\0') {
147                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
148                                cl_idx);
149                         continue;
150                 }
151
152                 last_transno = mcd_last_transno(mcd);
153
154                 /* These exports are cleaned up by mdt_obd_disconnect(), so
155                  * they need to be set up like real exports as
156                  * mdt_obd_connect() does.
157                  */
158                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
159                        " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx,
160                        last_transno, le64_to_cpu(msd->msd_last_transno),
161                        mcd_last_xid(mcd));
162
163                 exp = class_new_export(obd, (struct obd_uuid *)mcd->mcd_uuid);
164                 if (IS_ERR(exp))
165                         GOTO(err_client, rc = PTR_ERR(exp));
166
167                 med = &exp->exp_mdt_data;
168                 med->med_mcd = mcd;
169                 rc = mdt_client_add(ctx, mdt, med, cl_idx);
170                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
171
172                 exp->exp_replay_needed = 1;
173                 exp->exp_connecting = 0;
174                 obd->obd_recoverable_clients++;
175                 obd->obd_max_recoverable_clients++;
176                 class_export_put(exp);
177
178                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
179                        cl_idx, last_transno);
180
181                 spin_lock(&mdt->mdt_transno_lock);
182                 if (last_transno > mdt->mdt_last_transno)
183                         mdt->mdt_last_transno = last_transno;
184                 spin_unlock(&mdt->mdt_transno_lock);
185         }
186
187 err_client:
188         OBD_FREE_PTR(mcd);
189
190         RETURN(rc);
191 }
192
193 static int mdt_init_server_data(const struct lu_context *ctx,
194                                 struct mdt_device *mdt)
195 {
196         struct mdt_server_data *msd = &mdt->mdt_msd;
197         struct mdt_client_data *mcd = NULL;
198         struct obd_device      *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
199         struct mdt_thread_info *mti;
200         struct dt_object       *obj;
201         struct lu_attr         *la;
202         unsigned long last_rcvd_size = 0;
203         __u64 mount_count;
204         int rc = 0;
205         ENTRY;
206
207         /* ensure padding in the struct is the correct size */
208         LASSERT(offsetof(struct mdt_server_data, msd_padding) +
209                 sizeof(msd->msd_padding) == LR_SERVER_SIZE);
210         LASSERT(offsetof(struct mdt_client_data, mcd_padding) +
211                 sizeof(mcd->mcd_padding) == LR_CLIENT_SIZE);
212
213         mti = lu_context_key_get(ctx, &mdt_thread_key);
214         LASSERT(mti != NULL);
215         la = &mti->mti_attr.ma_attr;
216
217         obj = mdt->mdt_last_rcvd;
218         obj->do_ops->do_read_lock(ctx, obj);
219         rc = obj->do_ops->do_attr_get(ctx, mdt->mdt_last_rcvd, la);
220         obj->do_ops->do_read_unlock(ctx, obj);
221         if (rc)
222                 RETURN(rc);
223
224         last_rcvd_size = la->la_size;
225
226         if (last_rcvd_size == 0) {
227                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
228
229                 memcpy(msd->msd_uuid, obd->obd_uuid.uuid,
230                        sizeof(msd->msd_uuid));
231                 msd->msd_last_transno = 0;
232                 msd->msd_mount_count = 0;
233                 msd->msd_server_size = cpu_to_le32(LR_SERVER_SIZE);
234                 msd->msd_client_start = cpu_to_le32(LR_CLIENT_START);
235                 msd->msd_client_size = cpu_to_le16(LR_CLIENT_SIZE);
236                 msd->msd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID);
237                 msd->msd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT |
238                                                        OBD_INCOMPAT_COMMON_LR);
239         } else {
240                 rc = mdt_read_last_rcvd_header(ctx, mdt, msd);
241                 if (rc) {
242                         CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
243                         GOTO(out, rc);
244                 }
245                 if (strcmp(msd->msd_uuid, obd->obd_uuid.uuid) != 0) {
246                         LCONSOLE_ERROR("Trying to start OBD %s using the wrong"
247                                        " disk %s. Were the /dev/ assignments "
248                                        "rearranged?\n",
249                                        obd->obd_uuid.uuid, msd->msd_uuid);
250                         GOTO(out, rc = -EINVAL);
251                 }
252         }
253         mount_count = le64_to_cpu(msd->msd_mount_count);
254
255         if (msd->msd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
256                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
257                        obd->obd_name, le32_to_cpu(msd->msd_feature_incompat) &
258                        ~MDT_INCOMPAT_SUPP);
259                 GOTO(out, rc = -EINVAL);
260         }
261         if (msd->msd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) {
262                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
263                        obd->obd_name, le32_to_cpu(msd->msd_feature_rocompat) &
264                        ~MDT_ROCOMPAT_SUPP);
265                 /* Do something like remount filesystem read-only */
266                 GOTO(out, rc = -EINVAL);
267         }
268         if (!(msd->msd_feature_incompat & cpu_to_le32(OBD_INCOMPAT_COMMON_LR))){
269                 CDEBUG(D_WARNING, "using old last_rcvd format\n");
270                 msd->msd_mount_count = msd->msd_last_transno;
271                 msd->msd_last_transno = msd->msd_unused;
272                 /* If we update the last_rcvd, we can never go back to
273                    an old install, so leave this in the old format for now.
274                 msd->msd_feature_incompat |= cpu_to_le32(LR_INCOMPAT_COMMON_LR);
275                 */
276         }
277         msd->msd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
278
279         spin_lock(&mdt->mdt_transno_lock);
280         mdt->mdt_last_transno = le64_to_cpu(msd->msd_last_transno);
281         spin_unlock(&mdt->mdt_transno_lock);
282         CDEBUG(D_INODE, "========BEGIN DUMPING LAST_RCVD========\n");
283         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
284                obd->obd_name, mdt->mdt_last_transno);
285         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
286                obd->obd_name, mount_count + 1);
287         CDEBUG(D_INODE, "%s: server data size: %u\n",
288                obd->obd_name, le32_to_cpu(msd->msd_server_size));
289         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
290                obd->obd_name, le32_to_cpu(msd->msd_client_start));
291         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
292                obd->obd_name, le32_to_cpu(msd->msd_client_size));
293         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
294                obd->obd_name, last_rcvd_size);
295         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
296                last_rcvd_size <= le32_to_cpu(msd->msd_client_start) ? 0 :
297                (last_rcvd_size - le32_to_cpu(msd->msd_client_start)) /
298                 le16_to_cpu(msd->msd_client_size));
299         CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
300
301         if (!msd->msd_server_size || !msd->msd_client_start ||
302             !msd->msd_client_size) {
303                 CERROR("Bad last_rcvd contents!\n");
304                 GOTO(out, rc = -EINVAL);
305         }
306
307         rc = mdt_init_clients_data(ctx, mdt, last_rcvd_size);
308         if (rc)
309                 GOTO(err_client, rc);
310
311         spin_lock(&mdt->mdt_transno_lock);
312         /* obd_last_committed is used for compatibility
313          * with other lustre recovery code */
314         obd->obd_last_committed = mdt->mdt_last_transno;
315         spin_unlock(&mdt->mdt_transno_lock);
316
317         if (obd->obd_recoverable_clients) {
318                 CWARN("RECOVERY: service %s, %d recoverable clients, "
319                       "last_transno "LPU64"\n", obd->obd_name,
320                       obd->obd_recoverable_clients, mdt->mdt_last_transno);
321                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
322                 obd->obd_recovering = 1;
323                 obd->obd_recovery_start = CURRENT_SECONDS;
324                 /* Only used for lprocfs_status */
325                 obd->obd_recovery_end = obd->obd_recovery_start +
326                         OBD_RECOVERY_TIMEOUT;
327         }
328
329         mdt->mdt_mount_count++;
330         msd->msd_mount_count = cpu_to_le64(mdt->mdt_mount_count);
331
332         /* save it, so mount count and last_transno is current */
333         rc = mdt_update_server_data(ctx, mdt);
334         if (rc)
335                 GOTO(err_client, rc);
336
337         RETURN(0);
338
339 err_client:
340         class_disconnect_exports(obd);
341 out:
342         return rc;
343 }
344
345 static int mdt_update_server_data(const struct lu_context *ctx,
346                                   struct mdt_device *mdt)
347 {
348         struct mdt_server_data *msd = &mdt->mdt_msd;
349         int rc = 0;
350         ENTRY;
351
352         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
353                 mdt->mdt_mount_count, mdt->mdt_last_transno);
354
355         spin_lock(&mdt->mdt_transno_lock);
356         msd->msd_last_transno = cpu_to_le64(mdt->mdt_last_transno);
357         spin_unlock(&mdt->mdt_transno_lock);
358
359         rc = mdt_write_last_rcvd_header(ctx, mdt, msd, NULL);
360         RETURN(rc);
361 }
362
363 /* Add client data to the MDS.  We use a bitmap to locate a free space
364  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
365  * Otherwise, we just have to read the data from the last_rcvd file and
366  * we know its offset.
367  *
368  * It should not be possible to fail adding an existing client - otherwise
369  * mdt_init_server_data() callsite needs to be fixed.
370  */
371 int mdt_client_add(const struct lu_context *ctx,
372                    struct mdt_device *mdt,
373                    struct mdt_export_data *med, int cl_idx)
374 {
375         unsigned long *bitmap = mdt->mdt_client_bitmap;
376         struct mdt_client_data *mcd = med->med_mcd;
377         struct mdt_server_data *msd = &mdt->mdt_msd;
378         int new_client = (cl_idx == -1);
379         int rc = 0;
380         ENTRY;
381
382         LASSERT(bitmap != NULL);
383         LASSERTF(cl_idx > -2, "%d\n", cl_idx);
384
385         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
386          * there's no need for extra complication here
387          */
388         if (new_client) {
389                 cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
390         repeat:
391                 if (cl_idx >= LR_MAX_CLIENTS ||
392                     MDT_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
393                         CERROR("no room for clients - fix LR_MAX_CLIENTS\n");
394                         return -EOVERFLOW;
395                 }
396                 if (test_and_set_bit(cl_idx, bitmap)) {
397                         cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
398                                                     cl_idx);
399                         goto repeat;
400                 }
401         } else {
402                 if (test_and_set_bit(cl_idx, bitmap)) {
403                         CERROR("MDS client %d: bit already set in bitmap!!\n",
404                                cl_idx);
405                         LBUG();
406                 }
407         }
408
409         CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
410                cl_idx, med->med_mcd->mcd_uuid);
411
412         med->med_lr_idx = cl_idx;
413         med->med_lr_off = le32_to_cpu(msd->msd_client_start) +
414                           (cl_idx * le16_to_cpu(msd->msd_client_size));
415         init_mutex(&med->med_mcd_lock);
416
417         LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
418
419         if (new_client) {
420                 rc = mdt_write_last_rcvd(ctx, mdt, mcd,
421                                          &med->med_lr_off, NULL);
422                 CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
423                        cl_idx, med->med_lr_off, sizeof(*mcd));
424         }
425         RETURN(rc);
426 }
427
428 int mdt_client_free(const struct lu_context *ctx,
429                     struct mdt_device *mdt,
430                     struct mdt_export_data *med)
431 {
432         struct mdt_client_data *mcd  = med->med_mcd;
433         int rc = 0;
434         loff_t off;
435         ENTRY;
436
437         if (!mcd)
438                 RETURN(0);
439
440         CDEBUG(D_INFO, "freeing client at idx %u, offset %lld\n",
441                med->med_lr_idx, med->med_lr_off);
442
443         off = med->med_lr_off;
444
445         /* Don't clear med_lr_idx here as it is likely also unset.  At worst
446          * we leak a client slot that will be cleaned on the next recovery. */
447         if (off <= 0) {
448                 CERROR("client idx %d has offset %lld\n",
449                         med->med_lr_idx, off);
450                 GOTO(free, rc = -EINVAL);
451         }
452
453         /* Clear the bit _after_ zeroing out the client so we don't
454            race with mdt_client_add and zero out new clients.*/
455         if (!test_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
456                 CERROR("MDT client %u: bit already clear in bitmap!!\n",
457                        med->med_lr_idx);
458                 LBUG();
459         }
460
461         mutex_down(&med->med_mcd_lock);
462         memset(mcd, 0, sizeof *mcd);
463         rc = mdt_write_last_rcvd(ctx, mdt, mcd, &off, NULL);
464         mutex_up(&med->med_mcd_lock);
465
466         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
467                "zeroing out client idx %u in %s rc %d\n",
468                med->med_lr_idx, LAST_RCVD, rc);
469
470         if (!test_and_clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
471                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
472                        med->med_lr_idx);
473                 LBUG();
474         }
475
476         /* Make sure the server's last_transno is up to date. Do this
477          * after the client is freed so we know all the client's
478          * transactions have been committed. */
479         mdt_update_server_data(ctx, mdt);
480
481         EXIT;
482 free:
483         OBD_FREE_PTR(mcd);
484         med->med_mcd = NULL;
485         return 0;
486 }
487
488 /*
489  * last_rcvd & last_committed update callbacks
490  */
491 static int mdt_update_last_rcvd(struct mdt_thread_info *mti,
492                                 struct dt_device *dt,
493                                 struct thandle *th)
494 {
495         struct mdt_device *mdt = mti->mti_mdt;
496         struct ptlrpc_request *req = mdt_info_req(mti);
497         struct mdt_export_data *med;
498         struct mdt_client_data *mcd;
499         loff_t off;
500         int err;
501         __s32 rc = th->th_result;
502
503         ENTRY;
504         LASSERT(req);
505         LASSERT(req->rq_export);
506         LASSERT(mdt);
507         med = &req->rq_export->exp_mdt_data;
508         LASSERT(med);
509         mcd = med->med_mcd;
510         /* if the export has already been failed, we have no last_rcvd slot */
511         if (req->rq_export->exp_failed) {
512                 CWARN("commit transaction for disconnected client %s: rc %d\n",
513                       req->rq_export->exp_client_uuid.uuid, rc);
514                 if (rc == 0)
515                         rc = -ENOTCONN;
516                 RETURN(rc);
517         }
518
519         off = med->med_lr_off;
520         mutex_down(&med->med_mcd_lock);
521         if(lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
522                 mcd->mcd_last_close_transno = cpu_to_le64(mti->mti_transno);
523                 mcd->mcd_last_close_xid = cpu_to_le64(req->rq_xid);
524                 mcd->mcd_last_close_result = cpu_to_le32(rc);
525         } else {
526                 mcd->mcd_last_transno = cpu_to_le64(mti->mti_transno);
527                 mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
528                 mcd->mcd_last_result = cpu_to_le32(rc);
529                 /*XXX: save intent_disposition in mdt_thread_info?
530                  * also there is bug - intent_dispostion is __u64,
531                  * see struct ldlm_reply->lock_policy_res1; */
532                  mcd->mcd_last_data = cpu_to_le32(mti->mti_opdata);
533         }
534         if (off <= 0) {
535                 CERROR("client idx %d has offset %lld\n", med->med_lr_idx, off);
536                 err = -EINVAL;
537         } else {
538                 err = mdt_write_last_rcvd(mti->mti_ctxt, mdt, mcd, &off, th);
539         }
540         mutex_up(&med->med_mcd_lock);
541         RETURN(err);
542 }
543
544 extern struct lu_context_key mdt_txn_key;
545 extern struct lu_context_key mdt_thread_key;
546
547 enum {
548         MDT_TXN_LAST_RCVD_CREDITS = 3
549 };
550
551 /* add credits for last_rcvd update */
552 static int mdt_txn_start_cb(const struct lu_context *ctx,
553                             struct txn_param *param, void *cookie)
554 {
555         param->tp_credits += MDT_TXN_LAST_RCVD_CREDITS;
556         return 0;
557 }
558
559 /* Update last_rcvd records with latests transaction data */
560 static int mdt_txn_stop_cb(const struct lu_context *ctx,
561                            struct thandle *txn, void *cookie)
562 {
563         struct mdt_device *mdt = cookie;
564         struct mdt_txn_info *txi;
565         struct mdt_thread_info *mti;
566
567         /* transno in two contexts - for commit_cb and for thread */
568         txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
569         mti = lu_context_key_get(ctx, &mdt_thread_key);
570
571         /*TODO: checks for recovery cases, see mds_finish_transno */
572         spin_lock(&mdt->mdt_transno_lock);
573         if (txn->th_result != 0) {
574                 if (mti->mti_transno != 0) {
575                         CERROR("Replay transno "LPU64" failed: rc %i\n",
576                                mti->mti_transno, txn->th_result);
577                         mti->mti_transno = 0;
578                 }
579         } else if (mti->mti_transno == 0) {
580                 mti->mti_transno = ++ mdt->mdt_last_transno;
581         } else {
582                 /* replay */
583                 if (mti->mti_transno > mdt->mdt_last_transno)
584                         mdt->mdt_last_transno = mti->mti_transno;
585         }
586         /* save transno for the commit callback */
587         txi->txi_transno = mti->mti_transno;
588         spin_unlock(&mdt->mdt_transno_lock);
589
590         return 0;//mdt_update_last_rcvd(mti, dev, txn);
591 }
592
593 /* commit callback, need to update last_commited value */
594 static int mdt_txn_commit_cb(const struct lu_context *ctxt, 
595                              struct thandle *txn, void *cookie)
596 {
597         struct mdt_device *mdt = cookie;
598         struct obd_device *obd = md2lu_dev(&mdt->mdt_md_dev)->ld_obd;
599         struct mdt_txn_info *txi;
600
601         txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
602
603         /* copy of obd_transno_commit_cb() but with locking */
604         spin_lock(&mdt->mdt_transno_lock);
605         if (txi->txi_transno > obd->obd_last_committed) {
606                 obd->obd_last_committed = txi->txi_transno;
607                 spin_unlock(&mdt->mdt_transno_lock);
608                 ptlrpc_commit_replies(obd);
609         } else
610                 spin_unlock(&mdt->mdt_transno_lock);
611
612         CDEBUG(D_HA, "%s: transno "LPD64" committed\n",
613                obd->obd_name, txi->txi_transno);
614
615         return 0;
616 }
617
618 int mdt_fs_setup(const struct lu_context *ctx, struct mdt_device *mdt)
619 {
620         struct lu_fid last_fid;
621         struct dt_object *last;
622         int rc = 0;
623         ENTRY;
624
625         /* prepare transactions callbacks */
626         mdt->mdt_txn_cb.dtc_txn_start = mdt_txn_start_cb;
627         mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
628         mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb;
629         mdt->mdt_txn_cb.dtc_cookie = mdt;
630
631         dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
632
633         last = dt_store_open(ctx, mdt->mdt_bottom,
634                              LAST_RCVD, &last_fid);
635         if(!IS_ERR(last)) {
636                 mdt->mdt_last_rcvd = last;
637                 rc = mdt_init_server_data(ctx, mdt);
638                 if (rc) {
639                         lu_object_put(ctx, &last->do_lu);
640                         mdt->mdt_last_rcvd = NULL;
641                 }
642         } else {
643                 rc = PTR_ERR(last);
644                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
645         }
646
647         RETURN (rc);
648 }
649
650
651 void mdt_fs_cleanup(const struct lu_context *ctx, struct mdt_device *mdt)
652 {
653         struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
654
655         /* remove transaction callback */
656         dt_txn_callback_del(mdt->mdt_bottom, &mdt->mdt_txn_cb);
657
658         class_disconnect_exports(obd); /* cleans up client info too */
659
660         if (mdt->mdt_last_rcvd)
661                 lu_object_put(ctx, &mdt->mdt_last_rcvd->do_lu);
662         mdt->mdt_last_rcvd = NULL;
663 }
664
665 /* reconstruction code */
666 void mdt_req_from_mcd(struct ptlrpc_request *req,
667                       struct mdt_client_data *mcd)
668 {
669         DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
670                   mcd->mcd_last_transno, mcd->mcd_last_result);
671         req->rq_transno = mcd->mcd_last_transno;
672         req->rq_status = mcd->mcd_last_result;
673         lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
674         lustre_msg_set_status(req->rq_repmsg, req->rq_status);
675         //mds_steal_ack_locks(req);
676 }
677
678 static void mdt_reconstruct_generic(struct mdt_thread_info *mti)
679 {
680         struct ptlrpc_request *req = mdt_info_req(mti);
681         struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
682
683         return mdt_req_from_mcd(req, med->med_mcd);
684 }
685
686 static void mdt_reconstruct_create(struct mdt_thread_info *mti)
687 {
688         struct ptlrpc_request  *req = mdt_info_req(mti);
689         struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
690         struct mdt_device *mdt = mti->mti_mdt;
691         struct mdt_object *child;
692         struct mdt_body *body;
693         int rc;
694
695         mdt_req_from_mcd(req, med->med_mcd);
696         if (req->rq_status)
697                 return;
698
699         /* if no error, so child was created with requested fid */
700         child = mdt_object_find(mti->mti_ctxt, mdt, mti->mti_rr.rr_fid2);
701         LASSERT(!IS_ERR(child));
702
703         body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
704         rc = mo_attr_get(mti->mti_ctxt, mdt_object_child(child),
705                          &mti->mti_attr);
706         if (rc == -EREMOTE) {
707                 /* object was created on remote server */
708                 req->rq_status = rc;
709                 body->valid |= OBD_MD_MDS;
710         }
711         mdt_pack_attr2body(body, &mti->mti_attr.ma_attr,
712                            mti->mti_rr.rr_fid2);
713         mdt_object_put(mti->mti_ctxt, child);
714 }
715
716 static void mdt_reconstruct_setattr(struct mdt_thread_info *mti)
717 {
718         struct ptlrpc_request  *req = mdt_info_req(mti);
719         struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
720         struct mdt_device *mdt = mti->mti_mdt;
721         struct mdt_object *obj;
722         struct mdt_body *body;
723
724         mdt_req_from_mcd(req, med->med_mcd);
725         if (req->rq_status)
726                 return;
727
728         body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
729         obj = mdt_object_find(mti->mti_ctxt, mdt, mti->mti_rr.rr_fid1);
730         LASSERT(!IS_ERR(obj));
731         mo_attr_get(mti->mti_ctxt, mdt_object_child(obj), &mti->mti_attr);
732         mdt_pack_attr2body(body, &mti->mti_attr.ma_attr,
733                            mti->mti_rr.rr_fid1);
734
735         /* Don't return OST-specific attributes if we didn't just set them */
736 /*
737         if (rec->ur_iattr.ia_valid & ATTR_SIZE)
738                 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
739         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
740                 body->valid |= OBD_MD_FLMTIME;
741         if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
742                 body->valid |= OBD_MD_FLATIME;
743 */
744         mdt_object_put(mti->mti_ctxt, obj);
745 }
746
747 typedef void (*mdt_reconstructor)(struct mdt_thread_info *mti);
748
749 static mdt_reconstructor reconstructors[REINT_MAX] = {
750         [REINT_SETATTR]  = mdt_reconstruct_setattr,
751         [REINT_CREATE] = mdt_reconstruct_create,
752         [REINT_LINK] = mdt_reconstruct_generic,
753         [REINT_UNLINK] = mdt_reconstruct_generic,
754         [REINT_RENAME] = mdt_reconstruct_generic,
755         [REINT_OPEN] = mdt_reconstruct_open
756 };
757
758 void mdt_reconstruct(struct mdt_thread_info *mti)
759 {
760         ENTRY;
761         reconstructors[mti->mti_rr.rr_opcode](mti);
762         EXIT;
763 }
764