Whamcloud - gitweb
081b55e3d748caf2f92498a38db1e57ecae56959
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 cfs_spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         cfs_list_t *tmp;
101         struct obd_type *type;
102
103         cfs_spin_lock(&obd_types_lock);
104         cfs_list_for_each(tmp, &obd_types) {
105                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         cfs_spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         cfs_spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114
115 struct obd_type *class_get_type(const char *name)
116 {
117         struct obd_type *type = class_search_type(name);
118
119 #ifdef HAVE_MODULE_LOADING_SUPPORT
120         if (!type) {
121                 const char *modname = name;
122                 if (!cfs_request_module("%s", modname)) {
123                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124                         type = class_search_type(name);
125                 } else {
126                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
127                                            modname);
128                 }
129         }
130 #endif
131         if (type) {
132                 cfs_spin_lock(&type->obd_type_lock);
133                 type->typ_refcnt++;
134                 cfs_try_module_get(type->typ_dt_ops->o_owner);
135                 cfs_spin_unlock(&type->obd_type_lock);
136         }
137         return type;
138 }
139 EXPORT_SYMBOL(class_get_type);
140
141 void class_put_type(struct obd_type *type)
142 {
143         LASSERT(type);
144         cfs_spin_lock(&type->obd_type_lock);
145         type->typ_refcnt--;
146         cfs_module_put(type->typ_dt_ops->o_owner);
147         cfs_spin_unlock(&type->obd_type_lock);
148 }
149 EXPORT_SYMBOL(class_put_type);
150
151 #define CLASS_MAX_NAME 1024
152
153 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
154                         struct lprocfs_vars *vars, const char *name,
155                         struct lu_device_type *ldt)
156 {
157         struct obd_type *type;
158         int rc = 0;
159         ENTRY;
160
161         /* sanity check */
162         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
163
164         if (class_search_type(name)) {
165                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166                 RETURN(-EEXIST);
167         }
168
169         rc = -ENOMEM;
170         OBD_ALLOC(type, sizeof(*type));
171         if (type == NULL)
172                 RETURN(rc);
173
174         OBD_ALLOC_PTR(type->typ_dt_ops);
175         OBD_ALLOC_PTR(type->typ_md_ops);
176         OBD_ALLOC(type->typ_name, strlen(name) + 1);
177
178         if (type->typ_dt_ops == NULL ||
179             type->typ_md_ops == NULL ||
180             type->typ_name == NULL)
181                 GOTO (failed, rc);
182
183         *(type->typ_dt_ops) = *dt_ops;
184         /* md_ops is optional */
185         if (md_ops)
186                 *(type->typ_md_ops) = *md_ops;
187         strcpy(type->typ_name, name);
188         cfs_spin_lock_init(&type->obd_type_lock);
189
190 #ifdef LPROCFS
191         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
192                                               vars, type);
193         if (IS_ERR(type->typ_procroot)) {
194                 rc = PTR_ERR(type->typ_procroot);
195                 type->typ_procroot = NULL;
196                 GOTO (failed, rc);
197         }
198 #endif
199         if (ldt != NULL) {
200                 type->typ_lu = ldt;
201                 rc = lu_device_type_init(ldt);
202                 if (rc != 0)
203                         GOTO (failed, rc);
204         }
205
206         cfs_spin_lock(&obd_types_lock);
207         cfs_list_add(&type->typ_chain, &obd_types);
208         cfs_spin_unlock(&obd_types_lock);
209
210         RETURN (0);
211
212  failed:
213         if (type->typ_name != NULL)
214                 OBD_FREE(type->typ_name, strlen(name) + 1);
215         if (type->typ_md_ops != NULL)
216                 OBD_FREE_PTR(type->typ_md_ops);
217         if (type->typ_dt_ops != NULL)
218                 OBD_FREE_PTR(type->typ_dt_ops);
219         OBD_FREE(type, sizeof(*type));
220         RETURN(rc);
221 }
222 EXPORT_SYMBOL(class_register_type);
223
224 int class_unregister_type(const char *name)
225 {
226         struct obd_type *type = class_search_type(name);
227         ENTRY;
228
229         if (!type) {
230                 CERROR("unknown obd type\n");
231                 RETURN(-EINVAL);
232         }
233
234         if (type->typ_refcnt) {
235                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
236                 /* This is a bad situation, let's make the best of it */
237                 /* Remove ops, but leave the name for debugging */
238                 OBD_FREE_PTR(type->typ_dt_ops);
239                 OBD_FREE_PTR(type->typ_md_ops);
240                 RETURN(-EBUSY);
241         }
242
243         if (type->typ_procroot) {
244                 lprocfs_remove(&type->typ_procroot);
245         }
246
247         if (type->typ_lu)
248                 lu_device_type_fini(type->typ_lu);
249
250         cfs_spin_lock(&obd_types_lock);
251         cfs_list_del(&type->typ_chain);
252         cfs_spin_unlock(&obd_types_lock);
253         OBD_FREE(type->typ_name, strlen(name) + 1);
254         if (type->typ_dt_ops != NULL)
255                 OBD_FREE_PTR(type->typ_dt_ops);
256         if (type->typ_md_ops != NULL)
257                 OBD_FREE_PTR(type->typ_md_ops);
258         OBD_FREE(type, sizeof(*type));
259         RETURN(0);
260 } /* class_unregister_type */
261 EXPORT_SYMBOL(class_unregister_type);
262
263 /**
264  * Create a new obd device.
265  *
266  * Find an empty slot in ::obd_devs[], create a new obd device in it.
267  *
268  * \param[in] type_name obd device type string.
269  * \param[in] name      obd device name.
270  *
271  * \retval NULL if create fails, otherwise return the obd device
272  *         pointer created.
273  */
274 struct obd_device *class_newdev(const char *type_name, const char *name)
275 {
276         struct obd_device *result = NULL;
277         struct obd_device *newdev;
278         struct obd_type *type = NULL;
279         int i;
280         int new_obd_minor = 0;
281         ENTRY;
282
283         if (strlen(name) >= MAX_OBD_NAME) {
284                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
285                 RETURN(ERR_PTR(-EINVAL));
286         }
287
288         type = class_get_type(type_name);
289         if (type == NULL){
290                 CERROR("OBD: unknown type: %s\n", type_name);
291                 RETURN(ERR_PTR(-ENODEV));
292         }
293
294         newdev = obd_device_alloc();
295         if (newdev == NULL) {
296                 class_put_type(type);
297                 RETURN(ERR_PTR(-ENOMEM));
298         }
299         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
300
301         cfs_write_lock(&obd_dev_lock);
302         for (i = 0; i < class_devno_max(); i++) {
303                 struct obd_device *obd = class_num2obd(i);
304
305                 if (obd && obd->obd_name &&
306                     (strcmp(name, obd->obd_name) == 0)) {
307                         CERROR("Device %s already exists at %d, won't add\n",
308                                name, i);
309                         if (result) {
310                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
311                                          "%p obd_magic %08x != %08x\n", result,
312                                          result->obd_magic, OBD_DEVICE_MAGIC);
313                                 LASSERTF(result->obd_minor == new_obd_minor,
314                                          "%p obd_minor %d != %d\n", result,
315                                          result->obd_minor, new_obd_minor);
316
317                                 obd_devs[result->obd_minor] = NULL;
318                                 result->obd_name[0]='\0';
319                          }
320                         result = ERR_PTR(-EEXIST);
321                         break;
322                 }
323                 if (!result && !obd) {
324                         result = newdev;
325                         result->obd_minor = i;
326                         new_obd_minor = i;
327                         result->obd_type = type;
328                         strncpy(result->obd_name, name,
329                                 sizeof(result->obd_name) - 1);
330                         obd_devs[i] = result;
331                 }
332         }
333         cfs_write_unlock(&obd_dev_lock);
334
335         if (result == NULL && i >= class_devno_max()) {
336                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
337                        class_devno_max());
338                 RETURN(ERR_PTR(-EOVERFLOW));
339         }
340
341         if (IS_ERR(result)) {
342                 obd_device_free(newdev);
343                 class_put_type(type);
344         } else {
345                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
346                        result->obd_name, result);
347         }
348         RETURN(result);
349 }
350
351 void class_release_dev(struct obd_device *obd)
352 {
353         struct obd_type *obd_type = obd->obd_type;
354
355         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
356                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
357         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
358                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
359         LASSERT(obd_type != NULL);
360
361         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
362                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
363
364         cfs_write_lock(&obd_dev_lock);
365         obd_devs[obd->obd_minor] = NULL;
366         cfs_write_unlock(&obd_dev_lock);
367         obd_device_free(obd);
368
369         class_put_type(obd_type);
370 }
371
372 int class_name2dev(const char *name)
373 {
374         int i;
375
376         if (!name)
377                 return -1;
378
379         cfs_read_lock(&obd_dev_lock);
380         for (i = 0; i < class_devno_max(); i++) {
381                 struct obd_device *obd = class_num2obd(i);
382
383                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
384                         /* Make sure we finished attaching before we give
385                            out any references */
386                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
387                         if (obd->obd_attached) {
388                                 cfs_read_unlock(&obd_dev_lock);
389                                 return i;
390                         }
391                         break;
392                 }
393         }
394         cfs_read_unlock(&obd_dev_lock);
395
396         return -1;
397 }
398 EXPORT_SYMBOL(class_name2dev);
399
400 struct obd_device *class_name2obd(const char *name)
401 {
402         int dev = class_name2dev(name);
403
404         if (dev < 0 || dev > class_devno_max())
405                 return NULL;
406         return class_num2obd(dev);
407 }
408 EXPORT_SYMBOL(class_name2obd);
409
410 int class_uuid2dev(struct obd_uuid *uuid)
411 {
412         int i;
413
414         cfs_read_lock(&obd_dev_lock);
415         for (i = 0; i < class_devno_max(); i++) {
416                 struct obd_device *obd = class_num2obd(i);
417
418                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
419                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
420                         cfs_read_unlock(&obd_dev_lock);
421                         return i;
422                 }
423         }
424         cfs_read_unlock(&obd_dev_lock);
425
426         return -1;
427 }
428 EXPORT_SYMBOL(class_uuid2dev);
429
430 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
431 {
432         int dev = class_uuid2dev(uuid);
433         if (dev < 0)
434                 return NULL;
435         return class_num2obd(dev);
436 }
437 EXPORT_SYMBOL(class_uuid2obd);
438
439 /**
440  * Get obd device from ::obd_devs[]
441  *
442  * \param num [in] array index
443  *
444  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
445  *         otherwise return the obd device there.
446  */
447 struct obd_device *class_num2obd(int num)
448 {
449         struct obd_device *obd = NULL;
450
451         if (num < class_devno_max()) {
452                 obd = obd_devs[num];
453                 if (obd == NULL)
454                         return NULL;
455
456                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
457                          "%p obd_magic %08x != %08x\n",
458                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
459                 LASSERTF(obd->obd_minor == num,
460                          "%p obd_minor %0d != %0d\n",
461                          obd, obd->obd_minor, num);
462         }
463
464         return obd;
465 }
466 EXPORT_SYMBOL(class_num2obd);
467
468 void class_obd_list(void)
469 {
470         char *status;
471         int i;
472
473         cfs_read_lock(&obd_dev_lock);
474         for (i = 0; i < class_devno_max(); i++) {
475                 struct obd_device *obd = class_num2obd(i);
476
477                 if (obd == NULL)
478                         continue;
479                 if (obd->obd_stopping)
480                         status = "ST";
481                 else if (obd->obd_set_up)
482                         status = "UP";
483                 else if (obd->obd_attached)
484                         status = "AT";
485                 else
486                         status = "--";
487                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
488                          i, status, obd->obd_type->typ_name,
489                          obd->obd_name, obd->obd_uuid.uuid,
490                          cfs_atomic_read(&obd->obd_refcount));
491         }
492         cfs_read_unlock(&obd_dev_lock);
493         return;
494 }
495
496 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
497    specified, then only the client with that uuid is returned,
498    otherwise any client connected to the tgt is returned. */
499 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
500                                           const char * typ_name,
501                                           struct obd_uuid *grp_uuid)
502 {
503         int i;
504
505         cfs_read_lock(&obd_dev_lock);
506         for (i = 0; i < class_devno_max(); i++) {
507                 struct obd_device *obd = class_num2obd(i);
508
509                 if (obd == NULL)
510                         continue;
511                 if ((strncmp(obd->obd_type->typ_name, typ_name,
512                              strlen(typ_name)) == 0)) {
513                         if (obd_uuid_equals(tgt_uuid,
514                                             &obd->u.cli.cl_target_uuid) &&
515                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
516                                                          &obd->obd_uuid) : 1)) {
517                                 cfs_read_unlock(&obd_dev_lock);
518                                 return obd;
519                         }
520                 }
521         }
522         cfs_read_unlock(&obd_dev_lock);
523
524         return NULL;
525 }
526 EXPORT_SYMBOL(class_find_client_obd);
527
528 /* Iterate the obd_device list looking devices have grp_uuid. Start
529    searching at *next, and if a device is found, the next index to look
530    at is saved in *next. If next is NULL, then the first matching device
531    will always be returned. */
532 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
533 {
534         int i;
535
536         if (next == NULL)
537                 i = 0;
538         else if (*next >= 0 && *next < class_devno_max())
539                 i = *next;
540         else
541                 return NULL;
542
543         cfs_read_lock(&obd_dev_lock);
544         for (; i < class_devno_max(); i++) {
545                 struct obd_device *obd = class_num2obd(i);
546
547                 if (obd == NULL)
548                         continue;
549                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
550                         if (next != NULL)
551                                 *next = i+1;
552                         cfs_read_unlock(&obd_dev_lock);
553                         return obd;
554                 }
555         }
556         cfs_read_unlock(&obd_dev_lock);
557
558         return NULL;
559 }
560 EXPORT_SYMBOL(class_devices_in_group);
561
562 /**
563  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
564  * adjust sptlrpc settings accordingly.
565  */
566 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
567 {
568         struct obd_device  *obd;
569         const char         *type;
570         int                 i, rc = 0, rc2;
571
572         LASSERT(namelen > 0);
573
574         cfs_read_lock(&obd_dev_lock);
575         for (i = 0; i < class_devno_max(); i++) {
576                 obd = class_num2obd(i);
577
578                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
579                         continue;
580
581                 /* only notify mdc, osc, mdt, ost */
582                 type = obd->obd_type->typ_name;
583                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
584                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
585                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
586                     strcmp(type, LUSTRE_OST_NAME) != 0)
587                         continue;
588
589                 if (strncmp(obd->obd_name, fsname, namelen))
590                         continue;
591
592                 class_incref(obd, __FUNCTION__, obd);
593                 cfs_read_unlock(&obd_dev_lock);
594                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
595                                          sizeof(KEY_SPTLRPC_CONF),
596                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
597                 rc = rc ? rc : rc2;
598                 class_decref(obd, __FUNCTION__, obd);
599                 cfs_read_lock(&obd_dev_lock);
600         }
601         cfs_read_unlock(&obd_dev_lock);
602         return rc;
603 }
604 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
605
606 void obd_cleanup_caches(void)
607 {
608         int rc;
609
610         ENTRY;
611         if (obd_device_cachep) {
612                 rc = cfs_mem_cache_destroy(obd_device_cachep);
613                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
614                 obd_device_cachep = NULL;
615         }
616         if (obdo_cachep) {
617                 rc = cfs_mem_cache_destroy(obdo_cachep);
618                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
619                 obdo_cachep = NULL;
620         }
621         if (import_cachep) {
622                 rc = cfs_mem_cache_destroy(import_cachep);
623                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
624                 import_cachep = NULL;
625         }
626         if (capa_cachep) {
627                 rc = cfs_mem_cache_destroy(capa_cachep);
628                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
629                 capa_cachep = NULL;
630         }
631         EXIT;
632 }
633
634 int obd_init_caches(void)
635 {
636         ENTRY;
637
638         LASSERT(obd_device_cachep == NULL);
639         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
640                                                  sizeof(struct obd_device),
641                                                  0, 0);
642         if (!obd_device_cachep)
643                 GOTO(out, -ENOMEM);
644
645         LASSERT(obdo_cachep == NULL);
646         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
647                                            0, 0);
648         if (!obdo_cachep)
649                 GOTO(out, -ENOMEM);
650
651         LASSERT(import_cachep == NULL);
652         import_cachep = cfs_mem_cache_create("ll_import_cache",
653                                              sizeof(struct obd_import),
654                                              0, 0);
655         if (!import_cachep)
656                 GOTO(out, -ENOMEM);
657
658         LASSERT(capa_cachep == NULL);
659         capa_cachep = cfs_mem_cache_create("capa_cache",
660                                            sizeof(struct obd_capa), 0, 0);
661         if (!capa_cachep)
662                 GOTO(out, -ENOMEM);
663
664         RETURN(0);
665  out:
666         obd_cleanup_caches();
667         RETURN(-ENOMEM);
668
669 }
670
671 /* map connection to client */
672 struct obd_export *class_conn2export(struct lustre_handle *conn)
673 {
674         struct obd_export *export;
675         ENTRY;
676
677         if (!conn) {
678                 CDEBUG(D_CACHE, "looking for null handle\n");
679                 RETURN(NULL);
680         }
681
682         if (conn->cookie == -1) {  /* this means assign a new connection */
683                 CDEBUG(D_CACHE, "want a new connection\n");
684                 RETURN(NULL);
685         }
686
687         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
688         export = class_handle2object(conn->cookie);
689         RETURN(export);
690 }
691 EXPORT_SYMBOL(class_conn2export);
692
693 struct obd_device *class_exp2obd(struct obd_export *exp)
694 {
695         if (exp)
696                 return exp->exp_obd;
697         return NULL;
698 }
699 EXPORT_SYMBOL(class_exp2obd);
700
701 struct obd_device *class_conn2obd(struct lustre_handle *conn)
702 {
703         struct obd_export *export;
704         export = class_conn2export(conn);
705         if (export) {
706                 struct obd_device *obd = export->exp_obd;
707                 class_export_put(export);
708                 return obd;
709         }
710         return NULL;
711 }
712 EXPORT_SYMBOL(class_conn2obd);
713
714 struct obd_import *class_exp2cliimp(struct obd_export *exp)
715 {
716         struct obd_device *obd = exp->exp_obd;
717         if (obd == NULL)
718                 return NULL;
719         return obd->u.cli.cl_import;
720 }
721 EXPORT_SYMBOL(class_exp2cliimp);
722
723 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
724 {
725         struct obd_device *obd = class_conn2obd(conn);
726         if (obd == NULL)
727                 return NULL;
728         return obd->u.cli.cl_import;
729 }
730 EXPORT_SYMBOL(class_conn2cliimp);
731
732 /* Export management functions */
733
734 /* if export is involved in recovery then clean up related things */
735 void class_export_recovery_cleanup(struct obd_export *exp)
736 {
737         struct obd_device *obd = exp->exp_obd;
738
739         cfs_spin_lock(&obd->obd_recovery_task_lock);
740         if (exp->exp_delayed)
741                 obd->obd_delayed_clients--;
742         if (obd->obd_recovering && exp->exp_in_recovery) {
743                 cfs_spin_lock(&exp->exp_lock);
744                 exp->exp_in_recovery = 0;
745                 cfs_spin_unlock(&exp->exp_lock);
746                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
747                 cfs_atomic_dec(&obd->obd_connected_clients);
748         }
749         cfs_spin_unlock(&obd->obd_recovery_task_lock);
750         /** Cleanup req replay fields */
751         if (exp->exp_req_replay_needed) {
752                 cfs_spin_lock(&exp->exp_lock);
753                 exp->exp_req_replay_needed = 0;
754                 cfs_spin_unlock(&exp->exp_lock);
755                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
756                 cfs_atomic_dec(&obd->obd_req_replay_clients);
757         }
758         /** Cleanup lock replay data */
759         if (exp->exp_lock_replay_needed) {
760                 cfs_spin_lock(&exp->exp_lock);
761                 exp->exp_lock_replay_needed = 0;
762                 cfs_spin_unlock(&exp->exp_lock);
763                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
764                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
765         }
766 }
767
768 static void class_export_destroy(struct obd_export *exp)
769 {
770         struct obd_device *obd = exp->exp_obd;
771         ENTRY;
772
773         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
774         LASSERT(obd != NULL);
775
776         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
777                exp->exp_client_uuid.uuid, obd->obd_name);
778
779
780         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
781         if (exp->exp_connection)
782                 ptlrpc_put_connection_superhack(exp->exp_connection);
783
784         LASSERT(cfs_list_empty(&exp->exp_flock_wait_list));
785         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
786         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
787         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
788         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
789         obd_destroy_export(exp);
790         class_decref(obd, "export", exp);
791
792         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
793         EXIT;
794 }
795
796 static void export_handle_addref(void *export)
797 {
798         class_export_get(export);
799 }
800
801 static struct portals_handle_ops export_handle_ops = {
802         .hop_addref = export_handle_addref,
803         .hop_free   = NULL,
804 };
805
806 struct obd_export *class_export_get(struct obd_export *exp)
807 {
808         cfs_atomic_inc(&exp->exp_refcount);
809         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
810                cfs_atomic_read(&exp->exp_refcount));
811         return exp;
812 }
813 EXPORT_SYMBOL(class_export_get);
814
815 void class_export_put(struct obd_export *exp)
816 {
817         LASSERT(exp != NULL);
818         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
819         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
820                cfs_atomic_read(&exp->exp_refcount) - 1);
821
822         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
823                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
824                 CDEBUG(D_IOCTL, "final put %p/%s\n",
825                        exp, exp->exp_client_uuid.uuid);
826
827                 /* release nid stat refererence */
828                 lprocfs_exp_cleanup(exp);
829                 class_export_recovery_cleanup(exp);
830
831                 obd_zombie_export_add(exp);
832         }
833 }
834 EXPORT_SYMBOL(class_export_put);
835
836 /* Creates a new export, adds it to the hash table, and returns a
837  * pointer to it. The refcount is 2: one for the hash reference, and
838  * one for the pointer returned by this function. */
839 struct obd_export *class_new_export(struct obd_device *obd,
840                                     struct obd_uuid *cluuid)
841 {
842         struct obd_export *export;
843         cfs_hash_t *hash = NULL;
844         int rc = 0;
845         ENTRY;
846
847         OBD_ALLOC_PTR(export);
848         if (!export)
849                 return ERR_PTR(-ENOMEM);
850
851         export->exp_conn_cnt = 0;
852         export->exp_lock_hash = NULL;
853         cfs_atomic_set(&export->exp_refcount, 2);
854         cfs_atomic_set(&export->exp_rpc_count, 0);
855         cfs_atomic_set(&export->exp_cb_count, 0);
856         cfs_atomic_set(&export->exp_locks_count, 0);
857 #if LUSTRE_TRACKS_LOCK_EXP_REFS
858         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
859         cfs_spin_lock_init(&export->exp_locks_list_guard);
860 #endif
861         cfs_atomic_set(&export->exp_replay_count, 0);
862         export->exp_obd = obd;
863         CFS_INIT_LIST_HEAD(&export->exp_flock_wait_list);
864         cfs_rwlock_init(&export->exp_flock_wait_lock);
865         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
866         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
867         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
868         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
869         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
870         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
871         class_handle_hash(&export->exp_handle, &export_handle_ops);
872         export->exp_last_request_time = cfs_time_current_sec();
873         cfs_spin_lock_init(&export->exp_lock);
874         cfs_spin_lock_init(&export->exp_rpc_lock);
875         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
876         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
877         cfs_spin_lock_init(&export->exp_bl_list_lock);
878         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
879
880         export->exp_sp_peer = LUSTRE_SP_ANY;
881         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
882         export->exp_client_uuid = *cluuid;
883         obd_init_export(export);
884
885         cfs_spin_lock(&obd->obd_dev_lock);
886          /* shouldn't happen, but might race */
887         if (obd->obd_stopping)
888                 GOTO(exit_unlock, rc = -ENODEV);
889
890         hash = cfs_hash_getref(obd->obd_uuid_hash);
891         if (hash == NULL)
892                 GOTO(exit_unlock, rc = -ENODEV);
893         cfs_spin_unlock(&obd->obd_dev_lock);
894
895         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
896                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
897                 if (rc != 0) {
898                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
899                                       obd->obd_name, cluuid->uuid, rc);
900                         GOTO(exit_err, rc = -EALREADY);
901                 }
902         }
903
904         cfs_spin_lock(&obd->obd_dev_lock);
905         if (obd->obd_stopping) {
906                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
907                 GOTO(exit_unlock, rc = -ENODEV);
908         }
909
910         class_incref(obd, "export", export);
911         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
912         cfs_list_add_tail(&export->exp_obd_chain_timed,
913                           &export->exp_obd->obd_exports_timed);
914         export->exp_obd->obd_num_exports++;
915         cfs_spin_unlock(&obd->obd_dev_lock);
916         cfs_hash_putref(hash);
917         RETURN(export);
918
919 exit_unlock:
920         cfs_spin_unlock(&obd->obd_dev_lock);
921 exit_err:
922         if (hash)
923                 cfs_hash_putref(hash);
924         class_handle_unhash(&export->exp_handle);
925         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
926         obd_destroy_export(export);
927         OBD_FREE_PTR(export);
928         return ERR_PTR(rc);
929 }
930 EXPORT_SYMBOL(class_new_export);
931
932 void class_unlink_export(struct obd_export *exp)
933 {
934         class_handle_unhash(&exp->exp_handle);
935
936         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
937         /* delete an uuid-export hashitem from hashtables */
938         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
939                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
940                              &exp->exp_client_uuid,
941                              &exp->exp_uuid_hash);
942
943         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
944         cfs_list_del_init(&exp->exp_obd_chain_timed);
945         exp->exp_obd->obd_num_exports--;
946         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
947         class_export_put(exp);
948 }
949 EXPORT_SYMBOL(class_unlink_export);
950
951 /* Import management functions */
952 void class_import_destroy(struct obd_import *imp)
953 {
954         ENTRY;
955
956         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
957                 imp->imp_obd->obd_name);
958
959         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
960
961         ptlrpc_put_connection_superhack(imp->imp_connection);
962
963         while (!cfs_list_empty(&imp->imp_conn_list)) {
964                 struct obd_import_conn *imp_conn;
965
966                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
967                                           struct obd_import_conn, oic_item);
968                 cfs_list_del_init(&imp_conn->oic_item);
969                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
970                 OBD_FREE(imp_conn, sizeof(*imp_conn));
971         }
972
973         LASSERT(imp->imp_sec == NULL);
974         class_decref(imp->imp_obd, "import", imp);
975         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
976         EXIT;
977 }
978
979 static void import_handle_addref(void *import)
980 {
981         class_import_get(import);
982 }
983
984 static struct portals_handle_ops import_handle_ops = {
985         .hop_addref = import_handle_addref,
986         .hop_free   = NULL,
987 };
988
989 struct obd_import *class_import_get(struct obd_import *import)
990 {
991         cfs_atomic_inc(&import->imp_refcount);
992         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
993                cfs_atomic_read(&import->imp_refcount),
994                import->imp_obd->obd_name);
995         return import;
996 }
997 EXPORT_SYMBOL(class_import_get);
998
999 void class_import_put(struct obd_import *imp)
1000 {
1001         ENTRY;
1002
1003         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1004         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1005
1006         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1007                cfs_atomic_read(&imp->imp_refcount) - 1,
1008                imp->imp_obd->obd_name);
1009
1010         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1011                 CDEBUG(D_INFO, "final put import %p\n", imp);
1012                 obd_zombie_import_add(imp);
1013         }
1014
1015         EXIT;
1016 }
1017 EXPORT_SYMBOL(class_import_put);
1018
1019 static void init_imp_at(struct imp_at *at) {
1020         int i;
1021         at_init(&at->iat_net_latency, 0, 0);
1022         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1023                 /* max service estimates are tracked on the server side, so
1024                    don't use the AT history here, just use the last reported
1025                    val. (But keep hist for proc histogram, worst_ever) */
1026                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1027                         AT_FLG_NOHIST);
1028         }
1029 }
1030
1031 struct obd_import *class_new_import(struct obd_device *obd)
1032 {
1033         struct obd_import *imp;
1034
1035         OBD_ALLOC(imp, sizeof(*imp));
1036         if (imp == NULL)
1037                 return NULL;
1038
1039         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1040         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1041         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1042         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1043         cfs_spin_lock_init(&imp->imp_lock);
1044         imp->imp_last_success_conn = 0;
1045         imp->imp_state = LUSTRE_IMP_NEW;
1046         imp->imp_obd = class_incref(obd, "import", imp);
1047         cfs_mutex_init(&imp->imp_sec_mutex);
1048         cfs_waitq_init(&imp->imp_recovery_waitq);
1049
1050         cfs_atomic_set(&imp->imp_refcount, 2);
1051         cfs_atomic_set(&imp->imp_unregistering, 0);
1052         cfs_atomic_set(&imp->imp_inflight, 0);
1053         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1054         cfs_atomic_set(&imp->imp_inval_count, 0);
1055         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1056         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1057         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1058         init_imp_at(&imp->imp_at);
1059
1060         /* the default magic is V2, will be used in connect RPC, and
1061          * then adjusted according to the flags in request/reply. */
1062         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1063
1064         return imp;
1065 }
1066 EXPORT_SYMBOL(class_new_import);
1067
1068 void class_destroy_import(struct obd_import *import)
1069 {
1070         LASSERT(import != NULL);
1071         LASSERT(import != LP_POISON);
1072
1073         class_handle_unhash(&import->imp_handle);
1074
1075         cfs_spin_lock(&import->imp_lock);
1076         import->imp_generation++;
1077         cfs_spin_unlock(&import->imp_lock);
1078         class_import_put(import);
1079 }
1080 EXPORT_SYMBOL(class_destroy_import);
1081
1082 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1083
1084 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1085 {
1086         cfs_spin_lock(&exp->exp_locks_list_guard);
1087
1088         LASSERT(lock->l_exp_refs_nr >= 0);
1089
1090         if (lock->l_exp_refs_target != NULL &&
1091             lock->l_exp_refs_target != exp) {
1092                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1093                               exp, lock, lock->l_exp_refs_target);
1094         }
1095         if ((lock->l_exp_refs_nr ++) == 0) {
1096                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1097                 lock->l_exp_refs_target = exp;
1098         }
1099         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1100                lock, exp, lock->l_exp_refs_nr);
1101         cfs_spin_unlock(&exp->exp_locks_list_guard);
1102 }
1103 EXPORT_SYMBOL(__class_export_add_lock_ref);
1104
1105 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1106 {
1107         cfs_spin_lock(&exp->exp_locks_list_guard);
1108         LASSERT(lock->l_exp_refs_nr > 0);
1109         if (lock->l_exp_refs_target != exp) {
1110                 LCONSOLE_WARN("lock %p, "
1111                               "mismatching export pointers: %p, %p\n",
1112                               lock, lock->l_exp_refs_target, exp);
1113         }
1114         if (-- lock->l_exp_refs_nr == 0) {
1115                 cfs_list_del_init(&lock->l_exp_refs_link);
1116                 lock->l_exp_refs_target = NULL;
1117         }
1118         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1119                lock, exp, lock->l_exp_refs_nr);
1120         cfs_spin_unlock(&exp->exp_locks_list_guard);
1121 }
1122 EXPORT_SYMBOL(__class_export_del_lock_ref);
1123 #endif
1124
1125 /* A connection defines an export context in which preallocation can
1126    be managed. This releases the export pointer reference, and returns
1127    the export handle, so the export refcount is 1 when this function
1128    returns. */
1129 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1130                   struct obd_uuid *cluuid)
1131 {
1132         struct obd_export *export;
1133         LASSERT(conn != NULL);
1134         LASSERT(obd != NULL);
1135         LASSERT(cluuid != NULL);
1136         ENTRY;
1137
1138         export = class_new_export(obd, cluuid);
1139         if (IS_ERR(export))
1140                 RETURN(PTR_ERR(export));
1141
1142         conn->cookie = export->exp_handle.h_cookie;
1143         class_export_put(export);
1144
1145         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1146                cluuid->uuid, conn->cookie);
1147         RETURN(0);
1148 }
1149 EXPORT_SYMBOL(class_connect);
1150
1151
1152 /* This function removes 1-3 references from the export:
1153  * 1 - for export pointer passed
1154  * and if disconnect really need
1155  * 2 - removing from hash
1156  * 3 - in client_unlink_export
1157  * The export pointer passed to this function can destroyed */
1158 int class_disconnect(struct obd_export *export)
1159 {
1160         int already_disconnected;
1161         ENTRY;
1162
1163         if (export == NULL) {
1164                 CWARN("attempting to free NULL export %p\n", export);
1165                 RETURN(-EINVAL);
1166         }
1167
1168         cfs_spin_lock(&export->exp_lock);
1169         already_disconnected = export->exp_disconnected;
1170         export->exp_disconnected = 1;
1171         cfs_spin_unlock(&export->exp_lock);
1172
1173         /* class_cleanup(), abort_recovery(), and class_fail_export()
1174          * all end up in here, and if any of them race we shouldn't
1175          * call extra class_export_puts(). */
1176         if (already_disconnected) {
1177                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1178                 GOTO(no_disconn, already_disconnected);
1179         }
1180
1181         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1182                export->exp_handle.h_cookie);
1183
1184         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1185                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1186                              &export->exp_connection->c_peer.nid,
1187                              &export->exp_nid_hash);
1188
1189         class_unlink_export(export);
1190 no_disconn:
1191         class_export_put(export);
1192         RETURN(0);
1193 }
1194 EXPORT_SYMBOL(class_disconnect);
1195
1196 /* Return non-zero for a fully connected export */
1197 int class_connected_export(struct obd_export *exp)
1198 {
1199         if (exp) {
1200                 int connected;
1201                 cfs_spin_lock(&exp->exp_lock);
1202                 connected = (exp->exp_conn_cnt > 0);
1203                 cfs_spin_unlock(&exp->exp_lock);
1204                 return connected;
1205         }
1206         return 0;
1207 }
1208 EXPORT_SYMBOL(class_connected_export);
1209
1210 static void class_disconnect_export_list(cfs_list_t *list,
1211                                          enum obd_option flags)
1212 {
1213         int rc;
1214         struct obd_export *exp;
1215         ENTRY;
1216
1217         /* It's possible that an export may disconnect itself, but
1218          * nothing else will be added to this list. */
1219         while (!cfs_list_empty(list)) {
1220                 exp = cfs_list_entry(list->next, struct obd_export,
1221                                      exp_obd_chain);
1222                 /* need for safe call CDEBUG after obd_disconnect */
1223                 class_export_get(exp);
1224
1225                 cfs_spin_lock(&exp->exp_lock);
1226                 exp->exp_flags = flags;
1227                 cfs_spin_unlock(&exp->exp_lock);
1228
1229                 if (obd_uuid_equals(&exp->exp_client_uuid,
1230                                     &exp->exp_obd->obd_uuid)) {
1231                         CDEBUG(D_HA,
1232                                "exp %p export uuid == obd uuid, don't discon\n",
1233                                exp);
1234                         /* Need to delete this now so we don't end up pointing
1235                          * to work_list later when this export is cleaned up. */
1236                         cfs_list_del_init(&exp->exp_obd_chain);
1237                         class_export_put(exp);
1238                         continue;
1239                 }
1240
1241                 class_export_get(exp);
1242                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1243                        "last request at "CFS_TIME_T"\n",
1244                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1245                        exp, exp->exp_last_request_time);
1246                 /* release one export reference anyway */
1247                 rc = obd_disconnect(exp);
1248
1249                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1250                        obd_export_nid2str(exp), exp, rc);
1251                 class_export_put(exp);
1252         }
1253         EXIT;
1254 }
1255
1256 void class_disconnect_exports(struct obd_device *obd)
1257 {
1258         cfs_list_t work_list;
1259         ENTRY;
1260
1261         /* Move all of the exports from obd_exports to a work list, en masse. */
1262         CFS_INIT_LIST_HEAD(&work_list);
1263         cfs_spin_lock(&obd->obd_dev_lock);
1264         cfs_list_splice_init(&obd->obd_exports, &work_list);
1265         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1266         cfs_spin_unlock(&obd->obd_dev_lock);
1267
1268         if (!cfs_list_empty(&work_list)) {
1269                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1270                        "disconnecting them\n", obd->obd_minor, obd);
1271                 class_disconnect_export_list(&work_list,
1272                                              exp_flags_from_obd(obd));
1273         } else
1274                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1275                        obd->obd_minor, obd);
1276         EXIT;
1277 }
1278 EXPORT_SYMBOL(class_disconnect_exports);
1279
1280 /* Remove exports that have not completed recovery.
1281  */
1282 void class_disconnect_stale_exports(struct obd_device *obd,
1283                                     int (*test_export)(struct obd_export *))
1284 {
1285         cfs_list_t work_list;
1286         cfs_list_t *pos, *n;
1287         struct obd_export *exp;
1288         int evicted = 0;
1289         ENTRY;
1290
1291         CFS_INIT_LIST_HEAD(&work_list);
1292         cfs_spin_lock(&obd->obd_dev_lock);
1293         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1294                 int failed;
1295
1296                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1297
1298                 /* don't count self-export as client */
1299                 if (obd_uuid_equals(&exp->exp_client_uuid,
1300                                     &exp->exp_obd->obd_uuid))
1301                         continue;
1302
1303                 if (test_export(exp))
1304                         continue;
1305
1306                 cfs_spin_lock(&exp->exp_lock);
1307                 failed = exp->exp_failed;
1308                 exp->exp_failed = 1;
1309                 cfs_spin_unlock(&exp->exp_lock);
1310                 if (failed)
1311                         continue;
1312
1313                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1314                 evicted++;
1315                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1316                        obd->obd_name, exp->exp_client_uuid.uuid,
1317                        exp->exp_connection == NULL ? "<unknown>" :
1318                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1319                 print_export_data(exp, "EVICTING", 0);
1320         }
1321         cfs_spin_unlock(&obd->obd_dev_lock);
1322
1323         if (evicted) {
1324                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1325                               obd->obd_name, evicted);
1326                 obd->obd_stale_clients += evicted;
1327         }
1328         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1329                                                  OBD_OPT_ABORT_RECOV);
1330         EXIT;
1331 }
1332 EXPORT_SYMBOL(class_disconnect_stale_exports);
1333
1334 void class_fail_export(struct obd_export *exp)
1335 {
1336         int rc, already_failed;
1337
1338         cfs_spin_lock(&exp->exp_lock);
1339         already_failed = exp->exp_failed;
1340         exp->exp_failed = 1;
1341         cfs_spin_unlock(&exp->exp_lock);
1342
1343         if (already_failed) {
1344                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1345                        exp, exp->exp_client_uuid.uuid);
1346                 return;
1347         }
1348
1349         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1350                exp, exp->exp_client_uuid.uuid);
1351
1352         if (obd_dump_on_timeout)
1353                 libcfs_debug_dumplog();
1354
1355         /* need for safe call CDEBUG after obd_disconnect */
1356         class_export_get(exp);
1357
1358         /* Most callers into obd_disconnect are removing their own reference
1359          * (request, for example) in addition to the one from the hash table.
1360          * We don't have such a reference here, so make one. */
1361         class_export_get(exp);
1362         rc = obd_disconnect(exp);
1363         if (rc)
1364                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1365         else
1366                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1367                        exp, exp->exp_client_uuid.uuid);
1368         class_export_put(exp);
1369 }
1370 EXPORT_SYMBOL(class_fail_export);
1371
1372 char *obd_export_nid2str(struct obd_export *exp)
1373 {
1374         if (exp->exp_connection != NULL)
1375                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1376
1377         return "(no nid)";
1378 }
1379 EXPORT_SYMBOL(obd_export_nid2str);
1380
1381 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1382 {
1383         struct obd_export *doomed_exp = NULL;
1384         int exports_evicted = 0;
1385
1386         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1387
1388         do {
1389                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1390                 if (doomed_exp == NULL)
1391                         break;
1392
1393                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1394                          "nid %s found, wanted nid %s, requested nid %s\n",
1395                          obd_export_nid2str(doomed_exp),
1396                          libcfs_nid2str(nid_key), nid);
1397                 LASSERTF(doomed_exp != obd->obd_self_export,
1398                          "self-export is hashed by NID?\n");
1399                 exports_evicted++;
1400                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1401                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1402                        exports_evicted);
1403                 class_fail_export(doomed_exp);
1404                 class_export_put(doomed_exp);
1405         } while (1);
1406
1407         if (!exports_evicted)
1408                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1409                        obd->obd_name, nid);
1410         return exports_evicted;
1411 }
1412 EXPORT_SYMBOL(obd_export_evict_by_nid);
1413
1414 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1415 {
1416         struct obd_export *doomed_exp = NULL;
1417         struct obd_uuid doomed_uuid;
1418         int exports_evicted = 0;
1419
1420         obd_str2uuid(&doomed_uuid, uuid);
1421         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1422                 CERROR("%s: can't evict myself\n", obd->obd_name);
1423                 return exports_evicted;
1424         }
1425
1426         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1427
1428         if (doomed_exp == NULL) {
1429                 CERROR("%s: can't disconnect %s: no exports found\n",
1430                        obd->obd_name, uuid);
1431         } else {
1432                 CWARN("%s: evicting %s at adminstrative request\n",
1433                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1434                 class_fail_export(doomed_exp);
1435                 class_export_put(doomed_exp);
1436                 exports_evicted++;
1437         }
1438
1439         return exports_evicted;
1440 }
1441 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1442
1443 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1444 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1445 EXPORT_SYMBOL(class_export_dump_hook);
1446 #endif
1447
1448 static void print_export_data(struct obd_export *exp, const char *status,
1449                               int locks)
1450 {
1451         struct ptlrpc_reply_state *rs;
1452         struct ptlrpc_reply_state *first_reply = NULL;
1453         int nreplies = 0;
1454
1455         cfs_spin_lock(&exp->exp_lock);
1456         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1457                                 rs_exp_list) {
1458                 if (nreplies == 0)
1459                         first_reply = rs;
1460                 nreplies++;
1461         }
1462         cfs_spin_unlock(&exp->exp_lock);
1463
1464         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1465                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1466                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1467                cfs_atomic_read(&exp->exp_rpc_count),
1468                cfs_atomic_read(&exp->exp_cb_count),
1469                cfs_atomic_read(&exp->exp_locks_count),
1470                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1471                nreplies, first_reply, nreplies > 3 ? "..." : "",
1472                exp->exp_last_committed);
1473 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1474         if (locks && class_export_dump_hook != NULL)
1475                 class_export_dump_hook(exp);
1476 #endif
1477 }
1478
1479 void dump_exports(struct obd_device *obd, int locks)
1480 {
1481         struct obd_export *exp;
1482
1483         cfs_spin_lock(&obd->obd_dev_lock);
1484         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1485                 print_export_data(exp, "ACTIVE", locks);
1486         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1487                 print_export_data(exp, "UNLINKED", locks);
1488         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1489                 print_export_data(exp, "DELAYED", locks);
1490         cfs_spin_unlock(&obd->obd_dev_lock);
1491         cfs_spin_lock(&obd_zombie_impexp_lock);
1492         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1493                 print_export_data(exp, "ZOMBIE", locks);
1494         cfs_spin_unlock(&obd_zombie_impexp_lock);
1495 }
1496 EXPORT_SYMBOL(dump_exports);
1497
1498 void obd_exports_barrier(struct obd_device *obd)
1499 {
1500         int waited = 2;
1501         LASSERT(cfs_list_empty(&obd->obd_exports));
1502         cfs_spin_lock(&obd->obd_dev_lock);
1503         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1504                 cfs_spin_unlock(&obd->obd_dev_lock);
1505                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1506                                                    cfs_time_seconds(waited));
1507                 if (waited > 5 && IS_PO2(waited)) {
1508                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1509                                       "more than %d seconds. "
1510                                       "The obd refcount = %d. Is it stuck?\n",
1511                                       obd->obd_name, waited,
1512                                       cfs_atomic_read(&obd->obd_refcount));
1513                         dump_exports(obd, 1);
1514                 }
1515                 waited *= 2;
1516                 cfs_spin_lock(&obd->obd_dev_lock);
1517         }
1518         cfs_spin_unlock(&obd->obd_dev_lock);
1519 }
1520 EXPORT_SYMBOL(obd_exports_barrier);
1521
1522 /* Total amount of zombies to be destroyed */
1523 static int zombies_count = 0;
1524
1525 /**
1526  * kill zombie imports and exports
1527  */
1528 void obd_zombie_impexp_cull(void)
1529 {
1530         struct obd_import *import;
1531         struct obd_export *export;
1532         ENTRY;
1533
1534         do {
1535                 cfs_spin_lock(&obd_zombie_impexp_lock);
1536
1537                 import = NULL;
1538                 if (!cfs_list_empty(&obd_zombie_imports)) {
1539                         import = cfs_list_entry(obd_zombie_imports.next,
1540                                                 struct obd_import,
1541                                                 imp_zombie_chain);
1542                         cfs_list_del_init(&import->imp_zombie_chain);
1543                 }
1544
1545                 export = NULL;
1546                 if (!cfs_list_empty(&obd_zombie_exports)) {
1547                         export = cfs_list_entry(obd_zombie_exports.next,
1548                                                 struct obd_export,
1549                                                 exp_obd_chain);
1550                         cfs_list_del_init(&export->exp_obd_chain);
1551                 }
1552
1553                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1554
1555                 if (import != NULL) {
1556                         class_import_destroy(import);
1557                         cfs_spin_lock(&obd_zombie_impexp_lock);
1558                         zombies_count--;
1559                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1560                 }
1561
1562                 if (export != NULL) {
1563                         class_export_destroy(export);
1564                         cfs_spin_lock(&obd_zombie_impexp_lock);
1565                         zombies_count--;
1566                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1567                 }
1568
1569                 cfs_cond_resched();
1570         } while (import != NULL || export != NULL);
1571         EXIT;
1572 }
1573
1574 static cfs_completion_t         obd_zombie_start;
1575 static cfs_completion_t         obd_zombie_stop;
1576 static unsigned long            obd_zombie_flags;
1577 static cfs_waitq_t              obd_zombie_waitq;
1578 static pid_t                    obd_zombie_pid;
1579
1580 enum {
1581         OBD_ZOMBIE_STOP   = 1 << 1
1582 };
1583
1584 /**
1585  * check for work for kill zombie import/export thread.
1586  */
1587 static int obd_zombie_impexp_check(void *arg)
1588 {
1589         int rc;
1590
1591         cfs_spin_lock(&obd_zombie_impexp_lock);
1592         rc = (zombies_count == 0) &&
1593              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1594         cfs_spin_unlock(&obd_zombie_impexp_lock);
1595
1596         RETURN(rc);
1597 }
1598
1599 /**
1600  * Add export to the obd_zombe thread and notify it.
1601  */
1602 static void obd_zombie_export_add(struct obd_export *exp) {
1603         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1604         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1605         cfs_list_del_init(&exp->exp_obd_chain);
1606         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1607         cfs_spin_lock(&obd_zombie_impexp_lock);
1608         zombies_count++;
1609         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1610         cfs_spin_unlock(&obd_zombie_impexp_lock);
1611
1612         obd_zombie_impexp_notify();
1613 }
1614
1615 /**
1616  * Add import to the obd_zombe thread and notify it.
1617  */
1618 static void obd_zombie_import_add(struct obd_import *imp) {
1619         LASSERT(imp->imp_sec == NULL);
1620         LASSERT(imp->imp_rq_pool == NULL);
1621         cfs_spin_lock(&obd_zombie_impexp_lock);
1622         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1623         zombies_count++;
1624         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1625         cfs_spin_unlock(&obd_zombie_impexp_lock);
1626
1627         obd_zombie_impexp_notify();
1628 }
1629
1630 /**
1631  * notify import/export destroy thread about new zombie.
1632  */
1633 static void obd_zombie_impexp_notify(void)
1634 {
1635         /*
1636          * Make sure obd_zomebie_impexp_thread get this notification.
1637          * It is possible this signal only get by obd_zombie_barrier, and
1638          * barrier gulps this notification and sleeps away and hangs ensues
1639          */
1640         cfs_waitq_broadcast(&obd_zombie_waitq);
1641 }
1642
1643 /**
1644  * check whether obd_zombie is idle
1645  */
1646 static int obd_zombie_is_idle(void)
1647 {
1648         int rc;
1649
1650         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1651         cfs_spin_lock(&obd_zombie_impexp_lock);
1652         rc = (zombies_count == 0);
1653         cfs_spin_unlock(&obd_zombie_impexp_lock);
1654         return rc;
1655 }
1656
1657 /**
1658  * wait when obd_zombie import/export queues become empty
1659  */
1660 void obd_zombie_barrier(void)
1661 {
1662         struct l_wait_info lwi = { 0 };
1663
1664         if (obd_zombie_pid == cfs_curproc_pid())
1665                 /* don't wait for myself */
1666                 return;
1667         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1668 }
1669 EXPORT_SYMBOL(obd_zombie_barrier);
1670
1671 #ifdef __KERNEL__
1672
1673 /**
1674  * destroy zombie export/import thread.
1675  */
1676 static int obd_zombie_impexp_thread(void *unused)
1677 {
1678         int rc;
1679
1680         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1681                 cfs_complete(&obd_zombie_start);
1682                 RETURN(rc);
1683         }
1684
1685         cfs_complete(&obd_zombie_start);
1686
1687         obd_zombie_pid = cfs_curproc_pid();
1688
1689         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1690                 struct l_wait_info lwi = { 0 };
1691
1692                 l_wait_event(obd_zombie_waitq,
1693                              !obd_zombie_impexp_check(NULL), &lwi);
1694                 obd_zombie_impexp_cull();
1695
1696                 /*
1697                  * Notify obd_zombie_barrier callers that queues
1698                  * may be empty.
1699                  */
1700                 cfs_waitq_signal(&obd_zombie_waitq);
1701         }
1702
1703         cfs_complete(&obd_zombie_stop);
1704
1705         RETURN(0);
1706 }
1707
1708 #else /* ! KERNEL */
1709
1710 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1711 static void *obd_zombie_impexp_work_cb;
1712 static void *obd_zombie_impexp_idle_cb;
1713
1714 int obd_zombie_impexp_kill(void *arg)
1715 {
1716         int rc = 0;
1717
1718         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1719                 obd_zombie_impexp_cull();
1720                 rc = 1;
1721         }
1722         cfs_atomic_dec(&zombie_recur);
1723         return rc;
1724 }
1725
1726 #endif
1727
1728 /**
1729  * start destroy zombie import/export thread
1730  */
1731 int obd_zombie_impexp_init(void)
1732 {
1733         int rc;
1734
1735         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1736         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1737         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1738         cfs_init_completion(&obd_zombie_start);
1739         cfs_init_completion(&obd_zombie_stop);
1740         cfs_waitq_init(&obd_zombie_waitq);
1741         obd_zombie_pid = 0;
1742
1743 #ifdef __KERNEL__
1744         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1745         if (rc < 0)
1746                 RETURN(rc);
1747
1748         cfs_wait_for_completion(&obd_zombie_start);
1749 #else
1750
1751         obd_zombie_impexp_work_cb =
1752                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1753                                                  &obd_zombie_impexp_kill, NULL);
1754
1755         obd_zombie_impexp_idle_cb =
1756                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1757                                                  &obd_zombie_impexp_check, NULL);
1758         rc = 0;
1759 #endif
1760         RETURN(rc);
1761 }
1762 /**
1763  * stop destroy zombie import/export thread
1764  */
1765 void obd_zombie_impexp_stop(void)
1766 {
1767         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1768         obd_zombie_impexp_notify();
1769 #ifdef __KERNEL__
1770         cfs_wait_for_completion(&obd_zombie_stop);
1771 #else
1772         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1773         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1774 #endif
1775 }
1776
1777 /***** Kernel-userspace comm helpers *******/
1778
1779 /* Get length of entire message, including header */
1780 int kuc_len(int payload_len)
1781 {
1782         return sizeof(struct kuc_hdr) + payload_len;
1783 }
1784 EXPORT_SYMBOL(kuc_len);
1785
1786 /* Get a pointer to kuc header, given a ptr to the payload
1787  * @param p Pointer to payload area
1788  * @returns Pointer to kuc header
1789  */
1790 struct kuc_hdr * kuc_ptr(void *p)
1791 {
1792         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1793         LASSERT(lh->kuc_magic == KUC_MAGIC);
1794         return lh;
1795 }
1796 EXPORT_SYMBOL(kuc_ptr);
1797
1798 /* Test if payload is part of kuc message
1799  * @param p Pointer to payload area
1800  * @returns boolean
1801  */
1802 int kuc_ispayload(void *p)
1803 {
1804         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1805
1806         if (kh->kuc_magic == KUC_MAGIC)
1807                 return 1;
1808         else
1809                 return 0;
1810 }
1811 EXPORT_SYMBOL(kuc_ispayload);
1812
1813 /* Alloc space for a message, and fill in header
1814  * @return Pointer to payload area
1815  */
1816 void *kuc_alloc(int payload_len, int transport, int type)
1817 {
1818         struct kuc_hdr *lh;
1819         int len = kuc_len(payload_len);
1820
1821         OBD_ALLOC(lh, len);
1822         if (lh == NULL)
1823                 return ERR_PTR(-ENOMEM);
1824
1825         lh->kuc_magic = KUC_MAGIC;
1826         lh->kuc_transport = transport;
1827         lh->kuc_msgtype = type;
1828         lh->kuc_msglen = len;
1829
1830         return (void *)(lh + 1);
1831 }
1832 EXPORT_SYMBOL(kuc_alloc);
1833
1834 /* Takes pointer to payload area */
1835 inline void kuc_free(void *p, int payload_len)
1836 {
1837         struct kuc_hdr *lh = kuc_ptr(p);
1838         OBD_FREE(lh, kuc_len(payload_len));
1839 }
1840 EXPORT_SYMBOL(kuc_free);
1841
1842
1843