Whamcloud - gitweb
LU-6047 lustre: remove Size on MDS support
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Lustre Unified Target
37  * These are common function to work with last_received file
38  *
39  * Author: Mikhail Pershin <mike.pershin@intel.com>
40  */
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <lustre_fid.h>
44
45 #include "tgt_internal.h"
46
47 static inline struct lu_buf *tti_buf_lsd(struct tgt_thread_info *tti)
48 {
49         tti->tti_buf.lb_buf = &tti->tti_lsd;
50         tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
51         return &tti->tti_buf;
52 }
53
54 static inline struct lu_buf *tti_buf_lcd(struct tgt_thread_info *tti)
55 {
56         tti->tti_buf.lb_buf = &tti->tti_lcd;
57         tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
58         return &tti->tti_buf;
59 }
60
61 /**
62  * Allocate in-memory data for client slot related to export.
63  */
64 int tgt_client_alloc(struct obd_export *exp)
65 {
66         ENTRY;
67         LASSERT(exp != exp->exp_obd->obd_self_export);
68
69         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
70         if (exp->exp_target_data.ted_lcd == NULL)
71                 RETURN(-ENOMEM);
72         /* Mark that slot is not yet valid, 0 doesn't work here */
73         exp->exp_target_data.ted_lr_idx = -1;
74         RETURN(0);
75 }
76 EXPORT_SYMBOL(tgt_client_alloc);
77
78 /**
79  * Free in-memory data for client slot related to export.
80  */
81 void tgt_client_free(struct obd_export *exp)
82 {
83         struct tg_export_data   *ted = &exp->exp_target_data;
84         struct lu_target        *lut = class_exp2tgt(exp);
85
86         LASSERT(exp != exp->exp_obd->obd_self_export);
87
88         OBD_FREE_PTR(ted->ted_lcd);
89         ted->ted_lcd = NULL;
90
91         /* Slot may be not yet assigned */
92         if (ted->ted_lr_idx < 0)
93                 return;
94         /* Clear bit when lcd is freed */
95         LASSERT(lut->lut_client_bitmap);
96         if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
97                 CERROR("%s: client %u bit already clear in bitmap\n",
98                        exp->exp_obd->obd_name, ted->ted_lr_idx);
99                 LBUG();
100         }
101 }
102 EXPORT_SYMBOL(tgt_client_free);
103
104 int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
105                          struct lsd_client_data *lcd, loff_t *off, int index)
106 {
107         struct tgt_thread_info  *tti = tgt_th_info(env);
108         int                      rc;
109
110         tti_buf_lcd(tti);
111         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
112         if (rc == 0) {
113                 check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
114                 lcd_le_to_cpu(&tti->tti_lcd, lcd);
115                 lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
116                 lcd->lcd_last_close_result =
117                         ptlrpc_status_ntoh(lcd->lcd_last_close_result);
118         }
119
120         CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = "LPU64
121                ", last_xid = "LPU64", last_result = %u, last_data = %u, "
122                "last_close_transno = "LPU64", last_close_xid = "LPU64", "
123                "last_close_result = %u, rc = %d\n", tgt->lut_obd->obd_name,
124                *off, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
125                lcd->lcd_last_result, lcd->lcd_last_data,
126                lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
127                lcd->lcd_last_close_result, rc);
128         return rc;
129 }
130
131 int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
132                           struct lsd_client_data *lcd, loff_t *off,
133                           struct thandle *th)
134 {
135         struct tgt_thread_info *tti = tgt_th_info(env);
136
137         lcd->lcd_last_result = ptlrpc_status_hton(lcd->lcd_last_result);
138         lcd->lcd_last_close_result =
139                 ptlrpc_status_hton(lcd->lcd_last_close_result);
140         lcd_cpu_to_le(lcd, &tti->tti_lcd);
141         tti_buf_lcd(tti);
142
143         return dt_record_write(env, tgt->lut_last_rcvd, &tti->tti_buf, off, th);
144 }
145
146 /**
147  * Update client data in last_rcvd
148  */
149 static int tgt_client_data_update(const struct lu_env *env,
150                                   struct obd_export *exp)
151 {
152         struct tg_export_data   *ted = &exp->exp_target_data;
153         struct lu_target        *tgt = class_exp2tgt(exp);
154         struct tgt_thread_info  *tti = tgt_th_info(env);
155         struct thandle          *th;
156         int                      rc = 0;
157
158         ENTRY;
159
160         th = dt_trans_create(env, tgt->lut_bottom);
161         if (IS_ERR(th))
162                 RETURN(PTR_ERR(th));
163
164         tti_buf_lcd(tti);
165         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
166                                      &tti->tti_buf,
167                                      ted->ted_lr_off, th);
168         if (rc)
169                 GOTO(out, rc);
170
171         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
172         if (rc)
173                 GOTO(out, rc);
174         /*
175          * Until this operations will be committed the sync is needed
176          * for this export. This should be done _after_ starting the
177          * transaction so that many connecting clients will not bring
178          * server down with lots of sync writes.
179          */
180         rc = tgt_new_client_cb_add(th, exp);
181         if (rc) {
182                 /* can't add callback, do sync now */
183                 th->th_sync = 1;
184         } else {
185                 spin_lock(&exp->exp_lock);
186                 exp->exp_need_sync = 1;
187                 spin_unlock(&exp->exp_lock);
188         }
189
190         tti->tti_off = ted->ted_lr_off;
191         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
192         EXIT;
193 out:
194         dt_trans_stop(env, tgt->lut_bottom, th);
195         CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
196                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
197                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
198
199         return rc;
200 }
201
202 int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
203 {
204         struct tgt_thread_info  *tti = tgt_th_info(env);
205         int                      rc;
206
207         tti->tti_off = 0;
208         tti_buf_lsd(tti);
209         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf,
210                             &tti->tti_off);
211         if (rc == 0)
212                 lsd_le_to_cpu(&tti->tti_lsd, &tgt->lut_lsd);
213
214         CDEBUG(D_INFO, "%s: read last_rcvd server data for UUID = %s, "
215                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
216                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
217         return rc;
218 }
219
220 int tgt_server_data_write(const struct lu_env *env, struct lu_target *tgt,
221                           struct thandle *th)
222 {
223         struct tgt_thread_info  *tti = tgt_th_info(env);
224         int                      rc;
225
226         ENTRY;
227
228         tti->tti_off = 0;
229         tti_buf_lsd(tti);
230         lsd_cpu_to_le(&tgt->lut_lsd, &tti->tti_lsd);
231
232         rc = dt_record_write(env, tgt->lut_last_rcvd, &tti->tti_buf,
233                              &tti->tti_off, th);
234
235         CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
236                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
237                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
238
239         RETURN(rc);
240 }
241
242 /**
243  * Update server data in last_rcvd
244  */
245 int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
246                            int sync)
247 {
248         struct tgt_thread_info  *tti = tgt_th_info(env);
249         struct thandle          *th;
250         int                      rc = 0;
251
252         ENTRY;
253
254         CDEBUG(D_SUPER,
255                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
256                tgt->lut_lsd.lsd_uuid, tgt->lut_obd->u.obt.obt_mount_count,
257                tgt->lut_last_transno);
258
259         /* Always save latest transno to keep it fresh */
260         spin_lock(&tgt->lut_translock);
261         tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
262         spin_unlock(&tgt->lut_translock);
263
264         th = dt_trans_create(env, tgt->lut_bottom);
265         if (IS_ERR(th))
266                 RETURN(PTR_ERR(th));
267
268         th->th_sync = sync;
269
270         tti_buf_lsd(tti);
271         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
272                                      &tti->tti_buf, tti->tti_off, th);
273         if (rc)
274                 GOTO(out, rc);
275
276         rc = dt_trans_start(env, tgt->lut_bottom, th);
277         if (rc)
278                 GOTO(out, rc);
279
280         rc = tgt_server_data_write(env, tgt, th);
281 out:
282         dt_trans_stop(env, tgt->lut_bottom, th);
283
284         CDEBUG(D_INFO, "%s: update last_rcvd server data for UUID = %s, "
285                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
286                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
287         RETURN(rc);
288 }
289 EXPORT_SYMBOL(tgt_server_data_update);
290
291 int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tgt,
292                            loff_t size)
293 {
294         struct dt_object *dt = tgt->lut_last_rcvd;
295         struct thandle   *th;
296         struct lu_attr    attr;
297         int               rc;
298
299         ENTRY;
300
301         attr.la_size = size;
302         attr.la_valid = LA_SIZE;
303
304         th = dt_trans_create(env, tgt->lut_bottom);
305         if (IS_ERR(th))
306                 RETURN(PTR_ERR(th));
307         rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
308         if (rc)
309                 GOTO(cleanup, rc);
310         rc = dt_declare_attr_set(env, dt, &attr, th);
311         if (rc)
312                 GOTO(cleanup, rc);
313         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
314         if (rc)
315                 GOTO(cleanup, rc);
316
317         rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th);
318         if (rc == 0)
319                 rc = dt_attr_set(env, dt, &attr, th);
320
321 cleanup:
322         dt_trans_stop(env, tgt->lut_bottom, th);
323
324         RETURN(rc);
325 }
326
327 static void tgt_client_epoch_update(const struct lu_env *env,
328                                     struct obd_export *exp)
329 {
330         struct lsd_client_data  *lcd = exp->exp_target_data.ted_lcd;
331         struct lu_target        *tgt = class_exp2tgt(exp);
332
333         LASSERT(tgt->lut_bottom);
334         /** VBR: set client last_epoch to current epoch */
335         if (lcd->lcd_last_epoch >= tgt->lut_lsd.lsd_start_epoch)
336                 return;
337         lcd->lcd_last_epoch = tgt->lut_lsd.lsd_start_epoch;
338         tgt_client_data_update(env, exp);
339 }
340
341 /**
342  * Update boot epoch when recovery ends
343  */
344 void tgt_boot_epoch_update(struct lu_target *tgt)
345 {
346         struct lu_env            env;
347         struct ptlrpc_request   *req;
348         __u32                    start_epoch;
349         struct list_head         client_list;
350         int                      rc;
351
352         if (tgt->lut_obd->obd_stopping)
353                 return;
354
355         rc = lu_env_init(&env, LCT_DT_THREAD);
356         if (rc) {
357                 CERROR("%s: can't initialize environment: rc = %d\n",
358                         tgt->lut_obd->obd_name, rc);
359                 return;
360         }
361
362         spin_lock(&tgt->lut_translock);
363         start_epoch = lr_epoch(tgt->lut_last_transno) + 1;
364         tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
365         tgt->lut_lsd.lsd_start_epoch = start_epoch;
366         spin_unlock(&tgt->lut_translock);
367
368         INIT_LIST_HEAD(&client_list);
369         /**
370          * The recovery is not yet finished and final queue can still be updated
371          * with resend requests. Move final list to separate one for processing
372          */
373         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
374         list_splice_init(&tgt->lut_obd->obd_final_req_queue, &client_list);
375         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
376
377         /**
378          * go through list of exports participated in recovery and
379          * set new epoch for them
380          */
381         list_for_each_entry(req, &client_list, rq_list) {
382                 LASSERT(!req->rq_export->exp_delayed);
383                 if (!req->rq_export->exp_vbr_failed)
384                         tgt_client_epoch_update(&env, req->rq_export);
385         }
386         /** return list back at once */
387         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
388         list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
389         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
390         /** update server epoch */
391         tgt_server_data_update(&env, tgt, 1);
392         lu_env_fini(&env);
393 }
394
395 /**
396  * commit callback, need to update last_commited value
397  */
398 struct tgt_last_committed_callback {
399         struct dt_txn_commit_cb  llcc_cb;
400         struct lu_target        *llcc_tgt;
401         struct obd_export       *llcc_exp;
402         __u64                    llcc_transno;
403 };
404
405 static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
406                                   struct dt_txn_commit_cb *cb, int err)
407 {
408         struct tgt_last_committed_callback *ccb;
409
410         ccb = container_of0(cb, struct tgt_last_committed_callback, llcc_cb);
411
412         LASSERT(ccb->llcc_tgt != NULL);
413         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
414
415         spin_lock(&ccb->llcc_tgt->lut_translock);
416         if (ccb->llcc_transno > ccb->llcc_tgt->lut_obd->obd_last_committed)
417                 ccb->llcc_tgt->lut_obd->obd_last_committed = ccb->llcc_transno;
418
419         LASSERT(ccb->llcc_exp);
420         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
421                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
422                 spin_unlock(&ccb->llcc_tgt->lut_translock);
423                 ptlrpc_commit_replies(ccb->llcc_exp);
424         } else {
425                 spin_unlock(&ccb->llcc_tgt->lut_translock);
426         }
427         class_export_cb_put(ccb->llcc_exp);
428         if (ccb->llcc_transno)
429                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
430                        ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
431         OBD_FREE_PTR(ccb);
432 }
433
434 int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
435                            struct obd_export *exp, __u64 transno)
436 {
437         struct tgt_last_committed_callback      *ccb;
438         struct dt_txn_commit_cb                 *dcb;
439         int                                      rc;
440
441         OBD_ALLOC_PTR(ccb);
442         if (ccb == NULL)
443                 return -ENOMEM;
444
445         ccb->llcc_tgt = tgt;
446         ccb->llcc_exp = class_export_cb_get(exp);
447         ccb->llcc_transno = transno;
448
449         dcb = &ccb->llcc_cb;
450         dcb->dcb_func = tgt_cb_last_committed;
451         INIT_LIST_HEAD(&dcb->dcb_linkage);
452         strlcpy(dcb->dcb_name, "tgt_cb_last_committed", sizeof(dcb->dcb_name));
453
454         rc = dt_trans_cb_add(th, dcb);
455         if (rc) {
456                 class_export_cb_put(exp);
457                 OBD_FREE_PTR(ccb);
458         }
459
460         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
461                 /* report failure to force synchronous operation */
462                 return -EPERM;
463
464         return rc;
465 }
466
467 struct tgt_new_client_callback {
468         struct dt_txn_commit_cb  lncc_cb;
469         struct obd_export       *lncc_exp;
470 };
471
472 static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
473                               struct dt_txn_commit_cb *cb, int err)
474 {
475         struct tgt_new_client_callback *ccb;
476
477         ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
478
479         LASSERT(ccb->lncc_exp->exp_obd);
480
481         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
482                ccb->lncc_exp->exp_obd->obd_name,
483                ccb->lncc_exp->exp_client_uuid.uuid);
484
485         spin_lock(&ccb->lncc_exp->exp_lock);
486         /* XXX: Currently, we use per-export based sync/async policy for
487          *      the update via OUT RPC, it is coarse-grained policy, and
488          *      will be changed as per-request based by DNE II patches. */
489         if (!ccb->lncc_exp->exp_keep_sync)
490                 ccb->lncc_exp->exp_need_sync = 0;
491
492         spin_unlock(&ccb->lncc_exp->exp_lock);
493         class_export_cb_put(ccb->lncc_exp);
494
495         OBD_FREE_PTR(ccb);
496 }
497
498 int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
499 {
500         struct tgt_new_client_callback  *ccb;
501         struct dt_txn_commit_cb         *dcb;
502         int                              rc;
503
504         OBD_ALLOC_PTR(ccb);
505         if (ccb == NULL)
506                 return -ENOMEM;
507
508         ccb->lncc_exp = class_export_cb_get(exp);
509
510         dcb = &ccb->lncc_cb;
511         dcb->dcb_func = tgt_cb_new_client;
512         INIT_LIST_HEAD(&dcb->dcb_linkage);
513         strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
514
515         rc = dt_trans_cb_add(th, dcb);
516         if (rc) {
517                 class_export_cb_put(exp);
518                 OBD_FREE_PTR(ccb);
519         }
520         return rc;
521 }
522
523 /**
524  * Add new client to the last_rcvd upon new connection.
525  *
526  * We use a bitmap to locate a free space in the last_rcvd file and initialize
527  * tg_export_data.
528  */
529 int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
530 {
531         struct tg_export_data   *ted = &exp->exp_target_data;
532         struct lu_target        *tgt = class_exp2tgt(exp);
533         int                      rc = 0, idx;
534
535         ENTRY;
536
537         LASSERT(tgt->lut_client_bitmap != NULL);
538         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid))
539                 RETURN(0);
540
541         mutex_init(&ted->ted_lcd_lock);
542
543         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
544                 RETURN(0);
545
546         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
547          * there's no need for extra complication here
548          */
549         idx = find_first_zero_bit(tgt->lut_client_bitmap, LR_MAX_CLIENTS);
550 repeat:
551         if (idx >= LR_MAX_CLIENTS ||
552             OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
553                 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
554                        tgt->lut_obd->obd_name,  idx);
555                 RETURN(-EOVERFLOW);
556         }
557         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
558                 idx = find_next_zero_bit(tgt->lut_client_bitmap,
559                                              LR_MAX_CLIENTS, idx);
560                 goto repeat;
561         }
562
563         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
564                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
565
566         ted->ted_lr_idx = idx;
567         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
568                           idx * tgt->lut_lsd.lsd_client_size;
569
570         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
571
572         CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n",
573                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
574                ted->ted_lcd->lcd_uuid);
575
576         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
577                 RETURN(-ENOSPC);
578
579         rc = tgt_client_data_update(env, exp);
580         if (rc)
581                 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
582                        tgt->lut_obd->obd_name, idx, rc);
583
584         RETURN(rc);
585 }
586 EXPORT_SYMBOL(tgt_client_new);
587
588 /* Add client data to the MDS.  We use a bitmap to locate a free space
589  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
590  * Otherwise, we just have to read the data from the last_rcvd file and
591  * we know its offset.
592  *
593  * It should not be possible to fail adding an existing client - otherwise
594  * mdt_init_server_data() callsite needs to be fixed.
595  */
596 int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
597 {
598         struct tg_export_data   *ted = &exp->exp_target_data;
599         struct lu_target        *tgt = class_exp2tgt(exp);
600
601         ENTRY;
602
603         LASSERT(tgt->lut_client_bitmap != NULL);
604         LASSERTF(idx >= 0, "%d\n", idx);
605
606         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid) ||
607             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
608                 RETURN(0);
609
610         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
611                 CERROR("%s: client %d: bit already set in bitmap!!\n",
612                        tgt->lut_obd->obd_name,  idx);
613                 LBUG();
614         }
615
616         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
617                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
618
619         ted->ted_lr_idx = idx;
620         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
621                           idx * tgt->lut_lsd.lsd_client_size;
622
623         mutex_init(&ted->ted_lcd_lock);
624
625         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
626
627         RETURN(0);
628 }
629
630 int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
631 {
632         struct tg_export_data   *ted = &exp->exp_target_data;
633         struct lu_target        *tgt = class_exp2tgt(exp);
634         int                      rc;
635
636         ENTRY;
637
638         LASSERT(ted->ted_lcd);
639
640         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
641         if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
642                     (char *)tgt->lut_obd->obd_uuid.uuid) ||
643             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
644                 RETURN(0);
645
646         CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
647                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
648                ted->ted_lcd->lcd_uuid);
649
650         /* Clear the bit _after_ zeroing out the client so we don't
651            race with filter_client_add and zero out new clients.*/
652         if (!test_bit(ted->ted_lr_idx, tgt->lut_client_bitmap)) {
653                 CERROR("%s: client %u: bit already clear in bitmap!!\n",
654                        tgt->lut_obd->obd_name, ted->ted_lr_idx);
655                 LBUG();
656         }
657
658         /* Do not erase record for recoverable client. */
659         if (exp->exp_flags & OBD_OPT_FAILOVER)
660                 RETURN(0);
661
662         /* Make sure the server's last_transno is up to date.
663          * This should be done before zeroing client slot so last_transno will
664          * be in server data or in client data in case of failure */
665         rc = tgt_server_data_update(env, tgt, 0);
666         if (rc != 0) {
667                 CERROR("%s: failed to update server data, skip client %s "
668                        "zeroing, rc %d\n", tgt->lut_obd->obd_name,
669                        ted->ted_lcd->lcd_uuid, rc);
670                 RETURN(rc);
671         }
672
673         mutex_lock(&ted->ted_lcd_lock);
674         memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
675         rc = tgt_client_data_update(env, exp);
676         mutex_unlock(&ted->ted_lcd_lock);
677
678         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
679                "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
680                tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
681                ted->ted_lr_idx, ted->ted_lr_off, rc);
682         RETURN(rc);
683 }
684 EXPORT_SYMBOL(tgt_client_del);
685
686 /*
687  * last_rcvd & last_committed update callbacks
688  */
689 static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
690                                 struct dt_object *obj, __u64 opdata,
691                                 struct thandle *th, struct ptlrpc_request *req)
692 {
693         struct tgt_thread_info  *tti = tgt_th_info(env);
694         struct tg_export_data   *ted;
695         __u64                   *transno_p;
696         int                      rc = 0;
697         bool                     lw_client, update = false;
698
699         ENTRY;
700
701         ted = &req->rq_export->exp_target_data;
702
703         lw_client = exp_connect_flags(req->rq_export) & OBD_CONNECT_LIGHTWEIGHT;
704         if (ted->ted_lr_idx < 0 && !lw_client)
705                 /* ofd connect may cause transaction before export has
706                  * last_rcvd slot */
707                 RETURN(0);
708
709         tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
710
711         spin_lock(&tgt->lut_translock);
712         if (th->th_result != 0) {
713                 if (tti->tti_transno != 0) {
714                         CERROR("%s: replay transno "LPU64" failed: rc = %d\n",
715                                tgt_name(tgt), tti->tti_transno, th->th_result);
716                 }
717         } else if (tti->tti_transno == 0) {
718                 tti->tti_transno = ++tgt->lut_last_transno;
719         } else {
720                 /* should be replay */
721                 if (tti->tti_transno > tgt->lut_last_transno)
722                         tgt->lut_last_transno = tti->tti_transno;
723         }
724         spin_unlock(&tgt->lut_translock);
725
726         /** VBR: set new versions */
727         if (th->th_result == 0 && obj != NULL)
728                 dt_version_set(env, obj, tti->tti_transno, th);
729
730         /* filling reply data */
731         CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
732                tti->tti_transno, tgt->lut_obd->obd_last_committed);
733
734         req->rq_transno = tti->tti_transno;
735         lustre_msg_set_transno(req->rq_repmsg, tti->tti_transno);
736
737         /* if can't add callback, do sync write */
738         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, req->rq_export,
739                                                 tti->tti_transno);
740
741         if (lw_client) {
742                 /* All operations performed by LW clients are synchronous and
743                  * we store the committed transno in the last_rcvd header */
744                 spin_lock(&tgt->lut_translock);
745                 if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
746                         tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
747                         update = true;
748                 }
749                 spin_unlock(&tgt->lut_translock);
750                 /* Although lightweight (LW) connections have no slot in
751                  * last_rcvd, we still want to maintain the in-memory
752                  * lsd_client_data structure in order to properly handle reply
753                  * reconstruction. */
754         } else if (ted->ted_lr_off == 0) {
755                 CERROR("%s: client idx %d has offset %lld\n",
756                        tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
757                 RETURN(-EINVAL);
758         }
759
760         /* if the export has already been disconnected, we have no last_rcvd
761          * slot, update server data with latest transno then */
762         if (ted->ted_lcd == NULL) {
763                 CWARN("commit transaction for disconnected client %s: rc %d\n",
764                       req->rq_export->exp_client_uuid.uuid, rc);
765                 GOTO(srv_update, rc = 0);
766         }
767
768         mutex_lock(&ted->ted_lcd_lock);
769         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
770         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
771                 transno_p = &ted->ted_lcd->lcd_last_close_transno;
772                 ted->ted_lcd->lcd_last_close_xid = req->rq_xid;
773                 ted->ted_lcd->lcd_last_close_result = th->th_result;
774         } else {
775                 /* VBR: save versions in last_rcvd for reconstruct. */
776                 __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
777
778                 if (pre_versions) {
779                         ted->ted_lcd->lcd_pre_versions[0] = pre_versions[0];
780                         ted->ted_lcd->lcd_pre_versions[1] = pre_versions[1];
781                         ted->ted_lcd->lcd_pre_versions[2] = pre_versions[2];
782                         ted->ted_lcd->lcd_pre_versions[3] = pre_versions[3];
783                 }
784                 transno_p = &ted->ted_lcd->lcd_last_transno;
785                 ted->ted_lcd->lcd_last_xid = req->rq_xid;
786                 ted->ted_lcd->lcd_last_result = th->th_result;
787                 /* XXX: lcd_last_data is __u32 but intent_dispostion is __u64,
788                  * see struct ldlm_reply->lock_policy_res1; */
789                 ted->ted_lcd->lcd_last_data = opdata;
790         }
791
792         /* Update transno in slot only if non-zero number, i.e. no errors */
793         if (likely(tti->tti_transno != 0)) {
794                 if (*transno_p > tti->tti_transno &&
795                     !tgt->lut_no_reconstruct) {
796                         CERROR("%s: trying to overwrite bigger transno:"
797                                "on-disk: "LPU64", new: "LPU64" replay: %d. "
798                                "see LU-617.\n", tgt_name(tgt), *transno_p,
799                                tti->tti_transno, req_is_replay(req));
800                         if (req_is_replay(req)) {
801                                 spin_lock(&req->rq_export->exp_lock);
802                                 req->rq_export->exp_vbr_failed = 1;
803                                 spin_unlock(&req->rq_export->exp_lock);
804                         }
805                         mutex_unlock(&ted->ted_lcd_lock);
806                         RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
807                 }
808                 *transno_p = tti->tti_transno;
809         }
810
811         if (!lw_client) {
812                 tti->tti_off = ted->ted_lr_off;
813                 rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
814                 if (rc < 0) {
815                         mutex_unlock(&ted->ted_lcd_lock);
816                         RETURN(rc);
817                 }
818         }
819         mutex_unlock(&ted->ted_lcd_lock);
820         EXIT;
821 srv_update:
822         if (update)
823                 rc = tgt_server_data_write(env, tgt, th);
824         return rc;
825 }
826
827 /*
828  * last_rcvd update for echo client simulation.
829  * It updates last_rcvd client slot and version of object in
830  * simple way but with all locks to simulate all drawbacks
831  */
832 static int tgt_last_rcvd_update_echo(const struct lu_env *env,
833                                      struct lu_target *tgt,
834                                      struct dt_object *obj,
835                                      struct thandle *th,
836                                      struct obd_export *exp)
837 {
838         struct tgt_thread_info  *tti = tgt_th_info(env);
839         struct tg_export_data   *ted = &exp->exp_target_data;
840         int                      rc = 0;
841
842         ENTRY;
843
844         tti->tti_transno = 0;
845
846         spin_lock(&tgt->lut_translock);
847         if (th->th_result == 0)
848                 tti->tti_transno = ++tgt->lut_last_transno;
849         spin_unlock(&tgt->lut_translock);
850
851         /** VBR: set new versions */
852         if (th->th_result == 0 && obj != NULL)
853                 dt_version_set(env, obj, tti->tti_transno, th);
854
855         /* if can't add callback, do sync write */
856         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp,
857                                                 tti->tti_transno);
858
859         LASSERT(ted->ted_lr_off > 0);
860
861         mutex_lock(&ted->ted_lcd_lock);
862         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
863         ted->ted_lcd->lcd_last_transno = tti->tti_transno;
864         ted->ted_lcd->lcd_last_result = th->th_result;
865
866         tti->tti_off = ted->ted_lr_off;
867         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
868         mutex_unlock(&ted->ted_lcd_lock);
869         RETURN(rc);
870 }
871
872 static int tgt_clients_data_init(const struct lu_env *env,
873                                  struct lu_target *tgt,
874                                  unsigned long last_size)
875 {
876         struct obd_device       *obd = tgt->lut_obd;
877         struct lr_server_data   *lsd = &tgt->lut_lsd;
878         struct lsd_client_data  *lcd = NULL;
879         struct tg_export_data   *ted;
880         int                      cl_idx;
881         int                      rc = 0;
882         loff_t                   off = lsd->lsd_client_start;
883
884         ENTRY;
885
886         CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
887                  sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
888
889         OBD_ALLOC_PTR(lcd);
890         if (lcd == NULL)
891                 RETURN(-ENOMEM);
892
893         for (cl_idx = 0; off < last_size; cl_idx++) {
894                 struct obd_export       *exp;
895                 __u64                    last_transno;
896
897                 /* Don't assume off is incremented properly by
898                  * read_record(), in case sizeof(*lcd)
899                  * isn't the same as fsd->lsd_client_size.  */
900                 off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
901                 rc = tgt_client_data_read(env, tgt, lcd, &off, cl_idx);
902                 if (rc) {
903                         CERROR("%s: error reading last_rcvd %s idx %d off "
904                                "%llu: rc = %d\n", tgt_name(tgt), LAST_RCVD,
905                                cl_idx, off, rc);
906                         rc = 0;
907                         break; /* read error shouldn't cause startup to fail */
908                 }
909
910                 if (lcd->lcd_uuid[0] == '\0') {
911                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
912                                cl_idx);
913                         continue;
914                 }
915
916                 last_transno = lcd_last_transno(lcd);
917
918                 /* These exports are cleaned up by disconnect, so they
919                  * need to be set up like real exports as connect does.
920                  */
921                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
922                        " srv lr: "LPU64" lx: "LPU64"\n", lcd->lcd_uuid, cl_idx,
923                        last_transno, lsd->lsd_last_transno, lcd_last_xid(lcd));
924
925                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
926                 if (IS_ERR(exp)) {
927                         if (PTR_ERR(exp) == -EALREADY) {
928                                 /* export already exists, zero out this one */
929                                 CERROR("%s: Duplicate export %s!\n",
930                                        tgt_name(tgt), lcd->lcd_uuid);
931                                 continue;
932                         }
933                         GOTO(err_out, rc = PTR_ERR(exp));
934                 }
935
936                 ted = &exp->exp_target_data;
937                 *ted->ted_lcd = *lcd;
938
939                 rc = tgt_client_add(env, exp, cl_idx);
940                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
941                 /* VBR: set export last committed version */
942                 exp->exp_last_committed = last_transno;
943                 spin_lock(&exp->exp_lock);
944                 exp->exp_connecting = 0;
945                 exp->exp_in_recovery = 0;
946                 spin_unlock(&exp->exp_lock);
947                 obd->obd_max_recoverable_clients++;
948                 class_export_put(exp);
949
950                 /* Need to check last_rcvd even for duplicated exports. */
951                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
952                        cl_idx, last_transno);
953
954                 spin_lock(&tgt->lut_translock);
955                 tgt->lut_last_transno = max(last_transno,
956                                             tgt->lut_last_transno);
957                 spin_unlock(&tgt->lut_translock);
958         }
959
960 err_out:
961         OBD_FREE_PTR(lcd);
962         RETURN(rc);
963 }
964
965 struct server_compat_data {
966         __u32 rocompat;
967         __u32 incompat;
968         __u32 rocinit;
969         __u32 incinit;
970 };
971
972 static struct server_compat_data tgt_scd[] = {
973         [LDD_F_SV_TYPE_MDT] = {
974                 .rocompat = OBD_ROCOMPAT_LOVOBJID,
975                 .incompat = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
976                             OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR |
977                             OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI,
978                 .rocinit = OBD_ROCOMPAT_LOVOBJID,
979                 .incinit = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
980                            OBD_INCOMPAT_MULTI_OI,
981         },
982         [LDD_F_SV_TYPE_OST] = {
983                 .rocompat = OBD_ROCOMPAT_IDX_IN_IDIF,
984                 .incompat = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR |
985                             OBD_INCOMPAT_FID,
986                 .rocinit = OBD_ROCOMPAT_IDX_IN_IDIF,
987                 .incinit = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR,
988         }
989 };
990
991 int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
992 {
993         struct tgt_thread_info          *tti = tgt_th_info(env);
994         struct lr_server_data           *lsd = &tgt->lut_lsd;
995         unsigned long                    last_rcvd_size;
996         __u32                            index;
997         int                              rc, type;
998
999         rc = dt_attr_get(env, tgt->lut_last_rcvd, &tti->tti_attr);
1000         if (rc)
1001                 RETURN(rc);
1002
1003         last_rcvd_size = (unsigned long)tti->tti_attr.la_size;
1004
1005         /* ensure padding in the struct is the correct size */
1006         CLASSERT(offsetof(struct lr_server_data, lsd_padding) +
1007                  sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
1008
1009         rc = server_name2index(tgt_name(tgt), &index, NULL);
1010         if (rc < 0) {
1011                 CERROR("%s: Can not get index from name: rc = %d\n",
1012                        tgt_name(tgt), rc);
1013                 RETURN(rc);
1014         }
1015         /* server_name2index() returns type */
1016         type = rc;
1017         if (type != LDD_F_SV_TYPE_MDT && type != LDD_F_SV_TYPE_OST) {
1018                 CERROR("%s: unknown target type %x\n", tgt_name(tgt), type);
1019                 RETURN(-EINVAL);
1020         }
1021
1022         /* last_rcvd on OST doesn't provide reconstruct support because there
1023          * may be up to 8 in-flight write requests per single slot in
1024          * last_rcvd client data
1025          */
1026         tgt->lut_no_reconstruct = (type == LDD_F_SV_TYPE_OST);
1027
1028         if (last_rcvd_size == 0) {
1029                 LCONSOLE_WARN("%s: new disk, initializing\n", tgt_name(tgt));
1030
1031                 memcpy(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid,
1032                        sizeof(lsd->lsd_uuid));
1033                 lsd->lsd_last_transno = 0;
1034                 lsd->lsd_mount_count = 0;
1035                 lsd->lsd_server_size = LR_SERVER_SIZE;
1036                 lsd->lsd_client_start = LR_CLIENT_START;
1037                 lsd->lsd_client_size = LR_CLIENT_SIZE;
1038                 lsd->lsd_subdir_count = OBJ_SUBDIR_COUNT;
1039                 lsd->lsd_osd_index = index;
1040                 lsd->lsd_feature_rocompat = tgt_scd[type].rocinit;
1041                 lsd->lsd_feature_incompat = tgt_scd[type].incinit;
1042         } else {
1043                 rc = tgt_server_data_read(env, tgt);
1044                 if (rc) {
1045                         CERROR("%s: error reading LAST_RCVD: rc= %d\n",
1046                                tgt_name(tgt), rc);
1047                         RETURN(rc);
1048                 }
1049                 if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
1050                         LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s "
1051                                            "using the wrong disk %s. Were the"
1052                                            " /dev/ assignments rearranged?\n",
1053                                            tgt->lut_obd->obd_uuid.uuid,
1054                                            lsd->lsd_uuid);
1055                         RETURN(-EINVAL);
1056                 }
1057
1058                 if (lsd->lsd_osd_index != index) {
1059                         LCONSOLE_ERROR_MSG(0x157, "%s: index %d in last rcvd "
1060                                            "is different with the index %d in"
1061                                            "config log, It might be disk"
1062                                            "corruption!\n", tgt_name(tgt),
1063                                            lsd->lsd_osd_index, index);
1064                         RETURN(-EINVAL);
1065                 }
1066         }
1067
1068         if (lsd->lsd_feature_incompat & ~tgt_scd[type].incompat) {
1069                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
1070                        tgt_name(tgt),
1071                        lsd->lsd_feature_incompat & ~tgt_scd[type].incompat);
1072                 RETURN(-EINVAL);
1073         }
1074
1075         if (type == LDD_F_SV_TYPE_MDT)
1076                 lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
1077
1078         if (lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat) {
1079                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
1080                        tgt_name(tgt),
1081                        lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat);
1082                 RETURN(-EINVAL);
1083         }
1084         /** Interop: evict all clients at first boot with 1.8 last_rcvd */
1085         if (type == LDD_F_SV_TYPE_MDT &&
1086             !(lsd->lsd_feature_compat & OBD_COMPAT_20)) {
1087                 if (last_rcvd_size > lsd->lsd_client_start) {
1088                         LCONSOLE_WARN("%s: mounting at first time on 1.8 FS, "
1089                                       "remove all clients for interop needs\n",
1090                                       tgt_name(tgt));
1091                         rc = tgt_truncate_last_rcvd(env, tgt,
1092                                                     lsd->lsd_client_start);
1093                         if (rc)
1094                                 RETURN(rc);
1095                         last_rcvd_size = lsd->lsd_client_start;
1096                 }
1097                 /** set 2.0 flag to upgrade/downgrade between 1.8 and 2.0 */
1098                 lsd->lsd_feature_compat |= OBD_COMPAT_20;
1099         }
1100
1101         spin_lock(&tgt->lut_translock);
1102         tgt->lut_last_transno = lsd->lsd_last_transno;
1103         spin_unlock(&tgt->lut_translock);
1104
1105         lsd->lsd_mount_count++;
1106
1107         CDEBUG(D_INODE, "=======,=BEGIN DUMPING LAST_RCVD========\n");
1108         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
1109                tgt_name(tgt), tgt->lut_last_transno);
1110         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
1111                tgt_name(tgt), lsd->lsd_mount_count);
1112         CDEBUG(D_INODE, "%s: server data size: %u\n",
1113                tgt_name(tgt), lsd->lsd_server_size);
1114         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
1115                tgt_name(tgt), lsd->lsd_client_start);
1116         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
1117                tgt_name(tgt), lsd->lsd_client_size);
1118         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
1119                tgt_name(tgt), last_rcvd_size);
1120         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
1121                tgt_name(tgt), lsd->lsd_subdir_count);
1122         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", tgt_name(tgt),
1123                last_rcvd_size <= lsd->lsd_client_start ? 0 :
1124                (last_rcvd_size - lsd->lsd_client_start) /
1125                 lsd->lsd_client_size);
1126         CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
1127
1128         if (lsd->lsd_server_size == 0 || lsd->lsd_client_start == 0 ||
1129             lsd->lsd_client_size == 0) {
1130                 CERROR("%s: bad last_rcvd contents!\n", tgt_name(tgt));
1131                 RETURN(-EINVAL);
1132         }
1133
1134         if (!tgt->lut_obd->obd_replayable)
1135                 CWARN("%s: recovery support OFF\n", tgt_name(tgt));
1136
1137         rc = tgt_clients_data_init(env, tgt, last_rcvd_size);
1138         if (rc < 0)
1139                 GOTO(err_client, rc);
1140
1141         spin_lock(&tgt->lut_translock);
1142         /* obd_last_committed is used for compatibility
1143          * with other lustre recovery code */
1144         tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
1145         spin_unlock(&tgt->lut_translock);
1146
1147         tgt->lut_obd->u.obt.obt_mount_count = lsd->lsd_mount_count;
1148         tgt->lut_obd->u.obt.obt_instance = (__u32)lsd->lsd_mount_count;
1149
1150         /* save it, so mount count and last_transno is current */
1151         rc = tgt_server_data_update(env, tgt, 0);
1152         if (rc < 0)
1153                 GOTO(err_client, rc);
1154
1155         RETURN(0);
1156
1157 err_client:
1158         class_disconnect_exports(tgt->lut_obd);
1159         return rc;
1160 }
1161
1162 /* add credits for last_rcvd update */
1163 int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
1164                      void *cookie)
1165 {
1166         struct lu_target        *tgt = cookie;
1167         struct tgt_session_info *tsi;
1168         struct tgt_thread_info  *tti = tgt_th_info(env);
1169         int                      rc;
1170
1171         /* if there is no session, then this transaction is not result of
1172          * request processing but some local operation */
1173         if (env->le_ses == NULL)
1174                 return 0;
1175
1176         LASSERT(tgt->lut_last_rcvd);
1177         tsi = tgt_ses_info(env);
1178         /* OFD may start transaction without export assigned */
1179         if (tsi->tsi_exp == NULL)
1180                 return 0;
1181
1182         tti_buf_lcd(tti);
1183         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
1184                                      &tti->tti_buf,
1185                                      tsi->tsi_exp->exp_target_data.ted_lr_off,
1186                                      th);
1187         if (rc)
1188                 return rc;
1189
1190         tti_buf_lsd(tti);
1191         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
1192                                      &tti->tti_buf, 0, th);
1193         if (rc)
1194                 return rc;
1195
1196         if (tsi->tsi_vbr_obj != NULL &&
1197             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu))
1198                 rc = dt_declare_version_set(env, tsi->tsi_vbr_obj, th);
1199
1200         return rc;
1201 }
1202
1203 /* Update last_rcvd records with latests transaction data */
1204 int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
1205                     void *cookie)
1206 {
1207         struct lu_target        *tgt = cookie;
1208         struct tgt_session_info *tsi;
1209         struct tgt_thread_info  *tti = tgt_th_info(env);
1210         struct dt_object        *obj = NULL;
1211         int                      rc;
1212         bool                     echo_client;
1213
1214         if (env->le_ses == NULL)
1215                 return 0;
1216
1217         tsi = tgt_ses_info(env);
1218         /* OFD may start transaction without export assigned */
1219         if (tsi->tsi_exp == NULL)
1220                 return 0;
1221
1222         echo_client = (tgt_ses_req(tsi) == NULL);
1223
1224         if (tti->tti_has_trans && !echo_client) {
1225                 if (tti->tti_mult_trans == 0) {
1226                         CDEBUG(D_HA, "More than one transaction "LPU64"\n",
1227                                tti->tti_transno);
1228                         RETURN(0);
1229                 }
1230                 /* we need another transno to be assigned */
1231                 tti->tti_transno = 0;
1232         } else if (th->th_result == 0) {
1233                 tti->tti_has_trans = 1;
1234         }
1235
1236         if (tsi->tsi_vbr_obj != NULL &&
1237             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
1238                 obj = tsi->tsi_vbr_obj;
1239         }
1240
1241         if (unlikely(echo_client)) /* echo client special case */
1242                 rc = tgt_last_rcvd_update_echo(env, tgt, obj, th,
1243                                                tsi->tsi_exp);
1244         else
1245                 rc = tgt_last_rcvd_update(env, tgt, obj, tsi->tsi_opdata, th,
1246                                           tgt_ses_req(tsi));
1247         return rc;
1248 }