Whamcloud - gitweb
LU-1711 mount: obd_mount to start osd
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 cfs_spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         cfs_spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         cfs_spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         cfs_spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124                 if (!cfs_request_module("%s", modname)) {
125                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126                         type = class_search_type(name);
127                 } else {
128                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
129                                            modname);
130                 }
131         }
132 #endif
133         if (type) {
134                 cfs_spin_lock(&type->obd_type_lock);
135                 type->typ_refcnt++;
136                 cfs_try_module_get(type->typ_dt_ops->o_owner);
137                 cfs_spin_unlock(&type->obd_type_lock);
138         }
139         return type;
140 }
141 EXPORT_SYMBOL(class_get_type);
142
143 void class_put_type(struct obd_type *type)
144 {
145         LASSERT(type);
146         cfs_spin_lock(&type->obd_type_lock);
147         type->typ_refcnt--;
148         cfs_module_put(type->typ_dt_ops->o_owner);
149         cfs_spin_unlock(&type->obd_type_lock);
150 }
151 EXPORT_SYMBOL(class_put_type);
152
153 #define CLASS_MAX_NAME 1024
154
155 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
156                         struct lprocfs_vars *vars, const char *name,
157                         struct lu_device_type *ldt)
158 {
159         struct obd_type *type;
160         int rc = 0;
161         ENTRY;
162
163         /* sanity check */
164         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
165
166         if (class_search_type(name)) {
167                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
168                 RETURN(-EEXIST);
169         }
170
171         rc = -ENOMEM;
172         OBD_ALLOC(type, sizeof(*type));
173         if (type == NULL)
174                 RETURN(rc);
175
176         OBD_ALLOC_PTR(type->typ_dt_ops);
177         OBD_ALLOC_PTR(type->typ_md_ops);
178         OBD_ALLOC(type->typ_name, strlen(name) + 1);
179
180         if (type->typ_dt_ops == NULL ||
181             type->typ_md_ops == NULL ||
182             type->typ_name == NULL)
183                 GOTO (failed, rc);
184
185         *(type->typ_dt_ops) = *dt_ops;
186         /* md_ops is optional */
187         if (md_ops)
188                 *(type->typ_md_ops) = *md_ops;
189         strcpy(type->typ_name, name);
190         cfs_spin_lock_init(&type->obd_type_lock);
191
192 #ifdef LPROCFS
193         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
194                                               vars, type);
195         if (IS_ERR(type->typ_procroot)) {
196                 rc = PTR_ERR(type->typ_procroot);
197                 type->typ_procroot = NULL;
198                 GOTO (failed, rc);
199         }
200 #endif
201         if (ldt != NULL) {
202                 type->typ_lu = ldt;
203                 rc = lu_device_type_init(ldt);
204                 if (rc != 0)
205                         GOTO (failed, rc);
206         }
207
208         cfs_spin_lock(&obd_types_lock);
209         cfs_list_add(&type->typ_chain, &obd_types);
210         cfs_spin_unlock(&obd_types_lock);
211
212         RETURN (0);
213
214  failed:
215         if (type->typ_name != NULL)
216                 OBD_FREE(type->typ_name, strlen(name) + 1);
217         if (type->typ_md_ops != NULL)
218                 OBD_FREE_PTR(type->typ_md_ops);
219         if (type->typ_dt_ops != NULL)
220                 OBD_FREE_PTR(type->typ_dt_ops);
221         OBD_FREE(type, sizeof(*type));
222         RETURN(rc);
223 }
224 EXPORT_SYMBOL(class_register_type);
225
226 int class_unregister_type(const char *name)
227 {
228         struct obd_type *type = class_search_type(name);
229         ENTRY;
230
231         if (!type) {
232                 CERROR("unknown obd type\n");
233                 RETURN(-EINVAL);
234         }
235
236         if (type->typ_refcnt) {
237                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
238                 /* This is a bad situation, let's make the best of it */
239                 /* Remove ops, but leave the name for debugging */
240                 OBD_FREE_PTR(type->typ_dt_ops);
241                 OBD_FREE_PTR(type->typ_md_ops);
242                 RETURN(-EBUSY);
243         }
244
245         if (type->typ_procroot) {
246                 lprocfs_remove(&type->typ_procroot);
247         }
248
249         if (type->typ_lu)
250                 lu_device_type_fini(type->typ_lu);
251
252         cfs_spin_lock(&obd_types_lock);
253         cfs_list_del(&type->typ_chain);
254         cfs_spin_unlock(&obd_types_lock);
255         OBD_FREE(type->typ_name, strlen(name) + 1);
256         if (type->typ_dt_ops != NULL)
257                 OBD_FREE_PTR(type->typ_dt_ops);
258         if (type->typ_md_ops != NULL)
259                 OBD_FREE_PTR(type->typ_md_ops);
260         OBD_FREE(type, sizeof(*type));
261         RETURN(0);
262 } /* class_unregister_type */
263 EXPORT_SYMBOL(class_unregister_type);
264
265 /**
266  * Create a new obd device.
267  *
268  * Find an empty slot in ::obd_devs[], create a new obd device in it.
269  *
270  * \param[in] type_name obd device type string.
271  * \param[in] name      obd device name.
272  *
273  * \retval NULL if create fails, otherwise return the obd device
274  *         pointer created.
275  */
276 struct obd_device *class_newdev(const char *type_name, const char *name)
277 {
278         struct obd_device *result = NULL;
279         struct obd_device *newdev;
280         struct obd_type *type = NULL;
281         int i;
282         int new_obd_minor = 0;
283         ENTRY;
284
285         if (strlen(name) >= MAX_OBD_NAME) {
286                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287                 RETURN(ERR_PTR(-EINVAL));
288         }
289
290         type = class_get_type(type_name);
291         if (type == NULL){
292                 CERROR("OBD: unknown type: %s\n", type_name);
293                 RETURN(ERR_PTR(-ENODEV));
294         }
295
296         newdev = obd_device_alloc();
297         if (newdev == NULL) {
298                 class_put_type(type);
299                 RETURN(ERR_PTR(-ENOMEM));
300         }
301         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
302
303         cfs_write_lock(&obd_dev_lock);
304         for (i = 0; i < class_devno_max(); i++) {
305                 struct obd_device *obd = class_num2obd(i);
306
307                 if (obd && obd->obd_name &&
308                     (strcmp(name, obd->obd_name) == 0)) {
309                         CERROR("Device %s already exists at %d, won't add\n",
310                                name, i);
311                         if (result) {
312                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
313                                          "%p obd_magic %08x != %08x\n", result,
314                                          result->obd_magic, OBD_DEVICE_MAGIC);
315                                 LASSERTF(result->obd_minor == new_obd_minor,
316                                          "%p obd_minor %d != %d\n", result,
317                                          result->obd_minor, new_obd_minor);
318
319                                 obd_devs[result->obd_minor] = NULL;
320                                 result->obd_name[0]='\0';
321                          }
322                         result = ERR_PTR(-EEXIST);
323                         break;
324                 }
325                 if (!result && !obd) {
326                         result = newdev;
327                         result->obd_minor = i;
328                         new_obd_minor = i;
329                         result->obd_type = type;
330                         strncpy(result->obd_name, name,
331                                 sizeof(result->obd_name) - 1);
332                         obd_devs[i] = result;
333                 }
334         }
335         cfs_write_unlock(&obd_dev_lock);
336
337         if (result == NULL && i >= class_devno_max()) {
338                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339                        class_devno_max());
340                 RETURN(ERR_PTR(-EOVERFLOW));
341         }
342
343         if (IS_ERR(result)) {
344                 obd_device_free(newdev);
345                 class_put_type(type);
346         } else {
347                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
348                        result->obd_name, result);
349         }
350         RETURN(result);
351 }
352
353 void class_release_dev(struct obd_device *obd)
354 {
355         struct obd_type *obd_type = obd->obd_type;
356
357         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
358                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
359         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
360                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
361         LASSERT(obd_type != NULL);
362
363         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
364                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
365
366         cfs_write_lock(&obd_dev_lock);
367         obd_devs[obd->obd_minor] = NULL;
368         cfs_write_unlock(&obd_dev_lock);
369         obd_device_free(obd);
370
371         class_put_type(obd_type);
372 }
373
374 int class_name2dev(const char *name)
375 {
376         int i;
377
378         if (!name)
379                 return -1;
380
381         cfs_read_lock(&obd_dev_lock);
382         for (i = 0; i < class_devno_max(); i++) {
383                 struct obd_device *obd = class_num2obd(i);
384
385                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
386                         /* Make sure we finished attaching before we give
387                            out any references */
388                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
389                         if (obd->obd_attached) {
390                                 cfs_read_unlock(&obd_dev_lock);
391                                 return i;
392                         }
393                         break;
394                 }
395         }
396         cfs_read_unlock(&obd_dev_lock);
397
398         return -1;
399 }
400 EXPORT_SYMBOL(class_name2dev);
401
402 struct obd_device *class_name2obd(const char *name)
403 {
404         int dev = class_name2dev(name);
405
406         if (dev < 0 || dev > class_devno_max())
407                 return NULL;
408         return class_num2obd(dev);
409 }
410 EXPORT_SYMBOL(class_name2obd);
411
412 int class_uuid2dev(struct obd_uuid *uuid)
413 {
414         int i;
415
416         cfs_read_lock(&obd_dev_lock);
417         for (i = 0; i < class_devno_max(); i++) {
418                 struct obd_device *obd = class_num2obd(i);
419
420                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
421                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
422                         cfs_read_unlock(&obd_dev_lock);
423                         return i;
424                 }
425         }
426         cfs_read_unlock(&obd_dev_lock);
427
428         return -1;
429 }
430 EXPORT_SYMBOL(class_uuid2dev);
431
432 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
433 {
434         int dev = class_uuid2dev(uuid);
435         if (dev < 0)
436                 return NULL;
437         return class_num2obd(dev);
438 }
439 EXPORT_SYMBOL(class_uuid2obd);
440
441 /**
442  * Get obd device from ::obd_devs[]
443  *
444  * \param num [in] array index
445  *
446  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
447  *         otherwise return the obd device there.
448  */
449 struct obd_device *class_num2obd(int num)
450 {
451         struct obd_device *obd = NULL;
452
453         if (num < class_devno_max()) {
454                 obd = obd_devs[num];
455                 if (obd == NULL)
456                         return NULL;
457
458                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
459                          "%p obd_magic %08x != %08x\n",
460                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
461                 LASSERTF(obd->obd_minor == num,
462                          "%p obd_minor %0d != %0d\n",
463                          obd, obd->obd_minor, num);
464         }
465
466         return obd;
467 }
468 EXPORT_SYMBOL(class_num2obd);
469
470 void class_obd_list(void)
471 {
472         char *status;
473         int i;
474
475         cfs_read_lock(&obd_dev_lock);
476         for (i = 0; i < class_devno_max(); i++) {
477                 struct obd_device *obd = class_num2obd(i);
478
479                 if (obd == NULL)
480                         continue;
481                 if (obd->obd_stopping)
482                         status = "ST";
483                 else if (obd->obd_set_up)
484                         status = "UP";
485                 else if (obd->obd_attached)
486                         status = "AT";
487                 else
488                         status = "--";
489                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
490                          i, status, obd->obd_type->typ_name,
491                          obd->obd_name, obd->obd_uuid.uuid,
492                          cfs_atomic_read(&obd->obd_refcount));
493         }
494         cfs_read_unlock(&obd_dev_lock);
495         return;
496 }
497
498 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
499    specified, then only the client with that uuid is returned,
500    otherwise any client connected to the tgt is returned. */
501 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
502                                           const char * typ_name,
503                                           struct obd_uuid *grp_uuid)
504 {
505         int i;
506
507         cfs_read_lock(&obd_dev_lock);
508         for (i = 0; i < class_devno_max(); i++) {
509                 struct obd_device *obd = class_num2obd(i);
510
511                 if (obd == NULL)
512                         continue;
513                 if ((strncmp(obd->obd_type->typ_name, typ_name,
514                              strlen(typ_name)) == 0)) {
515                         if (obd_uuid_equals(tgt_uuid,
516                                             &obd->u.cli.cl_target_uuid) &&
517                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
518                                                          &obd->obd_uuid) : 1)) {
519                                 cfs_read_unlock(&obd_dev_lock);
520                                 return obd;
521                         }
522                 }
523         }
524         cfs_read_unlock(&obd_dev_lock);
525
526         return NULL;
527 }
528 EXPORT_SYMBOL(class_find_client_obd);
529
530 /* Iterate the obd_device list looking devices have grp_uuid. Start
531    searching at *next, and if a device is found, the next index to look
532    at is saved in *next. If next is NULL, then the first matching device
533    will always be returned. */
534 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
535 {
536         int i;
537
538         if (next == NULL)
539                 i = 0;
540         else if (*next >= 0 && *next < class_devno_max())
541                 i = *next;
542         else
543                 return NULL;
544
545         cfs_read_lock(&obd_dev_lock);
546         for (; i < class_devno_max(); i++) {
547                 struct obd_device *obd = class_num2obd(i);
548
549                 if (obd == NULL)
550                         continue;
551                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
552                         if (next != NULL)
553                                 *next = i+1;
554                         cfs_read_unlock(&obd_dev_lock);
555                         return obd;
556                 }
557         }
558         cfs_read_unlock(&obd_dev_lock);
559
560         return NULL;
561 }
562 EXPORT_SYMBOL(class_devices_in_group);
563
564 /**
565  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
566  * adjust sptlrpc settings accordingly.
567  */
568 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
569 {
570         struct obd_device  *obd;
571         const char         *type;
572         int                 i, rc = 0, rc2;
573
574         LASSERT(namelen > 0);
575
576         cfs_read_lock(&obd_dev_lock);
577         for (i = 0; i < class_devno_max(); i++) {
578                 obd = class_num2obd(i);
579
580                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
581                         continue;
582
583                 /* only notify mdc, osc, mdt, ost */
584                 type = obd->obd_type->typ_name;
585                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
586                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
587                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
588                     strcmp(type, LUSTRE_OST_NAME) != 0)
589                         continue;
590
591                 if (strncmp(obd->obd_name, fsname, namelen))
592                         continue;
593
594                 class_incref(obd, __FUNCTION__, obd);
595                 cfs_read_unlock(&obd_dev_lock);
596                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
597                                          sizeof(KEY_SPTLRPC_CONF),
598                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
599                 rc = rc ? rc : rc2;
600                 class_decref(obd, __FUNCTION__, obd);
601                 cfs_read_lock(&obd_dev_lock);
602         }
603         cfs_read_unlock(&obd_dev_lock);
604         return rc;
605 }
606 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
607
608 void obd_cleanup_caches(void)
609 {
610         int rc;
611
612         ENTRY;
613         if (obd_device_cachep) {
614                 rc = cfs_mem_cache_destroy(obd_device_cachep);
615                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
616                 obd_device_cachep = NULL;
617         }
618         if (obdo_cachep) {
619                 rc = cfs_mem_cache_destroy(obdo_cachep);
620                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
621                 obdo_cachep = NULL;
622         }
623         if (import_cachep) {
624                 rc = cfs_mem_cache_destroy(import_cachep);
625                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
626                 import_cachep = NULL;
627         }
628         if (capa_cachep) {
629                 rc = cfs_mem_cache_destroy(capa_cachep);
630                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
631                 capa_cachep = NULL;
632         }
633         EXIT;
634 }
635
636 int obd_init_caches(void)
637 {
638         ENTRY;
639
640         LASSERT(obd_device_cachep == NULL);
641         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
642                                                  sizeof(struct obd_device),
643                                                  0, 0);
644         if (!obd_device_cachep)
645                 GOTO(out, -ENOMEM);
646
647         LASSERT(obdo_cachep == NULL);
648         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
649                                            0, 0);
650         if (!obdo_cachep)
651                 GOTO(out, -ENOMEM);
652
653         LASSERT(import_cachep == NULL);
654         import_cachep = cfs_mem_cache_create("ll_import_cache",
655                                              sizeof(struct obd_import),
656                                              0, 0);
657         if (!import_cachep)
658                 GOTO(out, -ENOMEM);
659
660         LASSERT(capa_cachep == NULL);
661         capa_cachep = cfs_mem_cache_create("capa_cache",
662                                            sizeof(struct obd_capa), 0, 0);
663         if (!capa_cachep)
664                 GOTO(out, -ENOMEM);
665
666         RETURN(0);
667  out:
668         obd_cleanup_caches();
669         RETURN(-ENOMEM);
670
671 }
672
673 /* map connection to client */
674 struct obd_export *class_conn2export(struct lustre_handle *conn)
675 {
676         struct obd_export *export;
677         ENTRY;
678
679         if (!conn) {
680                 CDEBUG(D_CACHE, "looking for null handle\n");
681                 RETURN(NULL);
682         }
683
684         if (conn->cookie == -1) {  /* this means assign a new connection */
685                 CDEBUG(D_CACHE, "want a new connection\n");
686                 RETURN(NULL);
687         }
688
689         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
690         export = class_handle2object(conn->cookie);
691         RETURN(export);
692 }
693 EXPORT_SYMBOL(class_conn2export);
694
695 struct obd_device *class_exp2obd(struct obd_export *exp)
696 {
697         if (exp)
698                 return exp->exp_obd;
699         return NULL;
700 }
701 EXPORT_SYMBOL(class_exp2obd);
702
703 struct obd_device *class_conn2obd(struct lustre_handle *conn)
704 {
705         struct obd_export *export;
706         export = class_conn2export(conn);
707         if (export) {
708                 struct obd_device *obd = export->exp_obd;
709                 class_export_put(export);
710                 return obd;
711         }
712         return NULL;
713 }
714 EXPORT_SYMBOL(class_conn2obd);
715
716 struct obd_import *class_exp2cliimp(struct obd_export *exp)
717 {
718         struct obd_device *obd = exp->exp_obd;
719         if (obd == NULL)
720                 return NULL;
721         return obd->u.cli.cl_import;
722 }
723 EXPORT_SYMBOL(class_exp2cliimp);
724
725 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
726 {
727         struct obd_device *obd = class_conn2obd(conn);
728         if (obd == NULL)
729                 return NULL;
730         return obd->u.cli.cl_import;
731 }
732 EXPORT_SYMBOL(class_conn2cliimp);
733
734 /* Export management functions */
735 static void class_export_destroy(struct obd_export *exp)
736 {
737         struct obd_device *obd = exp->exp_obd;
738         ENTRY;
739
740         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
741         LASSERT(obd != NULL);
742
743         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
744                exp->exp_client_uuid.uuid, obd->obd_name);
745
746         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
747         if (exp->exp_connection)
748                 ptlrpc_put_connection_superhack(exp->exp_connection);
749
750         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
751         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
752         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
753         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
754         obd_destroy_export(exp);
755         class_decref(obd, "export", exp);
756
757         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
758         EXIT;
759 }
760
761 static void export_handle_addref(void *export)
762 {
763         class_export_get(export);
764 }
765
766 static struct portals_handle_ops export_handle_ops = {
767         .hop_addref = export_handle_addref,
768         .hop_free   = NULL,
769 };
770
771 struct obd_export *class_export_get(struct obd_export *exp)
772 {
773         cfs_atomic_inc(&exp->exp_refcount);
774         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
775                cfs_atomic_read(&exp->exp_refcount));
776         return exp;
777 }
778 EXPORT_SYMBOL(class_export_get);
779
780 void class_export_put(struct obd_export *exp)
781 {
782         LASSERT(exp != NULL);
783         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
784         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
785                cfs_atomic_read(&exp->exp_refcount) - 1);
786
787         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
788                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
789                 CDEBUG(D_IOCTL, "final put %p/%s\n",
790                        exp, exp->exp_client_uuid.uuid);
791
792                 /* release nid stat refererence */
793                 lprocfs_exp_cleanup(exp);
794
795                 obd_zombie_export_add(exp);
796         }
797 }
798 EXPORT_SYMBOL(class_export_put);
799
800 /* Creates a new export, adds it to the hash table, and returns a
801  * pointer to it. The refcount is 2: one for the hash reference, and
802  * one for the pointer returned by this function. */
803 struct obd_export *class_new_export(struct obd_device *obd,
804                                     struct obd_uuid *cluuid)
805 {
806         struct obd_export *export;
807         cfs_hash_t *hash = NULL;
808         int rc = 0;
809         ENTRY;
810
811         OBD_ALLOC_PTR(export);
812         if (!export)
813                 return ERR_PTR(-ENOMEM);
814
815         export->exp_conn_cnt = 0;
816         export->exp_lock_hash = NULL;
817         export->exp_flock_hash = NULL;
818         cfs_atomic_set(&export->exp_refcount, 2);
819         cfs_atomic_set(&export->exp_rpc_count, 0);
820         cfs_atomic_set(&export->exp_cb_count, 0);
821         cfs_atomic_set(&export->exp_locks_count, 0);
822 #if LUSTRE_TRACKS_LOCK_EXP_REFS
823         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
824         cfs_spin_lock_init(&export->exp_locks_list_guard);
825 #endif
826         cfs_atomic_set(&export->exp_replay_count, 0);
827         export->exp_obd = obd;
828         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
829         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
830         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
831         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
832         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
833         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
834         class_handle_hash(&export->exp_handle, &export_handle_ops);
835         export->exp_last_request_time = cfs_time_current_sec();
836         cfs_spin_lock_init(&export->exp_lock);
837         cfs_spin_lock_init(&export->exp_rpc_lock);
838         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
839         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
840         cfs_spin_lock_init(&export->exp_bl_list_lock);
841         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
842
843         export->exp_sp_peer = LUSTRE_SP_ANY;
844         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
845         export->exp_client_uuid = *cluuid;
846         obd_init_export(export);
847
848         cfs_spin_lock(&obd->obd_dev_lock);
849          /* shouldn't happen, but might race */
850         if (obd->obd_stopping)
851                 GOTO(exit_unlock, rc = -ENODEV);
852
853         hash = cfs_hash_getref(obd->obd_uuid_hash);
854         if (hash == NULL)
855                 GOTO(exit_unlock, rc = -ENODEV);
856         cfs_spin_unlock(&obd->obd_dev_lock);
857
858         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
859                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
860                 if (rc != 0) {
861                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
862                                       obd->obd_name, cluuid->uuid, rc);
863                         GOTO(exit_err, rc = -EALREADY);
864                 }
865         }
866
867         cfs_spin_lock(&obd->obd_dev_lock);
868         if (obd->obd_stopping) {
869                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
870                 GOTO(exit_unlock, rc = -ENODEV);
871         }
872
873         class_incref(obd, "export", export);
874         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
875         cfs_list_add_tail(&export->exp_obd_chain_timed,
876                           &export->exp_obd->obd_exports_timed);
877         export->exp_obd->obd_num_exports++;
878         cfs_spin_unlock(&obd->obd_dev_lock);
879         cfs_hash_putref(hash);
880         RETURN(export);
881
882 exit_unlock:
883         cfs_spin_unlock(&obd->obd_dev_lock);
884 exit_err:
885         if (hash)
886                 cfs_hash_putref(hash);
887         class_handle_unhash(&export->exp_handle);
888         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
889         obd_destroy_export(export);
890         OBD_FREE_PTR(export);
891         return ERR_PTR(rc);
892 }
893 EXPORT_SYMBOL(class_new_export);
894
895 void class_unlink_export(struct obd_export *exp)
896 {
897         class_handle_unhash(&exp->exp_handle);
898
899         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
900         /* delete an uuid-export hashitem from hashtables */
901         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
902                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
903                              &exp->exp_client_uuid,
904                              &exp->exp_uuid_hash);
905
906         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
907         cfs_list_del_init(&exp->exp_obd_chain_timed);
908         exp->exp_obd->obd_num_exports--;
909         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
910         class_export_put(exp);
911 }
912 EXPORT_SYMBOL(class_unlink_export);
913
914 /* Import management functions */
915 void class_import_destroy(struct obd_import *imp)
916 {
917         ENTRY;
918
919         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
920                 imp->imp_obd->obd_name);
921
922         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
923
924         ptlrpc_put_connection_superhack(imp->imp_connection);
925
926         while (!cfs_list_empty(&imp->imp_conn_list)) {
927                 struct obd_import_conn *imp_conn;
928
929                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
930                                           struct obd_import_conn, oic_item);
931                 cfs_list_del_init(&imp_conn->oic_item);
932                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
933                 OBD_FREE(imp_conn, sizeof(*imp_conn));
934         }
935
936         LASSERT(imp->imp_sec == NULL);
937         class_decref(imp->imp_obd, "import", imp);
938         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
939         EXIT;
940 }
941
942 static void import_handle_addref(void *import)
943 {
944         class_import_get(import);
945 }
946
947 static struct portals_handle_ops import_handle_ops = {
948         .hop_addref = import_handle_addref,
949         .hop_free   = NULL,
950 };
951
952 struct obd_import *class_import_get(struct obd_import *import)
953 {
954         cfs_atomic_inc(&import->imp_refcount);
955         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
956                cfs_atomic_read(&import->imp_refcount),
957                import->imp_obd->obd_name);
958         return import;
959 }
960 EXPORT_SYMBOL(class_import_get);
961
962 void class_import_put(struct obd_import *imp)
963 {
964         ENTRY;
965
966         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
967         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
968
969         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
970                cfs_atomic_read(&imp->imp_refcount) - 1,
971                imp->imp_obd->obd_name);
972
973         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
974                 CDEBUG(D_INFO, "final put import %p\n", imp);
975                 obd_zombie_import_add(imp);
976         }
977
978         /* catch possible import put race */
979         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
980         EXIT;
981 }
982 EXPORT_SYMBOL(class_import_put);
983
984 static void init_imp_at(struct imp_at *at) {
985         int i;
986         at_init(&at->iat_net_latency, 0, 0);
987         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
988                 /* max service estimates are tracked on the server side, so
989                    don't use the AT history here, just use the last reported
990                    val. (But keep hist for proc histogram, worst_ever) */
991                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
992                         AT_FLG_NOHIST);
993         }
994 }
995
996 struct obd_import *class_new_import(struct obd_device *obd)
997 {
998         struct obd_import *imp;
999
1000         OBD_ALLOC(imp, sizeof(*imp));
1001         if (imp == NULL)
1002                 return NULL;
1003
1004         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1005         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1006         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1007         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1008         cfs_spin_lock_init(&imp->imp_lock);
1009         imp->imp_last_success_conn = 0;
1010         imp->imp_state = LUSTRE_IMP_NEW;
1011         imp->imp_obd = class_incref(obd, "import", imp);
1012         cfs_mutex_init(&imp->imp_sec_mutex);
1013         cfs_waitq_init(&imp->imp_recovery_waitq);
1014
1015         cfs_atomic_set(&imp->imp_refcount, 2);
1016         cfs_atomic_set(&imp->imp_unregistering, 0);
1017         cfs_atomic_set(&imp->imp_inflight, 0);
1018         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1019         cfs_atomic_set(&imp->imp_inval_count, 0);
1020         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1021         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1022         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1023         init_imp_at(&imp->imp_at);
1024
1025         /* the default magic is V2, will be used in connect RPC, and
1026          * then adjusted according to the flags in request/reply. */
1027         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1028
1029         return imp;
1030 }
1031 EXPORT_SYMBOL(class_new_import);
1032
1033 void class_destroy_import(struct obd_import *import)
1034 {
1035         LASSERT(import != NULL);
1036         LASSERT(import != LP_POISON);
1037
1038         class_handle_unhash(&import->imp_handle);
1039
1040         cfs_spin_lock(&import->imp_lock);
1041         import->imp_generation++;
1042         cfs_spin_unlock(&import->imp_lock);
1043         class_import_put(import);
1044 }
1045 EXPORT_SYMBOL(class_destroy_import);
1046
1047 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1048
1049 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1050 {
1051         cfs_spin_lock(&exp->exp_locks_list_guard);
1052
1053         LASSERT(lock->l_exp_refs_nr >= 0);
1054
1055         if (lock->l_exp_refs_target != NULL &&
1056             lock->l_exp_refs_target != exp) {
1057                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1058                               exp, lock, lock->l_exp_refs_target);
1059         }
1060         if ((lock->l_exp_refs_nr ++) == 0) {
1061                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1062                 lock->l_exp_refs_target = exp;
1063         }
1064         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1065                lock, exp, lock->l_exp_refs_nr);
1066         cfs_spin_unlock(&exp->exp_locks_list_guard);
1067 }
1068 EXPORT_SYMBOL(__class_export_add_lock_ref);
1069
1070 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1071 {
1072         cfs_spin_lock(&exp->exp_locks_list_guard);
1073         LASSERT(lock->l_exp_refs_nr > 0);
1074         if (lock->l_exp_refs_target != exp) {
1075                 LCONSOLE_WARN("lock %p, "
1076                               "mismatching export pointers: %p, %p\n",
1077                               lock, lock->l_exp_refs_target, exp);
1078         }
1079         if (-- lock->l_exp_refs_nr == 0) {
1080                 cfs_list_del_init(&lock->l_exp_refs_link);
1081                 lock->l_exp_refs_target = NULL;
1082         }
1083         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1084                lock, exp, lock->l_exp_refs_nr);
1085         cfs_spin_unlock(&exp->exp_locks_list_guard);
1086 }
1087 EXPORT_SYMBOL(__class_export_del_lock_ref);
1088 #endif
1089
1090 /* A connection defines an export context in which preallocation can
1091    be managed. This releases the export pointer reference, and returns
1092    the export handle, so the export refcount is 1 when this function
1093    returns. */
1094 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1095                   struct obd_uuid *cluuid)
1096 {
1097         struct obd_export *export;
1098         LASSERT(conn != NULL);
1099         LASSERT(obd != NULL);
1100         LASSERT(cluuid != NULL);
1101         ENTRY;
1102
1103         export = class_new_export(obd, cluuid);
1104         if (IS_ERR(export))
1105                 RETURN(PTR_ERR(export));
1106
1107         conn->cookie = export->exp_handle.h_cookie;
1108         class_export_put(export);
1109
1110         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1111                cluuid->uuid, conn->cookie);
1112         RETURN(0);
1113 }
1114 EXPORT_SYMBOL(class_connect);
1115
1116 /* if export is involved in recovery then clean up related things */
1117 void class_export_recovery_cleanup(struct obd_export *exp)
1118 {
1119         struct obd_device *obd = exp->exp_obd;
1120
1121         cfs_spin_lock(&obd->obd_recovery_task_lock);
1122         if (exp->exp_delayed)
1123                 obd->obd_delayed_clients--;
1124         if (obd->obd_recovering && exp->exp_in_recovery) {
1125                 cfs_spin_lock(&exp->exp_lock);
1126                 exp->exp_in_recovery = 0;
1127                 cfs_spin_unlock(&exp->exp_lock);
1128                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1129                 cfs_atomic_dec(&obd->obd_connected_clients);
1130         }
1131         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1132         /** Cleanup req replay fields */
1133         if (exp->exp_req_replay_needed) {
1134                 cfs_spin_lock(&exp->exp_lock);
1135                 exp->exp_req_replay_needed = 0;
1136                 cfs_spin_unlock(&exp->exp_lock);
1137                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1138                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1139         }
1140         /** Cleanup lock replay data */
1141         if (exp->exp_lock_replay_needed) {
1142                 cfs_spin_lock(&exp->exp_lock);
1143                 exp->exp_lock_replay_needed = 0;
1144                 cfs_spin_unlock(&exp->exp_lock);
1145                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1146                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1147         }
1148 }
1149
1150 /* This function removes 1-3 references from the export:
1151  * 1 - for export pointer passed
1152  * and if disconnect really need
1153  * 2 - removing from hash
1154  * 3 - in client_unlink_export
1155  * The export pointer passed to this function can destroyed */
1156 int class_disconnect(struct obd_export *export)
1157 {
1158         int already_disconnected;
1159         ENTRY;
1160
1161         if (export == NULL) {
1162                 CWARN("attempting to free NULL export %p\n", export);
1163                 RETURN(-EINVAL);
1164         }
1165
1166         cfs_spin_lock(&export->exp_lock);
1167         already_disconnected = export->exp_disconnected;
1168         export->exp_disconnected = 1;
1169         cfs_spin_unlock(&export->exp_lock);
1170
1171         /* class_cleanup(), abort_recovery(), and class_fail_export()
1172          * all end up in here, and if any of them race we shouldn't
1173          * call extra class_export_puts(). */
1174         if (already_disconnected) {
1175                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1176                 GOTO(no_disconn, already_disconnected);
1177         }
1178
1179         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1180                export->exp_handle.h_cookie);
1181
1182         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1183                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1184                              &export->exp_connection->c_peer.nid,
1185                              &export->exp_nid_hash);
1186
1187         class_export_recovery_cleanup(export);
1188         class_unlink_export(export);
1189 no_disconn:
1190         class_export_put(export);
1191         RETURN(0);
1192 }
1193 EXPORT_SYMBOL(class_disconnect);
1194
1195 /* Return non-zero for a fully connected export */
1196 int class_connected_export(struct obd_export *exp)
1197 {
1198         if (exp) {
1199                 int connected;
1200                 cfs_spin_lock(&exp->exp_lock);
1201                 connected = (exp->exp_conn_cnt > 0);
1202                 cfs_spin_unlock(&exp->exp_lock);
1203                 return connected;
1204         }
1205         return 0;
1206 }
1207 EXPORT_SYMBOL(class_connected_export);
1208
1209 static void class_disconnect_export_list(cfs_list_t *list,
1210                                          enum obd_option flags)
1211 {
1212         int rc;
1213         struct obd_export *exp;
1214         ENTRY;
1215
1216         /* It's possible that an export may disconnect itself, but
1217          * nothing else will be added to this list. */
1218         while (!cfs_list_empty(list)) {
1219                 exp = cfs_list_entry(list->next, struct obd_export,
1220                                      exp_obd_chain);
1221                 /* need for safe call CDEBUG after obd_disconnect */
1222                 class_export_get(exp);
1223
1224                 cfs_spin_lock(&exp->exp_lock);
1225                 exp->exp_flags = flags;
1226                 cfs_spin_unlock(&exp->exp_lock);
1227
1228                 if (obd_uuid_equals(&exp->exp_client_uuid,
1229                                     &exp->exp_obd->obd_uuid)) {
1230                         CDEBUG(D_HA,
1231                                "exp %p export uuid == obd uuid, don't discon\n",
1232                                exp);
1233                         /* Need to delete this now so we don't end up pointing
1234                          * to work_list later when this export is cleaned up. */
1235                         cfs_list_del_init(&exp->exp_obd_chain);
1236                         class_export_put(exp);
1237                         continue;
1238                 }
1239
1240                 class_export_get(exp);
1241                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1242                        "last request at "CFS_TIME_T"\n",
1243                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1244                        exp, exp->exp_last_request_time);
1245                 /* release one export reference anyway */
1246                 rc = obd_disconnect(exp);
1247
1248                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1249                        obd_export_nid2str(exp), exp, rc);
1250                 class_export_put(exp);
1251         }
1252         EXIT;
1253 }
1254
1255 void class_disconnect_exports(struct obd_device *obd)
1256 {
1257         cfs_list_t work_list;
1258         ENTRY;
1259
1260         /* Move all of the exports from obd_exports to a work list, en masse. */
1261         CFS_INIT_LIST_HEAD(&work_list);
1262         cfs_spin_lock(&obd->obd_dev_lock);
1263         cfs_list_splice_init(&obd->obd_exports, &work_list);
1264         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1265         cfs_spin_unlock(&obd->obd_dev_lock);
1266
1267         if (!cfs_list_empty(&work_list)) {
1268                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1269                        "disconnecting them\n", obd->obd_minor, obd);
1270                 class_disconnect_export_list(&work_list,
1271                                              exp_flags_from_obd(obd));
1272         } else
1273                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1274                        obd->obd_minor, obd);
1275         EXIT;
1276 }
1277 EXPORT_SYMBOL(class_disconnect_exports);
1278
1279 /* Remove exports that have not completed recovery.
1280  */
1281 void class_disconnect_stale_exports(struct obd_device *obd,
1282                                     int (*test_export)(struct obd_export *))
1283 {
1284         cfs_list_t work_list;
1285         struct obd_export *exp, *n;
1286         int evicted = 0;
1287         ENTRY;
1288
1289         CFS_INIT_LIST_HEAD(&work_list);
1290         cfs_spin_lock(&obd->obd_dev_lock);
1291         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1292                                      exp_obd_chain) {
1293                 /* don't count self-export as client */
1294                 if (obd_uuid_equals(&exp->exp_client_uuid,
1295                                     &exp->exp_obd->obd_uuid))
1296                         continue;
1297
1298                 cfs_spin_lock(&exp->exp_lock);
1299                 if (test_export(exp)) {
1300                         cfs_spin_unlock(&exp->exp_lock);
1301                         continue;
1302                 }
1303                 exp->exp_failed = 1;
1304                 cfs_spin_unlock(&exp->exp_lock);
1305
1306                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1307                 evicted++;
1308                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1309                        obd->obd_name, exp->exp_client_uuid.uuid,
1310                        exp->exp_connection == NULL ? "<unknown>" :
1311                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1312                 print_export_data(exp, "EVICTING", 0);
1313         }
1314         cfs_spin_unlock(&obd->obd_dev_lock);
1315
1316         if (evicted) {
1317                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1318                               obd->obd_name, evicted);
1319                 obd->obd_stale_clients += evicted;
1320         }
1321         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1322                                                  OBD_OPT_ABORT_RECOV);
1323         EXIT;
1324 }
1325 EXPORT_SYMBOL(class_disconnect_stale_exports);
1326
1327 void class_fail_export(struct obd_export *exp)
1328 {
1329         int rc, already_failed;
1330
1331         cfs_spin_lock(&exp->exp_lock);
1332         already_failed = exp->exp_failed;
1333         exp->exp_failed = 1;
1334         cfs_spin_unlock(&exp->exp_lock);
1335
1336         if (already_failed) {
1337                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1338                        exp, exp->exp_client_uuid.uuid);
1339                 return;
1340         }
1341
1342         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1343                exp, exp->exp_client_uuid.uuid);
1344
1345         if (obd_dump_on_timeout)
1346                 libcfs_debug_dumplog();
1347
1348         /* need for safe call CDEBUG after obd_disconnect */
1349         class_export_get(exp);
1350
1351         /* Most callers into obd_disconnect are removing their own reference
1352          * (request, for example) in addition to the one from the hash table.
1353          * We don't have such a reference here, so make one. */
1354         class_export_get(exp);
1355         rc = obd_disconnect(exp);
1356         if (rc)
1357                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1358         else
1359                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1360                        exp, exp->exp_client_uuid.uuid);
1361         class_export_put(exp);
1362 }
1363 EXPORT_SYMBOL(class_fail_export);
1364
1365 char *obd_export_nid2str(struct obd_export *exp)
1366 {
1367         if (exp->exp_connection != NULL)
1368                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1369
1370         return "(no nid)";
1371 }
1372 EXPORT_SYMBOL(obd_export_nid2str);
1373
1374 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1375 {
1376         struct obd_export *doomed_exp = NULL;
1377         int exports_evicted = 0;
1378
1379         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1380
1381         do {
1382                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1383                 if (doomed_exp == NULL)
1384                         break;
1385
1386                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1387                          "nid %s found, wanted nid %s, requested nid %s\n",
1388                          obd_export_nid2str(doomed_exp),
1389                          libcfs_nid2str(nid_key), nid);
1390                 LASSERTF(doomed_exp != obd->obd_self_export,
1391                          "self-export is hashed by NID?\n");
1392                 exports_evicted++;
1393                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1394                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1395                        exports_evicted);
1396                 class_fail_export(doomed_exp);
1397                 class_export_put(doomed_exp);
1398         } while (1);
1399
1400         if (!exports_evicted)
1401                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1402                        obd->obd_name, nid);
1403         return exports_evicted;
1404 }
1405 EXPORT_SYMBOL(obd_export_evict_by_nid);
1406
1407 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1408 {
1409         struct obd_export *doomed_exp = NULL;
1410         struct obd_uuid doomed_uuid;
1411         int exports_evicted = 0;
1412
1413         obd_str2uuid(&doomed_uuid, uuid);
1414         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1415                 CERROR("%s: can't evict myself\n", obd->obd_name);
1416                 return exports_evicted;
1417         }
1418
1419         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1420
1421         if (doomed_exp == NULL) {
1422                 CERROR("%s: can't disconnect %s: no exports found\n",
1423                        obd->obd_name, uuid);
1424         } else {
1425                 CWARN("%s: evicting %s at adminstrative request\n",
1426                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1427                 class_fail_export(doomed_exp);
1428                 class_export_put(doomed_exp);
1429                 exports_evicted++;
1430         }
1431
1432         return exports_evicted;
1433 }
1434 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1435
1436 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1437 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1438 EXPORT_SYMBOL(class_export_dump_hook);
1439 #endif
1440
1441 static void print_export_data(struct obd_export *exp, const char *status,
1442                               int locks)
1443 {
1444         struct ptlrpc_reply_state *rs;
1445         struct ptlrpc_reply_state *first_reply = NULL;
1446         int nreplies = 0;
1447
1448         cfs_spin_lock(&exp->exp_lock);
1449         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1450                                 rs_exp_list) {
1451                 if (nreplies == 0)
1452                         first_reply = rs;
1453                 nreplies++;
1454         }
1455         cfs_spin_unlock(&exp->exp_lock);
1456
1457         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1458                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1459                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1460                cfs_atomic_read(&exp->exp_rpc_count),
1461                cfs_atomic_read(&exp->exp_cb_count),
1462                cfs_atomic_read(&exp->exp_locks_count),
1463                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1464                nreplies, first_reply, nreplies > 3 ? "..." : "",
1465                exp->exp_last_committed);
1466 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1467         if (locks && class_export_dump_hook != NULL)
1468                 class_export_dump_hook(exp);
1469 #endif
1470 }
1471
1472 void dump_exports(struct obd_device *obd, int locks)
1473 {
1474         struct obd_export *exp;
1475
1476         cfs_spin_lock(&obd->obd_dev_lock);
1477         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1478                 print_export_data(exp, "ACTIVE", locks);
1479         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1480                 print_export_data(exp, "UNLINKED", locks);
1481         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1482                 print_export_data(exp, "DELAYED", locks);
1483         cfs_spin_unlock(&obd->obd_dev_lock);
1484         cfs_spin_lock(&obd_zombie_impexp_lock);
1485         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1486                 print_export_data(exp, "ZOMBIE", locks);
1487         cfs_spin_unlock(&obd_zombie_impexp_lock);
1488 }
1489 EXPORT_SYMBOL(dump_exports);
1490
1491 void obd_exports_barrier(struct obd_device *obd)
1492 {
1493         int waited = 2;
1494         LASSERT(cfs_list_empty(&obd->obd_exports));
1495         cfs_spin_lock(&obd->obd_dev_lock);
1496         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1497                 cfs_spin_unlock(&obd->obd_dev_lock);
1498                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1499                                                    cfs_time_seconds(waited));
1500                 if (waited > 5 && IS_PO2(waited)) {
1501                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1502                                       "more than %d seconds. "
1503                                       "The obd refcount = %d. Is it stuck?\n",
1504                                       obd->obd_name, waited,
1505                                       cfs_atomic_read(&obd->obd_refcount));
1506                         dump_exports(obd, 1);
1507                 }
1508                 waited *= 2;
1509                 cfs_spin_lock(&obd->obd_dev_lock);
1510         }
1511         cfs_spin_unlock(&obd->obd_dev_lock);
1512 }
1513 EXPORT_SYMBOL(obd_exports_barrier);
1514
1515 /* Total amount of zombies to be destroyed */
1516 static int zombies_count = 0;
1517
1518 /**
1519  * kill zombie imports and exports
1520  */
1521 void obd_zombie_impexp_cull(void)
1522 {
1523         struct obd_import *import;
1524         struct obd_export *export;
1525         ENTRY;
1526
1527         do {
1528                 cfs_spin_lock(&obd_zombie_impexp_lock);
1529
1530                 import = NULL;
1531                 if (!cfs_list_empty(&obd_zombie_imports)) {
1532                         import = cfs_list_entry(obd_zombie_imports.next,
1533                                                 struct obd_import,
1534                                                 imp_zombie_chain);
1535                         cfs_list_del_init(&import->imp_zombie_chain);
1536                 }
1537
1538                 export = NULL;
1539                 if (!cfs_list_empty(&obd_zombie_exports)) {
1540                         export = cfs_list_entry(obd_zombie_exports.next,
1541                                                 struct obd_export,
1542                                                 exp_obd_chain);
1543                         cfs_list_del_init(&export->exp_obd_chain);
1544                 }
1545
1546                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1547
1548                 if (import != NULL) {
1549                         class_import_destroy(import);
1550                         cfs_spin_lock(&obd_zombie_impexp_lock);
1551                         zombies_count--;
1552                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1553                 }
1554
1555                 if (export != NULL) {
1556                         class_export_destroy(export);
1557                         cfs_spin_lock(&obd_zombie_impexp_lock);
1558                         zombies_count--;
1559                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1560                 }
1561
1562                 cfs_cond_resched();
1563         } while (import != NULL || export != NULL);
1564         EXIT;
1565 }
1566
1567 static cfs_completion_t         obd_zombie_start;
1568 static cfs_completion_t         obd_zombie_stop;
1569 static unsigned long            obd_zombie_flags;
1570 static cfs_waitq_t              obd_zombie_waitq;
1571 static pid_t                    obd_zombie_pid;
1572
1573 enum {
1574         OBD_ZOMBIE_STOP   = 1 << 1
1575 };
1576
1577 /**
1578  * check for work for kill zombie import/export thread.
1579  */
1580 static int obd_zombie_impexp_check(void *arg)
1581 {
1582         int rc;
1583
1584         cfs_spin_lock(&obd_zombie_impexp_lock);
1585         rc = (zombies_count == 0) &&
1586              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1587         cfs_spin_unlock(&obd_zombie_impexp_lock);
1588
1589         RETURN(rc);
1590 }
1591
1592 /**
1593  * Add export to the obd_zombe thread and notify it.
1594  */
1595 static void obd_zombie_export_add(struct obd_export *exp) {
1596         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1597         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1598         cfs_list_del_init(&exp->exp_obd_chain);
1599         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1600         cfs_spin_lock(&obd_zombie_impexp_lock);
1601         zombies_count++;
1602         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1603         cfs_spin_unlock(&obd_zombie_impexp_lock);
1604
1605         obd_zombie_impexp_notify();
1606 }
1607
1608 /**
1609  * Add import to the obd_zombe thread and notify it.
1610  */
1611 static void obd_zombie_import_add(struct obd_import *imp) {
1612         LASSERT(imp->imp_sec == NULL);
1613         LASSERT(imp->imp_rq_pool == NULL);
1614         cfs_spin_lock(&obd_zombie_impexp_lock);
1615         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1616         zombies_count++;
1617         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1618         cfs_spin_unlock(&obd_zombie_impexp_lock);
1619
1620         obd_zombie_impexp_notify();
1621 }
1622
1623 /**
1624  * notify import/export destroy thread about new zombie.
1625  */
1626 static void obd_zombie_impexp_notify(void)
1627 {
1628         /*
1629          * Make sure obd_zomebie_impexp_thread get this notification.
1630          * It is possible this signal only get by obd_zombie_barrier, and
1631          * barrier gulps this notification and sleeps away and hangs ensues
1632          */
1633         cfs_waitq_broadcast(&obd_zombie_waitq);
1634 }
1635
1636 /**
1637  * check whether obd_zombie is idle
1638  */
1639 static int obd_zombie_is_idle(void)
1640 {
1641         int rc;
1642
1643         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1644         cfs_spin_lock(&obd_zombie_impexp_lock);
1645         rc = (zombies_count == 0);
1646         cfs_spin_unlock(&obd_zombie_impexp_lock);
1647         return rc;
1648 }
1649
1650 /**
1651  * wait when obd_zombie import/export queues become empty
1652  */
1653 void obd_zombie_barrier(void)
1654 {
1655         struct l_wait_info lwi = { 0 };
1656
1657         if (obd_zombie_pid == cfs_curproc_pid())
1658                 /* don't wait for myself */
1659                 return;
1660         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1661 }
1662 EXPORT_SYMBOL(obd_zombie_barrier);
1663
1664 #ifdef __KERNEL__
1665
1666 /**
1667  * destroy zombie export/import thread.
1668  */
1669 static int obd_zombie_impexp_thread(void *unused)
1670 {
1671         int rc;
1672
1673         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1674                 cfs_complete(&obd_zombie_start);
1675                 RETURN(rc);
1676         }
1677
1678         cfs_complete(&obd_zombie_start);
1679
1680         obd_zombie_pid = cfs_curproc_pid();
1681
1682         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1683                 struct l_wait_info lwi = { 0 };
1684
1685                 l_wait_event(obd_zombie_waitq,
1686                              !obd_zombie_impexp_check(NULL), &lwi);
1687                 obd_zombie_impexp_cull();
1688
1689                 /*
1690                  * Notify obd_zombie_barrier callers that queues
1691                  * may be empty.
1692                  */
1693                 cfs_waitq_signal(&obd_zombie_waitq);
1694         }
1695
1696         cfs_complete(&obd_zombie_stop);
1697
1698         RETURN(0);
1699 }
1700
1701 #else /* ! KERNEL */
1702
1703 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1704 static void *obd_zombie_impexp_work_cb;
1705 static void *obd_zombie_impexp_idle_cb;
1706
1707 int obd_zombie_impexp_kill(void *arg)
1708 {
1709         int rc = 0;
1710
1711         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1712                 obd_zombie_impexp_cull();
1713                 rc = 1;
1714         }
1715         cfs_atomic_dec(&zombie_recur);
1716         return rc;
1717 }
1718
1719 #endif
1720
1721 /**
1722  * start destroy zombie import/export thread
1723  */
1724 int obd_zombie_impexp_init(void)
1725 {
1726         int rc;
1727
1728         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1729         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1730         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1731         cfs_init_completion(&obd_zombie_start);
1732         cfs_init_completion(&obd_zombie_stop);
1733         cfs_waitq_init(&obd_zombie_waitq);
1734         obd_zombie_pid = 0;
1735
1736 #ifdef __KERNEL__
1737         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1738         if (rc < 0)
1739                 RETURN(rc);
1740
1741         cfs_wait_for_completion(&obd_zombie_start);
1742 #else
1743
1744         obd_zombie_impexp_work_cb =
1745                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1746                                                  &obd_zombie_impexp_kill, NULL);
1747
1748         obd_zombie_impexp_idle_cb =
1749                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1750                                                  &obd_zombie_impexp_check, NULL);
1751         rc = 0;
1752 #endif
1753         RETURN(rc);
1754 }
1755 /**
1756  * stop destroy zombie import/export thread
1757  */
1758 void obd_zombie_impexp_stop(void)
1759 {
1760         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1761         obd_zombie_impexp_notify();
1762 #ifdef __KERNEL__
1763         cfs_wait_for_completion(&obd_zombie_stop);
1764 #else
1765         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1766         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1767 #endif
1768 }
1769
1770 /***** Kernel-userspace comm helpers *******/
1771
1772 /* Get length of entire message, including header */
1773 int kuc_len(int payload_len)
1774 {
1775         return sizeof(struct kuc_hdr) + payload_len;
1776 }
1777 EXPORT_SYMBOL(kuc_len);
1778
1779 /* Get a pointer to kuc header, given a ptr to the payload
1780  * @param p Pointer to payload area
1781  * @returns Pointer to kuc header
1782  */
1783 struct kuc_hdr * kuc_ptr(void *p)
1784 {
1785         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1786         LASSERT(lh->kuc_magic == KUC_MAGIC);
1787         return lh;
1788 }
1789 EXPORT_SYMBOL(kuc_ptr);
1790
1791 /* Test if payload is part of kuc message
1792  * @param p Pointer to payload area
1793  * @returns boolean
1794  */
1795 int kuc_ispayload(void *p)
1796 {
1797         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1798
1799         if (kh->kuc_magic == KUC_MAGIC)
1800                 return 1;
1801         else
1802                 return 0;
1803 }
1804 EXPORT_SYMBOL(kuc_ispayload);
1805
1806 /* Alloc space for a message, and fill in header
1807  * @return Pointer to payload area
1808  */
1809 void *kuc_alloc(int payload_len, int transport, int type)
1810 {
1811         struct kuc_hdr *lh;
1812         int len = kuc_len(payload_len);
1813
1814         OBD_ALLOC(lh, len);
1815         if (lh == NULL)
1816                 return ERR_PTR(-ENOMEM);
1817
1818         lh->kuc_magic = KUC_MAGIC;
1819         lh->kuc_transport = transport;
1820         lh->kuc_msgtype = type;
1821         lh->kuc_msglen = len;
1822
1823         return (void *)(lh + 1);
1824 }
1825 EXPORT_SYMBOL(kuc_alloc);
1826
1827 /* Takes pointer to payload area */
1828 inline void kuc_free(void *p, int payload_len)
1829 {
1830         struct kuc_hdr *lh = kuc_ptr(p);
1831         OBD_FREE(lh, kuc_len(payload_len));
1832 }
1833 EXPORT_SYMBOL(kuc_free);
1834
1835
1836