Whamcloud - gitweb
LU-4788 lfsck: replace cfs_list_t with list_head
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Lustre Unified Target
37  * These are common function to work with last_received file
38  *
39  * Author: Mikhail Pershin <mike.pershin@intel.com>
40  */
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <lustre_fid.h>
44
45 #include "tgt_internal.h"
46
47 static inline struct lu_buf *tti_buf_lsd(struct tgt_thread_info *tti)
48 {
49         tti->tti_buf.lb_buf = &tti->tti_lsd;
50         tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
51         return &tti->tti_buf;
52 }
53
54 static inline struct lu_buf *tti_buf_lcd(struct tgt_thread_info *tti)
55 {
56         tti->tti_buf.lb_buf = &tti->tti_lcd;
57         tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
58         return &tti->tti_buf;
59 }
60
61 /**
62  * Allocate in-memory data for client slot related to export.
63  */
64 int tgt_client_alloc(struct obd_export *exp)
65 {
66         ENTRY;
67         LASSERT(exp != exp->exp_obd->obd_self_export);
68
69         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
70         if (exp->exp_target_data.ted_lcd == NULL)
71                 RETURN(-ENOMEM);
72         /* Mark that slot is not yet valid, 0 doesn't work here */
73         exp->exp_target_data.ted_lr_idx = -1;
74         RETURN(0);
75 }
76 EXPORT_SYMBOL(tgt_client_alloc);
77
78 /**
79  * Free in-memory data for client slot related to export.
80  */
81 void tgt_client_free(struct obd_export *exp)
82 {
83         struct tg_export_data   *ted = &exp->exp_target_data;
84         struct lu_target        *lut = class_exp2tgt(exp);
85
86         LASSERT(exp != exp->exp_obd->obd_self_export);
87
88         OBD_FREE_PTR(ted->ted_lcd);
89         ted->ted_lcd = NULL;
90
91         /* Slot may be not yet assigned */
92         if (ted->ted_lr_idx < 0)
93                 return;
94         /* Clear bit when lcd is freed */
95         LASSERT(lut->lut_client_bitmap);
96         if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
97                 CERROR("%s: client %u bit already clear in bitmap\n",
98                        exp->exp_obd->obd_name, ted->ted_lr_idx);
99                 LBUG();
100         }
101 }
102 EXPORT_SYMBOL(tgt_client_free);
103
104 int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
105                          struct lsd_client_data *lcd, loff_t *off, int index)
106 {
107         struct tgt_thread_info  *tti = tgt_th_info(env);
108         int                      rc;
109
110         tti_buf_lcd(tti);
111         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
112         if (rc == 0) {
113                 check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
114                 lcd_le_to_cpu(&tti->tti_lcd, lcd);
115                 lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
116                 lcd->lcd_last_close_result =
117                         ptlrpc_status_ntoh(lcd->lcd_last_close_result);
118         }
119
120         CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = "LPU64
121                ", last_xid = "LPU64", last_result = %u, last_data = %u, "
122                "last_close_transno = "LPU64", last_close_xid = "LPU64", "
123                "last_close_result = %u, rc = %d\n", tgt->lut_obd->obd_name,
124                *off, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
125                lcd->lcd_last_result, lcd->lcd_last_data,
126                lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
127                lcd->lcd_last_close_result, rc);
128         return rc;
129 }
130 EXPORT_SYMBOL(tgt_client_data_read);
131
132 int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
133                           struct lsd_client_data *lcd, loff_t *off,
134                           struct thandle *th)
135 {
136         struct tgt_thread_info *tti = tgt_th_info(env);
137
138         lcd->lcd_last_result = ptlrpc_status_hton(lcd->lcd_last_result);
139         lcd->lcd_last_close_result =
140                 ptlrpc_status_hton(lcd->lcd_last_close_result);
141         lcd_cpu_to_le(lcd, &tti->tti_lcd);
142         tti_buf_lcd(tti);
143
144         return dt_record_write(env, tgt->lut_last_rcvd, &tti->tti_buf, off, th);
145 }
146 EXPORT_SYMBOL(tgt_client_data_write);
147
148 /**
149  * Update client data in last_rcvd
150  */
151 int tgt_client_data_update(const struct lu_env *env, struct obd_export *exp)
152 {
153         struct tg_export_data   *ted = &exp->exp_target_data;
154         struct lu_target        *tgt = class_exp2tgt(exp);
155         struct tgt_thread_info  *tti = tgt_th_info(env);
156         struct thandle          *th;
157         int                      rc = 0;
158
159         ENTRY;
160
161         th = dt_trans_create(env, tgt->lut_bottom);
162         if (IS_ERR(th))
163                 RETURN(PTR_ERR(th));
164
165         tti_buf_lcd(tti);
166         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
167                                      &tti->tti_buf,
168                                      ted->ted_lr_off, th);
169         if (rc)
170                 GOTO(out, rc);
171
172         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
173         if (rc)
174                 GOTO(out, rc);
175         /*
176          * Until this operations will be committed the sync is needed
177          * for this export. This should be done _after_ starting the
178          * transaction so that many connecting clients will not bring
179          * server down with lots of sync writes.
180          */
181         rc = tgt_new_client_cb_add(th, exp);
182         if (rc) {
183                 /* can't add callback, do sync now */
184                 th->th_sync = 1;
185         } else {
186                 spin_lock(&exp->exp_lock);
187                 exp->exp_need_sync = 1;
188                 spin_unlock(&exp->exp_lock);
189         }
190
191         tti->tti_off = ted->ted_lr_off;
192         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
193         EXIT;
194 out:
195         dt_trans_stop(env, tgt->lut_bottom, th);
196         CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
197                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
198                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
199
200         return rc;
201 }
202
203 int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
204 {
205         struct tgt_thread_info  *tti = tgt_th_info(env);
206         int                      rc;
207
208         tti->tti_off = 0;
209         tti_buf_lsd(tti);
210         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf,
211                             &tti->tti_off);
212         if (rc == 0)
213                 lsd_le_to_cpu(&tti->tti_lsd, &tgt->lut_lsd);
214
215         CDEBUG(D_INFO, "%s: read last_rcvd server data for UUID = %s, "
216                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
217                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
218         return rc;
219 }
220 EXPORT_SYMBOL(tgt_server_data_read);
221
222 int tgt_server_data_write(const struct lu_env *env, struct lu_target *tgt,
223                           struct thandle *th)
224 {
225         struct tgt_thread_info  *tti = tgt_th_info(env);
226         int                      rc;
227
228         ENTRY;
229
230         tti->tti_off = 0;
231         tti_buf_lsd(tti);
232         lsd_cpu_to_le(&tgt->lut_lsd, &tti->tti_lsd);
233
234         rc = dt_record_write(env, tgt->lut_last_rcvd, &tti->tti_buf,
235                              &tti->tti_off, th);
236
237         CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
238                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
239                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
240
241         RETURN(rc);
242 }
243 EXPORT_SYMBOL(tgt_server_data_write);
244
245 /**
246  * Update server data in last_rcvd
247  */
248 int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
249                            int sync)
250 {
251         struct tgt_thread_info  *tti = tgt_th_info(env);
252         struct thandle          *th;
253         int                      rc = 0;
254
255         ENTRY;
256
257         CDEBUG(D_SUPER,
258                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
259                tgt->lut_lsd.lsd_uuid, tgt->lut_obd->u.obt.obt_mount_count,
260                tgt->lut_last_transno);
261
262         /* Always save latest transno to keep it fresh */
263         spin_lock(&tgt->lut_translock);
264         tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
265         spin_unlock(&tgt->lut_translock);
266
267         th = dt_trans_create(env, tgt->lut_bottom);
268         if (IS_ERR(th))
269                 RETURN(PTR_ERR(th));
270
271         th->th_sync = sync;
272
273         tti_buf_lsd(tti);
274         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
275                                      &tti->tti_buf, tti->tti_off, th);
276         if (rc)
277                 GOTO(out, rc);
278
279         rc = dt_trans_start(env, tgt->lut_bottom, th);
280         if (rc)
281                 GOTO(out, rc);
282
283         rc = tgt_server_data_write(env, tgt, th);
284 out:
285         dt_trans_stop(env, tgt->lut_bottom, th);
286
287         CDEBUG(D_INFO, "%s: update last_rcvd server data for UUID = %s, "
288                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
289                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
290         RETURN(rc);
291 }
292 EXPORT_SYMBOL(tgt_server_data_update);
293
294 int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tgt,
295                            loff_t size)
296 {
297         struct dt_object *dt = tgt->lut_last_rcvd;
298         struct thandle   *th;
299         struct lu_attr    attr;
300         int               rc;
301
302         ENTRY;
303
304         attr.la_size = size;
305         attr.la_valid = LA_SIZE;
306
307         th = dt_trans_create(env, tgt->lut_bottom);
308         if (IS_ERR(th))
309                 RETURN(PTR_ERR(th));
310         rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
311         if (rc)
312                 GOTO(cleanup, rc);
313         rc = dt_declare_attr_set(env, dt, &attr, th);
314         if (rc)
315                 GOTO(cleanup, rc);
316         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
317         if (rc)
318                 GOTO(cleanup, rc);
319
320         rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th, BYPASS_CAPA);
321         if (rc == 0)
322                 rc = dt_attr_set(env, dt, &attr, th, BYPASS_CAPA);
323
324 cleanup:
325         dt_trans_stop(env, tgt->lut_bottom, th);
326
327         RETURN(rc);
328 }
329 EXPORT_SYMBOL(tgt_truncate_last_rcvd);
330
331 void tgt_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
332 {
333         struct lsd_client_data  *lcd = exp->exp_target_data.ted_lcd;
334         struct lu_target        *tgt = class_exp2tgt(exp);
335
336         LASSERT(tgt->lut_bottom);
337         /** VBR: set client last_epoch to current epoch */
338         if (lcd->lcd_last_epoch >= tgt->lut_lsd.lsd_start_epoch)
339                 return;
340         lcd->lcd_last_epoch = tgt->lut_lsd.lsd_start_epoch;
341         tgt_client_data_update(env, exp);
342 }
343
344 /**
345  * Update boot epoch when recovery ends
346  */
347 void tgt_boot_epoch_update(struct lu_target *tgt)
348 {
349         struct lu_env            env;
350         struct ptlrpc_request   *req;
351         __u32                    start_epoch;
352         struct list_head         client_list;
353         int                      rc;
354
355         if (tgt->lut_obd->obd_stopping)
356                 return;
357
358         rc = lu_env_init(&env, LCT_DT_THREAD);
359         if (rc) {
360                 CERROR("%s: can't initialize environment: rc = %d\n",
361                         tgt->lut_obd->obd_name, rc);
362                 return;
363         }
364
365         spin_lock(&tgt->lut_translock);
366         start_epoch = lr_epoch(tgt->lut_last_transno) + 1;
367         tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
368         tgt->lut_lsd.lsd_start_epoch = start_epoch;
369         spin_unlock(&tgt->lut_translock);
370
371         CFS_INIT_LIST_HEAD(&client_list);
372         /**
373          * The recovery is not yet finished and final queue can still be updated
374          * with resend requests. Move final list to separate one for processing
375          */
376         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
377         list_splice_init(&tgt->lut_obd->obd_final_req_queue, &client_list);
378         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
379
380         /**
381          * go through list of exports participated in recovery and
382          * set new epoch for them
383          */
384         list_for_each_entry(req, &client_list, rq_list) {
385                 LASSERT(!req->rq_export->exp_delayed);
386                 if (!req->rq_export->exp_vbr_failed)
387                         tgt_client_epoch_update(&env, req->rq_export);
388         }
389         /** return list back at once */
390         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
391         list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
392         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
393         /** update server epoch */
394         tgt_server_data_update(&env, tgt, 1);
395         lu_env_fini(&env);
396 }
397 EXPORT_SYMBOL(tgt_boot_epoch_update);
398
399 /**
400  * commit callback, need to update last_commited value
401  */
402 struct tgt_last_committed_callback {
403         struct dt_txn_commit_cb  llcc_cb;
404         struct lu_target        *llcc_tgt;
405         struct obd_export       *llcc_exp;
406         __u64                    llcc_transno;
407 };
408
409 void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
410                            struct dt_txn_commit_cb *cb, int err)
411 {
412         struct tgt_last_committed_callback *ccb;
413
414         ccb = container_of0(cb, struct tgt_last_committed_callback, llcc_cb);
415
416         LASSERT(ccb->llcc_tgt != NULL);
417         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
418
419         spin_lock(&ccb->llcc_tgt->lut_translock);
420         if (ccb->llcc_transno > ccb->llcc_tgt->lut_obd->obd_last_committed)
421                 ccb->llcc_tgt->lut_obd->obd_last_committed = ccb->llcc_transno;
422
423         LASSERT(ccb->llcc_exp);
424         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
425                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
426                 spin_unlock(&ccb->llcc_tgt->lut_translock);
427                 ptlrpc_commit_replies(ccb->llcc_exp);
428         } else {
429                 spin_unlock(&ccb->llcc_tgt->lut_translock);
430         }
431         class_export_cb_put(ccb->llcc_exp);
432         if (ccb->llcc_transno)
433                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
434                        ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
435         OBD_FREE_PTR(ccb);
436 }
437
438 int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
439                            struct obd_export *exp, __u64 transno)
440 {
441         struct tgt_last_committed_callback      *ccb;
442         struct dt_txn_commit_cb                 *dcb;
443         int                                      rc;
444
445         OBD_ALLOC_PTR(ccb);
446         if (ccb == NULL)
447                 return -ENOMEM;
448
449         ccb->llcc_tgt = tgt;
450         ccb->llcc_exp = class_export_cb_get(exp);
451         ccb->llcc_transno = transno;
452
453         dcb = &ccb->llcc_cb;
454         dcb->dcb_func = tgt_cb_last_committed;
455         CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
456         strlcpy(dcb->dcb_name, "tgt_cb_last_committed", sizeof(dcb->dcb_name));
457
458         rc = dt_trans_cb_add(th, dcb);
459         if (rc) {
460                 class_export_cb_put(exp);
461                 OBD_FREE_PTR(ccb);
462         }
463
464         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
465                 /* report failure to force synchronous operation */
466                 return -EPERM;
467
468         return rc;
469 }
470 EXPORT_SYMBOL(tgt_last_commit_cb_add);
471
472 struct tgt_new_client_callback {
473         struct dt_txn_commit_cb  lncc_cb;
474         struct obd_export       *lncc_exp;
475 };
476
477 void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
478                        struct dt_txn_commit_cb *cb, int err)
479 {
480         struct tgt_new_client_callback *ccb;
481
482         ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
483
484         LASSERT(ccb->lncc_exp->exp_obd);
485
486         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
487                ccb->lncc_exp->exp_obd->obd_name,
488                ccb->lncc_exp->exp_client_uuid.uuid);
489
490         spin_lock(&ccb->lncc_exp->exp_lock);
491         /* XXX: Currently, we use per-export based sync/async policy for
492          *      the update via OUT RPC, it is coarse-grained policy, and
493          *      will be changed as per-request based by DNE II patches. */
494         if (!ccb->lncc_exp->exp_keep_sync)
495                 ccb->lncc_exp->exp_need_sync = 0;
496
497         spin_unlock(&ccb->lncc_exp->exp_lock);
498         class_export_cb_put(ccb->lncc_exp);
499
500         OBD_FREE_PTR(ccb);
501 }
502
503 int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
504 {
505         struct tgt_new_client_callback  *ccb;
506         struct dt_txn_commit_cb         *dcb;
507         int                              rc;
508
509         OBD_ALLOC_PTR(ccb);
510         if (ccb == NULL)
511                 return -ENOMEM;
512
513         ccb->lncc_exp = class_export_cb_get(exp);
514
515         dcb = &ccb->lncc_cb;
516         dcb->dcb_func = tgt_cb_new_client;
517         CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
518         strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
519
520         rc = dt_trans_cb_add(th, dcb);
521         if (rc) {
522                 class_export_cb_put(exp);
523                 OBD_FREE_PTR(ccb);
524         }
525         return rc;
526 }
527
528 /**
529  * Add new client to the last_rcvd upon new connection.
530  *
531  * We use a bitmap to locate a free space in the last_rcvd file and initialize
532  * tg_export_data.
533  */
534 int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
535 {
536         struct tg_export_data   *ted = &exp->exp_target_data;
537         struct lu_target        *tgt = class_exp2tgt(exp);
538         int                      rc = 0, idx;
539
540         ENTRY;
541
542         LASSERT(tgt->lut_client_bitmap != NULL);
543         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid))
544                 RETURN(0);
545
546         mutex_init(&ted->ted_lcd_lock);
547
548         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
549                 RETURN(0);
550
551         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
552          * there's no need for extra complication here
553          */
554         idx = find_first_zero_bit(tgt->lut_client_bitmap, LR_MAX_CLIENTS);
555 repeat:
556         if (idx >= LR_MAX_CLIENTS ||
557             OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
558                 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
559                        tgt->lut_obd->obd_name,  idx);
560                 RETURN(-EOVERFLOW);
561         }
562         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
563                 idx = find_next_zero_bit(tgt->lut_client_bitmap,
564                                              LR_MAX_CLIENTS, idx);
565                 goto repeat;
566         }
567
568         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
569                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
570
571         ted->ted_lr_idx = idx;
572         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
573                           idx * tgt->lut_lsd.lsd_client_size;
574
575         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
576
577         CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n",
578                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
579                ted->ted_lcd->lcd_uuid);
580
581         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
582                 RETURN(-ENOSPC);
583
584         rc = tgt_client_data_update(env, exp);
585         if (rc)
586                 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
587                        tgt->lut_obd->obd_name, idx, rc);
588
589         RETURN(rc);
590 }
591 EXPORT_SYMBOL(tgt_client_new);
592
593 /* Add client data to the MDS.  We use a bitmap to locate a free space
594  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
595  * Otherwise, we just have to read the data from the last_rcvd file and
596  * we know its offset.
597  *
598  * It should not be possible to fail adding an existing client - otherwise
599  * mdt_init_server_data() callsite needs to be fixed.
600  */
601 int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
602 {
603         struct tg_export_data   *ted = &exp->exp_target_data;
604         struct lu_target        *tgt = class_exp2tgt(exp);
605
606         ENTRY;
607
608         LASSERT(tgt->lut_client_bitmap != NULL);
609         LASSERTF(idx >= 0, "%d\n", idx);
610
611         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid) ||
612             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
613                 RETURN(0);
614
615         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
616                 CERROR("%s: client %d: bit already set in bitmap!!\n",
617                        tgt->lut_obd->obd_name,  idx);
618                 LBUG();
619         }
620
621         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
622                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
623
624         ted->ted_lr_idx = idx;
625         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
626                           idx * tgt->lut_lsd.lsd_client_size;
627
628         mutex_init(&ted->ted_lcd_lock);
629
630         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
631
632         RETURN(0);
633 }
634 EXPORT_SYMBOL(tgt_client_add);
635
636 int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
637 {
638         struct tg_export_data   *ted = &exp->exp_target_data;
639         struct lu_target        *tgt = class_exp2tgt(exp);
640         int                      rc;
641
642         ENTRY;
643
644         LASSERT(ted->ted_lcd);
645
646         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
647         if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
648                     (char *)tgt->lut_obd->obd_uuid.uuid) ||
649             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
650                 RETURN(0);
651
652         CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
653                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
654                ted->ted_lcd->lcd_uuid);
655
656         /* Clear the bit _after_ zeroing out the client so we don't
657            race with filter_client_add and zero out new clients.*/
658         if (!test_bit(ted->ted_lr_idx, tgt->lut_client_bitmap)) {
659                 CERROR("%s: client %u: bit already clear in bitmap!!\n",
660                        tgt->lut_obd->obd_name, ted->ted_lr_idx);
661                 LBUG();
662         }
663
664         /* Do not erase record for recoverable client. */
665         if (exp->exp_flags & OBD_OPT_FAILOVER)
666                 RETURN(0);
667
668         /* Make sure the server's last_transno is up to date.
669          * This should be done before zeroing client slot so last_transno will
670          * be in server data or in client data in case of failure */
671         rc = tgt_server_data_update(env, tgt, 0);
672         if (rc != 0) {
673                 CERROR("%s: failed to update server data, skip client %s "
674                        "zeroing, rc %d\n", tgt->lut_obd->obd_name,
675                        ted->ted_lcd->lcd_uuid, rc);
676                 RETURN(rc);
677         }
678
679         mutex_lock(&ted->ted_lcd_lock);
680         memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
681         rc = tgt_client_data_update(env, exp);
682         mutex_unlock(&ted->ted_lcd_lock);
683
684         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
685                "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
686                tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
687                ted->ted_lr_idx, ted->ted_lr_off, rc);
688         RETURN(rc);
689 }
690 EXPORT_SYMBOL(tgt_client_del);
691
692 /*
693  * last_rcvd & last_committed update callbacks
694  */
695 int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
696                          struct dt_object *obj, __u64 opdata,
697                          struct thandle *th, struct ptlrpc_request *req)
698 {
699         struct tgt_thread_info  *tti = tgt_th_info(env);
700         struct tg_export_data   *ted;
701         __u64                   *transno_p;
702         int                      rc = 0;
703         bool                     lw_client, update = false;
704
705         ENTRY;
706
707         ted = &req->rq_export->exp_target_data;
708
709         lw_client = exp_connect_flags(req->rq_export) & OBD_CONNECT_LIGHTWEIGHT;
710         if (ted->ted_lr_idx < 0 && !lw_client)
711                 /* ofd connect may cause transaction before export has
712                  * last_rcvd slot */
713                 RETURN(0);
714
715         tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
716
717         spin_lock(&tgt->lut_translock);
718         if (th->th_result != 0) {
719                 if (tti->tti_transno != 0) {
720                         CERROR("%s: replay transno "LPU64" failed: rc = %d\n",
721                                tgt_name(tgt), tti->tti_transno, th->th_result);
722                 }
723         } else if (tti->tti_transno == 0) {
724                 tti->tti_transno = ++tgt->lut_last_transno;
725         } else {
726                 /* should be replay */
727                 if (tti->tti_transno > tgt->lut_last_transno)
728                         tgt->lut_last_transno = tti->tti_transno;
729         }
730         spin_unlock(&tgt->lut_translock);
731
732         /** VBR: set new versions */
733         if (th->th_result == 0 && obj != NULL)
734                 dt_version_set(env, obj, tti->tti_transno, th);
735
736         /* filling reply data */
737         CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
738                tti->tti_transno, tgt->lut_obd->obd_last_committed);
739
740         req->rq_transno = tti->tti_transno;
741         lustre_msg_set_transno(req->rq_repmsg, tti->tti_transno);
742
743         /* if can't add callback, do sync write */
744         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, req->rq_export,
745                                                 tti->tti_transno);
746
747         if (lw_client) {
748                 /* All operations performed by LW clients are synchronous and
749                  * we store the committed transno in the last_rcvd header */
750                 spin_lock(&tgt->lut_translock);
751                 if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
752                         tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
753                         update = true;
754                 }
755                 spin_unlock(&tgt->lut_translock);
756                 /* Although lightweight (LW) connections have no slot in
757                  * last_rcvd, we still want to maintain the in-memory
758                  * lsd_client_data structure in order to properly handle reply
759                  * reconstruction. */
760         } else if (ted->ted_lr_off == 0) {
761                 CERROR("%s: client idx %d has offset %lld\n",
762                        tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
763                 RETURN(-EINVAL);
764         }
765
766         /* if the export has already been disconnected, we have no last_rcvd
767          * slot, update server data with latest transno then */
768         if (ted->ted_lcd == NULL) {
769                 CWARN("commit transaction for disconnected client %s: rc %d\n",
770                       req->rq_export->exp_client_uuid.uuid, rc);
771                 GOTO(srv_update, rc = 0);
772         }
773
774         mutex_lock(&ted->ted_lcd_lock);
775         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
776         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
777             lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
778                 transno_p = &ted->ted_lcd->lcd_last_close_transno;
779                 ted->ted_lcd->lcd_last_close_xid = req->rq_xid;
780                 ted->ted_lcd->lcd_last_close_result = th->th_result;
781         } else {
782                 /* VBR: save versions in last_rcvd for reconstruct. */
783                 __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
784
785                 if (pre_versions) {
786                         ted->ted_lcd->lcd_pre_versions[0] = pre_versions[0];
787                         ted->ted_lcd->lcd_pre_versions[1] = pre_versions[1];
788                         ted->ted_lcd->lcd_pre_versions[2] = pre_versions[2];
789                         ted->ted_lcd->lcd_pre_versions[3] = pre_versions[3];
790                 }
791                 transno_p = &ted->ted_lcd->lcd_last_transno;
792                 ted->ted_lcd->lcd_last_xid = req->rq_xid;
793                 ted->ted_lcd->lcd_last_result = th->th_result;
794                 /* XXX: lcd_last_data is __u32 but intent_dispostion is __u64,
795                  * see struct ldlm_reply->lock_policy_res1; */
796                 ted->ted_lcd->lcd_last_data = opdata;
797         }
798
799         /* Update transno in slot only if non-zero number, i.e. no errors */
800         if (likely(tti->tti_transno != 0)) {
801                 if (*transno_p > tti->tti_transno &&
802                     !tgt->lut_no_reconstruct) {
803                         CERROR("%s: trying to overwrite bigger transno:"
804                                "on-disk: "LPU64", new: "LPU64" replay: %d. "
805                                "see LU-617.\n", tgt_name(tgt), *transno_p,
806                                tti->tti_transno, req_is_replay(req));
807                         if (req_is_replay(req)) {
808                                 spin_lock(&req->rq_export->exp_lock);
809                                 req->rq_export->exp_vbr_failed = 1;
810                                 spin_unlock(&req->rq_export->exp_lock);
811                         }
812                         mutex_unlock(&ted->ted_lcd_lock);
813                         RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
814                 }
815                 *transno_p = tti->tti_transno;
816         }
817
818         if (!lw_client) {
819                 tti->tti_off = ted->ted_lr_off;
820                 rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
821                 if (rc < 0) {
822                         mutex_unlock(&ted->ted_lcd_lock);
823                         RETURN(rc);
824                 }
825         }
826         mutex_unlock(&ted->ted_lcd_lock);
827         EXIT;
828 srv_update:
829         if (update)
830                 rc = tgt_server_data_write(env, tgt, th);
831         return rc;
832 }
833
834 /*
835  * last_rcvd update for echo client simulation.
836  * It updates last_rcvd client slot and version of object in
837  * simple way but with all locks to simulate all drawbacks
838  */
839 int tgt_last_rcvd_update_echo(const struct lu_env *env, struct lu_target *tgt,
840                               struct dt_object *obj, struct thandle *th,
841                               struct obd_export *exp)
842 {
843         struct tgt_thread_info  *tti = tgt_th_info(env);
844         struct tg_export_data   *ted = &exp->exp_target_data;
845         int                      rc = 0;
846
847         ENTRY;
848
849         tti->tti_transno = 0;
850
851         spin_lock(&tgt->lut_translock);
852         if (th->th_result == 0)
853                 tti->tti_transno = ++tgt->lut_last_transno;
854         spin_unlock(&tgt->lut_translock);
855
856         /** VBR: set new versions */
857         if (th->th_result == 0 && obj != NULL)
858                 dt_version_set(env, obj, tti->tti_transno, th);
859
860         /* if can't add callback, do sync write */
861         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp,
862                                                 tti->tti_transno);
863
864         LASSERT(ted->ted_lr_off > 0);
865
866         mutex_lock(&ted->ted_lcd_lock);
867         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
868         ted->ted_lcd->lcd_last_transno = tti->tti_transno;
869         ted->ted_lcd->lcd_last_result = th->th_result;
870
871         tti->tti_off = ted->ted_lr_off;
872         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
873         mutex_unlock(&ted->ted_lcd_lock);
874         RETURN(rc);
875 }
876
877 int tgt_clients_data_init(const struct lu_env *env, struct lu_target *tgt,
878                           unsigned long last_size)
879 {
880         struct obd_device       *obd = tgt->lut_obd;
881         struct lr_server_data   *lsd = &tgt->lut_lsd;
882         struct lsd_client_data  *lcd = NULL;
883         struct tg_export_data   *ted;
884         int                      cl_idx;
885         int                      rc = 0;
886         loff_t                   off = lsd->lsd_client_start;
887
888         ENTRY;
889
890         CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
891                  sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
892
893         OBD_ALLOC_PTR(lcd);
894         if (lcd == NULL)
895                 RETURN(-ENOMEM);
896
897         for (cl_idx = 0; off < last_size; cl_idx++) {
898                 struct obd_export       *exp;
899                 __u64                    last_transno;
900
901                 /* Don't assume off is incremented properly by
902                  * read_record(), in case sizeof(*lcd)
903                  * isn't the same as fsd->lsd_client_size.  */
904                 off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
905                 rc = tgt_client_data_read(env, tgt, lcd, &off, cl_idx);
906                 if (rc) {
907                         CERROR("%s: error reading last_rcvd %s idx %d off "
908                                "%llu: rc = %d\n", tgt_name(tgt), LAST_RCVD,
909                                cl_idx, off, rc);
910                         rc = 0;
911                         break; /* read error shouldn't cause startup to fail */
912                 }
913
914                 if (lcd->lcd_uuid[0] == '\0') {
915                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
916                                cl_idx);
917                         continue;
918                 }
919
920                 last_transno = lcd_last_transno(lcd);
921
922                 /* These exports are cleaned up by disconnect, so they
923                  * need to be set up like real exports as connect does.
924                  */
925                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
926                        " srv lr: "LPU64" lx: "LPU64"\n", lcd->lcd_uuid, cl_idx,
927                        last_transno, lsd->lsd_last_transno, lcd_last_xid(lcd));
928
929                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
930                 if (IS_ERR(exp)) {
931                         if (PTR_ERR(exp) == -EALREADY) {
932                                 /* export already exists, zero out this one */
933                                 CERROR("%s: Duplicate export %s!\n",
934                                        tgt_name(tgt), lcd->lcd_uuid);
935                                 continue;
936                         }
937                         GOTO(err_out, rc = PTR_ERR(exp));
938                 }
939
940                 ted = &exp->exp_target_data;
941                 *ted->ted_lcd = *lcd;
942
943                 rc = tgt_client_add(env, exp, cl_idx);
944                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
945                 /* VBR: set export last committed version */
946                 exp->exp_last_committed = last_transno;
947                 spin_lock(&exp->exp_lock);
948                 exp->exp_connecting = 0;
949                 exp->exp_in_recovery = 0;
950                 spin_unlock(&exp->exp_lock);
951                 obd->obd_max_recoverable_clients++;
952                 class_export_put(exp);
953
954                 /* Need to check last_rcvd even for duplicated exports. */
955                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
956                        cl_idx, last_transno);
957
958                 spin_lock(&tgt->lut_translock);
959                 tgt->lut_last_transno = max(last_transno,
960                                             tgt->lut_last_transno);
961                 spin_unlock(&tgt->lut_translock);
962         }
963
964 err_out:
965         OBD_FREE_PTR(lcd);
966         RETURN(rc);
967 }
968
969 struct server_compat_data {
970         __u32 rocompat;
971         __u32 incompat;
972         __u32 rocinit;
973         __u32 incinit;
974 };
975
976 static struct server_compat_data tgt_scd[] = {
977         [LDD_F_SV_TYPE_MDT] = {
978                 .rocompat = OBD_ROCOMPAT_LOVOBJID,
979                 .incompat = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
980                             OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR |
981                             OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI,
982                 .rocinit = OBD_ROCOMPAT_LOVOBJID,
983                 .incinit = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
984                            OBD_INCOMPAT_MULTI_OI,
985         },
986         [LDD_F_SV_TYPE_OST] = {
987                 .rocompat = 0,
988                 .incompat = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR |
989                             OBD_INCOMPAT_FID,
990                 .rocinit = 0,
991                 .incinit = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR,
992         }
993 };
994
995 int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
996 {
997         struct tgt_thread_info          *tti = tgt_th_info(env);
998         struct lr_server_data           *lsd = &tgt->lut_lsd;
999         unsigned long                    last_rcvd_size;
1000         __u32                            index;
1001         int                              rc, type;
1002
1003         rc = dt_attr_get(env, tgt->lut_last_rcvd, &tti->tti_attr, BYPASS_CAPA);
1004         if (rc)
1005                 RETURN(rc);
1006
1007         last_rcvd_size = (unsigned long)tti->tti_attr.la_size;
1008
1009         /* ensure padding in the struct is the correct size */
1010         CLASSERT(offsetof(struct lr_server_data, lsd_padding) +
1011                  sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
1012
1013         rc = server_name2index(tgt_name(tgt), &index, NULL);
1014         if (rc < 0) {
1015                 CERROR("%s: Can not get index from name: rc = %d\n",
1016                        tgt_name(tgt), rc);
1017                 RETURN(rc);
1018         }
1019         /* server_name2index() returns type */
1020         type = rc;
1021         if (type != LDD_F_SV_TYPE_MDT && type != LDD_F_SV_TYPE_OST) {
1022                 CERROR("%s: unknown target type %x\n", tgt_name(tgt), type);
1023                 RETURN(-EINVAL);
1024         }
1025
1026         /* last_rcvd on OST doesn't provide reconstruct support because there
1027          * may be up to 8 in-flight write requests per single slot in
1028          * last_rcvd client data
1029          */
1030         tgt->lut_no_reconstruct = (type == LDD_F_SV_TYPE_OST);
1031
1032         if (last_rcvd_size == 0) {
1033                 LCONSOLE_WARN("%s: new disk, initializing\n", tgt_name(tgt));
1034
1035                 memcpy(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid,
1036                        sizeof(lsd->lsd_uuid));
1037                 lsd->lsd_last_transno = 0;
1038                 lsd->lsd_mount_count = 0;
1039                 lsd->lsd_server_size = LR_SERVER_SIZE;
1040                 lsd->lsd_client_start = LR_CLIENT_START;
1041                 lsd->lsd_client_size = LR_CLIENT_SIZE;
1042                 lsd->lsd_subdir_count = OBJ_SUBDIR_COUNT;
1043                 lsd->lsd_osd_index = index;
1044                 lsd->lsd_feature_rocompat = tgt_scd[type].rocinit;
1045                 lsd->lsd_feature_incompat = tgt_scd[type].incinit;
1046         } else {
1047                 rc = tgt_server_data_read(env, tgt);
1048                 if (rc) {
1049                         CERROR("%s: error reading LAST_RCVD: rc= %d\n",
1050                                tgt_name(tgt), rc);
1051                         RETURN(rc);
1052                 }
1053                 if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
1054                         LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s "
1055                                            "using the wrong disk %s. Were the"
1056                                            " /dev/ assignments rearranged?\n",
1057                                            tgt->lut_obd->obd_uuid.uuid,
1058                                            lsd->lsd_uuid);
1059                         RETURN(-EINVAL);
1060                 }
1061
1062                 if (lsd->lsd_osd_index != index) {
1063                         LCONSOLE_ERROR_MSG(0x157, "%s: index %d in last rcvd "
1064                                            "is different with the index %d in"
1065                                            "config log, It might be disk"
1066                                            "corruption!\n", tgt_name(tgt),
1067                                            lsd->lsd_osd_index, index);
1068                         RETURN(-EINVAL);
1069                 }
1070         }
1071
1072         if (lsd->lsd_feature_incompat & ~tgt_scd[type].incompat) {
1073                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
1074                        tgt_name(tgt),
1075                        lsd->lsd_feature_incompat & ~tgt_scd[type].incompat);
1076                 RETURN(-EINVAL);
1077         }
1078
1079         if (type == LDD_F_SV_TYPE_MDT)
1080                 lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
1081
1082         if (lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat) {
1083                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
1084                        tgt_name(tgt),
1085                        lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat);
1086                 RETURN(-EINVAL);
1087         }
1088         /** Interop: evict all clients at first boot with 1.8 last_rcvd */
1089         if (type == LDD_F_SV_TYPE_MDT &&
1090             !(lsd->lsd_feature_compat & OBD_COMPAT_20)) {
1091                 if (last_rcvd_size > lsd->lsd_client_start) {
1092                         LCONSOLE_WARN("%s: mounting at first time on 1.8 FS, "
1093                                       "remove all clients for interop needs\n",
1094                                       tgt_name(tgt));
1095                         rc = tgt_truncate_last_rcvd(env, tgt,
1096                                                     lsd->lsd_client_start);
1097                         if (rc)
1098                                 RETURN(rc);
1099                         last_rcvd_size = lsd->lsd_client_start;
1100                 }
1101                 /** set 2.0 flag to upgrade/downgrade between 1.8 and 2.0 */
1102                 lsd->lsd_feature_compat |= OBD_COMPAT_20;
1103         }
1104
1105         spin_lock(&tgt->lut_translock);
1106         tgt->lut_last_transno = lsd->lsd_last_transno;
1107         spin_unlock(&tgt->lut_translock);
1108
1109         lsd->lsd_mount_count++;
1110
1111         CDEBUG(D_INODE, "=======,=BEGIN DUMPING LAST_RCVD========\n");
1112         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
1113                tgt_name(tgt), tgt->lut_last_transno);
1114         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
1115                tgt_name(tgt), lsd->lsd_mount_count);
1116         CDEBUG(D_INODE, "%s: server data size: %u\n",
1117                tgt_name(tgt), lsd->lsd_server_size);
1118         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
1119                tgt_name(tgt), lsd->lsd_client_start);
1120         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
1121                tgt_name(tgt), lsd->lsd_client_size);
1122         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
1123                tgt_name(tgt), last_rcvd_size);
1124         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
1125                tgt_name(tgt), lsd->lsd_subdir_count);
1126         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", tgt_name(tgt),
1127                last_rcvd_size <= lsd->lsd_client_start ? 0 :
1128                (last_rcvd_size - lsd->lsd_client_start) /
1129                 lsd->lsd_client_size);
1130         CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
1131
1132         if (lsd->lsd_server_size == 0 || lsd->lsd_client_start == 0 ||
1133             lsd->lsd_client_size == 0) {
1134                 CERROR("%s: bad last_rcvd contents!\n", tgt_name(tgt));
1135                 RETURN(-EINVAL);
1136         }
1137
1138         if (!tgt->lut_obd->obd_replayable)
1139                 CWARN("%s: recovery support OFF\n", tgt_name(tgt));
1140
1141         rc = tgt_clients_data_init(env, tgt, last_rcvd_size);
1142         if (rc < 0)
1143                 GOTO(err_client, rc);
1144
1145         spin_lock(&tgt->lut_translock);
1146         /* obd_last_committed is used for compatibility
1147          * with other lustre recovery code */
1148         tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
1149         spin_unlock(&tgt->lut_translock);
1150
1151         tgt->lut_obd->u.obt.obt_mount_count = lsd->lsd_mount_count;
1152         tgt->lut_obd->u.obt.obt_instance = (__u32)lsd->lsd_mount_count;
1153
1154         /* save it, so mount count and last_transno is current */
1155         rc = tgt_server_data_update(env, tgt, 0);
1156         if (rc < 0)
1157                 GOTO(err_client, rc);
1158
1159         RETURN(0);
1160
1161 err_client:
1162         class_disconnect_exports(tgt->lut_obd);
1163         return rc;
1164 }
1165
1166 /* add credits for last_rcvd update */
1167 int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
1168                      void *cookie)
1169 {
1170         struct lu_target        *tgt = cookie;
1171         struct tgt_session_info *tsi;
1172         struct tgt_thread_info  *tti = tgt_th_info(env);
1173         int                      rc;
1174
1175         /* if there is no session, then this transaction is not result of
1176          * request processing but some local operation */
1177         if (env->le_ses == NULL)
1178                 return 0;
1179
1180         LASSERT(tgt->lut_last_rcvd);
1181         tsi = tgt_ses_info(env);
1182         /* OFD may start transaction without export assigned */
1183         if (tsi->tsi_exp == NULL)
1184                 return 0;
1185
1186         tti_buf_lcd(tti);
1187         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
1188                                      &tti->tti_buf,
1189                                      tsi->tsi_exp->exp_target_data.ted_lr_off,
1190                                      th);
1191         if (rc)
1192                 return rc;
1193
1194         tti_buf_lsd(tti);
1195         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
1196                                      &tti->tti_buf, 0, th);
1197         if (rc)
1198                 return rc;
1199
1200         if (tsi->tsi_vbr_obj != NULL &&
1201             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu))
1202                 rc = dt_declare_version_set(env, tsi->tsi_vbr_obj, th);
1203
1204         return rc;
1205 }
1206
1207 /* Update last_rcvd records with latests transaction data */
1208 int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
1209                     void *cookie)
1210 {
1211         struct lu_target        *tgt = cookie;
1212         struct tgt_session_info *tsi;
1213         struct tgt_thread_info  *tti = tgt_th_info(env);
1214         struct dt_object        *obj = NULL;
1215         int                      rc;
1216         bool                     echo_client;
1217
1218         if (env->le_ses == NULL)
1219                 return 0;
1220
1221         tsi = tgt_ses_info(env);
1222         /* OFD may start transaction without export assigned */
1223         if (tsi->tsi_exp == NULL)
1224                 return 0;
1225
1226         echo_client = (tgt_ses_req(tsi) == NULL);
1227
1228         if (tti->tti_has_trans && !echo_client) {
1229                 if (tti->tti_mult_trans == 0) {
1230                         CDEBUG(D_HA, "More than one transaction "LPU64"\n",
1231                                tti->tti_transno);
1232                         RETURN(0);
1233                 }
1234                 /* we need another transno to be assigned */
1235                 tti->tti_transno = 0;
1236         } else if (th->th_result == 0) {
1237                 tti->tti_has_trans = 1;
1238         }
1239
1240         if (tsi->tsi_vbr_obj != NULL &&
1241             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
1242                 obj = tsi->tsi_vbr_obj;
1243         }
1244
1245         if (unlikely(echo_client)) /* echo client special case */
1246                 rc = tgt_last_rcvd_update_echo(env, tgt, obj, th,
1247                                                tsi->tsi_exp);
1248         else
1249                 rc = tgt_last_rcvd_update(env, tgt, obj, tsi->tsi_opdata, th,
1250                                           tgt_ses_req(tsi));
1251         return rc;
1252 }