Whamcloud - gitweb
- fixed really nasty bug with recovery. Sometimes server sent _two_ replies for one...
[fs/lustre-release.git] / lustre / mdt / mdt_recovery.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_recovery.c
5  *  Lustre Metadata Target (mdt) recovery-related methods
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Huang Hua <huanghua@clusterfs.com>
9  *   Author: Pershin Mike <tappro@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include "mdt_internal.h"
35
36 static int mdt_server_data_update(const struct lu_context *ctx,
37                                   struct mdt_device *mdt);
38
39 /* TODO: maybe this pair should be defined in dt_object.c */
40 static int mdt_record_read(const struct lu_context *ctx,
41                            struct dt_object *dt, void *buf,
42                            size_t count, loff_t *pos)
43 {
44         int rc;
45
46         LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
47
48         rc = dt->do_body_ops->dbo_read(ctx, dt, buf, count, pos);
49
50         if (rc == count)
51                 rc = 0;
52         else if (rc >= 0)
53                 rc = -EFAULT;
54         return rc;
55 }
56
57 static int mdt_record_write(const struct lu_context *ctx,
58                             struct dt_object *dt, const void *buf,
59                             size_t count, loff_t *pos, struct thandle *th)
60 {
61         int rc;
62
63         LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
64         LASSERT(th != NULL);
65         rc = dt->do_body_ops->dbo_write(ctx, dt, buf, count, pos, th);
66         if (rc == count)
67                 rc = 0;
68         else if (rc >= 0)
69                 rc = -EFAULT;
70         return rc;
71 }
72 /* only one record write */
73
74 enum {
75         MDT_TXN_LAST_RCVD_WRITE_CREDITS = 3
76 };
77
78 static struct thandle* mdt_trans_start(const struct lu_context *ctx,
79                                        struct mdt_device *mdt, int credits)
80 {
81         struct mdt_thread_info *mti;
82         struct txn_param *p;
83
84         mti = lu_context_key_get(ctx, &mdt_thread_key);
85         p = &mti->mti_txn_param;
86         p->tp_credits = credits;
87         return mdt->mdt_bottom->dd_ops->dt_trans_start(ctx, mdt->mdt_bottom, p);
88 }
89
90 static void mdt_trans_stop(const struct lu_context *ctx,
91                            struct mdt_device *mdt, struct thandle *th)
92 {
93         mdt->mdt_bottom->dd_ops->dt_trans_stop(ctx, th);
94 }
95
96 /* last_rcvd handling */
97 static inline void msd_le_to_cpu(struct mdt_server_data *buf,
98                                  struct mdt_server_data *msd)
99 {
100         memcpy(msd->msd_uuid, buf->msd_uuid, sizeof (msd->msd_uuid));
101         msd->msd_last_transno     = le64_to_cpu(buf->msd_last_transno);
102         msd->msd_mount_count      = le64_to_cpu(buf->msd_mount_count);
103         msd->msd_feature_compat   = le32_to_cpu(buf->msd_feature_compat);
104         msd->msd_feature_rocompat = le32_to_cpu(buf->msd_feature_rocompat);
105         msd->msd_feature_incompat = le32_to_cpu(buf->msd_feature_incompat);
106         msd->msd_server_size      = le32_to_cpu(buf->msd_server_size);
107         msd->msd_client_start     = le32_to_cpu(buf->msd_client_start);
108         msd->msd_client_size      = le16_to_cpu(buf->msd_client_size);
109 }
110
111 static inline void msd_cpu_to_le(struct mdt_server_data *msd,
112                                  struct mdt_server_data *buf)
113 {
114         memcpy(buf->msd_uuid, msd->msd_uuid, sizeof (msd->msd_uuid));
115         buf->msd_last_transno     = cpu_to_le64(msd->msd_last_transno);
116         buf->msd_mount_count      = cpu_to_le64(msd->msd_mount_count);
117         buf->msd_feature_compat   = cpu_to_le32(msd->msd_feature_compat);
118         buf->msd_feature_rocompat = cpu_to_le32(msd->msd_feature_rocompat);
119         buf->msd_feature_incompat = cpu_to_le32(msd->msd_feature_incompat);
120         buf->msd_server_size      = cpu_to_le32(msd->msd_server_size);
121         buf->msd_client_start     = cpu_to_le32(msd->msd_client_start);
122         buf->msd_client_size      = cpu_to_le16(msd->msd_client_size);
123 }
124
125 static inline void mcd_le_to_cpu(struct mdt_client_data *buf,
126                                  struct mdt_client_data *mcd)
127 {
128         memcpy(mcd->mcd_uuid, buf->mcd_uuid, sizeof (mcd->mcd_uuid));
129         mcd->mcd_last_transno       = le64_to_cpu(buf->mcd_last_transno);
130         mcd->mcd_last_xid           = le64_to_cpu(buf->mcd_last_xid);
131         mcd->mcd_last_result        = le32_to_cpu(buf->mcd_last_result);
132         mcd->mcd_last_data          = le32_to_cpu(buf->mcd_last_data);
133         mcd->mcd_last_close_transno = le64_to_cpu(buf->mcd_last_close_transno);
134         mcd->mcd_last_close_xid     = le64_to_cpu(buf->mcd_last_close_xid);
135         mcd->mcd_last_close_result  = le32_to_cpu(buf->mcd_last_close_result);
136 }
137
138 static inline void mcd_cpu_to_le(struct mdt_client_data *mcd,
139                                  struct mdt_client_data *buf)
140 {
141         memcpy(buf->mcd_uuid, mcd->mcd_uuid, sizeof (mcd->mcd_uuid));
142         buf->mcd_last_transno       = cpu_to_le64(mcd->mcd_last_transno);
143         buf->mcd_last_xid           = cpu_to_le64(mcd->mcd_last_xid);
144         buf->mcd_last_result        = cpu_to_le32(mcd->mcd_last_result);
145         buf->mcd_last_data          = cpu_to_le32(mcd->mcd_last_data);
146         buf->mcd_last_close_transno = cpu_to_le64(mcd->mcd_last_close_transno);
147         buf->mcd_last_close_xid     = cpu_to_le64(mcd->mcd_last_close_xid);
148         buf->mcd_last_close_result  = cpu_to_le32(mcd->mcd_last_close_result);
149 }
150
151 static int mdt_last_rcvd_header_read(const struct lu_context *ctx,
152                                      struct mdt_device *mdt,
153                                      struct mdt_server_data *msd)
154 {
155         struct mdt_thread_info *mti;
156         struct mdt_server_data *tmp;
157         loff_t *off;
158         int rc;
159
160         mti = lu_context_key_get(ctx, &mdt_thread_key);
161         /* temporary stuff for read */
162         tmp = &mti->mti_msd;
163         off = &mti->mti_off;
164         *off = 0;
165         rc = mdt_record_read(ctx, mdt->mdt_last_rcvd, 
166                              tmp, sizeof(*tmp), off);
167         if (rc == 0)
168                 msd_le_to_cpu(tmp, msd);
169
170         CDEBUG(D_INFO, "read last_rcvd header rc = %d:\n"
171                        "uuid = %s\n"
172                        "last_transno = "LPU64"\n",
173                         rc,
174                         msd->msd_uuid,
175                         msd->msd_last_transno);
176         return rc;
177 }
178
179 static int mdt_last_rcvd_header_write(const struct lu_context *ctx,
180                                       struct mdt_device *mdt,
181                                       struct mdt_server_data *msd)
182 {
183         struct mdt_thread_info *mti;
184         struct mdt_server_data *tmp;
185         struct thandle *th;
186         loff_t *off;
187         int rc;
188
189         mti = lu_context_key_get(ctx, &mdt_thread_key);
190
191         th = mdt_trans_start(ctx, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
192         if (IS_ERR(th))
193                 RETURN(PTR_ERR(th));
194
195         /* temporary stuff for read */
196         tmp = &mti->mti_msd;
197         off = &mti->mti_off;
198         *off = 0;
199         
200         msd_cpu_to_le(msd, tmp);
201
202         rc = mdt_record_write(ctx, mdt->mdt_last_rcvd, 
203                               tmp, sizeof(*tmp), off, th);
204
205         mdt_trans_stop(ctx, mdt, th);
206
207         CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
208                        "uuid = %s\n"
209                        "last_transno = "LPU64"\n",
210                         rc,
211                         msd->msd_uuid,
212                         msd->msd_last_transno);
213         return rc;
214 }
215
216 static int mdt_last_rcvd_read(const struct lu_context *ctx,
217                               struct mdt_device *mdt,
218                               struct mdt_client_data *mcd, loff_t *off)
219 {
220         struct mdt_thread_info *mti;
221         struct mdt_client_data *tmp;
222         int rc;
223
224         mti = lu_context_key_get(ctx, &mdt_thread_key);
225         tmp = &mti->mti_mcd;
226         rc = mdt_record_read(ctx, mdt->mdt_last_rcvd, tmp, sizeof(*tmp), off);
227         if (rc == 0)
228                 mcd_le_to_cpu(tmp, mcd);
229
230         CDEBUG(D_INFO, "read mcd @%d rc = %d:\n"
231                        "uuid = %s\n"
232                        "last_transno = "LPU64"\n"
233                        "last_xid = "LPU64"\n"
234                        "last_result = %d\n"
235                        "last_data = %d\n"
236                        "last_close_transno = "LPU64"\n"
237                        "last_close_xid = "LPU64"\n"
238                        "last_close_result = %d\n",
239                         (int)*off - sizeof(*tmp),
240                         rc,
241                         mcd->mcd_uuid,
242                         mcd->mcd_last_transno,
243                         mcd->mcd_last_xid,
244                         mcd->mcd_last_result,
245                         mcd->mcd_last_data,
246                         mcd->mcd_last_close_transno,
247                         mcd->mcd_last_close_xid,
248                         mcd->mcd_last_close_result);
249
250         return rc;
251 }
252
253 static int mdt_last_rcvd_write(const struct lu_context *ctx,
254                                struct mdt_device *mdt,
255                                struct mdt_client_data *mcd,
256                                loff_t *off, struct thandle *th)
257 {
258         struct mdt_thread_info *mti;
259         struct mdt_client_data *tmp;
260         int rc;
261
262         LASSERT(th != NULL);
263         mti = lu_context_key_get(ctx, &mdt_thread_key);
264         tmp = &mti->mti_mcd;
265
266         mcd_cpu_to_le(mcd, tmp);
267
268         rc = mdt_record_write(ctx, mdt->mdt_last_rcvd,
269                               tmp, sizeof(*tmp), off, th);
270
271         CDEBUG(D_INFO, "write mcd @%d rc = %d:\n"
272                        "uuid = %s\n"
273                        "last_transno = "LPU64"\n"
274                        "last_xid = "LPU64"\n"
275                        "last_result = %d\n"
276                        "last_data = %d\n"
277                        "last_close_transno = "LPU64"\n"
278                        "last_close_xid = "LPU64"\n"
279                        "last_close_result = %d\n",
280                         (int)*off - sizeof(*tmp),
281                         rc,
282                         mcd->mcd_uuid,
283                         mcd->mcd_last_transno,
284                         mcd->mcd_last_xid,
285                         mcd->mcd_last_result,
286                         mcd->mcd_last_data,
287                         mcd->mcd_last_close_transno,
288                         mcd->mcd_last_close_xid,
289                         mcd->mcd_last_close_result);
290         return rc;
291 }
292
293
294 static int mdt_clients_data_init(const struct lu_context *ctx,
295                                  struct mdt_device *mdt,
296                                  unsigned long last_size)
297 {
298         struct mdt_server_data *msd = &mdt->mdt_msd;
299         struct mdt_client_data *mcd = NULL;
300         struct obd_device      *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
301         loff_t off;
302         int cl_idx;
303         int rc = 0;
304         ENTRY;
305
306         /* When we do a clean MDS shutdown, we save the last_transno into
307          * the header.  If we find clients with higher last_transno values
308          * then those clients may need recovery done. */
309
310         for (cl_idx = 0, off = msd->msd_client_start;
311              off < last_size; cl_idx++) {
312                 __u64 last_transno;
313                 struct obd_export *exp;
314                 struct mdt_export_data *med;
315                 
316                 if (!mcd) {
317                         OBD_ALLOC_PTR(mcd);
318                         if (!mcd)
319                                 RETURN(-ENOMEM);
320                 }
321
322                 off = msd->msd_client_start +
323                         cl_idx * msd->msd_client_size;
324
325                 rc = mdt_last_rcvd_read(ctx, mdt, mcd, &off);
326                 if (rc) {
327                         CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
328                                LAST_RCVD, cl_idx, off, rc);
329                         rc = 0;
330                         break; /* read error shouldn't cause startup to fail */
331                 }
332
333                 if (mcd->mcd_uuid[0] == '\0') {
334                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
335                                cl_idx);
336                         continue;
337                 }
338
339                 last_transno = mcd_last_transno(mcd);
340
341                 /* These exports are cleaned up by mdt_obd_disconnect(), so
342                  * they need to be set up like real exports as
343                  * mdt_obd_connect() does.
344                  */
345                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
346                        " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx,
347                        last_transno, msd->msd_last_transno,
348                        mcd_last_xid(mcd));
349
350                 exp = class_new_export(obd, (struct obd_uuid *)mcd->mcd_uuid);
351                 if (IS_ERR(exp)) {
352                         rc = 0;
353                         continue;
354                         /* FIXME: Do we really want to return error? */
355                 }
356
357                 med = &exp->exp_mdt_data;
358                 med->med_mcd = mcd;
359                 rc = mdt_client_add(ctx, mdt, med, cl_idx);
360                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
361                 mcd = NULL;
362                 exp->exp_replay_needed = 1;
363                 exp->exp_connecting = 0;
364                 obd->obd_recoverable_clients++;
365                 obd->obd_max_recoverable_clients++;
366                 class_export_put(exp);
367
368                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
369                        cl_idx, last_transno);
370                 /* protect __u64 value update */
371                 spin_lock(&mdt->mdt_transno_lock);
372                 mdt->mdt_last_transno = max(last_transno,
373                                             mdt->mdt_last_transno);
374                 spin_unlock(&mdt->mdt_transno_lock);
375         }
376
377         if (mcd)
378                 OBD_FREE_PTR(mcd);
379         RETURN(rc);
380 }
381
382 static int mdt_server_data_init(const struct lu_context *ctx,
383                                 struct mdt_device *mdt)
384 {
385         struct mdt_server_data *msd = &mdt->mdt_msd;
386         struct mdt_client_data *mcd = NULL;
387         struct obd_device      *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
388         struct mdt_thread_info *mti;
389         struct dt_object       *obj;
390         struct lu_attr         *la;
391         unsigned long last_rcvd_size;
392         __u64 mount_count;
393         int rc;
394         ENTRY;
395
396         /* ensure padding in the struct is the correct size */
397         CLASSERT(offsetof(struct mdt_server_data, msd_padding) +
398                 sizeof(msd->msd_padding) == LR_SERVER_SIZE);
399         CLASSERT(offsetof(struct mdt_client_data, mcd_padding) +
400                 sizeof(mcd->mcd_padding) == LR_CLIENT_SIZE);
401
402         mti = lu_context_key_get(ctx, &mdt_thread_key);
403         LASSERT(mti != NULL);
404         la = &mti->mti_attr.ma_attr;
405
406         obj = mdt->mdt_last_rcvd;
407         obj->do_ops->do_read_lock(ctx, obj);
408         rc = obj->do_ops->do_attr_get(ctx, mdt->mdt_last_rcvd, la);
409         obj->do_ops->do_read_unlock(ctx, obj);
410         if (rc)
411                 RETURN(rc);
412
413         last_rcvd_size = (unsigned long)la->la_size;
414         
415         if (last_rcvd_size == 0) {
416                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
417
418                 memcpy(msd->msd_uuid, obd->obd_uuid.uuid,
419                        sizeof(msd->msd_uuid));
420                 msd->msd_last_transno = 0;
421                 msd->msd_mount_count = 0;
422                 msd->msd_server_size = LR_SERVER_SIZE;
423                 msd->msd_client_start = LR_CLIENT_START;
424                 msd->msd_client_size = LR_CLIENT_SIZE;
425                 msd->msd_feature_rocompat = OBD_ROCOMPAT_LOVOBJID;
426                 msd->msd_feature_incompat = OBD_INCOMPAT_MDT |
427                                                        OBD_INCOMPAT_COMMON_LR;
428         } else {
429                 LCONSOLE_WARN("%s: used disk, loading\n", obd->obd_name);
430                 rc = mdt_last_rcvd_header_read(ctx, mdt, msd);
431                 if (rc) {
432                         CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
433                         GOTO(out, rc);
434                 }
435                 if (strcmp(msd->msd_uuid, obd->obd_uuid.uuid) != 0) {
436                         LCONSOLE_ERROR("Trying to start OBD %s using the wrong"
437                                        " disk %s. Were the /dev/ assignments "
438                                        "rearranged?\n",
439                                        obd->obd_uuid.uuid, msd->msd_uuid);
440                         GOTO(out, rc = -EINVAL);
441                 }
442         }
443         mount_count = msd->msd_mount_count;
444 #if 0
445         if (msd->msd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
446                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
447                        obd->obd_name, le32_to_cpu(msd->msd_feature_incompat) &
448                        ~MDT_INCOMPAT_SUPP);
449                 GOTO(out, rc = -EINVAL);
450         }
451         if (msd->msd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) {
452                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
453                        obd->obd_name, le32_to_cpu(msd->msd_feature_rocompat) &
454                        ~MDT_ROCOMPAT_SUPP);
455                 /* Do something like remount filesystem read-only */
456                 GOTO(out, rc = -EINVAL);
457         }
458         if (!(msd->msd_feature_incompat & cpu_to_le32(OBD_INCOMPAT_COMMON_LR))){
459                 CDEBUG(D_WARNING, "using old last_rcvd format\n");
460                 msd->msd_mount_count = msd->msd_last_transno;
461                 msd->msd_last_transno = msd->msd_unused;
462                 /* If we update the last_rcvd, we can never go back to
463                    an old install, so leave this in the old format for now.
464                 msd->msd_feature_incompat |= cpu_to_le32(LR_INCOMPAT_COMMON_LR);
465                 */
466         }
467 #endif
468         msd->msd_feature_compat = OBD_COMPAT_MDT;
469
470         spin_lock(&mdt->mdt_transno_lock);
471         mdt->mdt_last_transno = msd->msd_last_transno;
472         spin_unlock(&mdt->mdt_transno_lock);
473
474         CDEBUG(D_INODE, "========BEGIN DUMPING LAST_RCVD========\n");
475         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
476                obd->obd_name, mdt->mdt_last_transno);
477         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
478                obd->obd_name, mount_count + 1);
479         CDEBUG(D_INODE, "%s: server data size: %u\n",
480                obd->obd_name, msd->msd_server_size);
481         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
482                obd->obd_name, msd->msd_client_start);
483         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
484                obd->obd_name, msd->msd_client_size);
485         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
486                obd->obd_name, last_rcvd_size);
487         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
488                last_rcvd_size <= msd->msd_client_start ? 0 :
489                (last_rcvd_size - msd->msd_client_start) /
490                 msd->msd_client_size);
491         CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
492
493         if (!msd->msd_server_size || !msd->msd_client_start ||
494             !msd->msd_client_size) {
495                 CERROR("Bad last_rcvd contents!\n");
496                 GOTO(out, rc = -EINVAL);
497         }
498
499         rc = mdt_clients_data_init(ctx, mdt, last_rcvd_size);
500         if (rc)
501                 GOTO(err_client, rc);
502
503         spin_lock(&mdt->mdt_transno_lock);
504         /* obd_last_committed is used for compatibility
505          * with other lustre recovery code */
506         obd->obd_last_committed = mdt->mdt_last_transno;
507         spin_unlock(&mdt->mdt_transno_lock);
508
509         if (obd->obd_recoverable_clients) {
510                 CWARN("RECOVERY: service %s, %d recoverable clients, "
511                       "last_transno "LPU64"\n", obd->obd_name,
512                       obd->obd_recoverable_clients, mdt->mdt_last_transno);
513                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
514                 obd->obd_recovering = 1;
515                 obd->obd_recovery_start = CURRENT_SECONDS;
516                 /* Only used for lprocfs_status */
517                 obd->obd_recovery_end = obd->obd_recovery_start +
518                         OBD_RECOVERY_TIMEOUT;
519         }
520
521         mdt->mdt_mount_count++;
522         msd->msd_mount_count = mdt->mdt_mount_count;
523
524         /* save it, so mount count and last_transno is current */
525         rc = mdt_server_data_update(ctx, mdt);
526         if (rc)
527                 GOTO(err_client, rc);
528
529         RETURN(0);
530
531 err_client:
532         class_disconnect_exports(obd);
533 out:
534         return rc;
535 }
536
537 static int mdt_server_data_update(const struct lu_context *ctx,
538                                   struct mdt_device *mdt)
539 {
540         struct mdt_server_data *msd = &mdt->mdt_msd;
541         int rc;
542         ENTRY;
543
544         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
545                 mdt->mdt_mount_count, mdt->mdt_last_transno);
546
547         spin_lock(&mdt->mdt_transno_lock);
548         msd->msd_last_transno = mdt->mdt_last_transno;
549         spin_unlock(&mdt->mdt_transno_lock);
550
551         rc = mdt_last_rcvd_header_write(ctx, mdt, msd);
552         RETURN(rc);
553 }
554
555 int mdt_client_new(const struct lu_context *ctx,
556                    struct mdt_device *mdt,
557                    struct mdt_export_data *med)
558 {
559         unsigned long *bitmap = mdt->mdt_client_bitmap;
560         struct mdt_client_data *mcd = med->med_mcd;
561         struct mdt_server_data *msd = &mdt->mdt_msd;
562         struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
563         struct mdt_thread_info *mti;
564         struct thandle *th;
565         loff_t off;
566         int rc = 0;
567         int cl_idx;
568         ENTRY;
569
570         LASSERT(bitmap != NULL);
571         if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
572                 RETURN(0);
573         mti = lu_context_key_get(ctx, &mdt_thread_key);
574         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
575          * there's no need for extra complication here
576          */
577         spin_lock(&mdt->mdt_client_bitmap_lock);
578         cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
579         if (cl_idx >= LR_MAX_CLIENTS ||
580             MDT_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
581                 CERROR("no room for clients - fix LR_MAX_CLIENTS\n");
582                 spin_unlock(&mdt->mdt_client_bitmap_lock);
583                 RETURN(-EOVERFLOW);
584         }
585         set_bit(cl_idx, bitmap);
586         spin_unlock(&mdt->mdt_client_bitmap_lock);
587
588         CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
589                cl_idx, med->med_mcd->mcd_uuid);
590
591         med->med_lr_idx = cl_idx;
592         med->med_lr_off = msd->msd_client_start +
593                           (cl_idx * msd->msd_client_size);
594         init_mutex(&med->med_mcd_lock);
595
596         LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
597         /* write new client data */
598         off = med->med_lr_off;
599         th = mdt_trans_start(ctx, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
600         if (IS_ERR(th))
601                 RETURN(PTR_ERR(th));
602         
603         rc = mdt_last_rcvd_write(ctx, mdt, mcd, &off, th);
604         CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
605                cl_idx, med->med_lr_off, sizeof(*mcd));
606         mdt_trans_stop(ctx, mdt, th);
607
608         RETURN(rc);
609 }
610
611 /* Add client data to the MDS.  We use a bitmap to locate a free space
612  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
613  * Otherwise, we just have to read the data from the last_rcvd file and
614  * we know its offset.
615  *
616  * It should not be possible to fail adding an existing client - otherwise
617  * mdt_init_server_data() callsite needs to be fixed.
618  */
619 int mdt_client_add(const struct lu_context *ctx,
620                    struct mdt_device *mdt,
621                    struct mdt_export_data *med, int cl_idx)
622 {
623         unsigned long *bitmap = mdt->mdt_client_bitmap;
624         struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
625         struct mdt_server_data *msd = &mdt->mdt_msd;
626         int rc = 0;
627         ENTRY;
628
629         LASSERT(bitmap != NULL);
630         LASSERTF(cl_idx >= 0, "%d\n", cl_idx);
631
632         if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
633                 RETURN(0);
634
635         spin_lock(&mdt->mdt_client_bitmap_lock);
636         if (test_and_set_bit(cl_idx, bitmap)) {
637                 CERROR("MDS client %d: bit already set in bitmap!!\n",
638                        cl_idx);
639                 LBUG();
640         }
641         spin_unlock(&mdt->mdt_client_bitmap_lock);
642
643         CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
644                cl_idx, med->med_mcd->mcd_uuid);
645
646         med->med_lr_idx = cl_idx;
647         med->med_lr_off = msd->msd_client_start +
648                           (cl_idx * msd->msd_client_size);
649         init_mutex(&med->med_mcd_lock);
650
651         LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
652
653         RETURN(rc);
654 }
655
656 int mdt_client_del(const struct lu_context *ctx,
657                    struct mdt_device *mdt,
658                    struct mdt_export_data *med)
659 {
660         struct mdt_client_data *mcd = med->med_mcd;
661         struct obd_device      *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
662         struct thandle *th;
663         loff_t off;
664         int rc = 0;
665         ENTRY;
666
667         if (!mcd)
668                 RETURN(0);
669
670         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
671         if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
672                 GOTO(free, 0);
673
674         CDEBUG(D_INFO, "freeing client at idx %u, offset %lld\n",
675                med->med_lr_idx, med->med_lr_off);
676
677         off = med->med_lr_off;
678
679         /* Don't clear med_lr_idx here as it is likely also unset.  At worst
680          * we leak a client slot that will be cleaned on the next recovery. */
681         if (off <= 0) {
682                 CERROR("client idx %d has offset %lld\n",
683                         med->med_lr_idx, off);
684                 GOTO(free, rc = -EINVAL);
685         }
686
687         /* Clear the bit _after_ zeroing out the client so we don't
688            race with mdt_client_add and zero out new clients.*/
689         if (!test_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
690                 CERROR("MDT client %u: bit already clear in bitmap!!\n",
691                        med->med_lr_idx);
692                 LBUG();
693         }
694
695         th = mdt_trans_start(ctx, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
696         if (IS_ERR(th))
697                 GOTO(free, rc = PTR_ERR(th));
698
699         mutex_down(&med->med_mcd_lock);
700         memset(mcd, 0, sizeof *mcd);
701         
702         rc = mdt_last_rcvd_write(ctx, mdt, mcd, &off, th);
703         mutex_up(&med->med_mcd_lock);
704         mdt_trans_stop(ctx, mdt, th);
705         
706         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
707                         "zeroing out client idx %u in %s rc %d\n",
708                         med->med_lr_idx, LAST_RCVD, rc);
709        
710         spin_lock(&mdt->mdt_client_bitmap_lock);
711         clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap);
712         spin_unlock(&mdt->mdt_client_bitmap_lock);
713         /* Make sure the server's last_transno is up to date. Do this
714          * after the client is freed so we know all the client's
715          * transactions have been committed. */
716         mdt_server_data_update(ctx, mdt);
717
718         EXIT;
719 free:
720         OBD_FREE_PTR(mcd);
721         med->med_mcd = NULL;
722         return 0;
723 }
724
725 /*
726  * last_rcvd & last_committed update callbacks
727  */
728 static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
729                                 struct thandle *th)
730 {
731         struct mdt_device *mdt = mti->mti_mdt;
732         struct ptlrpc_request *req = mdt_info_req(mti);
733         struct mdt_export_data *med;
734         struct mdt_client_data *mcd;
735         loff_t off;
736         int err;
737         __s32 rc = th->th_result;
738
739         ENTRY;
740         LASSERT(req);
741         LASSERT(req->rq_export);
742         LASSERT(mdt);
743         med = &req->rq_export->exp_mdt_data;
744         LASSERT(med);
745         mcd = med->med_mcd;
746         /* if the export has already been failed, we have no last_rcvd slot */
747         if (req->rq_export->exp_failed) {
748                 CWARN("commit transaction for disconnected client %s: rc %d\n",
749                       req->rq_export->exp_client_uuid.uuid, rc);
750                 if (rc == 0)
751                         rc = -ENOTCONN;
752                 RETURN(rc);
753         }
754
755         off = med->med_lr_off;
756         mutex_down(&med->med_mcd_lock);
757         if(lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
758                 mcd->mcd_last_close_transno = mti->mti_transno;
759                 mcd->mcd_last_close_xid = req->rq_xid;
760                 mcd->mcd_last_close_result = rc;
761         } else {
762                 mcd->mcd_last_transno = mti->mti_transno;
763                 mcd->mcd_last_xid = req->rq_xid;
764                 mcd->mcd_last_result = rc;
765                 /*XXX: save intent_disposition in mdt_thread_info?
766                  * also there is bug - intent_dispostion is __u64,
767                  * see struct ldlm_reply->lock_policy_res1; */
768                  mcd->mcd_last_data = mti->mti_opdata;
769         }
770         if (off <= 0) {
771                 CERROR("client idx %d has offset %lld\n", med->med_lr_idx, off);
772                 err = -EINVAL;
773         } else {
774                 err = mdt_last_rcvd_write(mti->mti_ctxt, mdt, mcd, &off, th);
775         }
776         mutex_up(&med->med_mcd_lock);
777         RETURN(err);
778 }
779
780 extern struct lu_context_key mdt_txn_key;
781 extern struct lu_context_key mdt_thread_key;
782
783 /* add credits for last_rcvd update */
784 static int mdt_txn_start_cb(const struct lu_context *ctx,
785                             struct txn_param *param, void *cookie)
786 {
787         param->tp_credits += MDT_TXN_LAST_RCVD_WRITE_CREDITS;
788         return 0;
789 }
790
791 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
792 {
793         return req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid;
794 }
795
796 /* Update last_rcvd records with latests transaction data */
797 static int mdt_txn_stop_cb(const struct lu_context *ctx,
798                            struct thandle *txn, void *cookie)
799 {
800         struct mdt_device *mdt = cookie;
801         struct mdt_txn_info *txi;
802         struct mdt_thread_info *mti;
803         struct ptlrpc_request *req;
804                 
805         /* transno in two contexts - for commit_cb and for thread */
806         txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
807         mti = lu_context_key_get(ctx, &mdt_thread_key);
808         req = mdt_info_req(mti);
809
810         if (mti->mti_mdt == NULL || req == NULL || mti->mti_no_need_trans) {
811                 txi->txi_transno = 0;
812                 return 0;
813         }
814
815         if (mti->mti_has_trans) {
816                 CWARN("More than one transaction "LPU64"\n", mti->mti_transno);
817                 return 0;
818         }
819
820         mti->mti_has_trans = 1;
821         /*TODO: checks for recovery cases, see mds_finish_transno */
822         spin_lock(&mdt->mdt_transno_lock);
823         if (txn->th_result != 0) {
824                 if (mti->mti_transno != 0) {
825                         CERROR("Replay transno "LPU64" failed: rc %i\n",
826                                mti->mti_transno, txn->th_result);
827                         mti->mti_transno = 0;
828                 }
829         } else if (mti->mti_transno == 0) {
830                 mti->mti_transno = ++ mdt->mdt_last_transno;
831         } else {
832                 /* should be replay */
833                 if (mti->mti_transno > mdt->mdt_last_transno)
834                         mdt->mdt_last_transno = mti->mti_transno;
835         }
836
837         /* sometimes the reply message has not been successfully packed */
838         LASSERT(req != NULL && req->rq_repmsg != NULL);
839
840         /* filling reply data */
841         CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
842                mti->mti_transno, req->rq_export->exp_obd->obd_last_committed);
843
844         req->rq_transno = mti->mti_transno;
845         lustre_msg_set_transno(req->rq_repmsg, mti->mti_transno);
846         target_committed_to_req(req);
847         lustre_msg_set_last_xid(req->rq_repmsg, req_exp_last_xid(req));
848         /* save transno for the commit callback */
849         txi->txi_transno = mti->mti_transno;
850         spin_unlock(&mdt->mdt_transno_lock);
851
852         return mdt_last_rcvd_update(mti, txn);
853 }
854
855 /* commit callback, need to update last_commited value */
856 static int mdt_txn_commit_cb(const struct lu_context *ctx, 
857                              struct thandle *txn, void *cookie)
858 {
859         struct mdt_device *mdt = cookie;
860         struct obd_device *obd = md2lu_dev(&mdt->mdt_md_dev)->ld_obd;
861         struct mdt_txn_info *txi;
862
863         txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
864
865         /* copy of obd_transno_commit_cb() but with locking */
866         spin_lock(&mdt->mdt_transno_lock);
867         if (txi->txi_transno > obd->obd_last_committed) {
868                 obd->obd_last_committed = txi->txi_transno;
869                 spin_unlock(&mdt->mdt_transno_lock);
870                 ptlrpc_commit_replies(obd);
871         } else
872                 spin_unlock(&mdt->mdt_transno_lock);
873
874         CDEBUG(D_HA, "%s: transno "LPD64" committed\n",
875                obd->obd_name, txi->txi_transno);
876
877         return 0;
878 }
879
880 int mdt_fs_setup(const struct lu_context *ctx, struct mdt_device *mdt,
881                  struct obd_device *obd)
882 {
883         struct lu_fid last_fid;
884         struct dt_object *last;
885         int rc = 0;
886         ENTRY;
887
888         /* prepare transactions callbacks */
889         mdt->mdt_txn_cb.dtc_txn_start = mdt_txn_start_cb;
890         mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
891         mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb;
892         mdt->mdt_txn_cb.dtc_cookie = mdt;
893
894         dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
895
896         last = dt_store_open(ctx, mdt->mdt_bottom,
897                              LAST_RCVD, &last_fid);
898         if(!IS_ERR(last)) {
899                 mdt->mdt_last_rcvd = last;
900                 rc = mdt_server_data_init(ctx, mdt);
901                 if (rc) {
902                         lu_object_put(ctx, &last->do_lu);
903                         mdt->mdt_last_rcvd = NULL;
904                 }
905         } else {
906                 rc = PTR_ERR(last);
907                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
908         }
909
910         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
911         obd->obd_lvfs_ctxt.pwdmnt = current->fs->pwdmnt;
912         obd->obd_lvfs_ctxt.pwd = current->fs->pwd;
913         obd->obd_lvfs_ctxt.fs = get_ds();
914
915         RETURN (rc);
916 }
917
918
919 void mdt_fs_cleanup(const struct lu_context *ctx, struct mdt_device *mdt)
920 {
921         struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
922
923         /* remove transaction callback */
924         dt_txn_callback_del(mdt->mdt_bottom, &mdt->mdt_txn_cb);
925
926         class_disconnect_exports(obd); /* cleans up client info too */
927         if (mdt->mdt_last_rcvd)
928                 lu_object_put(ctx, &mdt->mdt_last_rcvd->do_lu);
929         mdt->mdt_last_rcvd = NULL;
930 }
931
932 /* reconstruction code */
933 void mdt_req_from_mcd(struct ptlrpc_request *req,
934                       struct mdt_client_data *mcd)
935 {
936         DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
937                   mcd->mcd_last_transno, mcd->mcd_last_result);
938
939         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
940                 req->rq_transno = mcd->mcd_last_close_transno;
941                 req->rq_status = mcd->mcd_last_close_result;
942                 lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
943                 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
944         } else {
945                 req->rq_transno = mcd->mcd_last_transno;
946                 req->rq_status = mcd->mcd_last_result;
947                 lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
948                 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
949         }
950         //mds_steal_ack_locks(req);
951 }
952
953 static void mdt_reconstruct_generic(struct mdt_thread_info *mti,
954                                     struct mdt_lock_handle *lhc)
955 {
956         struct ptlrpc_request *req = mdt_info_req(mti);
957         struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
958
959         return mdt_req_from_mcd(req, med->med_mcd);
960 }
961
962 static void mdt_reconstruct_create(struct mdt_thread_info *mti,
963                                    struct mdt_lock_handle *lhc)
964 {
965         struct ptlrpc_request  *req = mdt_info_req(mti);
966         struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
967         struct mdt_device *mdt = mti->mti_mdt;
968         struct mdt_object *child;
969         struct mdt_body *body;
970         int rc;
971
972         mdt_req_from_mcd(req, med->med_mcd);
973         if (req->rq_status)
974                 return;
975
976         /* if no error, so child was created with requested fid */
977         child = mdt_object_find(mti->mti_ctxt, mdt, mti->mti_rr.rr_fid2);
978         LASSERT(!IS_ERR(child));
979
980         body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
981         rc = mo_attr_get(mti->mti_ctxt, mdt_object_child(child),
982                          &mti->mti_attr, &mti->mti_uc);
983         if (rc == -EREMOTE) {
984                 /* object was created on remote server */
985                 req->rq_status = rc;
986                 body->valid |= OBD_MD_MDS;
987         }
988         mdt_pack_attr2body(body, &mti->mti_attr.ma_attr, mdt_object_fid(child));
989         mdt_body_reverse_idmap(mti, body);
990         mdt_object_put(mti->mti_ctxt, child);
991 }
992
993 static void mdt_reconstruct_setattr(struct mdt_thread_info *mti,
994                                     struct mdt_lock_handle *lhc)
995 {
996         struct ptlrpc_request  *req = mdt_info_req(mti);
997         struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
998         struct mdt_device *mdt = mti->mti_mdt;
999         struct mdt_object *obj;
1000         struct mdt_body *body;
1001
1002         mdt_req_from_mcd(req, med->med_mcd);
1003         if (req->rq_status)
1004                 return;
1005
1006         body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
1007         obj = mdt_object_find(mti->mti_ctxt, mdt, mti->mti_rr.rr_fid1);
1008         LASSERT(!IS_ERR(obj));
1009         mo_attr_get(mti->mti_ctxt, mdt_object_child(obj),
1010                     &mti->mti_attr, &mti->mti_uc);
1011         mdt_pack_attr2body(body, &mti->mti_attr.ma_attr, mdt_object_fid(obj));
1012         mdt_body_reverse_idmap(mti, body);
1013
1014         /* Don't return OST-specific attributes if we didn't just set them */
1015 /*
1016         if (rec->ur_iattr.ia_valid & ATTR_SIZE)
1017                 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
1018         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
1019                 body->valid |= OBD_MD_FLMTIME;
1020         if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
1021                 body->valid |= OBD_MD_FLATIME;
1022 */
1023         mdt_object_put(mti->mti_ctxt, obj);
1024 }
1025
1026 static void mdt_reconstruct_with_shrink(struct mdt_thread_info *mti,
1027                                         struct mdt_lock_handle *lhc)
1028 {
1029         mdt_reconstruct_generic(mti, lhc);
1030         mdt_shrink_reply(mti, REPLY_REC_OFF + 1);
1031 }
1032
1033 typedef void (*mdt_reconstructor)(struct mdt_thread_info *mti,
1034                                   struct mdt_lock_handle *lhc);
1035
1036 static mdt_reconstructor reconstructors[REINT_MAX] = {
1037         [REINT_SETATTR]  = mdt_reconstruct_setattr,
1038         [REINT_CREATE]   = mdt_reconstruct_create,
1039         [REINT_LINK]     = mdt_reconstruct_generic,
1040         [REINT_UNLINK]   = mdt_reconstruct_with_shrink,
1041         [REINT_RENAME]   = mdt_reconstruct_with_shrink,
1042         [REINT_OPEN]     = mdt_reconstruct_open
1043 };
1044
1045 void mdt_reconstruct(struct mdt_thread_info *mti,
1046                      struct mdt_lock_handle *lhc)
1047 {
1048         ENTRY;
1049         reconstructors[mti->mti_rr.rr_opcode](mti, lhc);
1050         EXIT;
1051 }
1052