Whamcloud - gitweb
- fixed bug in fld_cache_insert(). Just allocated struct fld_cache was substituted...
[fs/lustre-release.git] / lustre / fld / fld_request.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fld/fld_handler.c
5  *  FLD (Fids Location Database)
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_FLD
32
33 #ifdef __KERNEL__
34 # include <libcfs/libcfs.h>
35 # include <linux/module.h>
36 # include <linux/jbd.h>
37 # include <asm/div64.h>
38 #else /* __KERNEL__ */
39 # include <liblustre.h>
40 # include <libcfs/list.h>
41 #endif
42
43 #include <obd.h>
44 #include <obd_class.h>
45 #include <lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
48
49 #include <dt_object.h>
50 #include <md_object.h>
51 #include <lustre_req_layout.h>
52 #include <lustre_fld.h>
53 #include "fld_internal.h"
54
55 #ifdef __KERNEL__
56 static __u32
57 fld_cache_hash(__u64 seq)
58 {
59         return (__u32)seq;
60 }
61
62 static int
63 fld_cache_insert(struct fld_cache_info *fld_cache,
64                  __u64 seq, __u64 mds)
65 {
66         struct fld_cache_entry *flde, *fldt;
67         struct hlist_head *bucket;
68         struct hlist_node *scan;
69         int rc = 0;
70         ENTRY;
71
72         OBD_ALLOC_PTR(flde);
73         if (!flde)
74                 RETURN(-ENOMEM);
75
76         bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
77                                         fld_cache->fld_hash_mask);
78
79         spin_lock(&fld_cache->fld_lock);
80         hlist_for_each_entry(fldt, scan, bucket, fld_list) {
81                 if (fldt->fld_seq == seq)
82                         GOTO(exit_unlock, rc = -EEXIST);
83         }
84
85         INIT_HLIST_NODE(&flde->fld_list);
86         flde->fld_mds = mds;
87         flde->fld_seq = seq;
88         
89         hlist_add_head(&flde->fld_list, bucket);
90
91         EXIT;
92 exit_unlock:
93         spin_unlock(&fld_cache->fld_lock);
94         if (rc != 0)
95                 OBD_FREE_PTR(flde);
96         return rc;
97 }
98
99 static void
100 fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq)
101 {
102         struct fld_cache_entry *flde;
103         struct hlist_head *bucket;
104         struct hlist_node *scan;
105         ENTRY;
106
107         bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
108                                         fld_cache->fld_hash_mask);
109
110         spin_lock(&fld_cache->fld_lock);
111         hlist_for_each_entry(flde, scan, bucket, fld_list) {
112                 if (flde->fld_seq == seq) {
113                         hlist_del_init(&flde->fld_list);
114                         GOTO(out_unlock, 0);
115                 }
116         }
117
118         EXIT;
119 out_unlock:
120         spin_unlock(&fld_cache->fld_lock);
121         return;
122 }
123
124 static struct fld_cache_entry *
125 fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq)
126 {
127         struct fld_cache_entry *flde;
128         struct hlist_head *bucket;
129         struct hlist_node *scan;
130         ENTRY;
131
132         bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
133                                         fld_cache->fld_hash_mask);
134
135         spin_lock(&fld_cache->fld_lock);
136         hlist_for_each_entry(flde, scan, bucket, fld_list) {
137                 if (flde->fld_seq == seq) {
138                         spin_unlock(&fld_cache->fld_lock);
139                         RETURN(flde);
140                 }
141         }
142         spin_unlock(&fld_cache->fld_lock);
143         RETURN(NULL);
144 }
145 #endif
146
147 static int
148 fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
149 {
150         if (fld->fld_count == 0)
151                 return 0;
152         
153         return do_div(seq, fld->fld_count);
154 }
155
156 static int
157 fld_dht_hash(struct lu_client_fld *fld, __u64 seq)
158 {
159         /* XXX: here should DHT hash */
160         return fld_rrb_hash(fld, seq);
161 }
162
163 struct lu_fld_hash fld_hash[3] = {
164         {
165                 .fh_name = "DHT",
166                 .fh_func = fld_dht_hash
167         },
168         {
169                 .fh_name = "Round Robin",
170                 .fh_func = fld_rrb_hash
171         },
172         {
173                 0,
174         }
175 };
176
177 static struct obd_export *
178 fld_client_get_export(struct lu_client_fld *fld, __u64 seq)
179 {
180         struct obd_export *fld_exp;
181         int count = 0, hash;
182         ENTRY;
183
184         LASSERT(fld->fld_hash != NULL);
185
186         spin_lock(&fld->fld_lock);
187         hash = fld->fld_hash->fh_func(fld, seq);
188         spin_unlock(&fld->fld_lock);
189
190         spin_lock(&fld->fld_lock);
191         list_for_each_entry(fld_exp,
192                             &fld->fld_exports, exp_fld_chain) {
193                 if (count == hash) {
194                         spin_unlock(&fld->fld_lock);
195                         RETURN(fld_exp);
196                 }
197                 count++;
198         }
199         spin_unlock(&fld->fld_lock);
200         RETURN(NULL);
201 }
202
203 /* add export to FLD. This is usually done by CMM and LMV as they are main users
204  * of FLD module. */
205 int
206 fld_client_add_export(struct lu_client_fld *fld,
207                       struct obd_export *exp)
208 {
209         struct obd_export *fld_exp;
210         ENTRY;
211
212         LASSERT(exp != NULL);
213
214         CDEBUG(D_INFO|D_WARNING, "FLD(cli): adding export %s\n",
215                exp->exp_client_uuid.uuid);
216         
217         spin_lock(&fld->fld_lock);
218         list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
219                 if (obd_uuid_equals(&fld_exp->exp_client_uuid,
220                                     &exp->exp_client_uuid))
221                 {
222                         spin_unlock(&fld->fld_lock);
223                         RETURN(-EEXIST);
224                 }
225         }
226         
227         fld_exp = class_export_get(exp);
228         list_add_tail(&fld_exp->exp_fld_chain,
229                       &fld->fld_exports);
230         fld->fld_count++;
231         
232         spin_unlock(&fld->fld_lock);
233         
234         RETURN(0);
235 }
236 EXPORT_SYMBOL(fld_client_add_export);
237
238 /* remove export from FLD */
239 int
240 fld_client_del_export(struct lu_client_fld *fld,
241                           struct obd_export *exp)
242 {
243         struct obd_export *fld_exp;
244         struct obd_export *tmp;
245         ENTRY;
246
247         spin_lock(&fld->fld_lock);
248         list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) {
249                 if (obd_uuid_equals(&fld_exp->exp_client_uuid,
250                                     &exp->exp_client_uuid))
251                 {
252                         fld->fld_count--;
253                         list_del(&fld_exp->exp_fld_chain);
254                         class_export_get(fld_exp);
255
256                         spin_unlock(&fld->fld_lock);
257                         RETURN(0);
258                 }
259         }
260         spin_unlock(&fld->fld_lock);
261         RETURN(-ENOENT);
262 }
263 EXPORT_SYMBOL(fld_client_del_export);
264
265 #ifdef LPROCFS
266 static int
267 fld_client_proc_init(struct lu_client_fld *fld)
268 {
269         int rc;
270         ENTRY;
271
272         fld->fld_proc_dir = lprocfs_register(fld->fld_name,
273                                              proc_lustre_root,
274                                              NULL, NULL);
275         
276         if (IS_ERR(fld->fld_proc_dir)) {
277                 CERROR("LProcFS failed in fld-init\n");
278                 rc = PTR_ERR(fld->fld_proc_dir);
279                 GOTO(err, rc);
280         }
281
282         rc = lprocfs_add_vars(fld->fld_proc_dir,
283                               fld_client_proc_list, fld);
284         if (rc) {
285                 CERROR("can't init FLD "
286                        "proc, rc %d\n", rc);
287                 GOTO(err, rc);
288         }
289
290         RETURN(0);
291
292 err:
293         fld->fld_proc_dir = NULL;
294         return rc;
295 }
296
297 static void
298 fld_client_proc_fini(struct lu_client_fld *fld)
299 {
300         ENTRY;
301         if (fld->fld_proc_dir) {
302                 lprocfs_remove(fld->fld_proc_dir);
303                 fld->fld_proc_dir = NULL;
304         }
305         EXIT;
306 }
307 #endif
308
309 int
310 fld_client_init(struct lu_client_fld *fld,
311                 const char *uuid, int hash)
312 {
313         int rc = 0;
314         ENTRY;
315
316         LASSERT(fld != NULL);
317
318         if (hash < 0 || hash >= LUSTRE_CLI_FLD_HASH_LAST) {
319                 CERROR("wrong hash function 0x%x\n", hash);
320                 RETURN(-EINVAL);
321         }
322         
323         INIT_LIST_HEAD(&fld->fld_exports);
324         spin_lock_init(&fld->fld_lock);
325         fld->fld_hash = &fld_hash[hash];
326         fld->fld_count = 0;
327         
328         snprintf(fld->fld_name, sizeof(fld->fld_name),
329                  "%s-%s", LUSTRE_FLD_NAME, uuid);
330         
331 #ifdef LPROCFS
332         rc = fld_client_proc_init(fld);
333         if (rc)
334                 GOTO(out, rc);
335 #endif
336         EXIT;
337 out:
338         if (rc)
339                 fld_client_fini(fld);
340         else 
341                 CDEBUG(D_INFO|D_WARNING,
342                        "Client FLD, using \"%s\" hash\n",
343                        fld->fld_hash->fh_name);
344         return rc;
345 }
346 EXPORT_SYMBOL(fld_client_init);
347
348 void
349 fld_client_fini(struct lu_client_fld *fld)
350 {
351         struct obd_export *fld_exp;
352         struct obd_export *tmp;
353         ENTRY;
354
355 #ifdef LPROCFS
356         fld_client_proc_fini(fld);
357 #endif
358         
359         spin_lock(&fld->fld_lock);
360         list_for_each_entry_safe(fld_exp, tmp,
361                                  &fld->fld_exports, exp_fld_chain) {
362                 fld->fld_count--;
363                 list_del(&fld_exp->exp_fld_chain);
364                 class_export_get(fld_exp);
365         }
366         spin_unlock(&fld->fld_lock);
367         CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
368         EXIT;
369 }
370 EXPORT_SYMBOL(fld_client_fini);
371
372 static int
373 fld_client_rpc(struct obd_export *exp,
374                struct md_fld *mf, __u32 fld_op)
375 {
376         int size[2] = {sizeof(__u32), sizeof(struct md_fld)}, rc;
377         int mf_size = sizeof(struct md_fld);
378         struct ptlrpc_request *req;
379         struct md_fld *pmf;
380         __u32 *op;
381         ENTRY;
382
383         LASSERT(exp != NULL);
384
385         req = ptlrpc_prep_req(class_exp2cliimp(exp),
386                               LUSTRE_MDS_VERSION, FLD_QUERY,
387                               2, size, NULL);
388         if (req == NULL)
389                 RETURN(-ENOMEM);
390
391         op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
392         *op = fld_op;
393
394         pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf));
395         memcpy(pmf, mf, sizeof(*mf));
396
397         req->rq_replen = lustre_msg_size(1, &mf_size);
398         req->rq_request_portal = MDS_FLD_PORTAL;
399
400         rc = ptlrpc_queue_wait(req);
401         if (rc)
402                 GOTO(out_req, rc);
403
404         pmf = lustre_swab_repbuf(req, 0, sizeof(*pmf),
405                                  lustre_swab_md_fld);
406         *mf = *pmf; 
407 out_req:
408         ptlrpc_req_finished(req);
409         RETURN(rc);
410 }
411
412 int
413 fld_client_create(struct lu_client_fld *fld,
414                   __u64 seq, mdsno_t mds)
415 {
416         struct obd_export *fld_exp;
417         struct md_fld      md_fld;
418         __u32 rc;
419         ENTRY;
420
421         fld_exp = fld_client_get_export(fld, seq);
422         if (!fld_exp)
423                 RETURN(-EINVAL);
424         md_fld.mf_seq = seq;
425         md_fld.mf_mds = mds;
426         
427         rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
428         
429 #ifdef __KERNEL__
430         if (rc  == 0)
431                 rc = fld_cache_insert(fld_cache, seq, mds);
432 #endif
433         
434         RETURN(rc);
435 }
436 EXPORT_SYMBOL(fld_client_create);
437
438 int
439 fld_client_delete(struct lu_client_fld *fld,
440                   __u64 seq)
441 {
442         struct obd_export *fld_exp;
443         struct md_fld      md_fld;
444         __u32 rc;
445
446 #ifdef __KERNEL__
447         fld_cache_delete(fld_cache, seq);
448 #endif
449         
450         fld_exp = fld_client_get_export(fld, seq);
451         if (!fld_exp)
452                 RETURN(-EINVAL);
453
454         md_fld.mf_seq = seq;
455         md_fld.mf_mds = 0;
456
457         rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE);
458         RETURN(rc);
459 }
460 EXPORT_SYMBOL(fld_client_delete);
461
462 static int
463 fld_client_get(struct lu_client_fld *fld,
464                __u64 seq, mdsno_t *mds)
465 {
466         struct obd_export *fld_exp;
467         struct md_fld md_fld;
468         int rc;
469         ENTRY;
470
471         fld_exp = fld_client_get_export(fld, seq);
472         if (!fld_exp)
473                 RETURN(-EINVAL);
474                 
475         md_fld.mf_seq = seq;
476         rc = fld_client_rpc(fld_exp,
477                             &md_fld, FLD_LOOKUP);
478         if (rc == 0)
479                 *mds = md_fld.mf_mds;
480
481         RETURN(rc);
482 }
483
484 /* lookup fid in the namespace of pfid according to the name */
485 int
486 fld_client_lookup(struct lu_client_fld *fld,
487                   __u64 seq, mdsno_t *mds)
488 {
489 #ifdef __KERNEL__
490         struct fld_cache_entry *flde;
491 #endif
492         int rc;
493         ENTRY;
494
495 #ifdef __KERNEL__
496         /* lookup it in the cache */
497         flde = fld_cache_lookup(fld_cache, seq);
498         if (flde != NULL) {
499                 *mds = flde->fld_mds;
500                 RETURN(0);
501         }
502 #endif
503         
504         /* can not find it in the cache */
505         rc = fld_client_get(fld, seq, mds);
506         if (rc)
507                 RETURN(rc);
508
509 #ifdef __KERNEL__
510         if (rc == 0)
511                 rc = fld_cache_insert(fld_cache, seq, *mds);
512 #endif
513         
514         RETURN(rc);
515 }
516 EXPORT_SYMBOL(fld_client_lookup);