Whamcloud - gitweb
c7b1d21b80adbd7544240aad9d51f790d8b98867
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Lustre Unified Target
37  * These are common function to work with last_received file
38  *
39  * Author: Mikhail Pershin <mike.pershin@intel.com>
40  */
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <lustre_fid.h>
44
45 #include "tgt_internal.h"
46
47 static inline struct lu_buf *tti_buf_lsd(struct tgt_thread_info *tti)
48 {
49         tti->tti_buf.lb_buf = &tti->tti_lsd;
50         tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
51         return &tti->tti_buf;
52 }
53
54 static inline struct lu_buf *tti_buf_lcd(struct tgt_thread_info *tti)
55 {
56         tti->tti_buf.lb_buf = &tti->tti_lcd;
57         tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
58         return &tti->tti_buf;
59 }
60
61 /**
62  * Allocate in-memory data for client slot related to export.
63  */
64 int tgt_client_alloc(struct obd_export *exp)
65 {
66         ENTRY;
67         LASSERT(exp != exp->exp_obd->obd_self_export);
68
69         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
70         if (exp->exp_target_data.ted_lcd == NULL)
71                 RETURN(-ENOMEM);
72         /* Mark that slot is not yet valid, 0 doesn't work here */
73         exp->exp_target_data.ted_lr_idx = -1;
74         RETURN(0);
75 }
76 EXPORT_SYMBOL(tgt_client_alloc);
77
78 /**
79  * Free in-memory data for client slot related to export.
80  */
81 void tgt_client_free(struct obd_export *exp)
82 {
83         struct tg_export_data   *ted = &exp->exp_target_data;
84         struct lu_target        *lut = class_exp2tgt(exp);
85
86         LASSERT(exp != exp->exp_obd->obd_self_export);
87
88         OBD_FREE_PTR(ted->ted_lcd);
89         ted->ted_lcd = NULL;
90
91         /* Slot may be not yet assigned */
92         if (ted->ted_lr_idx < 0)
93                 return;
94         /* Clear bit when lcd is freed */
95         LASSERT(lut->lut_client_bitmap);
96         if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
97                 CERROR("%s: client %u bit already clear in bitmap\n",
98                        exp->exp_obd->obd_name, ted->ted_lr_idx);
99                 LBUG();
100         }
101 }
102 EXPORT_SYMBOL(tgt_client_free);
103
104 int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
105                          struct lsd_client_data *lcd, loff_t *off, int index)
106 {
107         struct tgt_thread_info  *tti = tgt_th_info(env);
108         int                      rc;
109
110         tti_buf_lcd(tti);
111         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
112         if (rc == 0) {
113                 check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
114                 lcd_le_to_cpu(&tti->tti_lcd, lcd);
115                 lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
116                 lcd->lcd_last_close_result =
117                         ptlrpc_status_ntoh(lcd->lcd_last_close_result);
118         }
119
120         CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = "LPU64
121                ", last_xid = "LPU64", last_result = %u, last_data = %u, "
122                "last_close_transno = "LPU64", last_close_xid = "LPU64", "
123                "last_close_result = %u, rc = %d\n", tgt->lut_obd->obd_name,
124                *off, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
125                lcd->lcd_last_result, lcd->lcd_last_data,
126                lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
127                lcd->lcd_last_close_result, rc);
128         return rc;
129 }
130 EXPORT_SYMBOL(tgt_client_data_read);
131
132 int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
133                           struct lsd_client_data *lcd, loff_t *off,
134                           struct thandle *th)
135 {
136         struct tgt_thread_info *tti = tgt_th_info(env);
137
138         lcd->lcd_last_result = ptlrpc_status_hton(lcd->lcd_last_result);
139         lcd->lcd_last_close_result =
140                 ptlrpc_status_hton(lcd->lcd_last_close_result);
141         lcd_cpu_to_le(lcd, &tti->tti_lcd);
142         tti_buf_lcd(tti);
143
144         return dt_record_write(env, tgt->lut_last_rcvd, &tti->tti_buf, off, th);
145 }
146 EXPORT_SYMBOL(tgt_client_data_write);
147
148 /**
149  * Update client data in last_rcvd
150  */
151 static int tgt_client_data_update(const struct lu_env *env,
152                                   struct obd_export *exp)
153 {
154         struct tg_export_data   *ted = &exp->exp_target_data;
155         struct lu_target        *tgt = class_exp2tgt(exp);
156         struct tgt_thread_info  *tti = tgt_th_info(env);
157         struct thandle          *th;
158         int                      rc = 0;
159
160         ENTRY;
161
162         th = dt_trans_create(env, tgt->lut_bottom);
163         if (IS_ERR(th))
164                 RETURN(PTR_ERR(th));
165
166         tti_buf_lcd(tti);
167         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
168                                      &tti->tti_buf,
169                                      ted->ted_lr_off, th);
170         if (rc)
171                 GOTO(out, rc);
172
173         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
174         if (rc)
175                 GOTO(out, rc);
176         /*
177          * Until this operations will be committed the sync is needed
178          * for this export. This should be done _after_ starting the
179          * transaction so that many connecting clients will not bring
180          * server down with lots of sync writes.
181          */
182         rc = tgt_new_client_cb_add(th, exp);
183         if (rc) {
184                 /* can't add callback, do sync now */
185                 th->th_sync = 1;
186         } else {
187                 spin_lock(&exp->exp_lock);
188                 exp->exp_need_sync = 1;
189                 spin_unlock(&exp->exp_lock);
190         }
191
192         tti->tti_off = ted->ted_lr_off;
193         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
194         EXIT;
195 out:
196         dt_trans_stop(env, tgt->lut_bottom, th);
197         CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
198                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
199                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
200
201         return rc;
202 }
203
204 int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
205 {
206         struct tgt_thread_info  *tti = tgt_th_info(env);
207         int                      rc;
208
209         tti->tti_off = 0;
210         tti_buf_lsd(tti);
211         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf,
212                             &tti->tti_off);
213         if (rc == 0)
214                 lsd_le_to_cpu(&tti->tti_lsd, &tgt->lut_lsd);
215
216         CDEBUG(D_INFO, "%s: read last_rcvd server data for UUID = %s, "
217                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
218                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
219         return rc;
220 }
221 EXPORT_SYMBOL(tgt_server_data_read);
222
223 int tgt_server_data_write(const struct lu_env *env, struct lu_target *tgt,
224                           struct thandle *th)
225 {
226         struct tgt_thread_info  *tti = tgt_th_info(env);
227         int                      rc;
228
229         ENTRY;
230
231         tti->tti_off = 0;
232         tti_buf_lsd(tti);
233         lsd_cpu_to_le(&tgt->lut_lsd, &tti->tti_lsd);
234
235         rc = dt_record_write(env, tgt->lut_last_rcvd, &tti->tti_buf,
236                              &tti->tti_off, th);
237
238         CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
239                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
240                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
241
242         RETURN(rc);
243 }
244 EXPORT_SYMBOL(tgt_server_data_write);
245
246 /**
247  * Update server data in last_rcvd
248  */
249 int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
250                            int sync)
251 {
252         struct tgt_thread_info  *tti = tgt_th_info(env);
253         struct thandle          *th;
254         int                      rc = 0;
255
256         ENTRY;
257
258         CDEBUG(D_SUPER,
259                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
260                tgt->lut_lsd.lsd_uuid, tgt->lut_obd->u.obt.obt_mount_count,
261                tgt->lut_last_transno);
262
263         /* Always save latest transno to keep it fresh */
264         spin_lock(&tgt->lut_translock);
265         tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
266         spin_unlock(&tgt->lut_translock);
267
268         th = dt_trans_create(env, tgt->lut_bottom);
269         if (IS_ERR(th))
270                 RETURN(PTR_ERR(th));
271
272         th->th_sync = sync;
273
274         tti_buf_lsd(tti);
275         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
276                                      &tti->tti_buf, tti->tti_off, th);
277         if (rc)
278                 GOTO(out, rc);
279
280         rc = dt_trans_start(env, tgt->lut_bottom, th);
281         if (rc)
282                 GOTO(out, rc);
283
284         rc = tgt_server_data_write(env, tgt, th);
285 out:
286         dt_trans_stop(env, tgt->lut_bottom, th);
287
288         CDEBUG(D_INFO, "%s: update last_rcvd server data for UUID = %s, "
289                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
290                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
291         RETURN(rc);
292 }
293 EXPORT_SYMBOL(tgt_server_data_update);
294
295 int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tgt,
296                            loff_t size)
297 {
298         struct dt_object *dt = tgt->lut_last_rcvd;
299         struct thandle   *th;
300         struct lu_attr    attr;
301         int               rc;
302
303         ENTRY;
304
305         attr.la_size = size;
306         attr.la_valid = LA_SIZE;
307
308         th = dt_trans_create(env, tgt->lut_bottom);
309         if (IS_ERR(th))
310                 RETURN(PTR_ERR(th));
311         rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
312         if (rc)
313                 GOTO(cleanup, rc);
314         rc = dt_declare_attr_set(env, dt, &attr, th);
315         if (rc)
316                 GOTO(cleanup, rc);
317         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
318         if (rc)
319                 GOTO(cleanup, rc);
320
321         rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th, BYPASS_CAPA);
322         if (rc == 0)
323                 rc = dt_attr_set(env, dt, &attr, th, BYPASS_CAPA);
324
325 cleanup:
326         dt_trans_stop(env, tgt->lut_bottom, th);
327
328         RETURN(rc);
329 }
330 EXPORT_SYMBOL(tgt_truncate_last_rcvd);
331
332 static void tgt_client_epoch_update(const struct lu_env *env,
333                                     struct obd_export *exp)
334 {
335         struct lsd_client_data  *lcd = exp->exp_target_data.ted_lcd;
336         struct lu_target        *tgt = class_exp2tgt(exp);
337
338         LASSERT(tgt->lut_bottom);
339         /** VBR: set client last_epoch to current epoch */
340         if (lcd->lcd_last_epoch >= tgt->lut_lsd.lsd_start_epoch)
341                 return;
342         lcd->lcd_last_epoch = tgt->lut_lsd.lsd_start_epoch;
343         tgt_client_data_update(env, exp);
344 }
345
346 /**
347  * Update boot epoch when recovery ends
348  */
349 void tgt_boot_epoch_update(struct lu_target *tgt)
350 {
351         struct lu_env            env;
352         struct ptlrpc_request   *req;
353         __u32                    start_epoch;
354         struct list_head         client_list;
355         int                      rc;
356
357         if (tgt->lut_obd->obd_stopping)
358                 return;
359
360         rc = lu_env_init(&env, LCT_DT_THREAD);
361         if (rc) {
362                 CERROR("%s: can't initialize environment: rc = %d\n",
363                         tgt->lut_obd->obd_name, rc);
364                 return;
365         }
366
367         spin_lock(&tgt->lut_translock);
368         start_epoch = lr_epoch(tgt->lut_last_transno) + 1;
369         tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
370         tgt->lut_lsd.lsd_start_epoch = start_epoch;
371         spin_unlock(&tgt->lut_translock);
372
373         INIT_LIST_HEAD(&client_list);
374         /**
375          * The recovery is not yet finished and final queue can still be updated
376          * with resend requests. Move final list to separate one for processing
377          */
378         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
379         list_splice_init(&tgt->lut_obd->obd_final_req_queue, &client_list);
380         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
381
382         /**
383          * go through list of exports participated in recovery and
384          * set new epoch for them
385          */
386         list_for_each_entry(req, &client_list, rq_list) {
387                 LASSERT(!req->rq_export->exp_delayed);
388                 if (!req->rq_export->exp_vbr_failed)
389                         tgt_client_epoch_update(&env, req->rq_export);
390         }
391         /** return list back at once */
392         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
393         list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
394         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
395         /** update server epoch */
396         tgt_server_data_update(&env, tgt, 1);
397         lu_env_fini(&env);
398 }
399 EXPORT_SYMBOL(tgt_boot_epoch_update);
400
401 /**
402  * commit callback, need to update last_commited value
403  */
404 struct tgt_last_committed_callback {
405         struct dt_txn_commit_cb  llcc_cb;
406         struct lu_target        *llcc_tgt;
407         struct obd_export       *llcc_exp;
408         __u64                    llcc_transno;
409 };
410
411 static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
412                                   struct dt_txn_commit_cb *cb, int err)
413 {
414         struct tgt_last_committed_callback *ccb;
415
416         ccb = container_of0(cb, struct tgt_last_committed_callback, llcc_cb);
417
418         LASSERT(ccb->llcc_tgt != NULL);
419         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
420
421         spin_lock(&ccb->llcc_tgt->lut_translock);
422         if (ccb->llcc_transno > ccb->llcc_tgt->lut_obd->obd_last_committed)
423                 ccb->llcc_tgt->lut_obd->obd_last_committed = ccb->llcc_transno;
424
425         LASSERT(ccb->llcc_exp);
426         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
427                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
428                 spin_unlock(&ccb->llcc_tgt->lut_translock);
429                 ptlrpc_commit_replies(ccb->llcc_exp);
430         } else {
431                 spin_unlock(&ccb->llcc_tgt->lut_translock);
432         }
433         class_export_cb_put(ccb->llcc_exp);
434         if (ccb->llcc_transno)
435                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
436                        ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
437         OBD_FREE_PTR(ccb);
438 }
439
440 int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
441                            struct obd_export *exp, __u64 transno)
442 {
443         struct tgt_last_committed_callback      *ccb;
444         struct dt_txn_commit_cb                 *dcb;
445         int                                      rc;
446
447         OBD_ALLOC_PTR(ccb);
448         if (ccb == NULL)
449                 return -ENOMEM;
450
451         ccb->llcc_tgt = tgt;
452         ccb->llcc_exp = class_export_cb_get(exp);
453         ccb->llcc_transno = transno;
454
455         dcb = &ccb->llcc_cb;
456         dcb->dcb_func = tgt_cb_last_committed;
457         INIT_LIST_HEAD(&dcb->dcb_linkage);
458         strlcpy(dcb->dcb_name, "tgt_cb_last_committed", sizeof(dcb->dcb_name));
459
460         rc = dt_trans_cb_add(th, dcb);
461         if (rc) {
462                 class_export_cb_put(exp);
463                 OBD_FREE_PTR(ccb);
464         }
465
466         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
467                 /* report failure to force synchronous operation */
468                 return -EPERM;
469
470         return rc;
471 }
472 EXPORT_SYMBOL(tgt_last_commit_cb_add);
473
474 struct tgt_new_client_callback {
475         struct dt_txn_commit_cb  lncc_cb;
476         struct obd_export       *lncc_exp;
477 };
478
479 static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
480                               struct dt_txn_commit_cb *cb, int err)
481 {
482         struct tgt_new_client_callback *ccb;
483
484         ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
485
486         LASSERT(ccb->lncc_exp->exp_obd);
487
488         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
489                ccb->lncc_exp->exp_obd->obd_name,
490                ccb->lncc_exp->exp_client_uuid.uuid);
491
492         spin_lock(&ccb->lncc_exp->exp_lock);
493         /* XXX: Currently, we use per-export based sync/async policy for
494          *      the update via OUT RPC, it is coarse-grained policy, and
495          *      will be changed as per-request based by DNE II patches. */
496         if (!ccb->lncc_exp->exp_keep_sync)
497                 ccb->lncc_exp->exp_need_sync = 0;
498
499         spin_unlock(&ccb->lncc_exp->exp_lock);
500         class_export_cb_put(ccb->lncc_exp);
501
502         OBD_FREE_PTR(ccb);
503 }
504
505 int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
506 {
507         struct tgt_new_client_callback  *ccb;
508         struct dt_txn_commit_cb         *dcb;
509         int                              rc;
510
511         OBD_ALLOC_PTR(ccb);
512         if (ccb == NULL)
513                 return -ENOMEM;
514
515         ccb->lncc_exp = class_export_cb_get(exp);
516
517         dcb = &ccb->lncc_cb;
518         dcb->dcb_func = tgt_cb_new_client;
519         INIT_LIST_HEAD(&dcb->dcb_linkage);
520         strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
521
522         rc = dt_trans_cb_add(th, dcb);
523         if (rc) {
524                 class_export_cb_put(exp);
525                 OBD_FREE_PTR(ccb);
526         }
527         return rc;
528 }
529
530 /**
531  * Add new client to the last_rcvd upon new connection.
532  *
533  * We use a bitmap to locate a free space in the last_rcvd file and initialize
534  * tg_export_data.
535  */
536 int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
537 {
538         struct tg_export_data   *ted = &exp->exp_target_data;
539         struct lu_target        *tgt = class_exp2tgt(exp);
540         int                      rc = 0, idx;
541
542         ENTRY;
543
544         LASSERT(tgt->lut_client_bitmap != NULL);
545         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid))
546                 RETURN(0);
547
548         mutex_init(&ted->ted_lcd_lock);
549
550         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
551                 RETURN(0);
552
553         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
554          * there's no need for extra complication here
555          */
556         idx = find_first_zero_bit(tgt->lut_client_bitmap, LR_MAX_CLIENTS);
557 repeat:
558         if (idx >= LR_MAX_CLIENTS ||
559             OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
560                 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
561                        tgt->lut_obd->obd_name,  idx);
562                 RETURN(-EOVERFLOW);
563         }
564         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
565                 idx = find_next_zero_bit(tgt->lut_client_bitmap,
566                                              LR_MAX_CLIENTS, idx);
567                 goto repeat;
568         }
569
570         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
571                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
572
573         ted->ted_lr_idx = idx;
574         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
575                           idx * tgt->lut_lsd.lsd_client_size;
576
577         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
578
579         CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n",
580                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
581                ted->ted_lcd->lcd_uuid);
582
583         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
584                 RETURN(-ENOSPC);
585
586         rc = tgt_client_data_update(env, exp);
587         if (rc)
588                 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
589                        tgt->lut_obd->obd_name, idx, rc);
590
591         RETURN(rc);
592 }
593 EXPORT_SYMBOL(tgt_client_new);
594
595 /* Add client data to the MDS.  We use a bitmap to locate a free space
596  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
597  * Otherwise, we just have to read the data from the last_rcvd file and
598  * we know its offset.
599  *
600  * It should not be possible to fail adding an existing client - otherwise
601  * mdt_init_server_data() callsite needs to be fixed.
602  */
603 int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
604 {
605         struct tg_export_data   *ted = &exp->exp_target_data;
606         struct lu_target        *tgt = class_exp2tgt(exp);
607
608         ENTRY;
609
610         LASSERT(tgt->lut_client_bitmap != NULL);
611         LASSERTF(idx >= 0, "%d\n", idx);
612
613         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid) ||
614             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
615                 RETURN(0);
616
617         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
618                 CERROR("%s: client %d: bit already set in bitmap!!\n",
619                        tgt->lut_obd->obd_name,  idx);
620                 LBUG();
621         }
622
623         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
624                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
625
626         ted->ted_lr_idx = idx;
627         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
628                           idx * tgt->lut_lsd.lsd_client_size;
629
630         mutex_init(&ted->ted_lcd_lock);
631
632         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
633
634         RETURN(0);
635 }
636 EXPORT_SYMBOL(tgt_client_add);
637
638 int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
639 {
640         struct tg_export_data   *ted = &exp->exp_target_data;
641         struct lu_target        *tgt = class_exp2tgt(exp);
642         int                      rc;
643
644         ENTRY;
645
646         LASSERT(ted->ted_lcd);
647
648         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
649         if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
650                     (char *)tgt->lut_obd->obd_uuid.uuid) ||
651             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
652                 RETURN(0);
653
654         CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
655                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
656                ted->ted_lcd->lcd_uuid);
657
658         /* Clear the bit _after_ zeroing out the client so we don't
659            race with filter_client_add and zero out new clients.*/
660         if (!test_bit(ted->ted_lr_idx, tgt->lut_client_bitmap)) {
661                 CERROR("%s: client %u: bit already clear in bitmap!!\n",
662                        tgt->lut_obd->obd_name, ted->ted_lr_idx);
663                 LBUG();
664         }
665
666         /* Do not erase record for recoverable client. */
667         if (exp->exp_flags & OBD_OPT_FAILOVER)
668                 RETURN(0);
669
670         /* Make sure the server's last_transno is up to date.
671          * This should be done before zeroing client slot so last_transno will
672          * be in server data or in client data in case of failure */
673         rc = tgt_server_data_update(env, tgt, 0);
674         if (rc != 0) {
675                 CERROR("%s: failed to update server data, skip client %s "
676                        "zeroing, rc %d\n", tgt->lut_obd->obd_name,
677                        ted->ted_lcd->lcd_uuid, rc);
678                 RETURN(rc);
679         }
680
681         mutex_lock(&ted->ted_lcd_lock);
682         memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
683         rc = tgt_client_data_update(env, exp);
684         mutex_unlock(&ted->ted_lcd_lock);
685
686         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
687                "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
688                tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
689                ted->ted_lr_idx, ted->ted_lr_off, rc);
690         RETURN(rc);
691 }
692 EXPORT_SYMBOL(tgt_client_del);
693
694 /*
695  * last_rcvd & last_committed update callbacks
696  */
697 static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
698                                 struct dt_object *obj, __u64 opdata,
699                                 struct thandle *th, struct ptlrpc_request *req)
700 {
701         struct tgt_thread_info  *tti = tgt_th_info(env);
702         struct tg_export_data   *ted;
703         __u64                   *transno_p;
704         int                      rc = 0;
705         bool                     lw_client, update = false;
706
707         ENTRY;
708
709         ted = &req->rq_export->exp_target_data;
710
711         lw_client = exp_connect_flags(req->rq_export) & OBD_CONNECT_LIGHTWEIGHT;
712         if (ted->ted_lr_idx < 0 && !lw_client)
713                 /* ofd connect may cause transaction before export has
714                  * last_rcvd slot */
715                 RETURN(0);
716
717         tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
718
719         spin_lock(&tgt->lut_translock);
720         if (th->th_result != 0) {
721                 if (tti->tti_transno != 0) {
722                         CERROR("%s: replay transno "LPU64" failed: rc = %d\n",
723                                tgt_name(tgt), tti->tti_transno, th->th_result);
724                 }
725         } else if (tti->tti_transno == 0) {
726                 tti->tti_transno = ++tgt->lut_last_transno;
727         } else {
728                 /* should be replay */
729                 if (tti->tti_transno > tgt->lut_last_transno)
730                         tgt->lut_last_transno = tti->tti_transno;
731         }
732         spin_unlock(&tgt->lut_translock);
733
734         /** VBR: set new versions */
735         if (th->th_result == 0 && obj != NULL)
736                 dt_version_set(env, obj, tti->tti_transno, th);
737
738         /* filling reply data */
739         CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
740                tti->tti_transno, tgt->lut_obd->obd_last_committed);
741
742         req->rq_transno = tti->tti_transno;
743         lustre_msg_set_transno(req->rq_repmsg, tti->tti_transno);
744
745         /* if can't add callback, do sync write */
746         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, req->rq_export,
747                                                 tti->tti_transno);
748
749         if (lw_client) {
750                 /* All operations performed by LW clients are synchronous and
751                  * we store the committed transno in the last_rcvd header */
752                 spin_lock(&tgt->lut_translock);
753                 if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
754                         tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
755                         update = true;
756                 }
757                 spin_unlock(&tgt->lut_translock);
758                 /* Although lightweight (LW) connections have no slot in
759                  * last_rcvd, we still want to maintain the in-memory
760                  * lsd_client_data structure in order to properly handle reply
761                  * reconstruction. */
762         } else if (ted->ted_lr_off == 0) {
763                 CERROR("%s: client idx %d has offset %lld\n",
764                        tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
765                 RETURN(-EINVAL);
766         }
767
768         /* if the export has already been disconnected, we have no last_rcvd
769          * slot, update server data with latest transno then */
770         if (ted->ted_lcd == NULL) {
771                 CWARN("commit transaction for disconnected client %s: rc %d\n",
772                       req->rq_export->exp_client_uuid.uuid, rc);
773                 GOTO(srv_update, rc = 0);
774         }
775
776         mutex_lock(&ted->ted_lcd_lock);
777         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
778         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
779             lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
780                 transno_p = &ted->ted_lcd->lcd_last_close_transno;
781                 ted->ted_lcd->lcd_last_close_xid = req->rq_xid;
782                 ted->ted_lcd->lcd_last_close_result = th->th_result;
783         } else {
784                 /* VBR: save versions in last_rcvd for reconstruct. */
785                 __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
786
787                 if (pre_versions) {
788                         ted->ted_lcd->lcd_pre_versions[0] = pre_versions[0];
789                         ted->ted_lcd->lcd_pre_versions[1] = pre_versions[1];
790                         ted->ted_lcd->lcd_pre_versions[2] = pre_versions[2];
791                         ted->ted_lcd->lcd_pre_versions[3] = pre_versions[3];
792                 }
793                 transno_p = &ted->ted_lcd->lcd_last_transno;
794                 ted->ted_lcd->lcd_last_xid = req->rq_xid;
795                 ted->ted_lcd->lcd_last_result = th->th_result;
796                 /* XXX: lcd_last_data is __u32 but intent_dispostion is __u64,
797                  * see struct ldlm_reply->lock_policy_res1; */
798                 ted->ted_lcd->lcd_last_data = opdata;
799         }
800
801         /* Update transno in slot only if non-zero number, i.e. no errors */
802         if (likely(tti->tti_transno != 0)) {
803                 if (*transno_p > tti->tti_transno &&
804                     !tgt->lut_no_reconstruct) {
805                         CERROR("%s: trying to overwrite bigger transno:"
806                                "on-disk: "LPU64", new: "LPU64" replay: %d. "
807                                "see LU-617.\n", tgt_name(tgt), *transno_p,
808                                tti->tti_transno, req_is_replay(req));
809                         if (req_is_replay(req)) {
810                                 spin_lock(&req->rq_export->exp_lock);
811                                 req->rq_export->exp_vbr_failed = 1;
812                                 spin_unlock(&req->rq_export->exp_lock);
813                         }
814                         mutex_unlock(&ted->ted_lcd_lock);
815                         RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
816                 }
817                 *transno_p = tti->tti_transno;
818         }
819
820         if (!lw_client) {
821                 tti->tti_off = ted->ted_lr_off;
822                 rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
823                 if (rc < 0) {
824                         mutex_unlock(&ted->ted_lcd_lock);
825                         RETURN(rc);
826                 }
827         }
828         mutex_unlock(&ted->ted_lcd_lock);
829         EXIT;
830 srv_update:
831         if (update)
832                 rc = tgt_server_data_write(env, tgt, th);
833         return rc;
834 }
835
836 /*
837  * last_rcvd update for echo client simulation.
838  * It updates last_rcvd client slot and version of object in
839  * simple way but with all locks to simulate all drawbacks
840  */
841 static int tgt_last_rcvd_update_echo(const struct lu_env *env,
842                                      struct lu_target *tgt,
843                                      struct dt_object *obj,
844                                      struct thandle *th,
845                                      struct obd_export *exp)
846 {
847         struct tgt_thread_info  *tti = tgt_th_info(env);
848         struct tg_export_data   *ted = &exp->exp_target_data;
849         int                      rc = 0;
850
851         ENTRY;
852
853         tti->tti_transno = 0;
854
855         spin_lock(&tgt->lut_translock);
856         if (th->th_result == 0)
857                 tti->tti_transno = ++tgt->lut_last_transno;
858         spin_unlock(&tgt->lut_translock);
859
860         /** VBR: set new versions */
861         if (th->th_result == 0 && obj != NULL)
862                 dt_version_set(env, obj, tti->tti_transno, th);
863
864         /* if can't add callback, do sync write */
865         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp,
866                                                 tti->tti_transno);
867
868         LASSERT(ted->ted_lr_off > 0);
869
870         mutex_lock(&ted->ted_lcd_lock);
871         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
872         ted->ted_lcd->lcd_last_transno = tti->tti_transno;
873         ted->ted_lcd->lcd_last_result = th->th_result;
874
875         tti->tti_off = ted->ted_lr_off;
876         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
877         mutex_unlock(&ted->ted_lcd_lock);
878         RETURN(rc);
879 }
880
881 static int tgt_clients_data_init(const struct lu_env *env,
882                                  struct lu_target *tgt,
883                                  unsigned long last_size)
884 {
885         struct obd_device       *obd = tgt->lut_obd;
886         struct lr_server_data   *lsd = &tgt->lut_lsd;
887         struct lsd_client_data  *lcd = NULL;
888         struct tg_export_data   *ted;
889         int                      cl_idx;
890         int                      rc = 0;
891         loff_t                   off = lsd->lsd_client_start;
892
893         ENTRY;
894
895         CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
896                  sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
897
898         OBD_ALLOC_PTR(lcd);
899         if (lcd == NULL)
900                 RETURN(-ENOMEM);
901
902         for (cl_idx = 0; off < last_size; cl_idx++) {
903                 struct obd_export       *exp;
904                 __u64                    last_transno;
905
906                 /* Don't assume off is incremented properly by
907                  * read_record(), in case sizeof(*lcd)
908                  * isn't the same as fsd->lsd_client_size.  */
909                 off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
910                 rc = tgt_client_data_read(env, tgt, lcd, &off, cl_idx);
911                 if (rc) {
912                         CERROR("%s: error reading last_rcvd %s idx %d off "
913                                "%llu: rc = %d\n", tgt_name(tgt), LAST_RCVD,
914                                cl_idx, off, rc);
915                         rc = 0;
916                         break; /* read error shouldn't cause startup to fail */
917                 }
918
919                 if (lcd->lcd_uuid[0] == '\0') {
920                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
921                                cl_idx);
922                         continue;
923                 }
924
925                 last_transno = lcd_last_transno(lcd);
926
927                 /* These exports are cleaned up by disconnect, so they
928                  * need to be set up like real exports as connect does.
929                  */
930                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
931                        " srv lr: "LPU64" lx: "LPU64"\n", lcd->lcd_uuid, cl_idx,
932                        last_transno, lsd->lsd_last_transno, lcd_last_xid(lcd));
933
934                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
935                 if (IS_ERR(exp)) {
936                         if (PTR_ERR(exp) == -EALREADY) {
937                                 /* export already exists, zero out this one */
938                                 CERROR("%s: Duplicate export %s!\n",
939                                        tgt_name(tgt), lcd->lcd_uuid);
940                                 continue;
941                         }
942                         GOTO(err_out, rc = PTR_ERR(exp));
943                 }
944
945                 ted = &exp->exp_target_data;
946                 *ted->ted_lcd = *lcd;
947
948                 rc = tgt_client_add(env, exp, cl_idx);
949                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
950                 /* VBR: set export last committed version */
951                 exp->exp_last_committed = last_transno;
952                 spin_lock(&exp->exp_lock);
953                 exp->exp_connecting = 0;
954                 exp->exp_in_recovery = 0;
955                 spin_unlock(&exp->exp_lock);
956                 obd->obd_max_recoverable_clients++;
957                 class_export_put(exp);
958
959                 /* Need to check last_rcvd even for duplicated exports. */
960                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
961                        cl_idx, last_transno);
962
963                 spin_lock(&tgt->lut_translock);
964                 tgt->lut_last_transno = max(last_transno,
965                                             tgt->lut_last_transno);
966                 spin_unlock(&tgt->lut_translock);
967         }
968
969 err_out:
970         OBD_FREE_PTR(lcd);
971         RETURN(rc);
972 }
973
974 struct server_compat_data {
975         __u32 rocompat;
976         __u32 incompat;
977         __u32 rocinit;
978         __u32 incinit;
979 };
980
981 static struct server_compat_data tgt_scd[] = {
982         [LDD_F_SV_TYPE_MDT] = {
983                 .rocompat = OBD_ROCOMPAT_LOVOBJID,
984                 .incompat = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
985                             OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR |
986                             OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI,
987                 .rocinit = OBD_ROCOMPAT_LOVOBJID,
988                 .incinit = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
989                            OBD_INCOMPAT_MULTI_OI,
990         },
991         [LDD_F_SV_TYPE_OST] = {
992                 .rocompat = 0,
993                 .incompat = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR |
994                             OBD_INCOMPAT_FID,
995                 .rocinit = 0,
996                 .incinit = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR,
997         }
998 };
999
1000 int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
1001 {
1002         struct tgt_thread_info          *tti = tgt_th_info(env);
1003         struct lr_server_data           *lsd = &tgt->lut_lsd;
1004         unsigned long                    last_rcvd_size;
1005         __u32                            index;
1006         int                              rc, type;
1007
1008         rc = dt_attr_get(env, tgt->lut_last_rcvd, &tti->tti_attr, BYPASS_CAPA);
1009         if (rc)
1010                 RETURN(rc);
1011
1012         last_rcvd_size = (unsigned long)tti->tti_attr.la_size;
1013
1014         /* ensure padding in the struct is the correct size */
1015         CLASSERT(offsetof(struct lr_server_data, lsd_padding) +
1016                  sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
1017
1018         rc = server_name2index(tgt_name(tgt), &index, NULL);
1019         if (rc < 0) {
1020                 CERROR("%s: Can not get index from name: rc = %d\n",
1021                        tgt_name(tgt), rc);
1022                 RETURN(rc);
1023         }
1024         /* server_name2index() returns type */
1025         type = rc;
1026         if (type != LDD_F_SV_TYPE_MDT && type != LDD_F_SV_TYPE_OST) {
1027                 CERROR("%s: unknown target type %x\n", tgt_name(tgt), type);
1028                 RETURN(-EINVAL);
1029         }
1030
1031         /* last_rcvd on OST doesn't provide reconstruct support because there
1032          * may be up to 8 in-flight write requests per single slot in
1033          * last_rcvd client data
1034          */
1035         tgt->lut_no_reconstruct = (type == LDD_F_SV_TYPE_OST);
1036
1037         if (last_rcvd_size == 0) {
1038                 LCONSOLE_WARN("%s: new disk, initializing\n", tgt_name(tgt));
1039
1040                 memcpy(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid,
1041                        sizeof(lsd->lsd_uuid));
1042                 lsd->lsd_last_transno = 0;
1043                 lsd->lsd_mount_count = 0;
1044                 lsd->lsd_server_size = LR_SERVER_SIZE;
1045                 lsd->lsd_client_start = LR_CLIENT_START;
1046                 lsd->lsd_client_size = LR_CLIENT_SIZE;
1047                 lsd->lsd_subdir_count = OBJ_SUBDIR_COUNT;
1048                 lsd->lsd_osd_index = index;
1049                 lsd->lsd_feature_rocompat = tgt_scd[type].rocinit;
1050                 lsd->lsd_feature_incompat = tgt_scd[type].incinit;
1051         } else {
1052                 rc = tgt_server_data_read(env, tgt);
1053                 if (rc) {
1054                         CERROR("%s: error reading LAST_RCVD: rc= %d\n",
1055                                tgt_name(tgt), rc);
1056                         RETURN(rc);
1057                 }
1058                 if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
1059                         LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s "
1060                                            "using the wrong disk %s. Were the"
1061                                            " /dev/ assignments rearranged?\n",
1062                                            tgt->lut_obd->obd_uuid.uuid,
1063                                            lsd->lsd_uuid);
1064                         RETURN(-EINVAL);
1065                 }
1066
1067                 if (lsd->lsd_osd_index != index) {
1068                         LCONSOLE_ERROR_MSG(0x157, "%s: index %d in last rcvd "
1069                                            "is different with the index %d in"
1070                                            "config log, It might be disk"
1071                                            "corruption!\n", tgt_name(tgt),
1072                                            lsd->lsd_osd_index, index);
1073                         RETURN(-EINVAL);
1074                 }
1075         }
1076
1077         if (lsd->lsd_feature_incompat & ~tgt_scd[type].incompat) {
1078                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
1079                        tgt_name(tgt),
1080                        lsd->lsd_feature_incompat & ~tgt_scd[type].incompat);
1081                 RETURN(-EINVAL);
1082         }
1083
1084         if (type == LDD_F_SV_TYPE_MDT)
1085                 lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
1086
1087         if (lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat) {
1088                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
1089                        tgt_name(tgt),
1090                        lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat);
1091                 RETURN(-EINVAL);
1092         }
1093         /** Interop: evict all clients at first boot with 1.8 last_rcvd */
1094         if (type == LDD_F_SV_TYPE_MDT &&
1095             !(lsd->lsd_feature_compat & OBD_COMPAT_20)) {
1096                 if (last_rcvd_size > lsd->lsd_client_start) {
1097                         LCONSOLE_WARN("%s: mounting at first time on 1.8 FS, "
1098                                       "remove all clients for interop needs\n",
1099                                       tgt_name(tgt));
1100                         rc = tgt_truncate_last_rcvd(env, tgt,
1101                                                     lsd->lsd_client_start);
1102                         if (rc)
1103                                 RETURN(rc);
1104                         last_rcvd_size = lsd->lsd_client_start;
1105                 }
1106                 /** set 2.0 flag to upgrade/downgrade between 1.8 and 2.0 */
1107                 lsd->lsd_feature_compat |= OBD_COMPAT_20;
1108         }
1109
1110         spin_lock(&tgt->lut_translock);
1111         tgt->lut_last_transno = lsd->lsd_last_transno;
1112         spin_unlock(&tgt->lut_translock);
1113
1114         lsd->lsd_mount_count++;
1115
1116         CDEBUG(D_INODE, "=======,=BEGIN DUMPING LAST_RCVD========\n");
1117         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
1118                tgt_name(tgt), tgt->lut_last_transno);
1119         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
1120                tgt_name(tgt), lsd->lsd_mount_count);
1121         CDEBUG(D_INODE, "%s: server data size: %u\n",
1122                tgt_name(tgt), lsd->lsd_server_size);
1123         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
1124                tgt_name(tgt), lsd->lsd_client_start);
1125         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
1126                tgt_name(tgt), lsd->lsd_client_size);
1127         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
1128                tgt_name(tgt), last_rcvd_size);
1129         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
1130                tgt_name(tgt), lsd->lsd_subdir_count);
1131         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", tgt_name(tgt),
1132                last_rcvd_size <= lsd->lsd_client_start ? 0 :
1133                (last_rcvd_size - lsd->lsd_client_start) /
1134                 lsd->lsd_client_size);
1135         CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
1136
1137         if (lsd->lsd_server_size == 0 || lsd->lsd_client_start == 0 ||
1138             lsd->lsd_client_size == 0) {
1139                 CERROR("%s: bad last_rcvd contents!\n", tgt_name(tgt));
1140                 RETURN(-EINVAL);
1141         }
1142
1143         if (!tgt->lut_obd->obd_replayable)
1144                 CWARN("%s: recovery support OFF\n", tgt_name(tgt));
1145
1146         rc = tgt_clients_data_init(env, tgt, last_rcvd_size);
1147         if (rc < 0)
1148                 GOTO(err_client, rc);
1149
1150         spin_lock(&tgt->lut_translock);
1151         /* obd_last_committed is used for compatibility
1152          * with other lustre recovery code */
1153         tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
1154         spin_unlock(&tgt->lut_translock);
1155
1156         tgt->lut_obd->u.obt.obt_mount_count = lsd->lsd_mount_count;
1157         tgt->lut_obd->u.obt.obt_instance = (__u32)lsd->lsd_mount_count;
1158
1159         /* save it, so mount count and last_transno is current */
1160         rc = tgt_server_data_update(env, tgt, 0);
1161         if (rc < 0)
1162                 GOTO(err_client, rc);
1163
1164         RETURN(0);
1165
1166 err_client:
1167         class_disconnect_exports(tgt->lut_obd);
1168         return rc;
1169 }
1170
1171 /* add credits for last_rcvd update */
1172 int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
1173                      void *cookie)
1174 {
1175         struct lu_target        *tgt = cookie;
1176         struct tgt_session_info *tsi;
1177         struct tgt_thread_info  *tti = tgt_th_info(env);
1178         int                      rc;
1179
1180         /* if there is no session, then this transaction is not result of
1181          * request processing but some local operation */
1182         if (env->le_ses == NULL)
1183                 return 0;
1184
1185         LASSERT(tgt->lut_last_rcvd);
1186         tsi = tgt_ses_info(env);
1187         /* OFD may start transaction without export assigned */
1188         if (tsi->tsi_exp == NULL)
1189                 return 0;
1190
1191         tti_buf_lcd(tti);
1192         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
1193                                      &tti->tti_buf,
1194                                      tsi->tsi_exp->exp_target_data.ted_lr_off,
1195                                      th);
1196         if (rc)
1197                 return rc;
1198
1199         tti_buf_lsd(tti);
1200         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
1201                                      &tti->tti_buf, 0, th);
1202         if (rc)
1203                 return rc;
1204
1205         if (tsi->tsi_vbr_obj != NULL &&
1206             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu))
1207                 rc = dt_declare_version_set(env, tsi->tsi_vbr_obj, th);
1208
1209         return rc;
1210 }
1211
1212 /* Update last_rcvd records with latests transaction data */
1213 int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
1214                     void *cookie)
1215 {
1216         struct lu_target        *tgt = cookie;
1217         struct tgt_session_info *tsi;
1218         struct tgt_thread_info  *tti = tgt_th_info(env);
1219         struct dt_object        *obj = NULL;
1220         int                      rc;
1221         bool                     echo_client;
1222
1223         if (env->le_ses == NULL)
1224                 return 0;
1225
1226         tsi = tgt_ses_info(env);
1227         /* OFD may start transaction without export assigned */
1228         if (tsi->tsi_exp == NULL)
1229                 return 0;
1230
1231         echo_client = (tgt_ses_req(tsi) == NULL);
1232
1233         if (tti->tti_has_trans && !echo_client) {
1234                 if (tti->tti_mult_trans == 0) {
1235                         CDEBUG(D_HA, "More than one transaction "LPU64"\n",
1236                                tti->tti_transno);
1237                         RETURN(0);
1238                 }
1239                 /* we need another transno to be assigned */
1240                 tti->tti_transno = 0;
1241         } else if (th->th_result == 0) {
1242                 tti->tti_has_trans = 1;
1243         }
1244
1245         if (tsi->tsi_vbr_obj != NULL &&
1246             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
1247                 obj = tsi->tsi_vbr_obj;
1248         }
1249
1250         if (unlikely(echo_client)) /* echo client special case */
1251                 rc = tgt_last_rcvd_update_echo(env, tgt, obj, th,
1252                                                tsi->tsi_exp);
1253         else
1254                 rc = tgt_last_rcvd_update(env, tgt, obj, tsi->tsi_opdata, th,
1255                                           tgt_ses_req(tsi));
1256         return rc;
1257 }