Whamcloud - gitweb
LU-3534 mdt: move last_rcvd obj update to LOD
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Lustre Unified Target
37  * These are common function to work with last_received file
38  *
39  * Author: Mikhail Pershin <mike.pershin@intel.com>
40  */
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <lustre_fid.h>
44
45 #include "tgt_internal.h"
46
47 static inline struct lu_buf *tti_buf_lsd(struct tgt_thread_info *tti)
48 {
49         tti->tti_buf.lb_buf = &tti->tti_lsd;
50         tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
51         return &tti->tti_buf;
52 }
53
54 static inline struct lu_buf *tti_buf_lcd(struct tgt_thread_info *tti)
55 {
56         tti->tti_buf.lb_buf = &tti->tti_lcd;
57         tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
58         return &tti->tti_buf;
59 }
60
61 /**
62  * Allocate in-memory data for client slot related to export.
63  */
64 int tgt_client_alloc(struct obd_export *exp)
65 {
66         ENTRY;
67         LASSERT(exp != exp->exp_obd->obd_self_export);
68
69         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
70         if (exp->exp_target_data.ted_lcd == NULL)
71                 RETURN(-ENOMEM);
72         /* Mark that slot is not yet valid, 0 doesn't work here */
73         exp->exp_target_data.ted_lr_idx = -1;
74         RETURN(0);
75 }
76 EXPORT_SYMBOL(tgt_client_alloc);
77
78 /**
79  * Free in-memory data for client slot related to export.
80  */
81 void tgt_client_free(struct obd_export *exp)
82 {
83         struct tg_export_data   *ted = &exp->exp_target_data;
84         struct lu_target        *lut = class_exp2tgt(exp);
85
86         LASSERT(exp != exp->exp_obd->obd_self_export);
87
88         OBD_FREE_PTR(ted->ted_lcd);
89         ted->ted_lcd = NULL;
90
91         /* Slot may be not yet assigned */
92         if (ted->ted_lr_idx < 0)
93                 return;
94         /* Clear bit when lcd is freed */
95         LASSERT(lut && lut->lut_client_bitmap);
96         if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
97                 CERROR("%s: client %u bit already clear in bitmap\n",
98                        exp->exp_obd->obd_name, ted->ted_lr_idx);
99                 LBUG();
100         }
101 }
102 EXPORT_SYMBOL(tgt_client_free);
103
104 int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
105                          struct lsd_client_data *lcd, loff_t *off, int index)
106 {
107         struct tgt_thread_info  *tti = tgt_th_info(env);
108         int                      rc;
109
110         tti_buf_lcd(tti);
111         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
112         if (rc == 0) {
113                 check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
114                 lcd_le_to_cpu(&tti->tti_lcd, lcd);
115                 lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
116                 lcd->lcd_last_close_result =
117                         ptlrpc_status_ntoh(lcd->lcd_last_close_result);
118         }
119
120         CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = "LPU64
121                ", last_xid = "LPU64", last_result = %u, last_data = %u, "
122                "last_close_transno = "LPU64", last_close_xid = "LPU64", "
123                "last_close_result = %u, rc = %d\n", tgt->lut_obd->obd_name,
124                *off, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
125                lcd->lcd_last_result, lcd->lcd_last_data,
126                lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
127                lcd->lcd_last_close_result, rc);
128         return rc;
129 }
130
131 int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
132                           struct lsd_client_data *lcd, loff_t *off,
133                           struct thandle *th)
134 {
135         struct tgt_thread_info *tti = tgt_th_info(env);
136         struct dt_object        *dto;
137
138         lcd->lcd_last_result = ptlrpc_status_hton(lcd->lcd_last_result);
139         lcd->lcd_last_close_result =
140                 ptlrpc_status_hton(lcd->lcd_last_close_result);
141         lcd_cpu_to_le(lcd, &tti->tti_lcd);
142         tti_buf_lcd(tti);
143
144         dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
145         return dt_record_write(env, dto, &tti->tti_buf, off, th);
146 }
147
148 /**
149  * Update client data in last_rcvd
150  */
151 static int tgt_client_data_update(const struct lu_env *env,
152                                   struct obd_export *exp)
153 {
154         struct tg_export_data   *ted = &exp->exp_target_data;
155         struct lu_target        *tgt = class_exp2tgt(exp);
156         struct tgt_thread_info  *tti = tgt_th_info(env);
157         struct thandle          *th;
158         int                      rc = 0;
159
160         ENTRY;
161
162         if (unlikely(tgt == NULL)) {
163                 CDEBUG(D_ERROR, "%s: No target for connected export\n",
164                           class_exp2obd(exp)->obd_name);
165                 RETURN(-EINVAL);
166         }
167
168         th = dt_trans_create(env, tgt->lut_bottom);
169         if (IS_ERR(th))
170                 RETURN(PTR_ERR(th));
171
172         tti_buf_lcd(tti);
173         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
174                                      &tti->tti_buf,
175                                      ted->ted_lr_off, th);
176         if (rc)
177                 GOTO(out, rc);
178
179         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
180         if (rc)
181                 GOTO(out, rc);
182         /*
183          * Until this operations will be committed the sync is needed
184          * for this export. This should be done _after_ starting the
185          * transaction so that many connecting clients will not bring
186          * server down with lots of sync writes.
187          */
188         rc = tgt_new_client_cb_add(th, exp);
189         if (rc) {
190                 /* can't add callback, do sync now */
191                 th->th_sync = 1;
192         } else {
193                 spin_lock(&exp->exp_lock);
194                 exp->exp_need_sync = 1;
195                 spin_unlock(&exp->exp_lock);
196         }
197
198         tti->tti_off = ted->ted_lr_off;
199         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
200         EXIT;
201 out:
202         dt_trans_stop(env, tgt->lut_bottom, th);
203         CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
204                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
205                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
206
207         return rc;
208 }
209
210 int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
211 {
212         struct tgt_thread_info  *tti = tgt_th_info(env);
213         int                      rc;
214
215         tti->tti_off = 0;
216         tti_buf_lsd(tti);
217         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf,
218                             &tti->tti_off);
219         if (rc == 0)
220                 lsd_le_to_cpu(&tti->tti_lsd, &tgt->lut_lsd);
221
222         CDEBUG(D_INFO, "%s: read last_rcvd server data for UUID = %s, "
223                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
224                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
225         return rc;
226 }
227
228 int tgt_server_data_write(const struct lu_env *env, struct lu_target *tgt,
229                           struct thandle *th)
230 {
231         struct tgt_thread_info  *tti = tgt_th_info(env);
232         struct dt_object        *dto;
233         int                      rc;
234
235         ENTRY;
236
237         tti->tti_off = 0;
238         tti_buf_lsd(tti);
239         lsd_cpu_to_le(&tgt->lut_lsd, &tti->tti_lsd);
240
241         dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
242         rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
243
244         CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
245                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
246                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
247
248         RETURN(rc);
249 }
250
251 /**
252  * Update server data in last_rcvd
253  */
254 int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
255                            int sync)
256 {
257         struct tgt_thread_info  *tti = tgt_th_info(env);
258         struct thandle          *th;
259         int                      rc = 0;
260
261         ENTRY;
262
263         CDEBUG(D_SUPER,
264                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
265                tgt->lut_lsd.lsd_uuid, tgt->lut_obd->u.obt.obt_mount_count,
266                tgt->lut_last_transno);
267
268         /* Always save latest transno to keep it fresh */
269         spin_lock(&tgt->lut_translock);
270         tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
271         spin_unlock(&tgt->lut_translock);
272
273         th = dt_trans_create(env, tgt->lut_bottom);
274         if (IS_ERR(th))
275                 RETURN(PTR_ERR(th));
276
277         th->th_sync = sync;
278
279         tti_buf_lsd(tti);
280         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
281                                      &tti->tti_buf, tti->tti_off, th);
282         if (rc)
283                 GOTO(out, rc);
284
285         rc = dt_trans_start(env, tgt->lut_bottom, th);
286         if (rc)
287                 GOTO(out, rc);
288
289         rc = tgt_server_data_write(env, tgt, th);
290 out:
291         dt_trans_stop(env, tgt->lut_bottom, th);
292
293         CDEBUG(D_INFO, "%s: update last_rcvd server data for UUID = %s, "
294                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
295                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
296         RETURN(rc);
297 }
298 EXPORT_SYMBOL(tgt_server_data_update);
299
300 int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tgt,
301                            loff_t size)
302 {
303         struct dt_object *dt = tgt->lut_last_rcvd;
304         struct thandle   *th;
305         struct lu_attr    attr;
306         int               rc;
307
308         ENTRY;
309
310         attr.la_size = size;
311         attr.la_valid = LA_SIZE;
312
313         th = dt_trans_create(env, tgt->lut_bottom);
314         if (IS_ERR(th))
315                 RETURN(PTR_ERR(th));
316         rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
317         if (rc)
318                 GOTO(cleanup, rc);
319         rc = dt_declare_attr_set(env, dt, &attr, th);
320         if (rc)
321                 GOTO(cleanup, rc);
322         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
323         if (rc)
324                 GOTO(cleanup, rc);
325
326         rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th);
327         if (rc == 0)
328                 rc = dt_attr_set(env, dt, &attr, th);
329
330 cleanup:
331         dt_trans_stop(env, tgt->lut_bottom, th);
332
333         RETURN(rc);
334 }
335
336 static void tgt_client_epoch_update(const struct lu_env *env,
337                                     struct obd_export *exp)
338 {
339         struct lsd_client_data  *lcd = exp->exp_target_data.ted_lcd;
340         struct lu_target        *tgt = class_exp2tgt(exp);
341
342         LASSERT(tgt && tgt->lut_bottom);
343         /** VBR: set client last_epoch to current epoch */
344         if (lcd->lcd_last_epoch >= tgt->lut_lsd.lsd_start_epoch)
345                 return;
346         lcd->lcd_last_epoch = tgt->lut_lsd.lsd_start_epoch;
347         tgt_client_data_update(env, exp);
348 }
349
350 /**
351  * Update boot epoch when recovery ends
352  */
353 void tgt_boot_epoch_update(struct lu_target *tgt)
354 {
355         struct lu_env            env;
356         struct ptlrpc_request   *req;
357         __u32                    start_epoch;
358         struct list_head         client_list;
359         int                      rc;
360
361         if (tgt->lut_obd->obd_stopping)
362                 return;
363
364         rc = lu_env_init(&env, LCT_DT_THREAD);
365         if (rc) {
366                 CERROR("%s: can't initialize environment: rc = %d\n",
367                         tgt->lut_obd->obd_name, rc);
368                 return;
369         }
370
371         spin_lock(&tgt->lut_translock);
372         start_epoch = lr_epoch(tgt->lut_last_transno) + 1;
373         tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
374         tgt->lut_lsd.lsd_start_epoch = start_epoch;
375         spin_unlock(&tgt->lut_translock);
376
377         INIT_LIST_HEAD(&client_list);
378         /**
379          * The recovery is not yet finished and final queue can still be updated
380          * with resend requests. Move final list to separate one for processing
381          */
382         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
383         list_splice_init(&tgt->lut_obd->obd_final_req_queue, &client_list);
384         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
385
386         /**
387          * go through list of exports participated in recovery and
388          * set new epoch for them
389          */
390         list_for_each_entry(req, &client_list, rq_list) {
391                 LASSERT(!req->rq_export->exp_delayed);
392                 if (!req->rq_export->exp_vbr_failed)
393                         tgt_client_epoch_update(&env, req->rq_export);
394         }
395         /** return list back at once */
396         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
397         list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
398         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
399         /** update server epoch */
400         tgt_server_data_update(&env, tgt, 1);
401         lu_env_fini(&env);
402 }
403
404 /**
405  * commit callback, need to update last_commited value
406  */
407 struct tgt_last_committed_callback {
408         struct dt_txn_commit_cb  llcc_cb;
409         struct lu_target        *llcc_tgt;
410         struct obd_export       *llcc_exp;
411         __u64                    llcc_transno;
412 };
413
414 static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
415                                   struct dt_txn_commit_cb *cb, int err)
416 {
417         struct tgt_last_committed_callback *ccb;
418
419         ccb = container_of0(cb, struct tgt_last_committed_callback, llcc_cb);
420
421         LASSERT(ccb->llcc_tgt != NULL);
422         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
423
424         spin_lock(&ccb->llcc_tgt->lut_translock);
425         if (ccb->llcc_transno > ccb->llcc_tgt->lut_obd->obd_last_committed)
426                 ccb->llcc_tgt->lut_obd->obd_last_committed = ccb->llcc_transno;
427
428         LASSERT(ccb->llcc_exp);
429         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
430                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
431                 spin_unlock(&ccb->llcc_tgt->lut_translock);
432                 ptlrpc_commit_replies(ccb->llcc_exp);
433         } else {
434                 spin_unlock(&ccb->llcc_tgt->lut_translock);
435         }
436         class_export_cb_put(ccb->llcc_exp);
437         if (ccb->llcc_transno)
438                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
439                        ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
440         OBD_FREE_PTR(ccb);
441 }
442
443 int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
444                            struct obd_export *exp, __u64 transno)
445 {
446         struct tgt_last_committed_callback      *ccb;
447         struct dt_txn_commit_cb                 *dcb;
448         int                                      rc;
449
450         OBD_ALLOC_PTR(ccb);
451         if (ccb == NULL)
452                 return -ENOMEM;
453
454         ccb->llcc_tgt = tgt;
455         ccb->llcc_exp = class_export_cb_get(exp);
456         ccb->llcc_transno = transno;
457
458         dcb = &ccb->llcc_cb;
459         dcb->dcb_func = tgt_cb_last_committed;
460         INIT_LIST_HEAD(&dcb->dcb_linkage);
461         strlcpy(dcb->dcb_name, "tgt_cb_last_committed", sizeof(dcb->dcb_name));
462
463         rc = dt_trans_cb_add(th, dcb);
464         if (rc) {
465                 class_export_cb_put(exp);
466                 OBD_FREE_PTR(ccb);
467         }
468
469         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
470                 /* report failure to force synchronous operation */
471                 return -EPERM;
472
473         return rc;
474 }
475
476 struct tgt_new_client_callback {
477         struct dt_txn_commit_cb  lncc_cb;
478         struct obd_export       *lncc_exp;
479 };
480
481 static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
482                               struct dt_txn_commit_cb *cb, int err)
483 {
484         struct tgt_new_client_callback *ccb;
485
486         ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
487
488         LASSERT(ccb->lncc_exp->exp_obd);
489
490         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
491                ccb->lncc_exp->exp_obd->obd_name,
492                ccb->lncc_exp->exp_client_uuid.uuid);
493
494         spin_lock(&ccb->lncc_exp->exp_lock);
495         /* XXX: Currently, we use per-export based sync/async policy for
496          *      the update via OUT RPC, it is coarse-grained policy, and
497          *      will be changed as per-request based by DNE II patches. */
498         if (!ccb->lncc_exp->exp_keep_sync)
499                 ccb->lncc_exp->exp_need_sync = 0;
500
501         spin_unlock(&ccb->lncc_exp->exp_lock);
502         class_export_cb_put(ccb->lncc_exp);
503
504         OBD_FREE_PTR(ccb);
505 }
506
507 int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
508 {
509         struct tgt_new_client_callback  *ccb;
510         struct dt_txn_commit_cb         *dcb;
511         int                              rc;
512
513         OBD_ALLOC_PTR(ccb);
514         if (ccb == NULL)
515                 return -ENOMEM;
516
517         ccb->lncc_exp = class_export_cb_get(exp);
518
519         dcb = &ccb->lncc_cb;
520         dcb->dcb_func = tgt_cb_new_client;
521         INIT_LIST_HEAD(&dcb->dcb_linkage);
522         strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
523
524         rc = dt_trans_cb_add(th, dcb);
525         if (rc) {
526                 class_export_cb_put(exp);
527                 OBD_FREE_PTR(ccb);
528         }
529         return rc;
530 }
531
532 /**
533  * Add new client to the last_rcvd upon new connection.
534  *
535  * We use a bitmap to locate a free space in the last_rcvd file and initialize
536  * tg_export_data.
537  */
538 int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
539 {
540         struct tg_export_data   *ted = &exp->exp_target_data;
541         struct lu_target        *tgt = class_exp2tgt(exp);
542         int                      rc = 0, idx;
543
544         ENTRY;
545
546         LASSERT(tgt && tgt->lut_client_bitmap != NULL);
547         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid))
548                 RETURN(0);
549
550         mutex_init(&ted->ted_lcd_lock);
551
552         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
553                 RETURN(0);
554
555         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
556          * there's no need for extra complication here
557          */
558         idx = find_first_zero_bit(tgt->lut_client_bitmap, LR_MAX_CLIENTS);
559 repeat:
560         if (idx >= LR_MAX_CLIENTS ||
561             OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
562                 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
563                        tgt->lut_obd->obd_name,  idx);
564                 RETURN(-EOVERFLOW);
565         }
566         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
567                 idx = find_next_zero_bit(tgt->lut_client_bitmap,
568                                              LR_MAX_CLIENTS, idx);
569                 goto repeat;
570         }
571
572         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
573                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
574
575         ted->ted_lr_idx = idx;
576         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
577                           idx * tgt->lut_lsd.lsd_client_size;
578
579         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
580
581         CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n",
582                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
583                ted->ted_lcd->lcd_uuid);
584
585         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
586                 RETURN(-ENOSPC);
587
588         rc = tgt_client_data_update(env, exp);
589         if (rc)
590                 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
591                        tgt->lut_obd->obd_name, idx, rc);
592
593         RETURN(rc);
594 }
595 EXPORT_SYMBOL(tgt_client_new);
596
597 /* Add client data to the MDS.  We use a bitmap to locate a free space
598  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
599  * Otherwise, we just have to read the data from the last_rcvd file and
600  * we know its offset.
601  *
602  * It should not be possible to fail adding an existing client - otherwise
603  * mdt_init_server_data() callsite needs to be fixed.
604  */
605 int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
606 {
607         struct tg_export_data   *ted = &exp->exp_target_data;
608         struct lu_target        *tgt = class_exp2tgt(exp);
609
610         ENTRY;
611
612         LASSERT(tgt && tgt->lut_client_bitmap != NULL);
613         LASSERTF(idx >= 0, "%d\n", idx);
614
615         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid) ||
616             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
617                 RETURN(0);
618
619         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
620                 CERROR("%s: client %d: bit already set in bitmap!!\n",
621                        tgt->lut_obd->obd_name,  idx);
622                 LBUG();
623         }
624
625         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
626                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
627
628         ted->ted_lr_idx = idx;
629         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
630                           idx * tgt->lut_lsd.lsd_client_size;
631
632         mutex_init(&ted->ted_lcd_lock);
633
634         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
635
636         RETURN(0);
637 }
638
639 int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
640 {
641         struct tg_export_data   *ted = &exp->exp_target_data;
642         struct lu_target        *tgt = class_exp2tgt(exp);
643         int                      rc;
644
645         ENTRY;
646
647         LASSERT(ted->ted_lcd);
648
649         if (unlikely(tgt == NULL)) {
650                 CDEBUG(D_ERROR, "%s: No target for connected export\n",
651                        class_exp2obd(exp)->obd_name);
652                 RETURN(-EINVAL);
653         }
654
655         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
656         if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
657                     (char *)tgt->lut_obd->obd_uuid.uuid) ||
658             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
659                 RETURN(0);
660
661         CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
662                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
663                ted->ted_lcd->lcd_uuid);
664
665         /* Clear the bit _after_ zeroing out the client so we don't
666            race with filter_client_add and zero out new clients.*/
667         if (!test_bit(ted->ted_lr_idx, tgt->lut_client_bitmap)) {
668                 CERROR("%s: client %u: bit already clear in bitmap!!\n",
669                        tgt->lut_obd->obd_name, ted->ted_lr_idx);
670                 LBUG();
671         }
672
673         /* Do not erase record for recoverable client. */
674         if (exp->exp_flags & OBD_OPT_FAILOVER)
675                 RETURN(0);
676
677         /* Make sure the server's last_transno is up to date.
678          * This should be done before zeroing client slot so last_transno will
679          * be in server data or in client data in case of failure */
680         rc = tgt_server_data_update(env, tgt, 0);
681         if (rc != 0) {
682                 CERROR("%s: failed to update server data, skip client %s "
683                        "zeroing, rc %d\n", tgt->lut_obd->obd_name,
684                        ted->ted_lcd->lcd_uuid, rc);
685                 RETURN(rc);
686         }
687
688         mutex_lock(&ted->ted_lcd_lock);
689         memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
690         rc = tgt_client_data_update(env, exp);
691         mutex_unlock(&ted->ted_lcd_lock);
692
693         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
694                "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
695                tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
696                ted->ted_lr_idx, ted->ted_lr_off, rc);
697         RETURN(rc);
698 }
699 EXPORT_SYMBOL(tgt_client_del);
700
701 /*
702  * last_rcvd & last_committed update callbacks
703  */
704 static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
705                                 struct dt_object *obj, __u64 opdata,
706                                 struct thandle *th, struct ptlrpc_request *req)
707 {
708         struct tgt_thread_info  *tti = tgt_th_info(env);
709         struct tg_export_data   *ted;
710         __u64                   *transno_p;
711         int                      rc = 0;
712         bool                     lw_client, update = false;
713
714         ENTRY;
715
716         ted = &req->rq_export->exp_target_data;
717
718         lw_client = exp_connect_flags(req->rq_export) & OBD_CONNECT_LIGHTWEIGHT;
719         if (ted->ted_lr_idx < 0 && !lw_client)
720                 /* ofd connect may cause transaction before export has
721                  * last_rcvd slot */
722                 RETURN(0);
723
724         tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
725
726         spin_lock(&tgt->lut_translock);
727         if (th->th_result != 0) {
728                 if (tti->tti_transno != 0) {
729                         CERROR("%s: replay transno "LPU64" failed: rc = %d\n",
730                                tgt_name(tgt), tti->tti_transno, th->th_result);
731                 }
732         } else if (tti->tti_transno == 0) {
733                 tti->tti_transno = ++tgt->lut_last_transno;
734         } else {
735                 /* should be replay */
736                 if (tti->tti_transno > tgt->lut_last_transno)
737                         tgt->lut_last_transno = tti->tti_transno;
738         }
739         spin_unlock(&tgt->lut_translock);
740
741         /** VBR: set new versions */
742         if (th->th_result == 0 && obj != NULL) {
743                 struct dt_object *dto = dt_object_locate(obj, th->th_dev);
744                 dt_version_set(env, dto, tti->tti_transno, th);
745         }
746
747         /* filling reply data */
748         CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
749                tti->tti_transno, tgt->lut_obd->obd_last_committed);
750
751         req->rq_transno = tti->tti_transno;
752         lustre_msg_set_transno(req->rq_repmsg, tti->tti_transno);
753
754         /* if can't add callback, do sync write */
755         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, req->rq_export,
756                                                 tti->tti_transno);
757
758         if (lw_client) {
759                 /* All operations performed by LW clients are synchronous and
760                  * we store the committed transno in the last_rcvd header */
761                 spin_lock(&tgt->lut_translock);
762                 if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
763                         tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
764                         update = true;
765                 }
766                 spin_unlock(&tgt->lut_translock);
767                 /* Although lightweight (LW) connections have no slot in
768                  * last_rcvd, we still want to maintain the in-memory
769                  * lsd_client_data structure in order to properly handle reply
770                  * reconstruction. */
771         } else if (ted->ted_lr_off == 0) {
772                 CERROR("%s: client idx %d has offset %lld\n",
773                        tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
774                 RETURN(-EINVAL);
775         }
776
777         /* if the export has already been disconnected, we have no last_rcvd
778          * slot, update server data with latest transno then */
779         if (ted->ted_lcd == NULL) {
780                 CWARN("commit transaction for disconnected client %s: rc %d\n",
781                       req->rq_export->exp_client_uuid.uuid, rc);
782                 GOTO(srv_update, rc = 0);
783         }
784
785         mutex_lock(&ted->ted_lcd_lock);
786         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
787         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
788                 transno_p = &ted->ted_lcd->lcd_last_close_transno;
789                 ted->ted_lcd->lcd_last_close_xid = req->rq_xid;
790                 ted->ted_lcd->lcd_last_close_result = th->th_result;
791         } else {
792                 /* VBR: save versions in last_rcvd for reconstruct. */
793                 __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
794
795                 if (pre_versions) {
796                         ted->ted_lcd->lcd_pre_versions[0] = pre_versions[0];
797                         ted->ted_lcd->lcd_pre_versions[1] = pre_versions[1];
798                         ted->ted_lcd->lcd_pre_versions[2] = pre_versions[2];
799                         ted->ted_lcd->lcd_pre_versions[3] = pre_versions[3];
800                 }
801                 transno_p = &ted->ted_lcd->lcd_last_transno;
802                 ted->ted_lcd->lcd_last_xid = req->rq_xid;
803                 ted->ted_lcd->lcd_last_result = th->th_result;
804                 /* XXX: lcd_last_data is __u32 but intent_dispostion is __u64,
805                  * see struct ldlm_reply->lock_policy_res1; */
806                 ted->ted_lcd->lcd_last_data = opdata;
807         }
808
809         /* Update transno in slot only if non-zero number, i.e. no errors */
810         if (likely(tti->tti_transno != 0)) {
811                 if (*transno_p > tti->tti_transno &&
812                     !tgt->lut_no_reconstruct) {
813                         CERROR("%s: trying to overwrite bigger transno:"
814                                "on-disk: "LPU64", new: "LPU64" replay: %d. "
815                                "see LU-617.\n", tgt_name(tgt), *transno_p,
816                                tti->tti_transno, req_is_replay(req));
817                         if (req_is_replay(req)) {
818                                 spin_lock(&req->rq_export->exp_lock);
819                                 req->rq_export->exp_vbr_failed = 1;
820                                 spin_unlock(&req->rq_export->exp_lock);
821                         }
822                         mutex_unlock(&ted->ted_lcd_lock);
823                         RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
824                 }
825                 *transno_p = tti->tti_transno;
826         }
827
828         if (!lw_client) {
829                 tti->tti_off = ted->ted_lr_off;
830                 rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
831                 if (rc < 0) {
832                         mutex_unlock(&ted->ted_lcd_lock);
833                         RETURN(rc);
834                 }
835         }
836         mutex_unlock(&ted->ted_lcd_lock);
837         EXIT;
838 srv_update:
839         if (update)
840                 rc = tgt_server_data_write(env, tgt, th);
841         return rc;
842 }
843
844 /*
845  * last_rcvd update for echo client simulation.
846  * It updates last_rcvd client slot and version of object in
847  * simple way but with all locks to simulate all drawbacks
848  */
849 static int tgt_last_rcvd_update_echo(const struct lu_env *env,
850                                      struct lu_target *tgt,
851                                      struct dt_object *obj,
852                                      struct thandle *th,
853                                      struct obd_export *exp)
854 {
855         struct tgt_thread_info  *tti = tgt_th_info(env);
856         struct tg_export_data   *ted = &exp->exp_target_data;
857         int                      rc = 0;
858
859         ENTRY;
860
861         tti->tti_transno = 0;
862
863         spin_lock(&tgt->lut_translock);
864         if (th->th_result == 0)
865                 tti->tti_transno = ++tgt->lut_last_transno;
866         spin_unlock(&tgt->lut_translock);
867
868         /** VBR: set new versions */
869         if (th->th_result == 0 && obj != NULL)
870                 dt_version_set(env, obj, tti->tti_transno, th);
871
872         /* if can't add callback, do sync write */
873         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp,
874                                                 tti->tti_transno);
875
876         LASSERT(ted->ted_lr_off > 0);
877
878         mutex_lock(&ted->ted_lcd_lock);
879         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
880         ted->ted_lcd->lcd_last_transno = tti->tti_transno;
881         ted->ted_lcd->lcd_last_result = th->th_result;
882
883         tti->tti_off = ted->ted_lr_off;
884         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
885         mutex_unlock(&ted->ted_lcd_lock);
886         RETURN(rc);
887 }
888
889 static int tgt_clients_data_init(const struct lu_env *env,
890                                  struct lu_target *tgt,
891                                  unsigned long last_size)
892 {
893         struct obd_device       *obd = tgt->lut_obd;
894         struct lr_server_data   *lsd = &tgt->lut_lsd;
895         struct lsd_client_data  *lcd = NULL;
896         struct tg_export_data   *ted;
897         int                      cl_idx;
898         int                      rc = 0;
899         loff_t                   off = lsd->lsd_client_start;
900
901         ENTRY;
902
903         CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
904                  sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
905
906         OBD_ALLOC_PTR(lcd);
907         if (lcd == NULL)
908                 RETURN(-ENOMEM);
909
910         for (cl_idx = 0; off < last_size; cl_idx++) {
911                 struct obd_export       *exp;
912                 __u64                    last_transno;
913
914                 /* Don't assume off is incremented properly by
915                  * read_record(), in case sizeof(*lcd)
916                  * isn't the same as fsd->lsd_client_size.  */
917                 off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
918                 rc = tgt_client_data_read(env, tgt, lcd, &off, cl_idx);
919                 if (rc) {
920                         CERROR("%s: error reading last_rcvd %s idx %d off "
921                                "%llu: rc = %d\n", tgt_name(tgt), LAST_RCVD,
922                                cl_idx, off, rc);
923                         rc = 0;
924                         break; /* read error shouldn't cause startup to fail */
925                 }
926
927                 if (lcd->lcd_uuid[0] == '\0') {
928                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
929                                cl_idx);
930                         continue;
931                 }
932
933                 last_transno = lcd_last_transno(lcd);
934
935                 /* These exports are cleaned up by disconnect, so they
936                  * need to be set up like real exports as connect does.
937                  */
938                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
939                        " srv lr: "LPU64" lx: "LPU64"\n", lcd->lcd_uuid, cl_idx,
940                        last_transno, lsd->lsd_last_transno, lcd_last_xid(lcd));
941
942                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
943                 if (IS_ERR(exp)) {
944                         if (PTR_ERR(exp) == -EALREADY) {
945                                 /* export already exists, zero out this one */
946                                 CERROR("%s: Duplicate export %s!\n",
947                                        tgt_name(tgt), lcd->lcd_uuid);
948                                 continue;
949                         }
950                         GOTO(err_out, rc = PTR_ERR(exp));
951                 }
952
953                 ted = &exp->exp_target_data;
954                 *ted->ted_lcd = *lcd;
955
956                 rc = tgt_client_add(env, exp, cl_idx);
957                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
958                 /* VBR: set export last committed version */
959                 exp->exp_last_committed = last_transno;
960                 spin_lock(&exp->exp_lock);
961                 exp->exp_connecting = 0;
962                 exp->exp_in_recovery = 0;
963                 spin_unlock(&exp->exp_lock);
964                 obd->obd_max_recoverable_clients++;
965                 class_export_put(exp);
966
967                 /* Need to check last_rcvd even for duplicated exports. */
968                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
969                        cl_idx, last_transno);
970
971                 spin_lock(&tgt->lut_translock);
972                 tgt->lut_last_transno = max(last_transno,
973                                             tgt->lut_last_transno);
974                 spin_unlock(&tgt->lut_translock);
975         }
976
977 err_out:
978         OBD_FREE_PTR(lcd);
979         RETURN(rc);
980 }
981
982 struct server_compat_data {
983         __u32 rocompat;
984         __u32 incompat;
985         __u32 rocinit;
986         __u32 incinit;
987 };
988
989 static struct server_compat_data tgt_scd[] = {
990         [LDD_F_SV_TYPE_MDT] = {
991                 .rocompat = OBD_ROCOMPAT_LOVOBJID,
992                 .incompat = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
993                             OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR |
994                             OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI,
995                 .rocinit = OBD_ROCOMPAT_LOVOBJID,
996                 .incinit = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
997                            OBD_INCOMPAT_MULTI_OI,
998         },
999         [LDD_F_SV_TYPE_OST] = {
1000                 .rocompat = OBD_ROCOMPAT_IDX_IN_IDIF,
1001                 .incompat = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR |
1002                             OBD_INCOMPAT_FID,
1003                 .rocinit = OBD_ROCOMPAT_IDX_IN_IDIF,
1004                 .incinit = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR,
1005         }
1006 };
1007
1008 int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
1009 {
1010         struct tgt_thread_info          *tti = tgt_th_info(env);
1011         struct lr_server_data           *lsd = &tgt->lut_lsd;
1012         unsigned long                    last_rcvd_size;
1013         __u32                            index;
1014         int                              rc, type;
1015
1016         rc = dt_attr_get(env, tgt->lut_last_rcvd, &tti->tti_attr);
1017         if (rc)
1018                 RETURN(rc);
1019
1020         last_rcvd_size = (unsigned long)tti->tti_attr.la_size;
1021
1022         /* ensure padding in the struct is the correct size */
1023         CLASSERT(offsetof(struct lr_server_data, lsd_padding) +
1024                  sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
1025
1026         rc = server_name2index(tgt_name(tgt), &index, NULL);
1027         if (rc < 0) {
1028                 CERROR("%s: Can not get index from name: rc = %d\n",
1029                        tgt_name(tgt), rc);
1030                 RETURN(rc);
1031         }
1032         /* server_name2index() returns type */
1033         type = rc;
1034         if (type != LDD_F_SV_TYPE_MDT && type != LDD_F_SV_TYPE_OST) {
1035                 CERROR("%s: unknown target type %x\n", tgt_name(tgt), type);
1036                 RETURN(-EINVAL);
1037         }
1038
1039         /* last_rcvd on OST doesn't provide reconstruct support because there
1040          * may be up to 8 in-flight write requests per single slot in
1041          * last_rcvd client data
1042          */
1043         tgt->lut_no_reconstruct = (type == LDD_F_SV_TYPE_OST);
1044
1045         if (last_rcvd_size == 0) {
1046                 LCONSOLE_WARN("%s: new disk, initializing\n", tgt_name(tgt));
1047
1048                 memcpy(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid,
1049                        sizeof(lsd->lsd_uuid));
1050                 lsd->lsd_last_transno = 0;
1051                 lsd->lsd_mount_count = 0;
1052                 lsd->lsd_server_size = LR_SERVER_SIZE;
1053                 lsd->lsd_client_start = LR_CLIENT_START;
1054                 lsd->lsd_client_size = LR_CLIENT_SIZE;
1055                 lsd->lsd_subdir_count = OBJ_SUBDIR_COUNT;
1056                 lsd->lsd_osd_index = index;
1057                 lsd->lsd_feature_rocompat = tgt_scd[type].rocinit;
1058                 lsd->lsd_feature_incompat = tgt_scd[type].incinit;
1059         } else {
1060                 rc = tgt_server_data_read(env, tgt);
1061                 if (rc) {
1062                         CERROR("%s: error reading LAST_RCVD: rc= %d\n",
1063                                tgt_name(tgt), rc);
1064                         RETURN(rc);
1065                 }
1066                 if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
1067                         LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s "
1068                                            "using the wrong disk %s. Were the"
1069                                            " /dev/ assignments rearranged?\n",
1070                                            tgt->lut_obd->obd_uuid.uuid,
1071                                            lsd->lsd_uuid);
1072                         RETURN(-EINVAL);
1073                 }
1074
1075                 if (lsd->lsd_osd_index != index) {
1076                         LCONSOLE_ERROR_MSG(0x157, "%s: index %d in last rcvd "
1077                                            "is different with the index %d in"
1078                                            "config log, It might be disk"
1079                                            "corruption!\n", tgt_name(tgt),
1080                                            lsd->lsd_osd_index, index);
1081                         RETURN(-EINVAL);
1082                 }
1083         }
1084
1085         if (lsd->lsd_feature_incompat & ~tgt_scd[type].incompat) {
1086                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
1087                        tgt_name(tgt),
1088                        lsd->lsd_feature_incompat & ~tgt_scd[type].incompat);
1089                 RETURN(-EINVAL);
1090         }
1091
1092         if (type == LDD_F_SV_TYPE_MDT)
1093                 lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
1094
1095         if (lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat) {
1096                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
1097                        tgt_name(tgt),
1098                        lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat);
1099                 RETURN(-EINVAL);
1100         }
1101         /** Interop: evict all clients at first boot with 1.8 last_rcvd */
1102         if (type == LDD_F_SV_TYPE_MDT &&
1103             !(lsd->lsd_feature_compat & OBD_COMPAT_20)) {
1104                 if (last_rcvd_size > lsd->lsd_client_start) {
1105                         LCONSOLE_WARN("%s: mounting at first time on 1.8 FS, "
1106                                       "remove all clients for interop needs\n",
1107                                       tgt_name(tgt));
1108                         rc = tgt_truncate_last_rcvd(env, tgt,
1109                                                     lsd->lsd_client_start);
1110                         if (rc)
1111                                 RETURN(rc);
1112                         last_rcvd_size = lsd->lsd_client_start;
1113                 }
1114                 /** set 2.0 flag to upgrade/downgrade between 1.8 and 2.0 */
1115                 lsd->lsd_feature_compat |= OBD_COMPAT_20;
1116         }
1117
1118         spin_lock(&tgt->lut_translock);
1119         tgt->lut_last_transno = lsd->lsd_last_transno;
1120         spin_unlock(&tgt->lut_translock);
1121
1122         lsd->lsd_mount_count++;
1123
1124         CDEBUG(D_INODE, "=======,=BEGIN DUMPING LAST_RCVD========\n");
1125         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
1126                tgt_name(tgt), tgt->lut_last_transno);
1127         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
1128                tgt_name(tgt), lsd->lsd_mount_count);
1129         CDEBUG(D_INODE, "%s: server data size: %u\n",
1130                tgt_name(tgt), lsd->lsd_server_size);
1131         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
1132                tgt_name(tgt), lsd->lsd_client_start);
1133         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
1134                tgt_name(tgt), lsd->lsd_client_size);
1135         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
1136                tgt_name(tgt), last_rcvd_size);
1137         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
1138                tgt_name(tgt), lsd->lsd_subdir_count);
1139         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", tgt_name(tgt),
1140                last_rcvd_size <= lsd->lsd_client_start ? 0 :
1141                (last_rcvd_size - lsd->lsd_client_start) /
1142                 lsd->lsd_client_size);
1143         CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
1144
1145         if (lsd->lsd_server_size == 0 || lsd->lsd_client_start == 0 ||
1146             lsd->lsd_client_size == 0) {
1147                 CERROR("%s: bad last_rcvd contents!\n", tgt_name(tgt));
1148                 RETURN(-EINVAL);
1149         }
1150
1151         if (!tgt->lut_obd->obd_replayable)
1152                 CWARN("%s: recovery support OFF\n", tgt_name(tgt));
1153
1154         rc = tgt_clients_data_init(env, tgt, last_rcvd_size);
1155         if (rc < 0)
1156                 GOTO(err_client, rc);
1157
1158         spin_lock(&tgt->lut_translock);
1159         /* obd_last_committed is used for compatibility
1160          * with other lustre recovery code */
1161         tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
1162         spin_unlock(&tgt->lut_translock);
1163
1164         tgt->lut_obd->u.obt.obt_mount_count = lsd->lsd_mount_count;
1165         tgt->lut_obd->u.obt.obt_instance = (__u32)lsd->lsd_mount_count;
1166
1167         /* save it, so mount count and last_transno is current */
1168         rc = tgt_server_data_update(env, tgt, 0);
1169         if (rc < 0)
1170                 GOTO(err_client, rc);
1171
1172         RETURN(0);
1173
1174 err_client:
1175         class_disconnect_exports(tgt->lut_obd);
1176         return rc;
1177 }
1178
1179 /* add credits for last_rcvd update */
1180 int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
1181                      void *cookie)
1182 {
1183         struct lu_target        *tgt = cookie;
1184         struct tgt_session_info *tsi;
1185         struct tgt_thread_info  *tti = tgt_th_info(env);
1186         struct dt_object        *dto;
1187         int                      rc;
1188
1189         /* if there is no session, then this transaction is not result of
1190          * request processing but some local operation */
1191         if (env->le_ses == NULL)
1192                 return 0;
1193
1194         LASSERT(tgt->lut_last_rcvd);
1195         tsi = tgt_ses_info(env);
1196         /* OFD may start transaction without export assigned */
1197         if (tsi->tsi_exp == NULL)
1198                 return 0;
1199
1200         dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
1201         tti_buf_lcd(tti);
1202
1203         rc = dt_declare_record_write(env, dto, &tti->tti_buf,
1204                                      tsi->tsi_exp->exp_target_data.ted_lr_off,
1205                                      th);
1206         if (rc)
1207                 return rc;
1208
1209         tti_buf_lsd(tti);
1210         rc = dt_declare_record_write(env, dto, &tti->tti_buf, 0, th);
1211         if (rc)
1212                 return rc;
1213
1214         if (tsi->tsi_vbr_obj != NULL &&
1215             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
1216                 dto = dt_object_locate(tsi->tsi_vbr_obj, th->th_dev);
1217                 rc = dt_declare_version_set(env, dto, th);
1218         }
1219
1220         return rc;
1221 }
1222
1223 /* Update last_rcvd records with latests transaction data */
1224 int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
1225                     void *cookie)
1226 {
1227         struct lu_target        *tgt = cookie;
1228         struct tgt_session_info *tsi;
1229         struct tgt_thread_info  *tti = tgt_th_info(env);
1230         struct dt_object        *obj = NULL;
1231         int                      rc;
1232         bool                     echo_client;
1233
1234         if (env->le_ses == NULL)
1235                 return 0;
1236
1237         tsi = tgt_ses_info(env);
1238         /* OFD may start transaction without export assigned */
1239         if (tsi->tsi_exp == NULL)
1240                 return 0;
1241
1242         echo_client = (tgt_ses_req(tsi) == NULL);
1243
1244         if (tti->tti_has_trans && !echo_client) {
1245                 if (tti->tti_mult_trans == 0) {
1246                         CDEBUG(D_HA, "More than one transaction "LPU64"\n",
1247                                tti->tti_transno);
1248                         RETURN(0);
1249                 }
1250                 /* we need another transno to be assigned */
1251                 tti->tti_transno = 0;
1252         } else if (th->th_result == 0) {
1253                 tti->tti_has_trans = 1;
1254         }
1255
1256         if (tsi->tsi_vbr_obj != NULL &&
1257             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
1258                 obj = tsi->tsi_vbr_obj;
1259         }
1260
1261         if (unlikely(echo_client)) /* echo client special case */
1262                 rc = tgt_last_rcvd_update_echo(env, tgt, obj, th,
1263                                                tsi->tsi_exp);
1264         else
1265                 rc = tgt_last_rcvd_update(env, tgt, obj, tsi->tsi_opdata, th,
1266                                           tgt_ses_req(tsi));
1267         return rc;
1268 }