Whamcloud - gitweb
LU-1818 quota: en/disable quota enforcement via conf_param
[fs/lustre-release.git] / lustre / ptlrpc / target.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Lustre Common Target
37  * These are common function for MDT and OST recovery-related functionality
38  *
39  *   Author: Mikhail Pershin <tappro@sun.com>
40  */
41
42 #include <obd.h>
43 #include <lustre_fsfilt.h>
44 #include <obd_class.h>
45 #include <lustre_fid.h>
46
47 /**
48  * Common data shared by tg-level handlers. This is allocated per-thread to
49  * reduce stack consumption.
50  */
51 struct tg_thread_info {
52         /* server and client data buffers */
53         struct lr_server_data  tti_lsd;
54         struct lsd_client_data tti_lcd;
55         struct lu_buf          tti_buf;
56         loff_t                 tti_off;
57 };
58
59 static inline struct lu_buf *tti_buf_lsd(struct tg_thread_info *tti)
60 {
61         tti->tti_buf.lb_buf = &tti->tti_lsd;
62         tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
63         return &tti->tti_buf;
64 }
65
66 static inline struct lu_buf *tti_buf_lcd(struct tg_thread_info *tti)
67 {
68         tti->tti_buf.lb_buf = &tti->tti_lcd;
69         tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
70         return &tti->tti_buf;
71 }
72
73 extern struct lu_context_key tg_thread_key;
74
75 static inline struct tg_thread_info *tg_th_info(const struct lu_env *env)
76 {
77         struct tg_thread_info *tti;
78
79         tti = lu_context_key_get(&env->le_ctx, &tg_thread_key);
80         LASSERT(tti);
81         return tti;
82 }
83
84 /**
85  * Update client data in last_rcvd file. An obd API
86  */
87 static int obt_client_data_update(struct obd_export *exp)
88 {
89         struct tg_export_data *ted = &exp->exp_target_data;
90         struct obd_device_target *obt = &exp->exp_obd->u.obt;
91         struct lu_target *lut = class_exp2tgt(exp);
92         loff_t off = ted->ted_lr_off;
93         int rc = 0;
94
95         rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp,
96                                  ted->ted_lcd, sizeof(*ted->ted_lcd), &off, 0);
97
98         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
99                ted->ted_lr_idx, le32_to_cpu(ted->ted_lcd->lcd_last_epoch),
100                le32_to_cpu(lut->lut_lsd.lsd_start_epoch));
101
102         return rc;
103 }
104
105 /**
106  * Update server data in last_rcvd file. An obd API
107  */
108 int obt_server_data_update(struct lu_target *lut, int force_sync)
109 {
110         struct obd_device_target *obt = &lut->lut_obd->u.obt;
111         loff_t off = 0;
112         int rc;
113         ENTRY;
114
115         CDEBUG(D_SUPER,
116                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
117                lut->lut_lsd.lsd_uuid,
118                le64_to_cpu(lut->lut_lsd.lsd_mount_count),
119                le64_to_cpu(lut->lut_lsd.lsd_last_transno));
120
121         rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp,
122                                  &lut->lut_lsd, sizeof(lut->lut_lsd),
123                                  &off, force_sync);
124         if (rc)
125                 CERROR("error writing lr_server_data: rc = %d\n", rc);
126
127         RETURN(rc);
128 }
129
130 /**
131  * Update client epoch with server's one
132  */
133 void obt_client_epoch_update(struct obd_export *exp)
134 {
135         struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
136         struct lu_target *lut = class_exp2tgt(exp);
137
138         /** VBR: set client last_epoch to current epoch */
139         if (le32_to_cpu(lcd->lcd_last_epoch) >=
140             le32_to_cpu(lut->lut_lsd.lsd_start_epoch))
141                 return;
142         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
143         obt_client_data_update(exp);
144 }
145
146 /**
147  * Increment server epoch. An obd API
148  */
149 static void obt_boot_epoch_update(struct lu_target *lut)
150 {
151         struct obd_device *obd = lut->lut_obd;
152         __u32 start_epoch;
153         struct ptlrpc_request *req;
154         cfs_list_t client_list;
155
156         cfs_spin_lock(&lut->lut_translock);
157         start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1;
158         lut->lut_last_transno = cpu_to_le64((__u64)start_epoch <<
159                                             LR_EPOCH_BITS);
160         lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch);
161         cfs_spin_unlock(&lut->lut_translock);
162
163         CFS_INIT_LIST_HEAD(&client_list);
164         cfs_spin_lock(&obd->obd_recovery_task_lock);
165         cfs_list_splice_init(&obd->obd_final_req_queue, &client_list);
166         cfs_spin_unlock(&obd->obd_recovery_task_lock);
167
168         /**
169          * go through list of exports participated in recovery and
170          * set new epoch for them
171          */
172         cfs_list_for_each_entry(req, &client_list, rq_list) {
173                 LASSERT(!req->rq_export->exp_delayed);
174                 obt_client_epoch_update(req->rq_export);
175         }
176         /** return list back at once */
177         cfs_spin_lock(&obd->obd_recovery_task_lock);
178         cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
179         cfs_spin_unlock(&obd->obd_recovery_task_lock);
180         obt_server_data_update(lut, 1);
181 }
182
183 /**
184  * Allocate in-memory data for client slot related to export.
185  */
186 int lut_client_alloc(struct obd_export *exp)
187 {
188         LASSERT(exp != exp->exp_obd->obd_self_export);
189
190         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
191         if (exp->exp_target_data.ted_lcd == NULL)
192                 RETURN(-ENOMEM);
193         /* Mark that slot is not yet valid, 0 doesn't work here */
194         exp->exp_target_data.ted_lr_idx = -1;
195         RETURN(0);
196 }
197 EXPORT_SYMBOL(lut_client_alloc);
198
199 /**
200  * Free in-memory data for client slot related to export.
201  */
202 void lut_client_free(struct obd_export *exp)
203 {
204         struct tg_export_data *ted = &exp->exp_target_data;
205         struct lu_target *lut = class_exp2tgt(exp);
206
207         LASSERT(exp != exp->exp_obd->obd_self_export);
208
209         OBD_FREE_PTR(ted->ted_lcd);
210         ted->ted_lcd = NULL;
211
212         /* Slot may be not yet assigned */
213         if (ted->ted_lr_idx < 0)
214                 return;
215         /* Clear bit when lcd is freed */
216         LASSERT(lut->lut_client_bitmap);
217         if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
218                 CERROR("%s: client %u bit already clear in bitmap\n",
219                        exp->exp_obd->obd_name, ted->ted_lr_idx);
220                 LBUG();
221         }
222 }
223 EXPORT_SYMBOL(lut_client_free);
224
225 int lut_client_data_read(const struct lu_env *env, struct lu_target *tg,
226                          struct lsd_client_data *lcd, loff_t *off, int index)
227 {
228         struct tg_thread_info *tti = tg_th_info(env);
229         int rc;
230
231         tti_buf_lcd(tti);
232         rc = dt_record_read(env, tg->lut_last_rcvd, &tti->tti_buf, off);
233         if (rc == 0) {
234                 check_lcd(tg->lut_obd->obd_name, index, &tti->tti_lcd);
235                 lcd_le_to_cpu(&tti->tti_lcd, lcd);
236         }
237
238         CDEBUG(D_INFO, "read lcd @%lld rc = %d, uuid = %s, last_transno = "LPU64
239                ", last_xid = "LPU64", last_result = %u, last_data = %u, "
240                "last_close_transno = "LPU64", last_close_xid = "LPU64", "
241                "last_close_result = %u\n", *off,
242                rc, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
243                lcd->lcd_last_result, lcd->lcd_last_data,
244                lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
245                lcd->lcd_last_close_result);
246         return rc;
247 }
248 EXPORT_SYMBOL(lut_client_data_read);
249
250 int lut_client_data_write(const struct lu_env *env, struct lu_target *tg,
251                           struct lsd_client_data *lcd, loff_t *off,
252                           struct thandle *th)
253 {
254         struct tg_thread_info *tti = tg_th_info(env);
255
256         lcd_cpu_to_le(lcd, &tti->tti_lcd);
257         tti_buf_lcd(tti);
258
259         return dt_record_write(env, tg->lut_last_rcvd, &tti->tti_buf, off, th);
260 }
261 EXPORT_SYMBOL(lut_client_data_write);
262
263 /**
264  * Update client data in last_rcvd
265  */
266 int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
267 {
268         struct tg_export_data *ted = &exp->exp_target_data;
269         struct lu_target      *tg = class_exp2tgt(exp);
270         struct tg_thread_info *tti = tg_th_info(env);
271         struct thandle        *th;
272         int                    rc = 0;
273
274         ENTRY;
275
276         th = dt_trans_create(env, tg->lut_bottom);
277         if (IS_ERR(th))
278                 RETURN(PTR_ERR(th));
279
280         rc = dt_declare_record_write(env, tg->lut_last_rcvd,
281                                      sizeof(struct lsd_client_data),
282                                      ted->ted_lr_off, th);
283         if (rc)
284                 GOTO(out, rc);
285
286         rc = dt_trans_start_local(env, tg->lut_bottom, th);
287         if (rc)
288                 GOTO(out, rc);
289         /*
290          * Until this operations will be committed the sync is needed
291          * for this export. This should be done _after_ starting the
292          * transaction so that many connecting clients will not bring
293          * server down with lots of sync writes.
294          */
295         rc = lut_new_client_cb_add(th, exp);
296         if (rc) {
297                 /* can't add callback, do sync now */
298                 th->th_sync = 1;
299         } else {
300                 cfs_spin_lock(&exp->exp_lock);
301                 exp->exp_need_sync = 1;
302                 cfs_spin_unlock(&exp->exp_lock);
303         }
304
305         tti->tti_off = ted->ted_lr_off;
306         rc = lut_client_data_write(env, tg, ted->ted_lcd, &tti->tti_off, th);
307         EXIT;
308 out:
309         dt_trans_stop(env, tg->lut_bottom, th);
310         CDEBUG(D_INFO, "write last_rcvd, rc = %d:\n"
311                "uuid = %s\nlast_transno = "LPU64"\n",
312                rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
313
314         return rc;
315 }
316
317 int lut_server_data_read(const struct lu_env *env, struct lu_target *tg)
318 {
319         struct tg_thread_info *tti = tg_th_info(env);
320         int rc;
321
322         tti->tti_off = 0;
323         tti_buf_lsd(tti);
324         rc = dt_record_read(env, tg->lut_last_rcvd, &tti->tti_buf, &tti->tti_off);
325         if (rc == 0)
326                 lsd_le_to_cpu(&tti->tti_lsd, &tg->lut_lsd);
327
328         CDEBUG(D_INFO, "%s: read last_rcvd header, rc = %d, uuid = %s, "
329                "last_transno = "LPU64"\n", tg->lut_obd->obd_name, rc,
330                tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
331         return rc;
332 }
333 EXPORT_SYMBOL(lut_server_data_read);
334
335 int lut_server_data_write(const struct lu_env *env, struct lu_target *tg,
336                           struct thandle *th)
337 {
338         struct tg_thread_info *tti = tg_th_info(env);
339         int rc;
340         ENTRY;
341
342         tti->tti_off = 0;
343         tti_buf_lsd(tti);
344         lsd_cpu_to_le(&tg->lut_lsd, &tti->tti_lsd);
345
346         rc = dt_record_write(env, tg->lut_last_rcvd, &tti->tti_buf,
347                              &tti->tti_off, th);
348
349         CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
350                "uuid = %s\nlast_transno = "LPU64"\n",
351                rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
352
353         RETURN(rc);
354 }
355 EXPORT_SYMBOL(lut_server_data_write);
356
357 /**
358  * Update server data in last_rcvd
359  */
360 int lut_server_data_update(const struct lu_env *env, struct lu_target *tg,
361                            int sync)
362 {
363         struct tg_thread_info *tti = tg_th_info(env);
364         struct thandle  *th;
365         int                 rc = 0;
366
367         ENTRY;
368
369         CDEBUG(D_SUPER,
370                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
371                tg->lut_lsd.lsd_uuid, tg->lut_obd->u.obt.obt_mount_count,
372                tg->lut_last_transno);
373
374         /* Always save latest transno to keep it fresh */
375         cfs_spin_lock(&tg->lut_translock);
376         tg->lut_lsd.lsd_last_transno = tg->lut_last_transno;
377         cfs_spin_unlock(&tg->lut_translock);
378
379         th = dt_trans_create(env, tg->lut_bottom);
380         if (IS_ERR(th))
381                 RETURN(PTR_ERR(th));
382
383         th->th_sync = sync;
384
385         rc = dt_declare_record_write(env, tg->lut_last_rcvd,
386                                      sizeof(struct lr_server_data),
387                                      tti->tti_off, th);
388         if (rc)
389                 GOTO(out, rc);
390
391         rc = dt_trans_start(env, tg->lut_bottom, th);
392         if (rc)
393                 GOTO(out, rc);
394
395         rc = lut_server_data_write(env, tg, th);
396 out:
397         dt_trans_stop(env, tg->lut_bottom, th);
398
399         CDEBUG(D_INFO, "write last_rcvd header, rc = %d:\n"
400                "uuid = %s\nlast_transno = "LPU64"\n",
401                rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
402         RETURN(rc);
403 }
404 EXPORT_SYMBOL(lut_server_data_update);
405
406 int lut_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
407                            loff_t size)
408 {
409         struct dt_object *dt = tg->lut_last_rcvd;
410         struct thandle   *th;
411         struct lu_attr    attr;
412         int               rc;
413
414         ENTRY;
415
416         attr.la_size = size;
417         attr.la_valid = LA_SIZE;
418
419         th = dt_trans_create(env, tg->lut_bottom);
420         if (IS_ERR(th))
421                 RETURN(PTR_ERR(th));
422         rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
423         if (rc)
424                 GOTO(cleanup, rc);
425         rc = dt_declare_attr_set(env, dt, &attr, th);
426         if (rc)
427                 GOTO(cleanup, rc);
428         rc = dt_trans_start_local(env, tg->lut_bottom, th);
429         if (rc)
430                 GOTO(cleanup, rc);
431
432         rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th, BYPASS_CAPA);
433         if (rc == 0)
434                 rc = dt_attr_set(env, dt, &attr, th, BYPASS_CAPA);
435
436 cleanup:
437         dt_trans_stop(env, tg->lut_bottom, th);
438
439         RETURN(rc);
440 }
441 EXPORT_SYMBOL(lut_truncate_last_rcvd);
442
443 void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
444 {
445         struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
446         struct lu_target *lut = class_exp2tgt(exp);
447
448         LASSERT(lut->lut_bottom);
449         /** VBR: set client last_epoch to current epoch */
450         if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
451                 return;
452         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
453         lut_client_data_update(env, exp);
454 }
455
456 /**
457  * Update boot epoch when recovery ends
458  */
459 void lut_boot_epoch_update(struct lu_target *lut)
460 {
461         struct lu_env env;
462         struct ptlrpc_request *req;
463         __u32 start_epoch;
464         cfs_list_t client_list;
465         int rc;
466
467         if (lut->lut_obd->obd_stopping)
468                 return;
469         /** Increase server epoch after recovery */
470         if (lut->lut_bottom == NULL)
471                 return obt_boot_epoch_update(lut);
472
473         rc = lu_env_init(&env, LCT_DT_THREAD);
474         if (rc) {
475                 CERROR("Can't initialize environment rc=%d\n", rc);
476                 return;
477         }
478
479         cfs_spin_lock(&lut->lut_translock);
480         start_epoch = lr_epoch(lut->lut_last_transno) + 1;
481         lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
482         lut->lut_lsd.lsd_start_epoch = start_epoch;
483         cfs_spin_unlock(&lut->lut_translock);
484
485         CFS_INIT_LIST_HEAD(&client_list);
486         /**
487          * The recovery is not yet finished and final queue can still be updated
488          * with resend requests. Move final list to separate one for processing
489          */
490         cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
491         cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
492         cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
493
494         /**
495          * go through list of exports participated in recovery and
496          * set new epoch for them
497          */
498         cfs_list_for_each_entry(req, &client_list, rq_list) {
499                 LASSERT(!req->rq_export->exp_delayed);
500                 if (!req->rq_export->exp_vbr_failed)
501                         lut_client_epoch_update(&env, req->rq_export);
502         }
503         /** return list back at once */
504         cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
505         cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
506         cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
507         /** update server epoch */
508         lut_server_data_update(&env, lut, 1);
509         lu_env_fini(&env);
510 }
511 EXPORT_SYMBOL(lut_boot_epoch_update);
512
513 /**
514  * commit callback, need to update last_commited value
515  */
516 struct lut_last_committed_callback {
517         struct dt_txn_commit_cb llcc_cb;
518         struct lu_target       *llcc_lut;
519         struct obd_export      *llcc_exp;
520         __u64                   llcc_transno;
521 };
522
523 void lut_cb_last_committed(struct lu_env *env, struct thandle *th,
524                            struct dt_txn_commit_cb *cb, int err)
525 {
526         struct lut_last_committed_callback *ccb;
527
528         ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb);
529
530         LASSERT(ccb->llcc_lut != NULL);
531         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd);
532
533         cfs_spin_lock(&ccb->llcc_lut->lut_translock);
534         if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed)
535                 ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno;
536
537         LASSERT(ccb->llcc_exp);
538         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
539                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
540                 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
541                 ptlrpc_commit_replies(ccb->llcc_exp);
542         } else {
543                 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
544         }
545         class_export_cb_put(ccb->llcc_exp);
546         if (ccb->llcc_transno)
547                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
548                        ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno);
549         OBD_FREE_PTR(ccb);
550 }
551
552 int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
553                            struct obd_export *exp, __u64 transno)
554 {
555         struct lut_last_committed_callback *ccb;
556         struct dt_txn_commit_cb            *dcb;
557         int                                rc;
558
559         OBD_ALLOC_PTR(ccb);
560         if (ccb == NULL)
561                 return -ENOMEM;
562
563         ccb->llcc_lut     = lut;
564         ccb->llcc_exp     = class_export_cb_get(exp);
565         ccb->llcc_transno = transno;
566
567         dcb            = &ccb->llcc_cb;
568         dcb->dcb_func  = lut_cb_last_committed;
569         CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
570         strncpy(dcb->dcb_name, "lut_cb_last_committed", MAX_COMMIT_CB_STR_LEN);
571         dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
572
573         rc = dt_trans_cb_add(th, dcb);
574         if (rc) {
575                 class_export_cb_put(exp);
576                 OBD_FREE_PTR(ccb);
577         }
578         return rc;
579 }
580 EXPORT_SYMBOL(lut_last_commit_cb_add);
581
582 struct lut_new_client_callback {
583         struct dt_txn_commit_cb lncc_cb;
584         struct obd_export      *lncc_exp;
585 };
586
587 void lut_cb_new_client(struct lu_env *env, struct thandle *th,
588                        struct dt_txn_commit_cb *cb, int err)
589 {
590         struct lut_new_client_callback *ccb;
591
592         ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb);
593
594         LASSERT(ccb->lncc_exp->exp_obd);
595
596         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
597                ccb->lncc_exp->exp_obd->obd_name,
598                ccb->lncc_exp->exp_client_uuid.uuid);
599
600         cfs_spin_lock(&ccb->lncc_exp->exp_lock);
601         ccb->lncc_exp->exp_need_sync = 0;
602         cfs_spin_unlock(&ccb->lncc_exp->exp_lock);
603         class_export_cb_put(ccb->lncc_exp);
604
605         OBD_FREE_PTR(ccb);
606 }
607
608 int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp)
609 {
610         struct lut_new_client_callback *ccb;
611         struct dt_txn_commit_cb        *dcb;
612         int                            rc;
613
614         OBD_ALLOC_PTR(ccb);
615         if (ccb == NULL)
616                 return -ENOMEM;
617
618         ccb->lncc_exp  = class_export_cb_get(exp);
619
620         dcb            = &ccb->lncc_cb;
621         dcb->dcb_func  = lut_cb_new_client;
622         CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
623         strncpy(dcb->dcb_name, "lut_cb_new_client", MAX_COMMIT_CB_STR_LEN);
624         dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
625
626         rc = dt_trans_cb_add(th, dcb);
627         if (rc) {
628                 class_export_cb_put(exp);
629                 OBD_FREE_PTR(ccb);
630         }
631         return rc;
632 }
633
634 /**
635  * Add new client to the last_rcvd upon new connection.
636  *
637  * We use a bitmap to locate a free space in the last_rcvd file and initialize
638  * tg_export_data.
639  */
640 int lut_client_new(const struct lu_env *env, struct obd_export *exp)
641 {
642         struct tg_export_data *ted = &exp->exp_target_data;
643         struct lu_target *tg = class_exp2tgt(exp);
644         int rc = 0, idx;
645
646         ENTRY;
647
648         LASSERT(tg->lut_client_bitmap != NULL);
649         if (!strcmp(ted->ted_lcd->lcd_uuid, tg->lut_obd->obd_uuid.uuid))
650                 RETURN(0);
651
652         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
653          * there's no need for extra complication here
654          */
655         idx = cfs_find_first_zero_bit(tg->lut_client_bitmap, LR_MAX_CLIENTS);
656 repeat:
657         if (idx >= LR_MAX_CLIENTS ||
658             OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
659                 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
660                        tg->lut_obd->obd_name,  idx);
661                 RETURN(-EOVERFLOW);
662         }
663         if (cfs_test_and_set_bit(idx, tg->lut_client_bitmap)) {
664                 idx = cfs_find_next_zero_bit(tg->lut_client_bitmap,
665                                              LR_MAX_CLIENTS, idx);
666                 goto repeat;
667         }
668
669         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
670                tg->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
671
672         ted->ted_lr_idx = idx;
673         ted->ted_lr_off = tg->lut_lsd.lsd_client_start +
674                           idx * tg->lut_lsd.lsd_client_size;
675
676         cfs_mutex_init(&ted->ted_lcd_lock);
677
678         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
679
680         CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n",
681                tg->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
682                ted->ted_lcd->lcd_uuid);
683
684         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
685                 RETURN(-ENOSPC);
686
687         rc = lut_client_data_update(env, exp);
688         if (rc)
689                 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
690                        tg->lut_obd->obd_name, idx, rc);
691
692         RETURN(rc);
693 }
694 EXPORT_SYMBOL(lut_client_new);
695
696 /* Add client data to the MDS.  We use a bitmap to locate a free space
697  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
698  * Otherwise, we just have to read the data from the last_rcvd file and
699  * we know its offset.
700  *
701  * It should not be possible to fail adding an existing client - otherwise
702  * mdt_init_server_data() callsite needs to be fixed.
703  */
704 int lut_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
705 {
706         struct tg_export_data *ted = &exp->exp_target_data;
707         struct lu_target *tg = class_exp2tgt(exp);
708
709         ENTRY;
710
711         LASSERT(tg->lut_client_bitmap != NULL);
712         LASSERTF(idx >= 0, "%d\n", idx);
713
714         if (!strcmp(ted->ted_lcd->lcd_uuid, tg->lut_obd->obd_uuid.uuid))
715                 RETURN(0);
716
717         if (cfs_test_and_set_bit(idx, tg->lut_client_bitmap)) {
718                 CERROR("%s: client %d: bit already set in bitmap!!\n",
719                        tg->lut_obd->obd_name,  idx);
720                 LBUG();
721         }
722
723         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
724                tg->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
725
726         ted->ted_lr_idx = idx;
727         ted->ted_lr_off = tg->lut_lsd.lsd_client_start +
728                           idx * tg->lut_lsd.lsd_client_size;
729
730         cfs_mutex_init(&ted->ted_lcd_lock);
731
732         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
733
734         RETURN(0);
735 }
736 EXPORT_SYMBOL(lut_client_add);
737
738 int lut_client_del(const struct lu_env *env, struct obd_export *exp)
739 {
740         struct tg_export_data *ted = &exp->exp_target_data;
741         struct lu_target      *tg = class_exp2tgt(exp);
742         int                    rc;
743
744         ENTRY;
745
746         LASSERT(ted->ted_lcd);
747
748         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
749         if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
750                     (char *)tg->lut_obd->obd_uuid.uuid))
751                 RETURN(0);
752
753         CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
754                tg->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
755                ted->ted_lcd->lcd_uuid);
756
757         /* Clear the bit _after_ zeroing out the client so we don't
758            race with filter_client_add and zero out new clients.*/
759         if (!cfs_test_bit(ted->ted_lr_idx, tg->lut_client_bitmap)) {
760                 CERROR("%s: client %u: bit already clear in bitmap!!\n",
761                        tg->lut_obd->obd_name, ted->ted_lr_idx);
762                 LBUG();
763         }
764
765         /* Do not erase record for recoverable client. */
766         if (exp->exp_flags & OBD_OPT_FAILOVER)
767                 RETURN(0);
768
769         /* Make sure the server's last_transno is up to date.
770          * This should be done before zeroing client slot so last_transno will
771          * be in server data or in client data in case of failure */
772         rc = lut_server_data_update(env, tg, 0);
773         if (rc != 0) {
774                 CERROR("%s: failed to update server data, skip client %s "
775                        "zeroing, rc %d\n", tg->lut_obd->obd_name,
776                        ted->ted_lcd->lcd_uuid, rc);
777                 RETURN(rc);
778         }
779
780         cfs_mutex_lock(&ted->ted_lcd_lock);
781         memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
782         rc = lut_client_data_update(env, exp);
783         cfs_mutex_unlock(&ted->ted_lcd_lock);
784
785         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
786                "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
787                tg->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
788                ted->ted_lr_idx, ted->ted_lr_off, rc);
789         RETURN(rc);
790 }
791 EXPORT_SYMBOL(lut_client_del);
792
793 int lut_init(const struct lu_env *env, struct lu_target *lut,
794              struct obd_device *obd, struct dt_device *dt)
795 {
796         struct dt_object_format dof;
797         struct lu_attr          attr;
798         struct lu_fid           fid;
799         struct dt_object       *o;
800         int                     rc = 0;
801         ENTRY;
802
803         LASSERT(lut);
804         LASSERT(obd);
805         lut->lut_obd = obd;
806         lut->lut_bottom = dt;
807         lut->lut_last_rcvd = NULL;
808         obd->u.obt.obt_lut = lut;
809
810         cfs_spin_lock_init(&lut->lut_translock);
811
812         OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
813         if (lut->lut_client_bitmap == NULL)
814                 RETURN(-ENOMEM);
815
816         /** obdfilter has no lu_device stack yet */
817         if (dt == NULL)
818                 RETURN(rc);
819
820         memset(&attr, 0, sizeof(attr));
821         attr.la_valid = LA_MODE;
822         attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
823         dof.dof_type = dt_mode_to_dft(S_IFREG);
824
825         lu_local_obj_fid(&fid, MDT_LAST_RECV_OID);
826
827         o = dt_find_or_create(env, lut->lut_bottom, &fid, &dof, &attr);
828         if (!IS_ERR(o)) {
829                 lut->lut_last_rcvd = o;
830         } else {
831                 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
832                 lut->lut_client_bitmap = NULL;
833                 rc = PTR_ERR(o);
834                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
835         }
836
837         RETURN(rc);
838 }
839 EXPORT_SYMBOL(lut_init);
840
841 void lut_fini(const struct lu_env *env, struct lu_target *lut)
842 {
843         ENTRY;
844
845         if (lut->lut_client_bitmap) {
846                 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
847                 lut->lut_client_bitmap = NULL;
848         }
849         if (lut->lut_last_rcvd) {
850                 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
851                 lut->lut_last_rcvd = NULL;
852         }
853         EXIT;
854 }
855 EXPORT_SYMBOL(lut_fini);
856
857 /* context key constructor/destructor: tg_key_init, tg_key_fini */
858 LU_KEY_INIT_FINI(tg, struct tg_thread_info);
859 /* context key: tg_thread_key */
860 LU_CONTEXT_KEY_DEFINE(tg, LCT_MD_THREAD|LCT_DT_THREAD);
861 LU_KEY_INIT_GENERIC(tg);
862 EXPORT_SYMBOL(tg_thread_key);
863
864 int lut_mod_init(void)
865 {
866         tg_key_init_generic(&tg_thread_key, NULL);
867         lu_context_key_register_many(&tg_thread_key, NULL);
868         return 0;
869 }
870
871 void lut_mod_exit(void)
872 {
873         lu_context_key_degister_many(&tg_thread_key, NULL);
874 }
875