Whamcloud - gitweb
LU-3467 ofd: use unified handler for OST requests
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Lustre Unified Target
37  * These are common function to work with last_received file
38  *
39  * Author: Mikhail Pershin <mike.pershin@intel.com>
40  */
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <lustre_fid.h>
44
45 #include "tgt_internal.h"
46
47 static inline struct lu_buf *tti_buf_lsd(struct tgt_thread_info *tti)
48 {
49         tti->tti_buf.lb_buf = &tti->tti_lsd;
50         tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
51         return &tti->tti_buf;
52 }
53
54 static inline struct lu_buf *tti_buf_lcd(struct tgt_thread_info *tti)
55 {
56         tti->tti_buf.lb_buf = &tti->tti_lcd;
57         tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
58         return &tti->tti_buf;
59 }
60
61 /**
62  * Allocate in-memory data for client slot related to export.
63  */
64 int tgt_client_alloc(struct obd_export *exp)
65 {
66         ENTRY;
67         LASSERT(exp != exp->exp_obd->obd_self_export);
68
69         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
70         if (exp->exp_target_data.ted_lcd == NULL)
71                 RETURN(-ENOMEM);
72         /* Mark that slot is not yet valid, 0 doesn't work here */
73         exp->exp_target_data.ted_lr_idx = -1;
74         RETURN(0);
75 }
76 EXPORT_SYMBOL(tgt_client_alloc);
77
78 /**
79  * Free in-memory data for client slot related to export.
80  */
81 void tgt_client_free(struct obd_export *exp)
82 {
83         struct tg_export_data   *ted = &exp->exp_target_data;
84         struct lu_target        *lut = class_exp2tgt(exp);
85
86         LASSERT(exp != exp->exp_obd->obd_self_export);
87
88         OBD_FREE_PTR(ted->ted_lcd);
89         ted->ted_lcd = NULL;
90
91         /* Slot may be not yet assigned */
92         if (ted->ted_lr_idx < 0)
93                 return;
94         /* Clear bit when lcd is freed */
95         LASSERT(lut->lut_client_bitmap);
96         if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
97                 CERROR("%s: client %u bit already clear in bitmap\n",
98                        exp->exp_obd->obd_name, ted->ted_lr_idx);
99                 LBUG();
100         }
101 }
102 EXPORT_SYMBOL(tgt_client_free);
103
104 int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
105                          struct lsd_client_data *lcd, loff_t *off, int index)
106 {
107         struct tgt_thread_info  *tti = tgt_th_info(env);
108         int                      rc;
109
110         tti_buf_lcd(tti);
111         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
112         if (rc == 0) {
113                 check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
114                 lcd_le_to_cpu(&tti->tti_lcd, lcd);
115                 lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
116                 lcd->lcd_last_close_result =
117                         ptlrpc_status_ntoh(lcd->lcd_last_close_result);
118         }
119
120         CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = "LPU64
121                ", last_xid = "LPU64", last_result = %u, last_data = %u, "
122                "last_close_transno = "LPU64", last_close_xid = "LPU64", "
123                "last_close_result = %u, rc = %d\n", tgt->lut_obd->obd_name,
124                *off, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
125                lcd->lcd_last_result, lcd->lcd_last_data,
126                lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
127                lcd->lcd_last_close_result, rc);
128         return rc;
129 }
130 EXPORT_SYMBOL(tgt_client_data_read);
131
132 int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
133                           struct lsd_client_data *lcd, loff_t *off,
134                           struct thandle *th)
135 {
136         struct tgt_thread_info *tti = tgt_th_info(env);
137
138         lcd->lcd_last_result = ptlrpc_status_hton(lcd->lcd_last_result);
139         lcd->lcd_last_close_result =
140                 ptlrpc_status_hton(lcd->lcd_last_close_result);
141         lcd_cpu_to_le(lcd, &tti->tti_lcd);
142         tti_buf_lcd(tti);
143
144         return dt_record_write(env, tgt->lut_last_rcvd, &tti->tti_buf, off, th);
145 }
146 EXPORT_SYMBOL(tgt_client_data_write);
147
148 /**
149  * Update client data in last_rcvd
150  */
151 int tgt_client_data_update(const struct lu_env *env, struct obd_export *exp)
152 {
153         struct tg_export_data   *ted = &exp->exp_target_data;
154         struct lu_target        *tgt = class_exp2tgt(exp);
155         struct tgt_thread_info  *tti = tgt_th_info(env);
156         struct thandle          *th;
157         int                      rc = 0;
158
159         ENTRY;
160
161         th = dt_trans_create(env, tgt->lut_bottom);
162         if (IS_ERR(th))
163                 RETURN(PTR_ERR(th));
164
165         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
166                                      sizeof(struct lsd_client_data),
167                                      ted->ted_lr_off, th);
168         if (rc)
169                 GOTO(out, rc);
170
171         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
172         if (rc)
173                 GOTO(out, rc);
174         /*
175          * Until this operations will be committed the sync is needed
176          * for this export. This should be done _after_ starting the
177          * transaction so that many connecting clients will not bring
178          * server down with lots of sync writes.
179          */
180         rc = tgt_new_client_cb_add(th, exp);
181         if (rc) {
182                 /* can't add callback, do sync now */
183                 th->th_sync = 1;
184         } else {
185                 spin_lock(&exp->exp_lock);
186                 exp->exp_need_sync = 1;
187                 spin_unlock(&exp->exp_lock);
188         }
189
190         tti->tti_off = ted->ted_lr_off;
191         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
192         EXIT;
193 out:
194         dt_trans_stop(env, tgt->lut_bottom, th);
195         CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
196                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
197                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
198
199         return rc;
200 }
201
202 int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
203 {
204         struct tgt_thread_info  *tti = tgt_th_info(env);
205         int                      rc;
206
207         tti->tti_off = 0;
208         tti_buf_lsd(tti);
209         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf,
210                             &tti->tti_off);
211         if (rc == 0)
212                 lsd_le_to_cpu(&tti->tti_lsd, &tgt->lut_lsd);
213
214         CDEBUG(D_INFO, "%s: read last_rcvd server data for UUID = %s, "
215                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
216                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
217         return rc;
218 }
219 EXPORT_SYMBOL(tgt_server_data_read);
220
221 int tgt_server_data_write(const struct lu_env *env, struct lu_target *tgt,
222                           struct thandle *th)
223 {
224         struct tgt_thread_info  *tti = tgt_th_info(env);
225         int                      rc;
226
227         ENTRY;
228
229         tti->tti_off = 0;
230         tti_buf_lsd(tti);
231         lsd_cpu_to_le(&tgt->lut_lsd, &tti->tti_lsd);
232
233         rc = dt_record_write(env, tgt->lut_last_rcvd, &tti->tti_buf,
234                              &tti->tti_off, th);
235
236         CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
237                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
238                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
239
240         RETURN(rc);
241 }
242 EXPORT_SYMBOL(tgt_server_data_write);
243
244 /**
245  * Update server data in last_rcvd
246  */
247 int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
248                            int sync)
249 {
250         struct tgt_thread_info  *tti = tgt_th_info(env);
251         struct thandle          *th;
252         int                      rc = 0;
253
254         ENTRY;
255
256         CDEBUG(D_SUPER,
257                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
258                tgt->lut_lsd.lsd_uuid, tgt->lut_obd->u.obt.obt_mount_count,
259                tgt->lut_last_transno);
260
261         /* Always save latest transno to keep it fresh */
262         spin_lock(&tgt->lut_translock);
263         tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
264         spin_unlock(&tgt->lut_translock);
265
266         th = dt_trans_create(env, tgt->lut_bottom);
267         if (IS_ERR(th))
268                 RETURN(PTR_ERR(th));
269
270         th->th_sync = sync;
271
272         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
273                                      sizeof(struct lr_server_data),
274                                      tti->tti_off, th);
275         if (rc)
276                 GOTO(out, rc);
277
278         rc = dt_trans_start(env, tgt->lut_bottom, th);
279         if (rc)
280                 GOTO(out, rc);
281
282         rc = tgt_server_data_write(env, tgt, th);
283 out:
284         dt_trans_stop(env, tgt->lut_bottom, th);
285
286         CDEBUG(D_INFO, "%s: update last_rcvd server data for UUID = %s, "
287                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
288                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
289         RETURN(rc);
290 }
291 EXPORT_SYMBOL(tgt_server_data_update);
292
293 int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tgt,
294                            loff_t size)
295 {
296         struct dt_object *dt = tgt->lut_last_rcvd;
297         struct thandle   *th;
298         struct lu_attr    attr;
299         int               rc;
300
301         ENTRY;
302
303         attr.la_size = size;
304         attr.la_valid = LA_SIZE;
305
306         th = dt_trans_create(env, tgt->lut_bottom);
307         if (IS_ERR(th))
308                 RETURN(PTR_ERR(th));
309         rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
310         if (rc)
311                 GOTO(cleanup, rc);
312         rc = dt_declare_attr_set(env, dt, &attr, th);
313         if (rc)
314                 GOTO(cleanup, rc);
315         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
316         if (rc)
317                 GOTO(cleanup, rc);
318
319         rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th, BYPASS_CAPA);
320         if (rc == 0)
321                 rc = dt_attr_set(env, dt, &attr, th, BYPASS_CAPA);
322
323 cleanup:
324         dt_trans_stop(env, tgt->lut_bottom, th);
325
326         RETURN(rc);
327 }
328 EXPORT_SYMBOL(tgt_truncate_last_rcvd);
329
330 void tgt_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
331 {
332         struct lsd_client_data  *lcd = exp->exp_target_data.ted_lcd;
333         struct lu_target        *tgt = class_exp2tgt(exp);
334
335         LASSERT(tgt->lut_bottom);
336         /** VBR: set client last_epoch to current epoch */
337         if (lcd->lcd_last_epoch >= tgt->lut_lsd.lsd_start_epoch)
338                 return;
339         lcd->lcd_last_epoch = tgt->lut_lsd.lsd_start_epoch;
340         tgt_client_data_update(env, exp);
341 }
342
343 /**
344  * Update boot epoch when recovery ends
345  */
346 void tgt_boot_epoch_update(struct lu_target *tgt)
347 {
348         struct lu_env            env;
349         struct ptlrpc_request   *req;
350         __u32                    start_epoch;
351         cfs_list_t               client_list;
352         int                      rc;
353
354         if (tgt->lut_obd->obd_stopping)
355                 return;
356
357         rc = lu_env_init(&env, LCT_DT_THREAD);
358         if (rc) {
359                 CERROR("%s: can't initialize environment: rc = %d\n",
360                         tgt->lut_obd->obd_name, rc);
361                 return;
362         }
363
364         spin_lock(&tgt->lut_translock);
365         start_epoch = lr_epoch(tgt->lut_last_transno) + 1;
366         tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
367         tgt->lut_lsd.lsd_start_epoch = start_epoch;
368         spin_unlock(&tgt->lut_translock);
369
370         CFS_INIT_LIST_HEAD(&client_list);
371         /**
372          * The recovery is not yet finished and final queue can still be updated
373          * with resend requests. Move final list to separate one for processing
374          */
375         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
376         cfs_list_splice_init(&tgt->lut_obd->obd_final_req_queue, &client_list);
377         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
378
379         /**
380          * go through list of exports participated in recovery and
381          * set new epoch for them
382          */
383         cfs_list_for_each_entry(req, &client_list, rq_list) {
384                 LASSERT(!req->rq_export->exp_delayed);
385                 if (!req->rq_export->exp_vbr_failed)
386                         tgt_client_epoch_update(&env, req->rq_export);
387         }
388         /** return list back at once */
389         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
390         cfs_list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
391         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
392         /** update server epoch */
393         tgt_server_data_update(&env, tgt, 1);
394         lu_env_fini(&env);
395 }
396 EXPORT_SYMBOL(tgt_boot_epoch_update);
397
398 /**
399  * commit callback, need to update last_commited value
400  */
401 struct tgt_last_committed_callback {
402         struct dt_txn_commit_cb  llcc_cb;
403         struct lu_target        *llcc_tgt;
404         struct obd_export       *llcc_exp;
405         __u64                    llcc_transno;
406 };
407
408 void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
409                            struct dt_txn_commit_cb *cb, int err)
410 {
411         struct tgt_last_committed_callback *ccb;
412
413         ccb = container_of0(cb, struct tgt_last_committed_callback, llcc_cb);
414
415         LASSERT(ccb->llcc_tgt != NULL);
416         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
417
418         spin_lock(&ccb->llcc_tgt->lut_translock);
419         if (ccb->llcc_transno > ccb->llcc_tgt->lut_obd->obd_last_committed)
420                 ccb->llcc_tgt->lut_obd->obd_last_committed = ccb->llcc_transno;
421
422         LASSERT(ccb->llcc_exp);
423         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
424                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
425                 spin_unlock(&ccb->llcc_tgt->lut_translock);
426                 ptlrpc_commit_replies(ccb->llcc_exp);
427         } else {
428                 spin_unlock(&ccb->llcc_tgt->lut_translock);
429         }
430         class_export_cb_put(ccb->llcc_exp);
431         if (ccb->llcc_transno)
432                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
433                        ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
434         OBD_FREE_PTR(ccb);
435 }
436
437 int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
438                            struct obd_export *exp, __u64 transno)
439 {
440         struct tgt_last_committed_callback      *ccb;
441         struct dt_txn_commit_cb                 *dcb;
442         int                                      rc;
443
444         OBD_ALLOC_PTR(ccb);
445         if (ccb == NULL)
446                 return -ENOMEM;
447
448         ccb->llcc_tgt = tgt;
449         ccb->llcc_exp = class_export_cb_get(exp);
450         ccb->llcc_transno = transno;
451
452         dcb = &ccb->llcc_cb;
453         dcb->dcb_func = tgt_cb_last_committed;
454         CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
455         strncpy(dcb->dcb_name, "tgt_cb_last_committed", MAX_COMMIT_CB_STR_LEN);
456         dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
457
458         rc = dt_trans_cb_add(th, dcb);
459         if (rc) {
460                 class_export_cb_put(exp);
461                 OBD_FREE_PTR(ccb);
462         }
463
464         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
465                 /* report failure to force synchronous operation */
466                 return -EPERM;
467
468         return rc;
469 }
470 EXPORT_SYMBOL(tgt_last_commit_cb_add);
471
472 struct tgt_new_client_callback {
473         struct dt_txn_commit_cb  lncc_cb;
474         struct obd_export       *lncc_exp;
475 };
476
477 void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
478                        struct dt_txn_commit_cb *cb, int err)
479 {
480         struct tgt_new_client_callback *ccb;
481
482         ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
483
484         LASSERT(ccb->lncc_exp->exp_obd);
485
486         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
487                ccb->lncc_exp->exp_obd->obd_name,
488                ccb->lncc_exp->exp_client_uuid.uuid);
489
490         spin_lock(&ccb->lncc_exp->exp_lock);
491         ccb->lncc_exp->exp_need_sync = 0;
492         spin_unlock(&ccb->lncc_exp->exp_lock);
493         class_export_cb_put(ccb->lncc_exp);
494
495         OBD_FREE_PTR(ccb);
496 }
497
498 int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
499 {
500         struct tgt_new_client_callback  *ccb;
501         struct dt_txn_commit_cb         *dcb;
502         int                              rc;
503
504         OBD_ALLOC_PTR(ccb);
505         if (ccb == NULL)
506                 return -ENOMEM;
507
508         ccb->lncc_exp = class_export_cb_get(exp);
509
510         dcb = &ccb->lncc_cb;
511         dcb->dcb_func = tgt_cb_new_client;
512         CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
513         strncpy(dcb->dcb_name, "tgt_cb_new_client", MAX_COMMIT_CB_STR_LEN);
514         dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
515
516         rc = dt_trans_cb_add(th, dcb);
517         if (rc) {
518                 class_export_cb_put(exp);
519                 OBD_FREE_PTR(ccb);
520         }
521         return rc;
522 }
523
524 /**
525  * Add new client to the last_rcvd upon new connection.
526  *
527  * We use a bitmap to locate a free space in the last_rcvd file and initialize
528  * tg_export_data.
529  */
530 int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
531 {
532         struct tg_export_data   *ted = &exp->exp_target_data;
533         struct lu_target        *tgt = class_exp2tgt(exp);
534         int                      rc = 0, idx;
535
536         ENTRY;
537
538         LASSERT(tgt->lut_client_bitmap != NULL);
539         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid))
540                 RETURN(0);
541
542         mutex_init(&ted->ted_lcd_lock);
543
544         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
545                 RETURN(0);
546
547         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
548          * there's no need for extra complication here
549          */
550         idx = find_first_zero_bit(tgt->lut_client_bitmap, LR_MAX_CLIENTS);
551 repeat:
552         if (idx >= LR_MAX_CLIENTS ||
553             OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
554                 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
555                        tgt->lut_obd->obd_name,  idx);
556                 RETURN(-EOVERFLOW);
557         }
558         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
559                 idx = find_next_zero_bit(tgt->lut_client_bitmap,
560                                              LR_MAX_CLIENTS, idx);
561                 goto repeat;
562         }
563
564         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
565                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
566
567         ted->ted_lr_idx = idx;
568         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
569                           idx * tgt->lut_lsd.lsd_client_size;
570
571         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
572
573         CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n",
574                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
575                ted->ted_lcd->lcd_uuid);
576
577         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
578                 RETURN(-ENOSPC);
579
580         rc = tgt_client_data_update(env, exp);
581         if (rc)
582                 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
583                        tgt->lut_obd->obd_name, idx, rc);
584
585         RETURN(rc);
586 }
587 EXPORT_SYMBOL(tgt_client_new);
588
589 /* Add client data to the MDS.  We use a bitmap to locate a free space
590  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
591  * Otherwise, we just have to read the data from the last_rcvd file and
592  * we know its offset.
593  *
594  * It should not be possible to fail adding an existing client - otherwise
595  * mdt_init_server_data() callsite needs to be fixed.
596  */
597 int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
598 {
599         struct tg_export_data   *ted = &exp->exp_target_data;
600         struct lu_target        *tgt = class_exp2tgt(exp);
601
602         ENTRY;
603
604         LASSERT(tgt->lut_client_bitmap != NULL);
605         LASSERTF(idx >= 0, "%d\n", idx);
606
607         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid) ||
608             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
609                 RETURN(0);
610
611         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
612                 CERROR("%s: client %d: bit already set in bitmap!!\n",
613                        tgt->lut_obd->obd_name,  idx);
614                 LBUG();
615         }
616
617         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
618                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
619
620         ted->ted_lr_idx = idx;
621         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
622                           idx * tgt->lut_lsd.lsd_client_size;
623
624         mutex_init(&ted->ted_lcd_lock);
625
626         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
627
628         RETURN(0);
629 }
630 EXPORT_SYMBOL(tgt_client_add);
631
632 int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
633 {
634         struct tg_export_data   *ted = &exp->exp_target_data;
635         struct lu_target        *tgt = class_exp2tgt(exp);
636         int                      rc;
637
638         ENTRY;
639
640         LASSERT(ted->ted_lcd);
641
642         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
643         if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
644                     (char *)tgt->lut_obd->obd_uuid.uuid) ||
645             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
646                 RETURN(0);
647
648         CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
649                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
650                ted->ted_lcd->lcd_uuid);
651
652         /* Clear the bit _after_ zeroing out the client so we don't
653            race with filter_client_add and zero out new clients.*/
654         if (!test_bit(ted->ted_lr_idx, tgt->lut_client_bitmap)) {
655                 CERROR("%s: client %u: bit already clear in bitmap!!\n",
656                        tgt->lut_obd->obd_name, ted->ted_lr_idx);
657                 LBUG();
658         }
659
660         /* Do not erase record for recoverable client. */
661         if (exp->exp_flags & OBD_OPT_FAILOVER)
662                 RETURN(0);
663
664         /* Make sure the server's last_transno is up to date.
665          * This should be done before zeroing client slot so last_transno will
666          * be in server data or in client data in case of failure */
667         rc = tgt_server_data_update(env, tgt, 0);
668         if (rc != 0) {
669                 CERROR("%s: failed to update server data, skip client %s "
670                        "zeroing, rc %d\n", tgt->lut_obd->obd_name,
671                        ted->ted_lcd->lcd_uuid, rc);
672                 RETURN(rc);
673         }
674
675         mutex_lock(&ted->ted_lcd_lock);
676         memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
677         rc = tgt_client_data_update(env, exp);
678         mutex_unlock(&ted->ted_lcd_lock);
679
680         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
681                "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
682                tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
683                ted->ted_lr_idx, ted->ted_lr_off, rc);
684         RETURN(rc);
685 }
686 EXPORT_SYMBOL(tgt_client_del);
687
688 /*
689  * last_rcvd & last_committed update callbacks
690  */
691 int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
692                          struct dt_object *obj, __u64 opdata,
693                          struct thandle *th, struct ptlrpc_request *req)
694 {
695         struct tgt_thread_info  *tti = tgt_th_info(env);
696         struct tgt_session_info *tsi = tgt_ses_info(env);
697         struct tg_export_data   *ted;
698         __u64                   *transno_p;
699         int                      rc = 0;
700         bool                     lw_client, update = false;
701
702         ENTRY;
703
704         if (tsi->tsi_has_trans) {
705                 /* XXX: currently there are allowed cases, but the wrong cases
706                  * are also possible, so better check is needed here */
707                 CDEBUG(D_INFO, "More than one transaction "LPU64"\n",
708                        tti->tti_transno);
709                 return 0;
710         }
711
712         tsi->tsi_has_trans = 1;
713         /* that can be OUT target and we need tgt_session_info */
714         if (req == NULL) {
715                 req = tgt_ses_req(tsi);
716                 if (req == NULL) /* echo client case */
717                         RETURN(0);
718         }
719
720         ted = &req->rq_export->exp_target_data;
721
722         lw_client = exp_connect_flags(req->rq_export) & OBD_CONNECT_LIGHTWEIGHT;
723         if (ted->ted_lr_idx < 0 && !lw_client)
724                 /* ofd connect may cause transaction before export has
725                  * last_rcvd slot */
726                 RETURN(0);
727
728         tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
729
730         spin_lock(&tgt->lut_translock);
731         if (th->th_result != 0) {
732                 if (tti->tti_transno != 0) {
733                         CERROR("%s: replay transno "LPU64" failed: rc = %d\n",
734                                tgt_name(tgt), tti->tti_transno, th->th_result);
735                 }
736         } else if (tti->tti_transno == 0) {
737                 tti->tti_transno = ++tgt->lut_last_transno;
738         } else {
739                 /* should be replay */
740                 if (tti->tti_transno > tgt->lut_last_transno)
741                         tgt->lut_last_transno = tti->tti_transno;
742         }
743         spin_unlock(&tgt->lut_translock);
744
745         /** VBR: set new versions */
746         if (th->th_result == 0 && obj != NULL)
747                 dt_version_set(env, obj, tti->tti_transno, th);
748
749         /* filling reply data */
750         CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
751                tti->tti_transno, tgt->lut_obd->obd_last_committed);
752
753         req->rq_transno = tti->tti_transno;
754         lustre_msg_set_transno(req->rq_repmsg, tti->tti_transno);
755
756         /* if can't add callback, do sync write */
757         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, req->rq_export,
758                                                 tti->tti_transno);
759
760         if (lw_client) {
761                 /* All operations performed by LW clients are synchronous and
762                  * we store the committed transno in the last_rcvd header */
763                 spin_lock(&tgt->lut_translock);
764                 if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
765                         tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
766                         update = true;
767                 }
768                 spin_unlock(&tgt->lut_translock);
769                 /* Although lightweight (LW) connections have no slot in
770                  * last_rcvd, we still want to maintain the in-memory
771                  * lsd_client_data structure in order to properly handle reply
772                  * reconstruction. */
773         } else if (ted->ted_lr_off == 0) {
774                 CERROR("%s: client idx %d has offset %lld\n",
775                        tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
776                 RETURN(-EINVAL);
777         }
778
779         /* if the export has already been disconnected, we have no last_rcvd
780          * slot, update server data with latest transno then */
781         if (ted->ted_lcd == NULL) {
782                 CWARN("commit transaction for disconnected client %s: rc %d\n",
783                       req->rq_export->exp_client_uuid.uuid, rc);
784                 GOTO(srv_update, rc = 0);
785         }
786
787         mutex_lock(&ted->ted_lcd_lock);
788         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
789         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
790             lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
791                 transno_p = &ted->ted_lcd->lcd_last_close_transno;
792                 ted->ted_lcd->lcd_last_close_xid = req->rq_xid;
793                 ted->ted_lcd->lcd_last_close_result = th->th_result;
794         } else {
795                 /* VBR: save versions in last_rcvd for reconstruct. */
796                 __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
797
798                 if (pre_versions) {
799                         ted->ted_lcd->lcd_pre_versions[0] = pre_versions[0];
800                         ted->ted_lcd->lcd_pre_versions[1] = pre_versions[1];
801                         ted->ted_lcd->lcd_pre_versions[2] = pre_versions[2];
802                         ted->ted_lcd->lcd_pre_versions[3] = pre_versions[3];
803                 }
804                 transno_p = &ted->ted_lcd->lcd_last_transno;
805                 ted->ted_lcd->lcd_last_xid = req->rq_xid;
806                 ted->ted_lcd->lcd_last_result = th->th_result;
807                 /* XXX: lcd_last_data is __u32 but intent_dispostion is __u64,
808                  * see struct ldlm_reply->lock_policy_res1; */
809                 ted->ted_lcd->lcd_last_data = opdata;
810         }
811
812         /* Update transno in slot only if non-zero number, i.e. no errors */
813         if (likely(tti->tti_transno != 0)) {
814                 if (*transno_p > tti->tti_transno) {
815                         CERROR("%s: trying to overwrite bigger transno:"
816                                "on-disk: "LPU64", new: "LPU64" replay: %d. "
817                                "see LU-617.\n", tgt_name(tgt), *transno_p,
818                                tti->tti_transno, req_is_replay(req));
819                         if (req_is_replay(req)) {
820                                 spin_lock(&req->rq_export->exp_lock);
821                                 req->rq_export->exp_vbr_failed = 1;
822                                 spin_unlock(&req->rq_export->exp_lock);
823                         }
824                         mutex_unlock(&ted->ted_lcd_lock);
825                         RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
826                 }
827                 *transno_p = tti->tti_transno;
828         }
829
830         if (!lw_client) {
831                 tti->tti_off = ted->ted_lr_off;
832                 rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
833                 if (rc < 0) {
834                         mutex_unlock(&ted->ted_lcd_lock);
835                         RETURN(rc);
836                 }
837         }
838         mutex_unlock(&ted->ted_lcd_lock);
839         EXIT;
840 srv_update:
841         if (update)
842                 rc = tgt_server_data_write(env, tgt, th);
843         return rc;
844 }
845 EXPORT_SYMBOL(tgt_last_rcvd_update);
846
847 /*
848  * last_rcvd update for echo client simulation.
849  * It updates last_rcvd client slot and version of object in
850  * simple way but with all locks to simulate all drawbacks
851  */
852 int tgt_last_rcvd_update_echo(const struct lu_env *env, struct lu_target *tgt,
853                               struct dt_object *obj, struct thandle *th,
854                               struct obd_export *exp)
855 {
856         struct tgt_thread_info  *tti = tgt_th_info(env);
857         struct tg_export_data   *ted = &exp->exp_target_data;
858         int                      rc = 0;
859
860         ENTRY;
861
862         tti->tti_transno = 0;
863
864         spin_lock(&tgt->lut_translock);
865         if (th->th_result == 0)
866                 tti->tti_transno = ++tgt->lut_last_transno;
867         spin_unlock(&tgt->lut_translock);
868
869         /** VBR: set new versions */
870         if (th->th_result == 0 && obj != NULL)
871                 dt_version_set(env, obj, tti->tti_transno, th);
872
873         /* if can't add callback, do sync write */
874         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp,
875                                                 tti->tti_transno);
876
877         LASSERT(ted->ted_lr_off > 0);
878
879         mutex_lock(&ted->ted_lcd_lock);
880         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
881         ted->ted_lcd->lcd_last_transno = tti->tti_transno;
882         ted->ted_lcd->lcd_last_result = th->th_result;
883
884         tti->tti_off = ted->ted_lr_off;
885         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
886         mutex_unlock(&ted->ted_lcd_lock);
887         RETURN(rc);
888 }
889 EXPORT_SYMBOL(tgt_last_rcvd_update_echo);