Whamcloud - gitweb
LU-2145 target: move target code to the separate directory
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Lustre Unified Target
37  * These are common function to work with last_received file
38  *
39  * Author: Mikhail Pershin <mike.pershin@intel.com>
40  */
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <lustre_fid.h>
44
45 #include "tgt_internal.h"
46
47 static inline struct lu_buf *tti_buf_lsd(struct tgt_thread_info *tti)
48 {
49         tti->tti_buf.lb_buf = &tti->tti_lsd;
50         tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
51         return &tti->tti_buf;
52 }
53
54 static inline struct lu_buf *tti_buf_lcd(struct tgt_thread_info *tti)
55 {
56         tti->tti_buf.lb_buf = &tti->tti_lcd;
57         tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
58         return &tti->tti_buf;
59 }
60
61 /**
62  * Allocate in-memory data for client slot related to export.
63  */
64 int lut_client_alloc(struct obd_export *exp)
65 {
66         LASSERT(exp != exp->exp_obd->obd_self_export);
67
68         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
69         if (exp->exp_target_data.ted_lcd == NULL)
70                 RETURN(-ENOMEM);
71         /* Mark that slot is not yet valid, 0 doesn't work here */
72         exp->exp_target_data.ted_lr_idx = -1;
73         RETURN(0);
74 }
75 EXPORT_SYMBOL(lut_client_alloc);
76
77 /**
78  * Free in-memory data for client slot related to export.
79  */
80 void lut_client_free(struct obd_export *exp)
81 {
82         struct tg_export_data *ted = &exp->exp_target_data;
83         struct lu_target *lut = class_exp2tgt(exp);
84
85         LASSERT(exp != exp->exp_obd->obd_self_export);
86
87         OBD_FREE_PTR(ted->ted_lcd);
88         ted->ted_lcd = NULL;
89
90         /* Slot may be not yet assigned */
91         if (ted->ted_lr_idx < 0)
92                 return;
93         /* Clear bit when lcd is freed */
94         LASSERT(lut->lut_client_bitmap);
95         if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
96                 CERROR("%s: client %u bit already clear in bitmap\n",
97                        exp->exp_obd->obd_name, ted->ted_lr_idx);
98                 LBUG();
99         }
100 }
101 EXPORT_SYMBOL(lut_client_free);
102
103 int lut_client_data_read(const struct lu_env *env, struct lu_target *tg,
104                          struct lsd_client_data *lcd, loff_t *off, int index)
105 {
106         struct tgt_thread_info *tti = tgt_th_info(env);
107         int rc;
108
109         tti_buf_lcd(tti);
110         rc = dt_record_read(env, tg->lut_last_rcvd, &tti->tti_buf, off);
111         if (rc == 0) {
112                 check_lcd(tg->lut_obd->obd_name, index, &tti->tti_lcd);
113                 lcd_le_to_cpu(&tti->tti_lcd, lcd);
114         }
115
116         CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = "LPU64
117                ", last_xid = "LPU64", last_result = %u, last_data = %u, "
118                "last_close_transno = "LPU64", last_close_xid = "LPU64", "
119                "last_close_result = %u, rc = %d\n", tg->lut_obd->obd_name,
120                *off, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
121                lcd->lcd_last_result, lcd->lcd_last_data,
122                lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
123                lcd->lcd_last_close_result, rc);
124         return rc;
125 }
126 EXPORT_SYMBOL(lut_client_data_read);
127
128 int lut_client_data_write(const struct lu_env *env, struct lu_target *tg,
129                           struct lsd_client_data *lcd, loff_t *off,
130                           struct thandle *th)
131 {
132         struct tgt_thread_info *tti = tgt_th_info(env);
133
134         lcd_cpu_to_le(lcd, &tti->tti_lcd);
135         tti_buf_lcd(tti);
136
137         return dt_record_write(env, tg->lut_last_rcvd, &tti->tti_buf, off, th);
138 }
139 EXPORT_SYMBOL(lut_client_data_write);
140
141 /**
142  * Update client data in last_rcvd
143  */
144 int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
145 {
146         struct tg_export_data *ted = &exp->exp_target_data;
147         struct lu_target      *tg = class_exp2tgt(exp);
148         struct tgt_thread_info *tti = tgt_th_info(env);
149         struct thandle        *th;
150         int                    rc = 0;
151
152         ENTRY;
153
154         th = dt_trans_create(env, tg->lut_bottom);
155         if (IS_ERR(th))
156                 RETURN(PTR_ERR(th));
157
158         rc = dt_declare_record_write(env, tg->lut_last_rcvd,
159                                      sizeof(struct lsd_client_data),
160                                      ted->ted_lr_off, th);
161         if (rc)
162                 GOTO(out, rc);
163
164         rc = dt_trans_start_local(env, tg->lut_bottom, th);
165         if (rc)
166                 GOTO(out, rc);
167         /*
168          * Until this operations will be committed the sync is needed
169          * for this export. This should be done _after_ starting the
170          * transaction so that many connecting clients will not bring
171          * server down with lots of sync writes.
172          */
173         rc = lut_new_client_cb_add(th, exp);
174         if (rc) {
175                 /* can't add callback, do sync now */
176                 th->th_sync = 1;
177         } else {
178                 cfs_spin_lock(&exp->exp_lock);
179                 exp->exp_need_sync = 1;
180                 cfs_spin_unlock(&exp->exp_lock);
181         }
182
183         tti->tti_off = ted->ted_lr_off;
184         rc = lut_client_data_write(env, tg, ted->ted_lcd, &tti->tti_off, th);
185         EXIT;
186 out:
187         dt_trans_stop(env, tg->lut_bottom, th);
188         CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
189                "last_transno = "LPU64": rc = %d\n", tg->lut_obd->obd_name,
190                tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno, rc);
191
192         return rc;
193 }
194
195 int lut_server_data_read(const struct lu_env *env, struct lu_target *tg)
196 {
197         struct tgt_thread_info *tti = tgt_th_info(env);
198         int rc;
199
200         tti->tti_off = 0;
201         tti_buf_lsd(tti);
202         rc = dt_record_read(env, tg->lut_last_rcvd, &tti->tti_buf, &tti->tti_off);
203         if (rc == 0)
204                 lsd_le_to_cpu(&tti->tti_lsd, &tg->lut_lsd);
205
206         CDEBUG(D_INFO, "%s: read last_rcvd server data for UUID = %s, "
207                "last_transno = "LPU64": rc = %d\n", tg->lut_obd->obd_name,
208                tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno, rc);
209         return rc;
210 }
211 EXPORT_SYMBOL(lut_server_data_read);
212
213 int lut_server_data_write(const struct lu_env *env, struct lu_target *tg,
214                           struct thandle *th)
215 {
216         struct tgt_thread_info *tti = tgt_th_info(env);
217         int rc;
218         ENTRY;
219
220         tti->tti_off = 0;
221         tti_buf_lsd(tti);
222         lsd_cpu_to_le(&tg->lut_lsd, &tti->tti_lsd);
223
224         rc = dt_record_write(env, tg->lut_last_rcvd, &tti->tti_buf,
225                              &tti->tti_off, th);
226
227         CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
228                "last_transno = "LPU64": rc = %d\n", tg->lut_obd->obd_name,
229                tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno, rc);
230
231         RETURN(rc);
232 }
233 EXPORT_SYMBOL(lut_server_data_write);
234
235 /**
236  * Update server data in last_rcvd
237  */
238 int lut_server_data_update(const struct lu_env *env, struct lu_target *tg,
239                            int sync)
240 {
241         struct tgt_thread_info *tti = tgt_th_info(env);
242         struct thandle  *th;
243         int                 rc = 0;
244
245         ENTRY;
246
247         CDEBUG(D_SUPER,
248                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
249                tg->lut_lsd.lsd_uuid, tg->lut_obd->u.obt.obt_mount_count,
250                tg->lut_last_transno);
251
252         /* Always save latest transno to keep it fresh */
253         cfs_spin_lock(&tg->lut_translock);
254         tg->lut_lsd.lsd_last_transno = tg->lut_last_transno;
255         cfs_spin_unlock(&tg->lut_translock);
256
257         th = dt_trans_create(env, tg->lut_bottom);
258         if (IS_ERR(th))
259                 RETURN(PTR_ERR(th));
260
261         th->th_sync = sync;
262
263         rc = dt_declare_record_write(env, tg->lut_last_rcvd,
264                                      sizeof(struct lr_server_data),
265                                      tti->tti_off, th);
266         if (rc)
267                 GOTO(out, rc);
268
269         rc = dt_trans_start(env, tg->lut_bottom, th);
270         if (rc)
271                 GOTO(out, rc);
272
273         rc = lut_server_data_write(env, tg, th);
274 out:
275         dt_trans_stop(env, tg->lut_bottom, th);
276
277         CDEBUG(D_INFO, "%s: update last_rcvd server data for UUID = %s, "
278                "last_transno = "LPU64": rc = %d\n", tg->lut_obd->obd_name,
279                tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno, rc);
280         RETURN(rc);
281 }
282 EXPORT_SYMBOL(lut_server_data_update);
283
284 int lut_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
285                            loff_t size)
286 {
287         struct dt_object *dt = tg->lut_last_rcvd;
288         struct thandle   *th;
289         struct lu_attr    attr;
290         int               rc;
291
292         ENTRY;
293
294         attr.la_size = size;
295         attr.la_valid = LA_SIZE;
296
297         th = dt_trans_create(env, tg->lut_bottom);
298         if (IS_ERR(th))
299                 RETURN(PTR_ERR(th));
300         rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
301         if (rc)
302                 GOTO(cleanup, rc);
303         rc = dt_declare_attr_set(env, dt, &attr, th);
304         if (rc)
305                 GOTO(cleanup, rc);
306         rc = dt_trans_start_local(env, tg->lut_bottom, th);
307         if (rc)
308                 GOTO(cleanup, rc);
309
310         rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th, BYPASS_CAPA);
311         if (rc == 0)
312                 rc = dt_attr_set(env, dt, &attr, th, BYPASS_CAPA);
313
314 cleanup:
315         dt_trans_stop(env, tg->lut_bottom, th);
316
317         RETURN(rc);
318 }
319 EXPORT_SYMBOL(lut_truncate_last_rcvd);
320
321 void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
322 {
323         struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
324         struct lu_target *lut = class_exp2tgt(exp);
325
326         LASSERT(lut->lut_bottom);
327         /** VBR: set client last_epoch to current epoch */
328         if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
329                 return;
330         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
331         lut_client_data_update(env, exp);
332 }
333
334 /**
335  * Update boot epoch when recovery ends
336  */
337 void lut_boot_epoch_update(struct lu_target *lut)
338 {
339         struct lu_env env;
340         struct ptlrpc_request *req;
341         __u32 start_epoch;
342         cfs_list_t client_list;
343         int rc;
344
345         if (lut->lut_obd->obd_stopping)
346                 return;
347
348         rc = lu_env_init(&env, LCT_DT_THREAD);
349         if (rc) {
350                 CERROR("Can't initialize environment rc=%d\n", rc);
351                 return;
352         }
353
354         cfs_spin_lock(&lut->lut_translock);
355         start_epoch = lr_epoch(lut->lut_last_transno) + 1;
356         lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
357         lut->lut_lsd.lsd_start_epoch = start_epoch;
358         cfs_spin_unlock(&lut->lut_translock);
359
360         CFS_INIT_LIST_HEAD(&client_list);
361         /**
362          * The recovery is not yet finished and final queue can still be updated
363          * with resend requests. Move final list to separate one for processing
364          */
365         cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
366         cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
367         cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
368
369         /**
370          * go through list of exports participated in recovery and
371          * set new epoch for them
372          */
373         cfs_list_for_each_entry(req, &client_list, rq_list) {
374                 LASSERT(!req->rq_export->exp_delayed);
375                 if (!req->rq_export->exp_vbr_failed)
376                         lut_client_epoch_update(&env, req->rq_export);
377         }
378         /** return list back at once */
379         cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
380         cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
381         cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
382         /** update server epoch */
383         lut_server_data_update(&env, lut, 1);
384         lu_env_fini(&env);
385 }
386 EXPORT_SYMBOL(lut_boot_epoch_update);
387
388 /**
389  * commit callback, need to update last_commited value
390  */
391 struct lut_last_committed_callback {
392         struct dt_txn_commit_cb llcc_cb;
393         struct lu_target       *llcc_lut;
394         struct obd_export      *llcc_exp;
395         __u64                   llcc_transno;
396 };
397
398 void lut_cb_last_committed(struct lu_env *env, struct thandle *th,
399                            struct dt_txn_commit_cb *cb, int err)
400 {
401         struct lut_last_committed_callback *ccb;
402
403         ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb);
404
405         LASSERT(ccb->llcc_lut != NULL);
406         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd);
407
408         cfs_spin_lock(&ccb->llcc_lut->lut_translock);
409         if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed)
410                 ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno;
411
412         LASSERT(ccb->llcc_exp);
413         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
414                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
415                 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
416                 ptlrpc_commit_replies(ccb->llcc_exp);
417         } else {
418                 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
419         }
420         class_export_cb_put(ccb->llcc_exp);
421         if (ccb->llcc_transno)
422                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
423                        ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno);
424         OBD_FREE_PTR(ccb);
425 }
426
427 int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
428                            struct obd_export *exp, __u64 transno)
429 {
430         struct lut_last_committed_callback *ccb;
431         struct dt_txn_commit_cb            *dcb;
432         int                                rc;
433
434         OBD_ALLOC_PTR(ccb);
435         if (ccb == NULL)
436                 return -ENOMEM;
437
438         ccb->llcc_lut     = lut;
439         ccb->llcc_exp     = class_export_cb_get(exp);
440         ccb->llcc_transno = transno;
441
442         dcb            = &ccb->llcc_cb;
443         dcb->dcb_func  = lut_cb_last_committed;
444         CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
445         strncpy(dcb->dcb_name, "lut_cb_last_committed", MAX_COMMIT_CB_STR_LEN);
446         dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
447
448         rc = dt_trans_cb_add(th, dcb);
449         if (rc) {
450                 class_export_cb_put(exp);
451                 OBD_FREE_PTR(ccb);
452         }
453
454         if ((exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
455                 /* report failure to force synchronous operation */
456                 return -EPERM;
457
458         return rc;
459 }
460 EXPORT_SYMBOL(lut_last_commit_cb_add);
461
462 struct lut_new_client_callback {
463         struct dt_txn_commit_cb lncc_cb;
464         struct obd_export      *lncc_exp;
465 };
466
467 void lut_cb_new_client(struct lu_env *env, struct thandle *th,
468                        struct dt_txn_commit_cb *cb, int err)
469 {
470         struct lut_new_client_callback *ccb;
471
472         ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb);
473
474         LASSERT(ccb->lncc_exp->exp_obd);
475
476         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
477                ccb->lncc_exp->exp_obd->obd_name,
478                ccb->lncc_exp->exp_client_uuid.uuid);
479
480         cfs_spin_lock(&ccb->lncc_exp->exp_lock);
481         ccb->lncc_exp->exp_need_sync = 0;
482         cfs_spin_unlock(&ccb->lncc_exp->exp_lock);
483         class_export_cb_put(ccb->lncc_exp);
484
485         OBD_FREE_PTR(ccb);
486 }
487
488 int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp)
489 {
490         struct lut_new_client_callback *ccb;
491         struct dt_txn_commit_cb        *dcb;
492         int                            rc;
493
494         OBD_ALLOC_PTR(ccb);
495         if (ccb == NULL)
496                 return -ENOMEM;
497
498         ccb->lncc_exp  = class_export_cb_get(exp);
499
500         dcb            = &ccb->lncc_cb;
501         dcb->dcb_func  = lut_cb_new_client;
502         CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
503         strncpy(dcb->dcb_name, "lut_cb_new_client", MAX_COMMIT_CB_STR_LEN);
504         dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
505
506         rc = dt_trans_cb_add(th, dcb);
507         if (rc) {
508                 class_export_cb_put(exp);
509                 OBD_FREE_PTR(ccb);
510         }
511         return rc;
512 }
513
514 /**
515  * Add new client to the last_rcvd upon new connection.
516  *
517  * We use a bitmap to locate a free space in the last_rcvd file and initialize
518  * tg_export_data.
519  */
520 int lut_client_new(const struct lu_env *env, struct obd_export *exp)
521 {
522         struct tg_export_data *ted = &exp->exp_target_data;
523         struct lu_target *tg = class_exp2tgt(exp);
524         int rc = 0, idx;
525
526         ENTRY;
527
528         LASSERT(tg->lut_client_bitmap != NULL);
529         if (!strcmp(ted->ted_lcd->lcd_uuid, tg->lut_obd->obd_uuid.uuid))
530                 RETURN(0);
531
532         cfs_mutex_init(&ted->ted_lcd_lock);
533
534         if ((exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
535                 RETURN(0);
536
537         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
538          * there's no need for extra complication here
539          */
540         idx = cfs_find_first_zero_bit(tg->lut_client_bitmap, LR_MAX_CLIENTS);
541 repeat:
542         if (idx >= LR_MAX_CLIENTS ||
543             OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
544                 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
545                        tg->lut_obd->obd_name,  idx);
546                 RETURN(-EOVERFLOW);
547         }
548         if (cfs_test_and_set_bit(idx, tg->lut_client_bitmap)) {
549                 idx = cfs_find_next_zero_bit(tg->lut_client_bitmap,
550                                              LR_MAX_CLIENTS, idx);
551                 goto repeat;
552         }
553
554         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
555                tg->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
556
557         ted->ted_lr_idx = idx;
558         ted->ted_lr_off = tg->lut_lsd.lsd_client_start +
559                           idx * tg->lut_lsd.lsd_client_size;
560
561         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
562
563         CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n",
564                tg->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
565                ted->ted_lcd->lcd_uuid);
566
567         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
568                 RETURN(-ENOSPC);
569
570         rc = lut_client_data_update(env, exp);
571         if (rc)
572                 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
573                        tg->lut_obd->obd_name, idx, rc);
574
575         RETURN(rc);
576 }
577 EXPORT_SYMBOL(lut_client_new);
578
579 /* Add client data to the MDS.  We use a bitmap to locate a free space
580  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
581  * Otherwise, we just have to read the data from the last_rcvd file and
582  * we know its offset.
583  *
584  * It should not be possible to fail adding an existing client - otherwise
585  * mdt_init_server_data() callsite needs to be fixed.
586  */
587 int lut_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
588 {
589         struct tg_export_data *ted = &exp->exp_target_data;
590         struct lu_target *tg = class_exp2tgt(exp);
591
592         ENTRY;
593
594         LASSERT(tg->lut_client_bitmap != NULL);
595         LASSERTF(idx >= 0, "%d\n", idx);
596
597         if (!strcmp(ted->ted_lcd->lcd_uuid, tg->lut_obd->obd_uuid.uuid) ||
598             (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
599                 RETURN(0);
600
601         if (cfs_test_and_set_bit(idx, tg->lut_client_bitmap)) {
602                 CERROR("%s: client %d: bit already set in bitmap!!\n",
603                        tg->lut_obd->obd_name,  idx);
604                 LBUG();
605         }
606
607         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
608                tg->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
609
610         ted->ted_lr_idx = idx;
611         ted->ted_lr_off = tg->lut_lsd.lsd_client_start +
612                           idx * tg->lut_lsd.lsd_client_size;
613
614         cfs_mutex_init(&ted->ted_lcd_lock);
615
616         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
617
618         RETURN(0);
619 }
620 EXPORT_SYMBOL(lut_client_add);
621
622 int lut_client_del(const struct lu_env *env, struct obd_export *exp)
623 {
624         struct tg_export_data *ted = &exp->exp_target_data;
625         struct lu_target      *tg = class_exp2tgt(exp);
626         int                    rc;
627
628         ENTRY;
629
630         LASSERT(ted->ted_lcd);
631
632         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
633         if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
634                     (char *)tg->lut_obd->obd_uuid.uuid) ||
635             (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
636                 RETURN(0);
637
638         CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
639                tg->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
640                ted->ted_lcd->lcd_uuid);
641
642         /* Clear the bit _after_ zeroing out the client so we don't
643            race with filter_client_add and zero out new clients.*/
644         if (!cfs_test_bit(ted->ted_lr_idx, tg->lut_client_bitmap)) {
645                 CERROR("%s: client %u: bit already clear in bitmap!!\n",
646                        tg->lut_obd->obd_name, ted->ted_lr_idx);
647                 LBUG();
648         }
649
650         /* Do not erase record for recoverable client. */
651         if (exp->exp_flags & OBD_OPT_FAILOVER)
652                 RETURN(0);
653
654         /* Make sure the server's last_transno is up to date.
655          * This should be done before zeroing client slot so last_transno will
656          * be in server data or in client data in case of failure */
657         rc = lut_server_data_update(env, tg, 0);
658         if (rc != 0) {
659                 CERROR("%s: failed to update server data, skip client %s "
660                        "zeroing, rc %d\n", tg->lut_obd->obd_name,
661                        ted->ted_lcd->lcd_uuid, rc);
662                 RETURN(rc);
663         }
664
665         cfs_mutex_lock(&ted->ted_lcd_lock);
666         memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
667         rc = lut_client_data_update(env, exp);
668         cfs_mutex_unlock(&ted->ted_lcd_lock);
669
670         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
671                "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
672                tg->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
673                ted->ted_lr_idx, ted->ted_lr_off, rc);
674         RETURN(rc);
675 }
676 EXPORT_SYMBOL(lut_client_del);