Whamcloud - gitweb
- make HEAD from b_post_cmd3
[fs/lustre-release.git] / lustre / mdt / mdt_recovery.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_recovery.c
5  *  Lustre Metadata Target (mdt) recovery-related methods
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Huang Hua <huanghua@clusterfs.com>
9  *   Author: Pershin Mike <tappro@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include "mdt_internal.h"
35
36 static int mdt_server_data_update(const struct lu_env *env,
37                                   struct mdt_device *mdt);
38
39 struct lu_buf *mdt_buf(const struct lu_env *env, void *area, ssize_t len)
40 {
41         struct lu_buf *buf;
42         struct mdt_thread_info *mti;
43
44         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
45         buf = &mti->mti_buf;
46         buf->lb_buf = area;
47         buf->lb_len = len;
48         return buf;
49 }
50
51 const struct lu_buf *mdt_buf_const(const struct lu_env *env,
52                                    const void *area, ssize_t len)
53 {
54         struct lu_buf *buf;
55         struct mdt_thread_info *mti;
56
57         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
58         buf = &mti->mti_buf;
59
60         buf->lb_buf = (void *)area;
61         buf->lb_len = len;
62         return buf;
63 }
64
65 int mdt_record_read(const struct lu_env *env,
66                     struct dt_object *dt, struct lu_buf *buf, loff_t *pos)
67 {
68         int rc;
69
70         LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
71
72         rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
73
74         if (rc == buf->lb_len)
75                 rc = 0;
76         else if (rc >= 0)
77                 rc = -EFAULT;
78         return rc;
79 }
80
81 int mdt_record_write(const struct lu_env *env,
82                      struct dt_object *dt, const struct lu_buf *buf,
83                      loff_t *pos, struct thandle *th)
84 {
85         int rc;
86
87         LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
88         LASSERT(th != NULL);
89         rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA);
90         if (rc == buf->lb_len)
91                 rc = 0;
92         else if (rc >= 0)
93                 rc = -EFAULT;
94         return rc;
95 }
96 /* only one record write */
97
98 enum {
99         MDT_TXN_LAST_RCVD_WRITE_CREDITS = 3
100 };
101
102 struct thandle* mdt_trans_start(const struct lu_env *env,
103                                 struct mdt_device *mdt, int credits)
104 {
105         struct mdt_thread_info *mti;
106         struct txn_param *p;
107
108         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
109         p = &mti->mti_txn_param;
110         txn_param_init(p, credits);
111
112         /* export can require sync operations */
113         if (mti->mti_exp != NULL)
114                 p->tp_sync = mti->mti_exp->exp_need_sync;
115
116         return mdt->mdt_bottom->dd_ops->dt_trans_start(env, mdt->mdt_bottom, p);
117 }
118
119 void mdt_trans_stop(const struct lu_env *env,
120                     struct mdt_device *mdt, struct thandle *th)
121 {
122         mdt->mdt_bottom->dd_ops->dt_trans_stop(env, th);
123 }
124
125 /* last_rcvd handling */
126 static inline void msd_le_to_cpu(struct mdt_server_data *buf,
127                                  struct mdt_server_data *msd)
128 {
129         memcpy(msd->msd_uuid, buf->msd_uuid, sizeof (msd->msd_uuid));
130         msd->msd_last_transno     = le64_to_cpu(buf->msd_last_transno);
131         msd->msd_mount_count      = le64_to_cpu(buf->msd_mount_count);
132         msd->msd_feature_compat   = le32_to_cpu(buf->msd_feature_compat);
133         msd->msd_feature_rocompat = le32_to_cpu(buf->msd_feature_rocompat);
134         msd->msd_feature_incompat = le32_to_cpu(buf->msd_feature_incompat);
135         msd->msd_server_size      = le32_to_cpu(buf->msd_server_size);
136         msd->msd_client_start     = le32_to_cpu(buf->msd_client_start);
137         msd->msd_client_size      = le16_to_cpu(buf->msd_client_size);
138 }
139
140 static inline void msd_cpu_to_le(struct mdt_server_data *msd,
141                                  struct mdt_server_data *buf)
142 {
143         memcpy(buf->msd_uuid, msd->msd_uuid, sizeof (msd->msd_uuid));
144         buf->msd_last_transno     = cpu_to_le64(msd->msd_last_transno);
145         buf->msd_mount_count      = cpu_to_le64(msd->msd_mount_count);
146         buf->msd_feature_compat   = cpu_to_le32(msd->msd_feature_compat);
147         buf->msd_feature_rocompat = cpu_to_le32(msd->msd_feature_rocompat);
148         buf->msd_feature_incompat = cpu_to_le32(msd->msd_feature_incompat);
149         buf->msd_server_size      = cpu_to_le32(msd->msd_server_size);
150         buf->msd_client_start     = cpu_to_le32(msd->msd_client_start);
151         buf->msd_client_size      = cpu_to_le16(msd->msd_client_size);
152 }
153
154 static inline void mcd_le_to_cpu(struct mdt_client_data *buf,
155                                  struct mdt_client_data *mcd)
156 {
157         memcpy(mcd->mcd_uuid, buf->mcd_uuid, sizeof (mcd->mcd_uuid));
158         mcd->mcd_last_transno       = le64_to_cpu(buf->mcd_last_transno);
159         mcd->mcd_last_xid           = le64_to_cpu(buf->mcd_last_xid);
160         mcd->mcd_last_result        = le32_to_cpu(buf->mcd_last_result);
161         mcd->mcd_last_data          = le32_to_cpu(buf->mcd_last_data);
162         mcd->mcd_last_close_transno = le64_to_cpu(buf->mcd_last_close_transno);
163         mcd->mcd_last_close_xid     = le64_to_cpu(buf->mcd_last_close_xid);
164         mcd->mcd_last_close_result  = le32_to_cpu(buf->mcd_last_close_result);
165 }
166
167 static inline void mcd_cpu_to_le(struct mdt_client_data *mcd,
168                                  struct mdt_client_data *buf)
169 {
170         memcpy(buf->mcd_uuid, mcd->mcd_uuid, sizeof (mcd->mcd_uuid));
171         buf->mcd_last_transno       = cpu_to_le64(mcd->mcd_last_transno);
172         buf->mcd_last_xid           = cpu_to_le64(mcd->mcd_last_xid);
173         buf->mcd_last_result        = cpu_to_le32(mcd->mcd_last_result);
174         buf->mcd_last_data          = cpu_to_le32(mcd->mcd_last_data);
175         buf->mcd_last_close_transno = cpu_to_le64(mcd->mcd_last_close_transno);
176         buf->mcd_last_close_xid     = cpu_to_le64(mcd->mcd_last_close_xid);
177         buf->mcd_last_close_result  = cpu_to_le32(mcd->mcd_last_close_result);
178 }
179
180 static int mdt_last_rcvd_header_read(const struct lu_env *env,
181                                      struct mdt_device *mdt,
182                                      struct mdt_server_data *msd)
183 {
184         struct mdt_thread_info *mti;
185         struct mdt_server_data *tmp;
186         loff_t *off;
187         int rc;
188
189         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
190         /* temporary stuff for read */
191         tmp = &mti->mti_msd;
192         off = &mti->mti_off;
193         *off = 0;
194         rc = mdt_record_read(env, mdt->mdt_last_rcvd,
195                              mdt_buf(env, tmp, sizeof(*tmp)), off);
196         if (rc == 0)
197                 msd_le_to_cpu(tmp, msd);
198
199         CDEBUG(D_INFO, "read last_rcvd header rc = %d:\n"
200                        "uuid = %s\n"
201                        "last_transno = "LPU64"\n",
202                         rc,
203                         msd->msd_uuid,
204                         msd->msd_last_transno);
205         return rc;
206 }
207
208 static int mdt_last_rcvd_header_write(const struct lu_env *env,
209                                       struct mdt_device *mdt,
210                                       struct mdt_server_data *msd)
211 {
212         struct mdt_thread_info *mti;
213         struct mdt_server_data *tmp;
214         struct thandle *th;
215         loff_t *off;
216         int rc;
217         ENTRY;
218
219         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
220
221         th = mdt_trans_start(env, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
222         if (IS_ERR(th))
223                 RETURN(PTR_ERR(th));
224
225         /* temporary stuff for read */
226         tmp = &mti->mti_msd;
227         off = &mti->mti_off;
228         *off = 0;
229
230         msd_cpu_to_le(msd, tmp);
231
232         rc = mdt_record_write(env, mdt->mdt_last_rcvd,
233                               mdt_buf_const(env, tmp, sizeof(*tmp)), off, th);
234
235         mdt_trans_stop(env, mdt, th);
236
237         CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
238                "uuid = %s\nlast_transno = "LPU64"\n",
239                rc, msd->msd_uuid, msd->msd_last_transno);
240         
241         RETURN(rc);
242 }
243
244 static int mdt_last_rcvd_read(const struct lu_env *env,
245                               struct mdt_device *mdt,
246                               struct mdt_client_data *mcd, loff_t *off)
247 {
248         struct mdt_thread_info *mti;
249         struct mdt_client_data *tmp;
250         int rc;
251
252         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
253         tmp = &mti->mti_mcd;
254         rc = mdt_record_read(env, mdt->mdt_last_rcvd,
255                              mdt_buf(env, tmp, sizeof(*tmp)), off);
256         if (rc == 0)
257                 mcd_le_to_cpu(tmp, mcd);
258
259         CDEBUG(D_INFO, "read mcd @%d rc = %d:\n"
260                        "uuid = %s\n"
261                        "last_transno = "LPU64"\n"
262                        "last_xid = "LPU64"\n"
263                        "last_result = %d\n"
264                        "last_data = %d\n"
265                        "last_close_transno = "LPU64"\n"
266                        "last_close_xid = "LPU64"\n"
267                        "last_close_result = %d\n",
268                         (int)*off - sizeof(*tmp),
269                         rc,
270                         mcd->mcd_uuid,
271                         mcd->mcd_last_transno,
272                         mcd->mcd_last_xid,
273                         mcd->mcd_last_result,
274                         mcd->mcd_last_data,
275                         mcd->mcd_last_close_transno,
276                         mcd->mcd_last_close_xid,
277                         mcd->mcd_last_close_result);
278         return rc;
279 }
280
281 static int mdt_last_rcvd_write(const struct lu_env *env,
282                                struct mdt_device *mdt,
283                                struct mdt_client_data *mcd,
284                                loff_t *off, struct thandle *th)
285 {
286         struct mdt_thread_info *mti;
287         struct mdt_client_data *tmp;
288         int rc;
289
290         LASSERT(th != NULL);
291         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
292         tmp = &mti->mti_mcd;
293
294         mcd_cpu_to_le(mcd, tmp);
295
296         rc = mdt_record_write(env, mdt->mdt_last_rcvd,
297                               mdt_buf_const(env, tmp, sizeof(*tmp)), off, th);
298
299         CDEBUG(D_INFO, "write mcd @%d rc = %d:\n"
300                        "uuid = %s\n"
301                        "last_transno = "LPU64"\n"
302                        "last_xid = "LPU64"\n"
303                        "last_result = %d\n"
304                        "last_data = %d\n"
305                        "last_close_transno = "LPU64"\n"
306                        "last_close_xid = "LPU64"\n"
307                        "last_close_result = %d\n",
308                         (int)*off - sizeof(*tmp),
309                         rc,
310                         mcd->mcd_uuid,
311                         mcd->mcd_last_transno,
312                         mcd->mcd_last_xid,
313                         mcd->mcd_last_result,
314                         mcd->mcd_last_data,
315                         mcd->mcd_last_close_transno,
316                         mcd->mcd_last_close_xid,
317                         mcd->mcd_last_close_result);
318         return rc;
319 }
320
321
322 static int mdt_clients_data_init(const struct lu_env *env,
323                                  struct mdt_device *mdt,
324                                  unsigned long last_size)
325 {
326         struct mdt_server_data *msd = &mdt->mdt_msd;
327         struct mdt_client_data *mcd = NULL;
328         struct obd_device      *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
329         loff_t off;
330         int cl_idx;
331         int rc = 0;
332         ENTRY;
333
334         /* When we do a clean MDS shutdown, we save the last_transno into
335          * the header.  If we find clients with higher last_transno values
336          * then those clients may need recovery done. */
337         LASSERT(atomic_read(&obd->obd_req_replay_clients) == 0);
338         for (cl_idx = 0, off = msd->msd_client_start;
339              off < last_size; cl_idx++) {
340                 __u64 last_transno;
341                 struct obd_export *exp;
342
343                 if (!mcd) {
344                         OBD_ALLOC_PTR(mcd);
345                         if (!mcd)
346                                 RETURN(-ENOMEM);
347                 }
348
349                 off = msd->msd_client_start +
350                         cl_idx * msd->msd_client_size;
351
352                 rc = mdt_last_rcvd_read(env, mdt, mcd, &off);
353                 if (rc) {
354                         CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
355                                LAST_RCVD, cl_idx, off, rc);
356                         rc = 0;
357                         break; /* read error shouldn't cause startup to fail */
358                 }
359
360                 if (mcd->mcd_uuid[0] == '\0') {
361                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
362                                cl_idx);
363                         continue;
364                 }
365
366                 last_transno = mcd_last_transno(mcd);
367
368                 /* These exports are cleaned up by mdt_obd_disconnect(), so
369                  * they need to be set up like real exports as
370                  * mdt_obd_connect() does.
371                  */
372                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
373                        " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx,
374                        last_transno, msd->msd_last_transno,
375                        mcd_last_xid(mcd));
376
377                 exp = class_new_export(obd, (struct obd_uuid *)mcd->mcd_uuid);
378                 if (IS_ERR(exp)) {
379                         if (PTR_ERR(exp) == -EALREADY) {
380                                 /* export already exists, zero out this one */
381                                 mcd->mcd_uuid[0] = '\0'; 
382                         } else
383                                 GOTO(err_client, rc = PTR_ERR(exp));
384                 } else {
385                         struct mdt_thread_info *mti;                        
386                         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
387                         LASSERT(mti != NULL);
388                         mti->mti_exp = exp;
389                         exp->exp_mdt_data.med_mcd = mcd;
390                         rc = mdt_client_add(env, mdt, cl_idx);
391                         /* can't fail existing */
392                         LASSERTF(rc == 0, "rc = %d\n", rc); 
393                         mcd = NULL;
394                         spin_lock(&exp->exp_lock);
395                         exp->exp_connecting = 0;
396                         exp->exp_in_recovery = 0;
397                         spin_unlock(&exp->exp_lock);
398                         obd->obd_max_recoverable_clients++;
399                         class_export_put(exp);
400                 }
401
402                 CDEBUG(D_OTHER, "client at idx %d has last_transno="LPU64"\n",
403                        cl_idx, last_transno);
404                 /* protect __u64 value update */
405                 spin_lock(&mdt->mdt_transno_lock);
406                 mdt->mdt_last_transno = max(last_transno,
407                                             mdt->mdt_last_transno);
408                 spin_unlock(&mdt->mdt_transno_lock);
409         }
410
411 err_client:
412         if (mcd)
413                 OBD_FREE_PTR(mcd);
414         RETURN(rc);
415 }
416
417 static int mdt_server_data_init(const struct lu_env *env,
418                                 struct mdt_device *mdt)
419 {
420         struct mdt_server_data *msd = &mdt->mdt_msd;
421         struct mdt_client_data *mcd = NULL;
422         struct obd_device      *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
423         struct mdt_thread_info *mti;
424         struct dt_object       *obj;
425         struct lu_attr         *la;
426         unsigned long last_rcvd_size;
427         __u64 mount_count;
428         int rc;
429         ENTRY;
430
431         /* ensure padding in the struct is the correct size */
432         CLASSERT(offsetof(struct mdt_server_data, msd_padding) +
433                 sizeof(msd->msd_padding) == LR_SERVER_SIZE);
434         CLASSERT(offsetof(struct mdt_client_data, mcd_padding) +
435                 sizeof(mcd->mcd_padding) == LR_CLIENT_SIZE);
436
437         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
438         LASSERT(mti != NULL);
439         la = &mti->mti_attr.ma_attr;
440
441         obj = mdt->mdt_last_rcvd;
442         rc = obj->do_ops->do_attr_get(env, mdt->mdt_last_rcvd, la, BYPASS_CAPA);
443         if (rc)
444                 RETURN(rc);
445
446         last_rcvd_size = (unsigned long)la->la_size;
447
448         if (last_rcvd_size == 0) {
449                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
450
451                 memcpy(msd->msd_uuid, obd->obd_uuid.uuid,
452                        sizeof(msd->msd_uuid));
453                 msd->msd_last_transno = 0;
454                 msd->msd_mount_count = 0;
455                 msd->msd_server_size = LR_SERVER_SIZE;
456                 msd->msd_client_start = LR_CLIENT_START;
457                 msd->msd_client_size = LR_CLIENT_SIZE;
458                 msd->msd_feature_rocompat = OBD_ROCOMPAT_LOVOBJID;
459                 msd->msd_feature_incompat = OBD_INCOMPAT_MDT |
460                                                        OBD_INCOMPAT_COMMON_LR;
461         } else {
462                 LCONSOLE_WARN("%s: used disk, loading\n", obd->obd_name);
463                 rc = mdt_last_rcvd_header_read(env, mdt, msd);
464                 if (rc) {
465                         CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
466                         GOTO(out, rc);
467                 }
468                 if (strcmp(msd->msd_uuid, obd->obd_uuid.uuid) != 0) {
469                         LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s using"
470                                            "the wrong disk %s. Were the /dev/ "
471                                            "assignments rearranged?\n",
472                                            obd->obd_uuid.uuid, msd->msd_uuid);
473                         GOTO(out, rc = -EINVAL);
474                 }
475         }
476         mount_count = msd->msd_mount_count;
477
478         msd->msd_feature_compat = OBD_COMPAT_MDT;
479
480         spin_lock(&mdt->mdt_transno_lock);
481         mdt->mdt_last_transno = msd->msd_last_transno;
482         spin_unlock(&mdt->mdt_transno_lock);
483
484         CDEBUG(D_INODE, "========BEGIN DUMPING LAST_RCVD========\n");
485         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
486                obd->obd_name, mdt->mdt_last_transno);
487         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
488                obd->obd_name, mount_count + 1);
489         CDEBUG(D_INODE, "%s: server data size: %u\n",
490                obd->obd_name, msd->msd_server_size);
491         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
492                obd->obd_name, msd->msd_client_start);
493         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
494                obd->obd_name, msd->msd_client_size);
495         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
496                obd->obd_name, last_rcvd_size);
497         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
498                last_rcvd_size <= msd->msd_client_start ? 0 :
499                (last_rcvd_size - msd->msd_client_start) /
500                 msd->msd_client_size);
501         CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
502
503         if (!msd->msd_server_size || !msd->msd_client_start ||
504             !msd->msd_client_size) {
505                 CERROR("Bad last_rcvd contents!\n");
506                 GOTO(out, rc = -EINVAL);
507         }
508
509         rc = mdt_clients_data_init(env, mdt, last_rcvd_size);
510         if (rc)
511                 GOTO(err_client, rc);
512
513         spin_lock(&mdt->mdt_transno_lock);
514         /* obd_last_committed is used for compatibility
515          * with other lustre recovery code */
516         obd->obd_last_committed = mdt->mdt_last_transno;
517         spin_unlock(&mdt->mdt_transno_lock);
518
519         mdt->mdt_mount_count++;
520         msd->msd_mount_count = mdt->mdt_mount_count;
521
522         /* save it, so mount count and last_transno is current */
523         rc = mdt_server_data_update(env, mdt);
524         if (rc)
525                 GOTO(err_client, rc);
526
527         RETURN(0);
528
529 err_client:
530         target_recovery_fini(obd);
531 out:
532         return rc;
533 }
534
535 static int mdt_server_data_update(const struct lu_env *env,
536                                   struct mdt_device *mdt)
537 {
538         struct mdt_server_data *msd = &mdt->mdt_msd;
539         int rc = 0;
540         ENTRY;
541
542         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
543                mdt->mdt_mount_count, mdt->mdt_last_transno);
544
545         spin_lock(&mdt->mdt_transno_lock);
546         msd->msd_last_transno = mdt->mdt_last_transno;
547         spin_unlock(&mdt->mdt_transno_lock);
548
549         /*
550          * This may be called from difficult reply handler and
551          * mdt->mdt_last_rcvd may be NULL that time.
552          */
553         if (mdt->mdt_last_rcvd != NULL)
554                 rc = mdt_last_rcvd_header_write(env, mdt, msd);
555         RETURN(rc);
556 }
557
558 void mdt_cb_new_client(const struct mdt_device *mdt, __u64 transno,
559                                   void *data, int err) 
560 {
561         struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
562         
563         target_client_add_cb(obd, transno, data, err);
564 }
565
566 int mdt_client_new(const struct lu_env *env, struct mdt_device *mdt)
567 {
568         unsigned long *bitmap = mdt->mdt_client_bitmap;
569         struct mdt_thread_info *mti;
570         struct mdt_export_data *med;
571         struct mdt_client_data *mcd;
572         struct mdt_server_data *msd = &mdt->mdt_msd;
573         struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
574         struct thandle *th;
575         loff_t off;
576         int rc;
577         int cl_idx;
578         ENTRY;
579
580         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
581         LASSERT(mti != NULL);
582
583         med = &mti->mti_exp->exp_mdt_data;
584         mcd = med->med_mcd;
585
586         LASSERT(bitmap != NULL);
587         if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
588                 RETURN(0);
589
590         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
591          * there's no need for extra complication here
592          */
593         spin_lock(&mdt->mdt_client_bitmap_lock);
594         cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
595         if (cl_idx >= LR_MAX_CLIENTS ||
596             MDT_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
597                 CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n",
598                        cl_idx);
599                 spin_unlock(&mdt->mdt_client_bitmap_lock);
600                 RETURN(-EOVERFLOW);
601         }
602         set_bit(cl_idx, bitmap);
603         spin_unlock(&mdt->mdt_client_bitmap_lock);
604
605         CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
606                cl_idx, med->med_mcd->mcd_uuid);
607
608         med->med_lr_idx = cl_idx;
609         med->med_lr_off = msd->msd_client_start +
610                           (cl_idx * msd->msd_client_size);
611         init_mutex(&med->med_mcd_lock);
612
613         LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
614         /* write new client data */
615         off = med->med_lr_off;
616         th = mdt_trans_start(env, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
617         if (IS_ERR(th))
618                 RETURN(PTR_ERR(th));
619
620         /* until this operations will be committed the sync is needed for this
621          * export */
622         mdt_trans_add_cb(th, mdt_cb_new_client, mti->mti_exp);
623         spin_lock(&mti->mti_exp->exp_lock);
624         mti->mti_exp->exp_need_sync = 1;
625         spin_unlock(&mti->mti_exp->exp_lock);
626
627         rc = mdt_last_rcvd_write(env, mdt, mcd, &off, th);
628         CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
629                cl_idx, med->med_lr_off, sizeof(*mcd));
630         mdt_trans_stop(env, mdt, th);
631
632         RETURN(rc);
633 }
634
635 /* Add client data to the MDS.  We use a bitmap to locate a free space
636  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
637  * Otherwise, we just have to read the data from the last_rcvd file and
638  * we know its offset.
639  *
640  * It should not be possible to fail adding an existing client - otherwise
641  * mdt_init_server_data() callsite needs to be fixed.
642  */
643 int mdt_client_add(const struct lu_env *env,
644                    struct mdt_device *mdt, int cl_idx)
645 {
646         struct mdt_thread_info *mti;
647         struct mdt_export_data *med;
648         unsigned long *bitmap = mdt->mdt_client_bitmap;
649         struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
650         struct mdt_server_data *msd = &mdt->mdt_msd;
651         int rc = 0;
652         ENTRY;
653
654         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
655         LASSERT(mti != NULL);
656
657         med = &mti->mti_exp->exp_mdt_data;
658
659         LASSERT(bitmap != NULL);
660         LASSERTF(cl_idx >= 0, "%d\n", cl_idx);
661
662         if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
663                 RETURN(0);
664
665         spin_lock(&mdt->mdt_client_bitmap_lock);
666         if (test_and_set_bit(cl_idx, bitmap)) {
667                 CERROR("MDS client %d: bit already set in bitmap!!\n",
668                        cl_idx);
669                 LBUG();
670         }
671         spin_unlock(&mdt->mdt_client_bitmap_lock);
672
673         CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
674                cl_idx, med->med_mcd->mcd_uuid);
675
676         med->med_lr_idx = cl_idx;
677         med->med_lr_off = msd->msd_client_start +
678                           (cl_idx * msd->msd_client_size);
679         init_mutex(&med->med_mcd_lock);
680
681         LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
682
683         RETURN(rc);
684 }
685
686 int mdt_client_del(const struct lu_env *env, struct mdt_device *mdt)
687 {
688         struct mdt_thread_info *mti;
689         struct mdt_export_data *med;
690         struct mdt_client_data *mcd;
691         struct obd_device      *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
692         struct thandle *th;
693         loff_t off;
694         int rc = 0;
695         ENTRY;
696
697         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
698         LASSERT(mti != NULL);
699
700         med = &mti->mti_exp->exp_mdt_data;
701         mcd = med->med_mcd;
702         if (!mcd)
703                 RETURN(0);
704
705         /* XXX: If mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
706         if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
707                 GOTO(free, 0);
708
709         CDEBUG(D_INFO, "freeing client at idx %u, offset %lld\n",
710                med->med_lr_idx, med->med_lr_off);
711
712         off = med->med_lr_off;
713
714         /*
715          * Don't clear med_lr_idx here as it is likely also unset.  At worst we
716          * leak a client slot that will be cleaned on the next recovery.
717          */
718         if (off <= 0) {
719                 CERROR("client idx %d has offset %lld\n",
720                         med->med_lr_idx, off);
721                 GOTO(free, rc = -EINVAL);
722         }
723
724         /*
725          * Clear the bit _after_ zeroing out the client so we don't race with
726          * mdt_client_add and zero out new clients.
727          */
728         if (!test_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
729                 CERROR("MDT client %u: bit already clear in bitmap!!\n",
730                        med->med_lr_idx);
731                 LBUG();
732         }
733
734         /*
735          * This may be called from difficult reply handler path and
736          * mdt->mdt_last_rcvd may be NULL that time.
737          */
738         if (mdt->mdt_last_rcvd != NULL) {
739                 th = mdt_trans_start(env, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
740                 if (IS_ERR(th))
741                         GOTO(free, rc = PTR_ERR(th));
742
743                 mutex_down(&med->med_mcd_lock);
744                 memset(mcd, 0, sizeof *mcd);
745
746                 rc = mdt_last_rcvd_write(env, mdt, mcd, &off, th);
747                 mutex_up(&med->med_mcd_lock);
748                 mdt_trans_stop(env, mdt, th);
749         }
750
751         CDEBUG(rc == 0 ? D_INFO : D_ERROR, "Zeroing out client idx %u in "
752                "%s rc %d\n",  med->med_lr_idx, LAST_RCVD, rc);
753
754         spin_lock(&mdt->mdt_client_bitmap_lock);
755         clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap);
756         spin_unlock(&mdt->mdt_client_bitmap_lock);
757         
758         /*
759          * Make sure the server's last_transno is up to date. Do this after the
760          * client is freed so we know all the client's transactions have been
761          * committed.
762          */
763         mdt_server_data_update(env, mdt);
764         EXIT;
765 free:
766         OBD_FREE_PTR(mcd);
767         med->med_mcd = NULL;
768         return 0;
769 }
770
771 /*
772  * last_rcvd & last_committed update callbacks
773  */
774 static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
775                                 struct thandle *th)
776 {
777         struct mdt_device *mdt = mti->mti_mdt;
778         struct ptlrpc_request *req = mdt_info_req(mti);
779         struct mdt_export_data *med;
780         struct mdt_client_data *mcd;
781         loff_t off;
782         int err;
783         __s32 rc = th->th_result;
784         __u64 *transno_p;
785
786         ENTRY;
787         LASSERT(req);
788         LASSERT(req->rq_export);
789         LASSERT(mdt);
790         med = &req->rq_export->exp_mdt_data;
791         LASSERT(med);
792         mcd = med->med_mcd;
793         /* if the export has already been failed, we have no last_rcvd slot */
794         if (req->rq_export->exp_failed) {
795                 CWARN("commit transaction for disconnected client %s: rc %d\n",
796                       req->rq_export->exp_client_uuid.uuid, rc);
797                 if (rc == 0)
798                         rc = -ENOTCONN;
799                 RETURN(rc);
800         }
801
802         off = med->med_lr_off;
803         mutex_down(&med->med_mcd_lock);
804         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
805             lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
806                 transno_p = &mcd->mcd_last_close_transno;
807                 mcd->mcd_last_close_xid = req->rq_xid;
808                 mcd->mcd_last_close_result = rc;
809         } else {
810                 transno_p = &mcd->mcd_last_transno;
811                 mcd->mcd_last_xid = req->rq_xid;
812                 mcd->mcd_last_result = rc;
813                 /*XXX: save intent_disposition in mdt_thread_info?
814                  * also there is bug - intent_dispostion is __u64,
815                  * see struct ldlm_reply->lock_policy_res1; */
816                  mcd->mcd_last_data = mti->mti_opdata;
817         }
818
819         /*
820          * When we store zero transno in mcd we can lost last transno value
821          * because mcd contains 0, but msd is not yet written
822          * The server data should be updated also if the latest 
823          * transno is rewritten by zero. See the bug 11125 for details.
824          */
825         if (mti->mti_transno == 0 && 
826             *transno_p == mdt->mdt_last_transno)
827                 mdt_server_data_update(mti->mti_env, mdt);
828
829         *transno_p = mti->mti_transno;
830
831         if (off <= 0) {
832                 CERROR("client idx %d has offset %lld\n", med->med_lr_idx, off);
833                 err = -EINVAL;
834         } else {
835                 err = mdt_last_rcvd_write(mti->mti_env, mdt, mcd, &off, th);
836         }
837         mutex_up(&med->med_mcd_lock);
838         RETURN(err);
839 }
840
841 extern struct lu_context_key mdt_thread_key;
842
843 /* add credits for last_rcvd update */
844 static int mdt_txn_start_cb(const struct lu_env *env,
845                             struct txn_param *param, void *cookie)
846 {
847         param->tp_credits += MDT_TXN_LAST_RCVD_WRITE_CREDITS;
848         return 0;
849 }
850
851 /* Update last_rcvd records with latests transaction data */
852 static int mdt_txn_stop_cb(const struct lu_env *env,
853                            struct thandle *txn, void *cookie)
854 {
855         struct mdt_device *mdt = cookie;
856         struct mdt_txn_info *txi;
857         struct mdt_thread_info *mti;
858         struct ptlrpc_request *req;
859
860         /* transno in two contexts - for commit_cb and for thread */
861         txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
862         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
863         req = mdt_info_req(mti);
864
865         if (mti->mti_mdt == NULL || req == NULL || mti->mti_no_need_trans) {
866                 txi->txi_transno = 0;
867                 mti->mti_no_need_trans = 0;
868                 return 0;
869         }
870
871         if (mti->mti_has_trans) {
872                 /* XXX: currently there are allowed cases, but the wrong cases
873                  * are also possible, so better check is needed here */
874                 CDEBUG(D_INFO, "More than one transaction "LPU64"\n", mti->mti_transno);
875                 return 0;
876         }
877
878         mti->mti_has_trans = 1;
879         spin_lock(&mdt->mdt_transno_lock);
880         if (txn->th_result != 0) {
881                 if (mti->mti_transno != 0) {
882                         CERROR("Replay transno "LPU64" failed: rc %i\n",
883                                mti->mti_transno, txn->th_result);
884                         mti->mti_transno = 0;
885                 }
886         } else if (mti->mti_transno == 0) {
887                 mti->mti_transno = ++ mdt->mdt_last_transno;
888         } else {
889                 /* should be replay */
890                 if (mti->mti_transno > mdt->mdt_last_transno)
891                         mdt->mdt_last_transno = mti->mti_transno;
892         }
893
894         /* sometimes the reply message has not been successfully packed */
895         LASSERT(req != NULL && req->rq_repmsg != NULL);
896
897         /* filling reply data */
898         CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
899                mti->mti_transno, req->rq_export->exp_obd->obd_last_committed);
900
901         req->rq_transno = mti->mti_transno;
902         lustre_msg_set_transno(req->rq_repmsg, mti->mti_transno);
903         lustre_msg_set_last_xid(req->rq_repmsg,
904                          mcd_last_xid(req->rq_export->exp_mdt_data.med_mcd));
905         /* save transno for the commit callback */
906         txi->txi_transno = mti->mti_transno;
907         spin_unlock(&mdt->mdt_transno_lock);
908
909         return mdt_last_rcvd_update(mti, txn);
910 }
911
912 /* commit callback, need to update last_commited value */
913 static int mdt_txn_commit_cb(const struct lu_env *env,
914                              struct thandle *txn, void *cookie)
915 {
916         struct mdt_device *mdt = cookie;
917         struct obd_device *obd = md2lu_dev(&mdt->mdt_md_dev)->ld_obd;
918         struct mdt_txn_info *txi;
919         int i;
920
921         txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
922
923         /* copy of obd_transno_commit_cb() but with locking */
924         spin_lock(&mdt->mdt_transno_lock);
925         if (txi->txi_transno > obd->obd_last_committed) {
926                 obd->obd_last_committed = txi->txi_transno;
927                 spin_unlock(&mdt->mdt_transno_lock);
928                 ptlrpc_commit_replies(obd);
929         } else
930                 spin_unlock(&mdt->mdt_transno_lock);
931
932         if (txi->txi_transno)
933                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
934                        obd->obd_name, txi->txi_transno);
935
936         /* iterate through all additional callbacks */
937         for (i = 0; i < txi->txi_cb_count; i++) {
938                 txi->txi_cb[i].mdt_cb_func(mdt, txi->txi_transno,
939                                            txi->txi_cb[i].mdt_cb_data, 0);
940         }
941         return 0;
942 }
943
944 int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
945                  struct obd_device *obd)
946 {
947         struct lu_fid fid;
948         struct dt_object *o;
949         int rc = 0;
950         ENTRY;
951
952         OBD_FAIL_RETURN(OBD_FAIL_MDS_FS_SETUP, -ENOENT);
953
954         /* prepare transactions callbacks */
955         mdt->mdt_txn_cb.dtc_txn_start = mdt_txn_start_cb;
956         mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
957         mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb;
958         mdt->mdt_txn_cb.dtc_cookie = mdt;
959         INIT_LIST_HEAD(&mdt->mdt_txn_cb.dtc_linkage);
960
961         dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
962
963         o = dt_store_open(env, mdt->mdt_bottom, LAST_RCVD, &fid);
964         if (!IS_ERR(o)) {
965                 mdt->mdt_last_rcvd = o;
966                 rc = mdt_server_data_init(env, mdt);
967                 if (rc)
968                         GOTO(put_last_rcvd, rc);
969         } else {
970                 rc = PTR_ERR(o);
971                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
972                 RETURN(rc);
973         }
974
975         o = dt_store_open(env, mdt->mdt_bottom, CAPA_KEYS, &fid);
976         if (!IS_ERR(o)) {
977                 mdt->mdt_ck_obj = o;
978                 rc = mdt_capa_keys_init(env, mdt);
979                 if (rc)
980                         GOTO(put_ck_object, rc);
981         } else {
982                 rc = PTR_ERR(o);
983                 CERROR("cannot open %s: rc = %d\n", CAPA_KEYS, rc);
984                 GOTO(put_last_rcvd, rc);
985         }
986         RETURN(0);
987
988 put_ck_object:
989         lu_object_put(env, &o->do_lu);
990         mdt->mdt_ck_obj = NULL;
991 put_last_rcvd:
992         lu_object_put(env, &mdt->mdt_last_rcvd->do_lu);
993         mdt->mdt_last_rcvd = NULL;
994         return rc;
995 }
996
997
998 void mdt_fs_cleanup(const struct lu_env *env, struct mdt_device *mdt)
999 {
1000         ENTRY;
1001
1002         /* Remove transaction callback */
1003         dt_txn_callback_del(mdt->mdt_bottom, &mdt->mdt_txn_cb);
1004         if (mdt->mdt_last_rcvd)
1005                 lu_object_put(env, &mdt->mdt_last_rcvd->do_lu);
1006         mdt->mdt_last_rcvd = NULL;
1007         if (mdt->mdt_ck_obj)
1008                 lu_object_put(env, &mdt->mdt_ck_obj->do_lu);
1009         mdt->mdt_ck_obj = NULL;
1010         EXIT;
1011 }
1012
1013 /* reconstruction code */
1014 extern void mds_steal_ack_locks(struct ptlrpc_request *req);
1015 void mdt_req_from_mcd(struct ptlrpc_request *req,
1016                       struct mdt_client_data *mcd)
1017 {
1018         DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
1019                   mcd->mcd_last_transno, mcd->mcd_last_result);
1020
1021         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
1022             lustre_msg_get_opc(req->rq_repmsg) == MDS_DONE_WRITING) {
1023                 req->rq_transno = mcd->mcd_last_close_transno;
1024                 req->rq_status = mcd->mcd_last_close_result;
1025                 lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
1026                 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
1027         } else {
1028                 req->rq_transno = mcd->mcd_last_transno;
1029                 req->rq_status = mcd->mcd_last_result;
1030                 lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
1031                 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
1032         }
1033         mds_steal_ack_locks(req);
1034 }
1035
1036 static void mdt_reconstruct_generic(struct mdt_thread_info *mti,
1037                                     struct mdt_lock_handle *lhc)
1038 {
1039         struct ptlrpc_request *req = mdt_info_req(mti);
1040         struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
1041
1042         return mdt_req_from_mcd(req, med->med_mcd);
1043 }
1044
1045 static void mdt_reconstruct_create(struct mdt_thread_info *mti,
1046                                    struct mdt_lock_handle *lhc)
1047 {
1048         struct ptlrpc_request  *req = mdt_info_req(mti);
1049         struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
1050         struct mdt_device *mdt = mti->mti_mdt;
1051         struct mdt_object *child;
1052         struct mdt_body *body;
1053         int rc;
1054
1055         mdt_req_from_mcd(req, med->med_mcd);
1056         if (req->rq_status)
1057                 return;
1058
1059         /* if no error, so child was created with requested fid */
1060         child = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid2);
1061         LASSERT(!IS_ERR(child));
1062
1063         body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
1064         rc = mo_attr_get(mti->mti_env, mdt_object_child(child), &mti->mti_attr);
1065         if (rc == -EREMOTE) {
1066                 /* object was created on remote server */
1067                 req->rq_status = rc;
1068                 body->valid |= OBD_MD_MDS;
1069         }
1070         mdt_pack_attr2body(mti, body, &mti->mti_attr.ma_attr, mdt_object_fid(child));
1071         mdt_object_put(mti->mti_env, child);
1072 }
1073
1074 static void mdt_reconstruct_setattr(struct mdt_thread_info *mti,
1075                                     struct mdt_lock_handle *lhc)
1076 {
1077         struct ptlrpc_request  *req = mdt_info_req(mti);
1078         struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
1079         struct mdt_device *mdt = mti->mti_mdt;
1080         struct mdt_object *obj;
1081         struct mdt_body *body;
1082
1083         mdt_req_from_mcd(req, med->med_mcd);
1084         if (req->rq_status)
1085                 return;
1086
1087         body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
1088         obj = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid1);
1089         LASSERT(!IS_ERR(obj));
1090         mo_attr_get(mti->mti_env, mdt_object_child(obj), &mti->mti_attr);
1091         mdt_pack_attr2body(mti, body, &mti->mti_attr.ma_attr, mdt_object_fid(obj));
1092
1093         /* Don't return OST-specific attributes if we didn't just set them */
1094 /*
1095         if (rec->ur_iattr.ia_valid & ATTR_SIZE)
1096                 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
1097         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
1098                 body->valid |= OBD_MD_FLMTIME;
1099         if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
1100                 body->valid |= OBD_MD_FLATIME;
1101 */
1102         mdt_object_put(mti->mti_env, obj);
1103 }
1104
1105 static void mdt_reconstruct_with_shrink(struct mdt_thread_info *mti,
1106                                         struct mdt_lock_handle *lhc)
1107 {
1108         mdt_reconstruct_generic(mti, lhc);
1109         mdt_shrink_reply(mti);
1110 }
1111
1112 typedef void (*mdt_reconstructor)(struct mdt_thread_info *mti,
1113                                   struct mdt_lock_handle *lhc);
1114
1115 static mdt_reconstructor reconstructors[REINT_MAX] = {
1116         [REINT_SETATTR]  = mdt_reconstruct_setattr,
1117         [REINT_CREATE]   = mdt_reconstruct_create,
1118         [REINT_LINK]     = mdt_reconstruct_generic,
1119         [REINT_UNLINK]   = mdt_reconstruct_with_shrink,
1120         [REINT_RENAME]   = mdt_reconstruct_with_shrink,
1121         [REINT_OPEN]     = mdt_reconstruct_open
1122 };
1123
1124 void mdt_reconstruct(struct mdt_thread_info *mti,
1125                      struct mdt_lock_handle *lhc)
1126 {
1127         ENTRY;
1128         reconstructors[mti->mti_rr.rr_opcode](mti, lhc);
1129         EXIT;
1130 }
1131