Whamcloud - gitweb
- update from 1_5
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  *
24  * These are the only exported functions, they provide some generic
25  * infrastructure for managing object devices
26  */
27
28 #define DEBUG_SUBSYSTEM S_CLASS
29 #ifndef __KERNEL__
30 #include <liblustre.h>
31 #endif
32 #include <obd_ost.h>
33 #include <obd_class.h>
34 #include <lprocfs_status.h>
35
36 extern struct list_head obd_types;
37 spinlock_t obd_types_lock;
38
39 cfs_mem_cache_t *obd_device_cachep;
40 cfs_mem_cache_t *obdo_cachep;
41 EXPORT_SYMBOL(obdo_cachep);
42 cfs_mem_cache_t *import_cachep;
43
44 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
45
46 /*
47  * support functions: we could use inter-module communication, but this
48  * is more portable to other OS's
49  */
50 static struct obd_device *obd_device_alloc(void)
51 {
52         struct obd_device *obd;
53
54         OBD_SLAB_ALLOC(obd, obd_device_cachep, SLAB_KERNEL, sizeof(*obd));
55         if (obd != NULL) {
56                 obd->obd_magic = OBD_DEVICE_MAGIC;
57         }
58         return obd;
59 }
60 EXPORT_SYMBOL(obd_device_alloc);
61
62 static void obd_device_free(struct obd_device *obd)
63 {
64         LASSERT(obd != NULL);
65         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n", 
66                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
67         OBD_SLAB_FREE(obd, obd_device_cachep, sizeof(*obd));
68 }
69 EXPORT_SYMBOL(obd_device_free);
70
71 struct obd_type *class_search_type(const char *name)
72 {
73         struct list_head *tmp;
74         struct obd_type *type;
75
76         spin_lock(&obd_types_lock);
77         list_for_each(tmp, &obd_types) {
78                 type = list_entry(tmp, struct obd_type, typ_chain);
79                 if (strcmp(type->typ_name, name) == 0) {
80                         spin_unlock(&obd_types_lock);
81                         return type;
82                 }
83         }
84         spin_unlock(&obd_types_lock);
85         return NULL;
86 }
87
88 struct obd_type *class_get_type(const char *name)
89 {
90         struct obd_type *type = class_search_type(name);
91
92 #ifdef CONFIG_KMOD
93         if (!type) {
94                 const char *modname = name;
95                 if (!request_module(modname)) {
96                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
97                         type = class_search_type(name);
98                 } else {
99                         LCONSOLE_ERROR("Can't load module '%s'\n", modname);
100                 }
101         }
102 #endif
103         if (type) {
104                 spin_lock(&type->obd_type_lock);
105                 type->typ_refcnt++;
106                 try_module_get(type->typ_dt_ops->o_owner);
107                 spin_unlock(&type->obd_type_lock);
108         }
109         return type;
110 }
111
112 void class_put_type(struct obd_type *type)
113 {
114         LASSERT(type);
115         spin_lock(&type->obd_type_lock);
116         type->typ_refcnt--;
117         module_put(type->typ_dt_ops->o_owner);
118         spin_unlock(&type->obd_type_lock);
119 }
120
121 #define CLASS_MAX_NAME 1024
122
123 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops, 
124                         struct lprocfs_vars *vars, const char *name, 
125                         struct lu_device_type *ldt)
126 {
127         struct obd_type *type;
128         int rc = 0;
129         ENTRY;
130
131         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);    /* sanity check */
132
133         if (class_search_type(name)) {
134                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
135                 RETURN(-EEXIST);
136         }
137
138         rc = -ENOMEM;
139         OBD_ALLOC(type, sizeof(*type));
140         if (type == NULL)
141                 RETURN(rc);
142
143         OBD_ALLOC(type->typ_dt_ops, sizeof(*type->typ_dt_ops));
144         OBD_ALLOC(type->typ_md_ops, sizeof(*type->typ_md_ops));
145         OBD_ALLOC(type->typ_name, strlen(name) + 1);
146         
147         if (type->typ_dt_ops == NULL || 
148             type->typ_md_ops == NULL || 
149             type->typ_name == NULL)
150                 GOTO (failed, rc);
151
152         *(type->typ_dt_ops) = *dt_ops;
153         /* md_ops is optional */
154         if (md_ops)
155                 *(type->typ_md_ops) = *md_ops;
156         strcpy(type->typ_name, name);
157         spin_lock_init(&type->obd_type_lock);
158
159 #ifdef LPROCFS
160         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
161                                               vars, type);
162         if (IS_ERR(type->typ_procroot)) {
163                 rc = PTR_ERR(type->typ_procroot);
164                 type->typ_procroot = NULL;
165                 GOTO (failed, rc);
166         }
167 #endif
168         if (ldt != NULL) {
169                 type->typ_lu = ldt;
170                 rc = ldt->ldt_ops->ldto_init(ldt);
171                 if (rc != 0)
172                         GOTO (failed, rc);
173         }
174
175         spin_lock(&obd_types_lock);
176         list_add(&type->typ_chain, &obd_types);
177         spin_unlock(&obd_types_lock);
178
179         RETURN (0);
180
181  failed:
182         if (type->typ_name != NULL)
183                 OBD_FREE(type->typ_name, strlen(name) + 1);
184         if (type->typ_md_ops != NULL)
185                 OBD_FREE (type->typ_md_ops, sizeof (*type->typ_md_ops));
186         if (type->typ_dt_ops != NULL)
187                 OBD_FREE (type->typ_dt_ops, sizeof (*type->typ_dt_ops));
188         OBD_FREE(type, sizeof(*type));
189         RETURN(rc);
190 }
191
192 int class_unregister_type(const char *name)
193 {
194         struct obd_type *type = class_search_type(name);
195         ENTRY;
196
197         if (!type) {
198                 CERROR("unknown obd type\n");
199                 RETURN(-EINVAL);
200         }
201
202         if (type->typ_refcnt) {
203                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
204                 /* This is a bad situation, let's make the best of it */
205                 /* Remove ops, but leave the name for debugging */
206                 OBD_FREE(type->typ_dt_ops, sizeof(*type->typ_dt_ops));
207                 OBD_FREE(type->typ_md_ops, sizeof(*type->typ_md_ops));
208                 RETURN(-EBUSY);
209         }
210
211         if (type->typ_procroot) {
212                 lprocfs_remove(type->typ_procroot);
213                 type->typ_procroot = NULL;
214         }
215
216         if (type->typ_lu)
217                 type->typ_lu->ldt_ops->ldto_fini(type->typ_lu);
218
219         spin_lock(&obd_types_lock);
220         list_del(&type->typ_chain);
221         spin_unlock(&obd_types_lock);
222         OBD_FREE(type->typ_name, strlen(name) + 1);
223         if (type->typ_dt_ops != NULL)
224                 OBD_FREE(type->typ_dt_ops, sizeof(*type->typ_dt_ops));
225         if (type->typ_md_ops != NULL)
226                 OBD_FREE(type->typ_md_ops, sizeof(*type->typ_md_ops));
227         OBD_FREE(type, sizeof(*type));
228         RETURN(0);
229 } /* class_unregister_type */
230
231 struct obd_device *class_newdev(const char *type_name, const char *name)
232 {
233         struct obd_device *result = NULL;
234         struct obd_type *type = NULL;
235         int i;
236         int new_obd_minor = 0;
237
238         if (strlen(name) > MAX_OBD_NAME) {
239                 CERROR("name/uuid must be < %u bytes long\n",MAX_OBD_NAME);
240                 RETURN(ERR_PTR(-EINVAL));
241         }
242
243         type = class_get_type(type_name); 
244         if (type == NULL){
245                 CERROR("OBD: unknown type: %s\n", type_name);
246                 RETURN(ERR_PTR(-ENODEV));
247         }
248
249         spin_lock(&obd_dev_lock);
250         for (i = 0 ; i < class_devno_max(); i++) {
251                 struct obd_device *obd = class_num2obd(i);
252                 if (obd && obd->obd_name && (strcmp(name, obd->obd_name) == 0)) {
253                         CERROR("Device %s already exists, won't add\n", name);
254                         if (result) {
255                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
256                                          "%p obd_magic %08x != %08x\n",
257                                          result, result->obd_magic, OBD_DEVICE_MAGIC);
258                                 LASSERTF(result->obd_minor == new_obd_minor,
259                                          "%p obd_minor %d != %d\n",
260                                          result, result->obd_minor, new_obd_minor);
261
262                                 obd_devs[result->obd_minor] = NULL;
263                                 result->obd_name[0]='\0';
264                                 obd_device_free(result);
265                         }
266                         result = ERR_PTR(-EEXIST);
267                         break;
268                 }
269                 if (!result && !obd) {
270                         obd = obd_device_alloc();
271
272                         if(obd == NULL)
273                                 GOTO(out,result = ERR_PTR(-ENOMEM));
274
275                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
276                         obd->obd_minor = i;
277                         new_obd_minor = i;
278                         obd->obd_type = type;
279                         memcpy(obd->obd_name, name, strlen(name));
280
281                         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
282                                obd->obd_name, obd);
283                         result = obd;
284                         obd_devs[i] = result;
285                         obd = NULL;
286                 }
287         }
288         spin_unlock(&obd_dev_lock);
289 out :
290         if (IS_ERR(result)) {
291                 class_put_type(type);
292         }
293         return result;
294 }
295
296 void class_release_dev(struct obd_device *obd)
297 {
298         struct obd_type *obd_type = obd->obd_type;
299
300         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
301                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
302         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
303                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
304         LASSERT(obd_type != NULL);
305
306         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
307                obd->obd_name,obd->obd_type->typ_name);
308
309         spin_lock(&obd_dev_lock);
310         obd_devs[obd->obd_minor] = NULL;
311         obd_device_free(obd);
312         spin_unlock(&obd_dev_lock);
313
314         class_put_type(obd_type);
315 }
316
317 int class_name2dev(const char *name)
318 {
319         int i;
320
321         if (!name)
322                 return -1;
323
324         spin_lock(&obd_dev_lock);
325         for (i = 0; i < class_devno_max(); i++) {
326                 struct obd_device *obd = class_num2obd(i);
327                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
328                         /* Make sure we finished attaching before we give
329                            out any references */
330                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
331                         if (obd->obd_attached) {
332                                 spin_unlock(&obd_dev_lock);
333                                 return i;
334                         }
335                         break;
336                 }
337         }
338         spin_unlock(&obd_dev_lock);
339
340         return -1;
341 }
342
343 struct obd_device *class_name2obd(const char *name)
344 {
345         int dev = class_name2dev(name);
346
347         if (dev < 0 || dev > class_devno_max())
348                 return NULL;
349         return class_num2obd(dev);
350 }
351
352 int class_uuid2dev(struct obd_uuid *uuid)
353 {
354         int i;
355
356         spin_lock(&obd_dev_lock);
357         for (i = 0; i < class_devno_max(); i++) {
358                 struct obd_device *obd = class_num2obd(i);
359                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
360                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
361                         spin_unlock(&obd_dev_lock);
362                         return i;
363                 }
364         }
365         spin_unlock(&obd_dev_lock);
366
367         return -1;
368 }
369
370 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
371 {
372         int dev = class_uuid2dev(uuid);
373         if (dev < 0)
374                 return NULL;
375         return class_num2obd(dev);
376 }
377
378 struct obd_device *class_num2obd(int num)
379 {
380         struct obd_device *obd = NULL;
381
382         if (num < class_devno_max()) {
383                 obd = obd_devs[num];
384                 if (obd == NULL) {
385                         return NULL;
386                 }
387
388                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
389                          "%p obd_magic %08x != %08x\n",
390                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
391                 LASSERTF(obd->obd_minor == num,
392                          "%p obd_minor %0d != %0d\n",
393                          obd, obd->obd_minor, num);
394         }
395
396         return obd;
397 }
398
399 void class_obd_list(void)
400 {
401         char *status;
402         int i;
403
404         spin_lock(&obd_dev_lock);
405         for (i = 0; i < class_devno_max(); i++) {
406                 struct obd_device *obd = class_num2obd(i);
407                 if (obd == NULL)
408                         continue;
409                 if (obd->obd_stopping)
410                         status = "ST";
411                 else if (obd->obd_set_up)
412                         status = "UP";
413                 else if (obd->obd_attached)
414                         status = "AT";
415                 else
416                         status = "--";
417                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
418                          i, status, obd->obd_type->typ_name,
419                          obd->obd_name, obd->obd_uuid.uuid,
420                          atomic_read(&obd->obd_refcount));
421         }
422         spin_unlock(&obd_dev_lock);
423         return;
424 }
425
426 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
427    specified, then only the client with that uuid is returned,
428    otherwise any client connected to the tgt is returned. */
429 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
430                                           const char * typ_name,
431                                           struct obd_uuid *grp_uuid)
432 {
433         int i;
434
435         spin_lock(&obd_dev_lock);
436         for (i = 0; i < class_devno_max(); i++) {
437                 struct obd_device *obd = class_num2obd(i);
438                 if (obd == NULL)
439                         continue;
440                 if ((strncmp(obd->obd_type->typ_name, typ_name,
441                              strlen(typ_name)) == 0)) {
442                         if (obd_uuid_equals(tgt_uuid,
443                                             &obd->u.cli.cl_target_uuid) &&
444                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
445                                                          &obd->obd_uuid) : 1)) {
446                                 spin_unlock(&obd_dev_lock);
447                                 return obd;
448                         }
449                 }
450         }
451         spin_unlock(&obd_dev_lock);
452
453         return NULL;
454 }
455
456 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
457                                             struct obd_uuid *grp_uuid)
458 {
459         struct obd_device *obd;
460
461         obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
462         if (!obd)
463                 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
464                                             grp_uuid);
465         return obd;
466 }
467
468 /* Iterate the obd_device list looking devices have grp_uuid. Start
469    searching at *next, and if a device is found, the next index to look
470    at is saved in *next. If next is NULL, then the first matching device
471    will always be returned. */
472 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
473 {
474         int i;
475
476         if (next == NULL)
477                 i = 0;
478         else if (*next >= 0 && *next < class_devno_max())
479                 i = *next;
480         else
481                 return NULL;
482
483         spin_lock(&obd_dev_lock);
484         for (; i < class_devno_max(); i++) {
485                 struct obd_device *obd = class_num2obd(i);
486                 if (obd == NULL)
487                         continue;
488                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
489                         if (next != NULL)
490                                 *next = i+1;
491                         spin_unlock(&obd_dev_lock);
492                         return obd;
493                 }
494         }
495         spin_unlock(&obd_dev_lock);
496
497         return NULL;
498 }
499
500
501 void obd_cleanup_caches(void)
502 {
503         int rc;
504
505         ENTRY;
506         if (obd_device_cachep) {
507                 rc = cfs_mem_cache_destroy(obd_device_cachep);
508                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
509                 obd_device_cachep = NULL;
510         }
511         if (obdo_cachep) {
512                 rc = cfs_mem_cache_destroy(obdo_cachep);
513                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
514                 obdo_cachep = NULL;
515         }
516         if (import_cachep) {
517                 rc = cfs_mem_cache_destroy(import_cachep);
518                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
519                 import_cachep = NULL;
520         }
521         EXIT;
522 }
523
524 int obd_init_caches(void)
525 {
526         ENTRY;
527
528         LASSERT(obd_device_cachep == NULL);
529         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
530                                               sizeof(struct obd_device), 0, 0);
531         if (!obd_device_cachep)
532                 GOTO(out, -ENOMEM);
533
534         LASSERT(obdo_cachep == NULL);
535         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
536                                         0, 0);
537         if (!obdo_cachep)
538                 GOTO(out, -ENOMEM);
539
540         LASSERT(import_cachep == NULL);
541         import_cachep = cfs_mem_cache_create("ll_import_cache",
542                                           sizeof(struct obd_import),
543                                           0, 0);
544         if (!import_cachep)
545                 GOTO(out, -ENOMEM);
546
547         RETURN(0);
548  out:
549         obd_cleanup_caches();
550         RETURN(-ENOMEM);
551
552 }
553
554 /* map connection to client */
555 struct obd_export *class_conn2export(struct lustre_handle *conn)
556 {
557         struct obd_export *export;
558         ENTRY;
559
560         if (!conn) {
561                 CDEBUG(D_CACHE, "looking for null handle\n");
562                 RETURN(NULL);
563         }
564
565         if (conn->cookie == -1) {  /* this means assign a new connection */
566                 CDEBUG(D_CACHE, "want a new connection\n");
567                 RETURN(NULL);
568         }
569
570         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
571         export = class_handle2object(conn->cookie);
572         RETURN(export);
573 }
574
575 struct obd_device *class_exp2obd(struct obd_export *exp)
576 {
577         if (exp)
578                 return exp->exp_obd;
579         return NULL;
580 }
581
582 struct obd_device *class_conn2obd(struct lustre_handle *conn)
583 {
584         struct obd_export *export;
585         export = class_conn2export(conn);
586         if (export) {
587                 struct obd_device *obd = export->exp_obd;
588                 class_export_put(export);
589                 return obd;
590         }
591         return NULL;
592 }
593
594 struct obd_import *class_exp2cliimp(struct obd_export *exp)
595 {
596         struct obd_device *obd = exp->exp_obd;
597         if (obd == NULL)
598                 return NULL;
599         return obd->u.cli.cl_import;
600 }
601
602 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
603 {
604         struct obd_device *obd = class_conn2obd(conn);
605         if (obd == NULL)
606                 return NULL;
607         return obd->u.cli.cl_import;
608 }
609
610 /* Export management functions */
611 static void export_handle_addref(void *export)
612 {
613         class_export_get(export);
614 }
615
616 void __class_export_put(struct obd_export *exp)
617 {
618         if (atomic_dec_and_test(&exp->exp_refcount)) {
619                 struct obd_device *obd = exp->exp_obd;
620                 CDEBUG(D_IOCTL, "destroying export %p/%s\n", exp,
621                        exp->exp_client_uuid.uuid);
622
623                 LASSERT(obd != NULL);
624
625                 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
626                 if (exp->exp_connection)
627                         ptlrpc_put_connection_superhack(exp->exp_connection);
628
629                 LASSERT(list_empty(&exp->exp_outstanding_replies));
630                 LASSERT(list_empty(&exp->exp_handle.h_link));
631                 obd_destroy_export(exp);
632
633                 OBD_FREE(exp, sizeof(*exp));
634                 class_decref(obd);
635         }
636 }
637 EXPORT_SYMBOL(__class_export_put);
638
639 /* Creates a new export, adds it to the hash table, and returns a
640  * pointer to it. The refcount is 2: one for the hash reference, and
641  * one for the pointer returned by this function. */
642 struct obd_export *class_new_export(struct obd_device *obd,
643                                     struct obd_uuid *cluuid)
644 {
645         struct obd_export *export, *tmp;
646
647         OBD_ALLOC(export, sizeof(*export));
648         if (!export)
649                 return ERR_PTR(-ENOMEM);
650
651         export->exp_conn_cnt = 0;
652         atomic_set(&export->exp_refcount, 2);
653         export->exp_obd = obd;
654         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
655         /* XXX this should be in LDLM init */
656         CFS_INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
657         spin_lock_init(&export->exp_ldlm_data.led_lock);
658
659         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
660         class_handle_hash(&export->exp_handle, export_handle_addref);
661         export->exp_last_request_time = CURRENT_SECONDS;
662         spin_lock_init(&export->exp_lock);
663
664         export->exp_client_uuid = *cluuid;
665         obd_init_export(export);
666
667         spin_lock(&obd->obd_dev_lock);
668         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
669                 list_for_each_entry(tmp, &obd->obd_exports, exp_obd_chain) {
670                         if (obd_uuid_equals(cluuid, &tmp->exp_client_uuid)) {
671                                 spin_unlock(&obd->obd_dev_lock);
672                                 CWARN("%s: denying duplicate export for %s\n",
673                                       obd->obd_name, cluuid->uuid);
674                                 class_handle_unhash(&export->exp_handle);
675                                 OBD_FREE_PTR(export);
676                                 return ERR_PTR(-EALREADY);
677                         }
678                 }
679         }
680         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
681         class_incref(obd);
682         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
683         list_add_tail(&export->exp_obd_chain_timed,
684                       &export->exp_obd->obd_exports_timed);
685         export->exp_obd->obd_num_exports++;
686         spin_unlock(&obd->obd_dev_lock);
687
688         return export;
689 }
690 EXPORT_SYMBOL(class_new_export);
691
692 void class_unlink_export(struct obd_export *exp)
693 {
694         class_handle_unhash(&exp->exp_handle);
695
696         spin_lock(&exp->exp_obd->obd_dev_lock);
697         list_del_init(&exp->exp_obd_chain);
698         list_del_init(&exp->exp_obd_chain_timed);
699         exp->exp_obd->obd_num_exports--;
700         spin_unlock(&exp->exp_obd->obd_dev_lock);
701
702         class_export_put(exp);
703 }
704 EXPORT_SYMBOL(class_unlink_export);
705
706 /* Import management functions */
707 static void import_handle_addref(void *import)
708 {
709         class_import_get(import);
710 }
711
712 struct obd_import *class_import_get(struct obd_import *import)
713 {
714         LASSERT(atomic_read(&import->imp_refcount) >= 0);
715         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
716         atomic_inc(&import->imp_refcount);
717         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
718                atomic_read(&import->imp_refcount));
719         return import;
720 }
721 EXPORT_SYMBOL(class_import_get);
722
723 void class_import_put(struct obd_import *import)
724 {
725         ENTRY;
726
727         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
728                atomic_read(&import->imp_refcount) - 1);
729
730         LASSERT(atomic_read(&import->imp_refcount) > 0);
731         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
732         if (!atomic_dec_and_test(&import->imp_refcount)) {
733                 EXIT;
734                 return;
735         }
736
737         CDEBUG(D_IOCTL, "destroying import %p\n", import);
738
739         ptlrpc_put_connection_superhack(import->imp_connection);
740
741         while (!list_empty(&import->imp_conn_list)) {
742                 struct obd_import_conn *imp_conn;
743
744                 imp_conn = list_entry(import->imp_conn_list.next,
745                                       struct obd_import_conn, oic_item);
746                 list_del(&imp_conn->oic_item);
747                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
748                 OBD_FREE(imp_conn, sizeof(*imp_conn));
749         }
750
751         LASSERT(list_empty(&import->imp_handle.h_link));
752         LASSERT(import->imp_sec == NULL);
753         class_decref(import->imp_obd);
754         OBD_FREE(import, sizeof(*import));
755         EXIT;
756 }
757 EXPORT_SYMBOL(class_import_put);
758
759 struct obd_import *class_new_import(struct obd_device *obd)
760 {
761         struct obd_import *imp;
762
763         OBD_ALLOC(imp, sizeof(*imp));
764         if (imp == NULL)
765                 return NULL;
766
767         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
768         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
769         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
770         spin_lock_init(&imp->imp_lock);
771         imp->imp_last_success_conn = 0;
772         imp->imp_state = LUSTRE_IMP_NEW;
773         imp->imp_obd = class_incref(obd);
774         cfs_waitq_init(&imp->imp_recovery_waitq);
775
776         atomic_set(&imp->imp_refcount, 2);
777         atomic_set(&imp->imp_inflight, 0);
778         atomic_set(&imp->imp_replay_inflight, 0);
779         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
780         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
781         class_handle_hash(&imp->imp_handle, import_handle_addref);
782
783         /* the default magic is V1, will be used in connect RPC, and
784          * then adjusted according to the flags in request/reply. */
785         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V1;
786
787         return imp;
788 }
789 EXPORT_SYMBOL(class_new_import);
790
791 void class_destroy_import(struct obd_import *import)
792 {
793         LASSERT(import != NULL);
794         LASSERT(import != LP_POISON);
795
796         class_handle_unhash(&import->imp_handle);
797
798         import->imp_generation++;
799         class_import_put(import);
800 }
801 EXPORT_SYMBOL(class_destroy_import);
802
803 /* A connection defines an export context in which preallocation can
804    be managed. This releases the export pointer reference, and returns
805    the export handle, so the export refcount is 1 when this function
806    returns. */
807 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
808                   struct obd_uuid *cluuid)
809 {
810         struct obd_export *export;
811         LASSERT(conn != NULL);
812         LASSERT(obd != NULL);
813         LASSERT(cluuid != NULL);
814         ENTRY;
815
816         export = class_new_export(obd, cluuid);
817         if (IS_ERR(export))
818                 RETURN(PTR_ERR(export));
819
820         conn->cookie = export->exp_handle.h_cookie;
821         class_export_put(export);
822
823         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
824                cluuid->uuid, conn->cookie);
825         RETURN(0);
826 }
827 EXPORT_SYMBOL(class_connect);
828
829 /* This function removes two references from the export: one for the
830  * hash entry and one for the export pointer passed in.  The export
831  * pointer passed to this function is destroyed should not be used
832  * again. */
833 int class_disconnect(struct obd_export *export)
834 {
835         int already_disconnected;
836         ENTRY;
837
838         if (export == NULL) {
839                 fixme();
840                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
841                 RETURN(-EINVAL);
842         }
843
844         spin_lock(&export->exp_lock);
845         already_disconnected = export->exp_disconnected;
846         export->exp_disconnected = 1;
847         spin_unlock(&export->exp_lock);
848
849         /* class_cleanup(), abort_recovery(), and class_fail_export()
850          * all end up in here, and if any of them race we shouldn't
851          * call extra class_export_puts(). */
852         if (already_disconnected)
853                 RETURN(0);
854
855         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
856                export->exp_handle.h_cookie);
857
858         class_unlink_export(export);
859         class_export_put(export);
860         RETURN(0);
861 }
862
863 static void class_disconnect_export_list(struct list_head *list, int flags)
864 {
865         int rc;
866         struct lustre_handle fake_conn;
867         struct obd_export *fake_exp, *exp;
868         ENTRY;
869
870         /* It's possible that an export may disconnect itself, but
871          * nothing else will be added to this list. */
872         while (!list_empty(list)) {
873                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
874                 class_export_get(exp);
875                 exp->exp_flags = flags;
876
877                 if (obd_uuid_equals(&exp->exp_client_uuid,
878                                     &exp->exp_obd->obd_uuid)) {
879                         CDEBUG(D_HA,
880                                "exp %p export uuid == obd uuid, don't discon\n",
881                                exp);
882                         /* Need to delete this now so we don't end up pointing
883                          * to work_list later when this export is cleaned up. */
884                         list_del_init(&exp->exp_obd_chain);
885                         class_export_put(exp);
886                         continue;
887                 }
888
889                 fake_conn.cookie = exp->exp_handle.h_cookie;
890                 fake_exp = class_conn2export(&fake_conn);
891                 if (!fake_exp) {
892                         class_export_put(exp);
893                         continue;
894                 }
895                 fake_exp->exp_flags = flags;
896                 rc = obd_disconnect(fake_exp);
897                 class_export_put(exp);
898                 if (rc) {
899                         CDEBUG(D_HA, "disconnecting export %p failed: %d\n",
900                                exp, rc);
901                 } else {
902                         CDEBUG(D_HA, "export %p disconnected\n", exp);
903                 }
904         }
905         EXIT;
906 }
907
908 static inline int get_exp_flags_from_obd(struct obd_device *obd)
909 {
910         return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
911                 (obd->obd_force ? OBD_OPT_FORCE : 0));
912 }
913
914 void class_disconnect_exports(struct obd_device *obd)
915 {
916         struct list_head work_list;
917         ENTRY;
918
919         /* Move all of the exports from obd_exports to a work list, en masse. */
920         spin_lock(&obd->obd_dev_lock);
921         list_add(&work_list, &obd->obd_exports);
922         list_del_init(&obd->obd_exports);
923         spin_unlock(&obd->obd_dev_lock);
924
925         CDEBUG(D_HA, "OBD device %d (%p) has exports, "
926                "disconnecting them\n", obd->obd_minor, obd);
927         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
928         EXIT;
929 }
930 EXPORT_SYMBOL(class_disconnect_exports);
931
932 /* Remove exports that have not completed recovery.
933  */
934 void class_disconnect_stale_exports(struct obd_device *obd)
935 {
936         struct list_head work_list;
937         struct list_head *pos, *n;
938         struct obd_export *exp;
939         int cnt = 0;
940         ENTRY;
941
942         CFS_INIT_LIST_HEAD(&work_list);
943         spin_lock(&obd->obd_dev_lock);
944         list_for_each_safe(pos, n, &obd->obd_exports) {
945                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
946                 if (exp->exp_replay_needed) {
947                         list_del(&exp->exp_obd_chain);
948                         list_add(&exp->exp_obd_chain, &work_list);
949                         cnt++;
950                 }
951         }
952         spin_unlock(&obd->obd_dev_lock);
953
954         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
955                obd->obd_name, cnt);
956         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
957         EXIT;
958 }
959 EXPORT_SYMBOL(class_disconnect_stale_exports);
960
961 int oig_init(struct obd_io_group **oig_out)
962 {
963         struct obd_io_group *oig;
964         ENTRY;
965
966         OBD_ALLOC(oig, sizeof(*oig));
967         if (oig == NULL)
968                 RETURN(-ENOMEM);
969
970         spin_lock_init(&oig->oig_lock);
971         oig->oig_rc = 0;
972         oig->oig_pending = 0;
973         atomic_set(&oig->oig_refcount, 1);
974         cfs_waitq_init(&oig->oig_waitq);
975         CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
976
977         *oig_out = oig;
978         RETURN(0);
979 };
980 EXPORT_SYMBOL(oig_init);
981
982 static inline void oig_grab(struct obd_io_group *oig)
983 {
984         atomic_inc(&oig->oig_refcount);
985 }
986
987 void oig_release(struct obd_io_group *oig)
988 {
989         if (atomic_dec_and_test(&oig->oig_refcount))
990                 OBD_FREE(oig, sizeof(*oig));
991 }
992 EXPORT_SYMBOL(oig_release);
993
994 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
995 {
996         int rc = 0;
997         CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
998         spin_lock(&oig->oig_lock);
999         if (oig->oig_rc) {
1000                 rc = oig->oig_rc;
1001         } else {
1002                 oig->oig_pending++;
1003                 if (occ != NULL)
1004                         list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1005         }
1006         spin_unlock(&oig->oig_lock);
1007         oig_grab(oig);
1008
1009         return 0;
1010 }
1011 EXPORT_SYMBOL(oig_add_one);
1012
1013 void oig_complete_one(struct obd_io_group *oig,
1014                       struct oig_callback_context *occ, int rc)
1015 {
1016         cfs_waitq_t *wake = NULL;
1017         int old_rc;
1018
1019         spin_lock(&oig->oig_lock);
1020
1021         if (occ != NULL)
1022                 list_del_init(&occ->occ_oig_item);
1023
1024         old_rc = oig->oig_rc;
1025         if (oig->oig_rc == 0 && rc != 0)
1026                 oig->oig_rc = rc;
1027
1028         if (--oig->oig_pending <= 0)
1029                 wake = &oig->oig_waitq;
1030
1031         spin_unlock(&oig->oig_lock);
1032
1033         CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1034                         "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1035                         oig->oig_pending);
1036         if (wake)
1037                 cfs_waitq_signal(wake);
1038         oig_release(oig);
1039 }
1040 EXPORT_SYMBOL(oig_complete_one);
1041
1042 static int oig_done(struct obd_io_group *oig)
1043 {
1044         int rc = 0;
1045         spin_lock(&oig->oig_lock);
1046         if (oig->oig_pending <= 0)
1047                 rc = 1;
1048         spin_unlock(&oig->oig_lock);
1049         return rc;
1050 }
1051
1052 static void interrupted_oig(void *data)
1053 {
1054         struct obd_io_group *oig = data;
1055         struct oig_callback_context *occ;
1056
1057         spin_lock(&oig->oig_lock);
1058         /* We need to restart the processing each time we drop the lock, as
1059          * it is possible other threads called oig_complete_one() to remove
1060          * an entry elsewhere in the list while we dropped lock.  We need to
1061          * drop the lock because osc_ap_completion() calls oig_complete_one()
1062          * which re-gets this lock ;-) as well as a lock ordering issue. */
1063 restart:
1064         list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1065                 if (occ->interrupted)
1066                         continue;
1067                 occ->interrupted = 1;
1068                 spin_unlock(&oig->oig_lock);
1069                 occ->occ_interrupted(occ);
1070                 spin_lock(&oig->oig_lock);
1071                 goto restart;
1072         }
1073         spin_unlock(&oig->oig_lock);
1074 }
1075
1076 int oig_wait(struct obd_io_group *oig)
1077 {
1078         struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1079         int rc;
1080
1081         CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1082
1083         do {
1084                 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1085                 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1086                 /* we can't continue until the oig has emptied and stopped
1087                  * referencing state that the caller will free upon return */
1088                 if (rc == -EINTR)
1089                         lwi = (struct l_wait_info){ 0, };
1090         } while (rc == -EINTR);
1091
1092         LASSERTF(oig->oig_pending == 0,
1093                  "exiting oig_wait(oig = %p) with %d pending\n", oig,
1094                  oig->oig_pending);
1095
1096         CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1097         return oig->oig_rc;
1098 }
1099 EXPORT_SYMBOL(oig_wait);
1100
1101 void class_fail_export(struct obd_export *exp)
1102 {
1103         int rc, already_failed;
1104
1105         spin_lock(&exp->exp_lock);
1106         already_failed = exp->exp_failed;
1107         exp->exp_failed = 1;
1108         spin_unlock(&exp->exp_lock);
1109
1110         if (already_failed) {
1111                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1112                        exp, exp->exp_client_uuid.uuid);
1113                 return;
1114         }
1115
1116         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1117                exp, exp->exp_client_uuid.uuid);
1118
1119         if (obd_dump_on_timeout)
1120                 libcfs_debug_dumplog();
1121
1122         /* Most callers into obd_disconnect are removing their own reference
1123          * (request, for example) in addition to the one from the hash table.
1124          * We don't have such a reference here, so make one. */
1125         class_export_get(exp);
1126         rc = obd_disconnect(exp);
1127         if (rc)
1128                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1129         else
1130                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1131                        exp, exp->exp_client_uuid.uuid);
1132 }
1133 EXPORT_SYMBOL(class_fail_export);
1134
1135 char *obd_export_nid2str(struct obd_export *exp)
1136 {
1137         if (exp->exp_connection != NULL)
1138                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1139
1140         return "(no nid)";
1141 }
1142 EXPORT_SYMBOL(obd_export_nid2str);
1143
1144 #define EVICT_BATCH 32
1145 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1146 {
1147         struct obd_export *doomed_exp[EVICT_BATCH] = { NULL };
1148         struct list_head *p;
1149         int exports_evicted = 0, num_to_evict = 0, i;
1150
1151 search_again:
1152         spin_lock(&obd->obd_dev_lock);
1153         list_for_each(p, &obd->obd_exports) {
1154                 doomed_exp[num_to_evict] = list_entry(p, struct obd_export,
1155                                                       exp_obd_chain);
1156                 if (strcmp(obd_export_nid2str(doomed_exp[num_to_evict]),
1157                            nid) == 0) {
1158                         class_export_get(doomed_exp[num_to_evict]);
1159                         if (++num_to_evict == EVICT_BATCH)
1160                                 break;
1161                 }
1162         }
1163         spin_unlock(&obd->obd_dev_lock);
1164
1165         for (i = 0; i < num_to_evict; i++) {
1166                 exports_evicted++;
1167                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1168                        obd->obd_name, nid, doomed_exp[i]->exp_client_uuid.uuid,
1169                        exports_evicted);
1170                 class_fail_export(doomed_exp[i]);
1171                 class_export_put(doomed_exp[i]);
1172         }
1173         if (num_to_evict == EVICT_BATCH) {
1174                 num_to_evict = 0;
1175                 goto search_again;
1176         }
1177
1178         if (!exports_evicted)
1179                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1180                        obd->obd_name, nid);
1181         return exports_evicted;
1182 }
1183 EXPORT_SYMBOL(obd_export_evict_by_nid);
1184
1185 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1186 {
1187         struct obd_export *doomed_exp = NULL;
1188         struct list_head *p;
1189         struct obd_uuid doomed;
1190         int exports_evicted = 0;
1191
1192         obd_str2uuid(&doomed, uuid);
1193
1194         spin_lock(&obd->obd_dev_lock);
1195         list_for_each(p, &obd->obd_exports) {
1196                 doomed_exp = list_entry(p, struct obd_export, exp_obd_chain);
1197
1198                 if (obd_uuid_equals(&doomed, &doomed_exp->exp_client_uuid)) {
1199                         class_export_get(doomed_exp);
1200                         break;
1201                 }
1202                 doomed_exp = NULL;
1203         }
1204         spin_unlock(&obd->obd_dev_lock);
1205
1206         if (doomed_exp == NULL) {
1207                 CERROR("%s: can't disconnect %s: no exports found\n",
1208                        obd->obd_name, uuid);
1209         } else {
1210                 CWARN("%s: evicting %s at adminstrative request\n",
1211                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1212                 class_fail_export(doomed_exp);
1213                 class_export_put(doomed_exp);
1214                 exports_evicted++;
1215         }
1216
1217         return exports_evicted;
1218 }
1219 EXPORT_SYMBOL(obd_export_evict_by_uuid);