Whamcloud - gitweb
LU-8066 obd: migrate to ksets
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 static DEFINE_SPINLOCK(obd_types_lock);
49 static LIST_HEAD(obd_types);
50 DEFINE_RWLOCK(obd_dev_lock);
51 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52
53 static struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 static struct kmem_cache *import_cachep;
57
58 static LIST_HEAD(obd_zombie_imports);
59 static LIST_HEAD(obd_zombie_exports);
60 static DEFINE_SPINLOCK(obd_zombie_impexp_lock);
61
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp,
66                               const char *status, int locks, int debug_level);
67
68 static LIST_HEAD(obd_stale_exports);
69 static DEFINE_SPINLOCK(obd_stale_export_lock);
70 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
71
72 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
73 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
74
75 /*
76  * support functions: we could use inter-module communication, but this
77  * is more portable to other OS's
78  */
79 static struct obd_device *obd_device_alloc(void)
80 {
81         struct obd_device *obd;
82
83         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
84         if (obd != NULL) {
85                 obd->obd_magic = OBD_DEVICE_MAGIC;
86         }
87         return obd;
88 }
89
90 static void obd_device_free(struct obd_device *obd)
91 {
92         LASSERT(obd != NULL);
93         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
94                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
95         if (obd->obd_namespace != NULL) {
96                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
97                        obd, obd->obd_namespace, obd->obd_force);
98                 LBUG();
99         }
100         lu_ref_fini(&obd->obd_reference);
101         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
102 }
103
104 struct obd_type *class_search_type(const char *name)
105 {
106         struct list_head *tmp;
107         struct obd_type *type;
108
109         spin_lock(&obd_types_lock);
110         list_for_each(tmp, &obd_types) {
111                 type = list_entry(tmp, struct obd_type, typ_chain);
112                 if (strcmp(type->typ_name, name) == 0) {
113                         spin_unlock(&obd_types_lock);
114                         return type;
115                 }
116         }
117         spin_unlock(&obd_types_lock);
118         return NULL;
119 }
120 EXPORT_SYMBOL(class_search_type);
121
122 struct obd_type *class_get_type(const char *name)
123 {
124         struct obd_type *type = class_search_type(name);
125
126 #ifdef HAVE_MODULE_LOADING_SUPPORT
127         if (!type) {
128                 const char *modname = name;
129
130                 if (strcmp(modname, "obdfilter") == 0)
131                         modname = "ofd";
132
133                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
134                         modname = LUSTRE_OSP_NAME;
135
136                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
137                         modname = LUSTRE_MDT_NAME;
138
139                 if (!request_module("%s", modname)) {
140                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
141                         type = class_search_type(name);
142                 } else {
143                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144                                            modname);
145                 }
146         }
147 #endif
148         if (type) {
149                 spin_lock(&type->obd_type_lock);
150                 type->typ_refcnt++;
151                 try_module_get(type->typ_dt_ops->o_owner);
152                 spin_unlock(&type->obd_type_lock);
153         }
154         return type;
155 }
156
157 void class_put_type(struct obd_type *type)
158 {
159         LASSERT(type);
160         spin_lock(&type->obd_type_lock);
161         type->typ_refcnt--;
162         module_put(type->typ_dt_ops->o_owner);
163         spin_unlock(&type->obd_type_lock);
164 }
165
166 static void class_sysfs_release(struct kobject *kobj)
167 {
168         struct obd_type *type = container_of(kobj, struct obd_type,
169                                              typ_kobj);
170
171         complete(&type->typ_kobj_unregister);
172 }
173
174 static struct kobj_type class_ktype = {
175         .sysfs_ops      = &lustre_sysfs_ops,
176         .release        = class_sysfs_release,
177 };
178
179 #define CLASS_MAX_NAME 1024
180
181 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
182                         bool enable_proc, struct lprocfs_vars *vars,
183                         const char *name, struct lu_device_type *ldt)
184 {
185         struct obd_type *type;
186         int rc = 0;
187         ENTRY;
188
189         /* sanity check */
190         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
191
192         if (class_search_type(name)) {
193                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
194                 RETURN(-EEXIST);
195         }
196
197         rc = -ENOMEM;
198         OBD_ALLOC(type, sizeof(*type));
199         if (type == NULL)
200                 RETURN(rc);
201
202         OBD_ALLOC_PTR(type->typ_dt_ops);
203         OBD_ALLOC_PTR(type->typ_md_ops);
204         OBD_ALLOC(type->typ_name, strlen(name) + 1);
205
206         if (type->typ_dt_ops == NULL ||
207             type->typ_md_ops == NULL ||
208             type->typ_name == NULL)
209                 GOTO (failed, rc);
210
211         *(type->typ_dt_ops) = *dt_ops;
212         /* md_ops is optional */
213         if (md_ops)
214                 *(type->typ_md_ops) = *md_ops;
215         strcpy(type->typ_name, name);
216         spin_lock_init(&type->obd_type_lock);
217
218 #ifdef CONFIG_PROC_FS
219         if (enable_proc) {
220                 type->typ_procroot = lprocfs_register(type->typ_name,
221                                                       proc_lustre_root,
222                                                       vars, type);
223                 if (IS_ERR(type->typ_procroot)) {
224                         rc = PTR_ERR(type->typ_procroot);
225                         type->typ_procroot = NULL;
226                         GOTO(failed, rc);
227                 }
228         }
229 #endif
230         type->typ_kobj.kset = lustre_kset;
231         init_completion(&type->typ_kobj_unregister);
232         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
233                                   &lustre_kset->kobj, "%s", type->typ_name);
234         if (rc)
235                 GOTO(failed, rc);
236
237         if (ldt) {
238                 type->typ_lu = ldt;
239                 rc = lu_device_type_init(ldt);
240                 if (rc) {
241                         kobject_put(&type->typ_kobj);
242                         GOTO(failed, rc);
243                 }
244         }
245
246         spin_lock(&obd_types_lock);
247         list_add(&type->typ_chain, &obd_types);
248         spin_unlock(&obd_types_lock);
249
250         RETURN(0);
251
252 failed:
253         if (type->typ_name != NULL) {
254 #ifdef CONFIG_PROC_FS
255                 if (type->typ_procroot != NULL)
256                         remove_proc_subtree(type->typ_name, proc_lustre_root);
257 #endif
258                 OBD_FREE(type->typ_name, strlen(name) + 1);
259         }
260         if (type->typ_md_ops != NULL)
261                 OBD_FREE_PTR(type->typ_md_ops);
262         if (type->typ_dt_ops != NULL)
263                 OBD_FREE_PTR(type->typ_dt_ops);
264         OBD_FREE(type, sizeof(*type));
265         RETURN(rc);
266 }
267 EXPORT_SYMBOL(class_register_type);
268
269 int class_unregister_type(const char *name)
270 {
271         struct obd_type *type = class_search_type(name);
272         ENTRY;
273
274         if (!type) {
275                 CERROR("unknown obd type\n");
276                 RETURN(-EINVAL);
277         }
278
279         if (type->typ_refcnt) {
280                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
281                 /* This is a bad situation, let's make the best of it */
282                 /* Remove ops, but leave the name for debugging */
283                 OBD_FREE_PTR(type->typ_dt_ops);
284                 OBD_FREE_PTR(type->typ_md_ops);
285                 RETURN(-EBUSY);
286         }
287
288         kobject_put(&type->typ_kobj);
289         wait_for_completion(&type->typ_kobj_unregister);
290
291         /* we do not use type->typ_procroot as for compatibility purposes
292          * other modules can share names (i.e. lod can use lov entry). so
293          * we can't reference pointer as it can get invalided when another
294          * module removes the entry */
295 #ifdef CONFIG_PROC_FS
296         if (type->typ_procroot != NULL)
297                 remove_proc_subtree(type->typ_name, proc_lustre_root);
298         if (type->typ_procsym != NULL)
299                 lprocfs_remove(&type->typ_procsym);
300 #endif
301         if (type->typ_lu)
302                 lu_device_type_fini(type->typ_lu);
303
304         spin_lock(&obd_types_lock);
305         list_del(&type->typ_chain);
306         spin_unlock(&obd_types_lock);
307         OBD_FREE(type->typ_name, strlen(name) + 1);
308         if (type->typ_dt_ops != NULL)
309                 OBD_FREE_PTR(type->typ_dt_ops);
310         if (type->typ_md_ops != NULL)
311                 OBD_FREE_PTR(type->typ_md_ops);
312         OBD_FREE(type, sizeof(*type));
313         RETURN(0);
314 } /* class_unregister_type */
315 EXPORT_SYMBOL(class_unregister_type);
316
317 /**
318  * Create a new obd device.
319  *
320  * Allocate the new obd_device and initialize it.
321  *
322  * \param[in] type_name obd device type string.
323  * \param[in] name      obd device name.
324  * \param[in] uuid      obd device UUID
325  *
326  * \retval newdev         pointer to created obd_device
327  * \retval ERR_PTR(errno) on error
328  */
329 struct obd_device *class_newdev(const char *type_name, const char *name,
330                                 const char *uuid)
331 {
332         struct obd_device *newdev;
333         struct obd_type *type = NULL;
334         ENTRY;
335
336         if (strlen(name) >= MAX_OBD_NAME) {
337                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
338                 RETURN(ERR_PTR(-EINVAL));
339         }
340
341         type = class_get_type(type_name);
342         if (type == NULL){
343                 CERROR("OBD: unknown type: %s\n", type_name);
344                 RETURN(ERR_PTR(-ENODEV));
345         }
346
347         newdev = obd_device_alloc();
348         if (newdev == NULL) {
349                 class_put_type(type);
350                 RETURN(ERR_PTR(-ENOMEM));
351         }
352         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
353         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
354         newdev->obd_type = type;
355         newdev->obd_minor = -1;
356
357         rwlock_init(&newdev->obd_pool_lock);
358         newdev->obd_pool_limit = 0;
359         newdev->obd_pool_slv = 0;
360
361         INIT_LIST_HEAD(&newdev->obd_exports);
362         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
363         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
364         INIT_LIST_HEAD(&newdev->obd_exports_timed);
365         INIT_LIST_HEAD(&newdev->obd_nid_stats);
366         spin_lock_init(&newdev->obd_nid_lock);
367         spin_lock_init(&newdev->obd_dev_lock);
368         mutex_init(&newdev->obd_dev_mutex);
369         spin_lock_init(&newdev->obd_osfs_lock);
370         /* newdev->obd_osfs_age must be set to a value in the distant
371          * past to guarantee a fresh statfs is fetched on mount. */
372         newdev->obd_osfs_age = cfs_time_shift_64(-1000);
373
374         /* XXX belongs in setup not attach  */
375         init_rwsem(&newdev->obd_observer_link_sem);
376         /* recovery data */
377         init_timer(&newdev->obd_recovery_timer);
378         spin_lock_init(&newdev->obd_recovery_task_lock);
379         init_waitqueue_head(&newdev->obd_next_transno_waitq);
380         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
381         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
382         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
383         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
384         INIT_LIST_HEAD(&newdev->obd_evict_list);
385         INIT_LIST_HEAD(&newdev->obd_lwp_list);
386
387         llog_group_init(&newdev->obd_olg);
388         /* Detach drops this */
389         atomic_set(&newdev->obd_refcount, 1);
390         lu_ref_init(&newdev->obd_reference);
391         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
392
393         newdev->obd_conn_inprogress = 0;
394
395         strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
396
397         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
398                newdev->obd_name, newdev);
399
400         return newdev;
401 }
402
403 /**
404  * Free obd device.
405  *
406  * \param[in] obd obd_device to be freed
407  *
408  * \retval none
409  */
410 void class_free_dev(struct obd_device *obd)
411 {
412         struct obd_type *obd_type = obd->obd_type;
413
414         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
415                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
416         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
417                  "obd %p != obd_devs[%d] %p\n",
418                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
419         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
420                  "obd_refcount should be 0, not %d\n",
421                  atomic_read(&obd->obd_refcount));
422         LASSERT(obd_type != NULL);
423
424         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
425                obd->obd_name, obd->obd_type->typ_name);
426
427         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
428                          obd->obd_name, obd->obd_uuid.uuid);
429         if (obd->obd_stopping) {
430                 int err;
431
432                 /* If we're not stopping, we were never set up */
433                 err = obd_cleanup(obd);
434                 if (err)
435                         CERROR("Cleanup %s returned %d\n",
436                                 obd->obd_name, err);
437         }
438
439         obd_device_free(obd);
440
441         class_put_type(obd_type);
442 }
443
444 /**
445  * Unregister obd device.
446  *
447  * Free slot in obd_dev[] used by \a obd.
448  *
449  * \param[in] new_obd obd_device to be unregistered
450  *
451  * \retval none
452  */
453 void class_unregister_device(struct obd_device *obd)
454 {
455         write_lock(&obd_dev_lock);
456         if (obd->obd_minor >= 0) {
457                 LASSERT(obd_devs[obd->obd_minor] == obd);
458                 obd_devs[obd->obd_minor] = NULL;
459                 obd->obd_minor = -1;
460         }
461         write_unlock(&obd_dev_lock);
462 }
463
464 /**
465  * Register obd device.
466  *
467  * Find free slot in obd_devs[], fills it with \a new_obd.
468  *
469  * \param[in] new_obd obd_device to be registered
470  *
471  * \retval 0          success
472  * \retval -EEXIST    device with this name is registered
473  * \retval -EOVERFLOW obd_devs[] is full
474  */
475 int class_register_device(struct obd_device *new_obd)
476 {
477         int ret = 0;
478         int i;
479         int new_obd_minor = 0;
480         bool minor_assign = false;
481
482         write_lock(&obd_dev_lock);
483         for (i = 0; i < class_devno_max(); i++) {
484                 struct obd_device *obd = class_num2obd(i);
485
486                 if (obd != NULL &&
487                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
488                         CERROR("%s: already exists, won't add\n",
489                                obd->obd_name);
490                         /* in case we found a free slot before duplicate */
491                         minor_assign = false;
492                         ret = -EEXIST;
493                         break;
494                 }
495                 if (!minor_assign && obd == NULL) {
496                         new_obd_minor = i;
497                         minor_assign = true;
498                 }
499         }
500
501         if (minor_assign) {
502                 new_obd->obd_minor = new_obd_minor;
503                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
504                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
505                 obd_devs[new_obd_minor] = new_obd;
506         } else {
507                 if (ret == 0) {
508                         ret = -EOVERFLOW;
509                         CERROR("%s: all %u/%u devices used, increase "
510                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
511                                i, class_devno_max(), ret);
512                 }
513         }
514         write_unlock(&obd_dev_lock);
515
516         RETURN(ret);
517 }
518
519 static int class_name2dev_nolock(const char *name)
520 {
521         int i;
522
523         if (!name)
524                 return -1;
525
526         for (i = 0; i < class_devno_max(); i++) {
527                 struct obd_device *obd = class_num2obd(i);
528
529                 if (obd && strcmp(name, obd->obd_name) == 0) {
530                         /* Make sure we finished attaching before we give
531                            out any references */
532                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
533                         if (obd->obd_attached) {
534                                 return i;
535                         }
536                         break;
537                 }
538         }
539
540         return -1;
541 }
542
543 int class_name2dev(const char *name)
544 {
545         int i;
546
547         if (!name)
548                 return -1;
549
550         read_lock(&obd_dev_lock);
551         i = class_name2dev_nolock(name);
552         read_unlock(&obd_dev_lock);
553
554         return i;
555 }
556 EXPORT_SYMBOL(class_name2dev);
557
558 struct obd_device *class_name2obd(const char *name)
559 {
560         int dev = class_name2dev(name);
561
562         if (dev < 0 || dev > class_devno_max())
563                 return NULL;
564         return class_num2obd(dev);
565 }
566 EXPORT_SYMBOL(class_name2obd);
567
568 int class_uuid2dev_nolock(struct obd_uuid *uuid)
569 {
570         int i;
571
572         for (i = 0; i < class_devno_max(); i++) {
573                 struct obd_device *obd = class_num2obd(i);
574
575                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
576                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
577                         return i;
578                 }
579         }
580
581         return -1;
582 }
583
584 int class_uuid2dev(struct obd_uuid *uuid)
585 {
586         int i;
587
588         read_lock(&obd_dev_lock);
589         i = class_uuid2dev_nolock(uuid);
590         read_unlock(&obd_dev_lock);
591
592         return i;
593 }
594 EXPORT_SYMBOL(class_uuid2dev);
595
596 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
597 {
598         int dev = class_uuid2dev(uuid);
599         if (dev < 0)
600                 return NULL;
601         return class_num2obd(dev);
602 }
603 EXPORT_SYMBOL(class_uuid2obd);
604
605 /**
606  * Get obd device from ::obd_devs[]
607  *
608  * \param num [in] array index
609  *
610  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
611  *         otherwise return the obd device there.
612  */
613 struct obd_device *class_num2obd(int num)
614 {
615         struct obd_device *obd = NULL;
616
617         if (num < class_devno_max()) {
618                 obd = obd_devs[num];
619                 if (obd == NULL)
620                         return NULL;
621
622                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
623                          "%p obd_magic %08x != %08x\n",
624                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
625                 LASSERTF(obd->obd_minor == num,
626                          "%p obd_minor %0d != %0d\n",
627                          obd, obd->obd_minor, num);
628         }
629
630         return obd;
631 }
632
633 /**
634  * Find obd in obd_dev[] by name or uuid.
635  *
636  * Increment obd's refcount if found.
637  *
638  * \param[in] str obd name or uuid
639  *
640  * \retval NULL    if not found
641  * \retval target  pointer to found obd_device
642  */
643 struct obd_device *class_dev_by_str(const char *str)
644 {
645         struct obd_device *target = NULL;
646         struct obd_uuid tgtuuid;
647         int rc;
648
649         obd_str2uuid(&tgtuuid, str);
650
651         read_lock(&obd_dev_lock);
652         rc = class_uuid2dev_nolock(&tgtuuid);
653         if (rc < 0)
654                 rc = class_name2dev_nolock(str);
655
656         if (rc >= 0)
657                 target = class_num2obd(rc);
658
659         if (target != NULL)
660                 class_incref(target, "find", current);
661         read_unlock(&obd_dev_lock);
662
663         RETURN(target);
664 }
665 EXPORT_SYMBOL(class_dev_by_str);
666
667 /**
668  * Get obd devices count. Device in any
669  *    state are counted
670  * \retval obd device count
671  */
672 int get_devices_count(void)
673 {
674         int index, max_index = class_devno_max(), dev_count = 0;
675
676         read_lock(&obd_dev_lock);
677         for (index = 0; index <= max_index; index++) {
678                 struct obd_device *obd = class_num2obd(index);
679                 if (obd != NULL)
680                         dev_count++;
681         }
682         read_unlock(&obd_dev_lock);
683
684         return dev_count;
685 }
686 EXPORT_SYMBOL(get_devices_count);
687
688 void class_obd_list(void)
689 {
690         char *status;
691         int i;
692
693         read_lock(&obd_dev_lock);
694         for (i = 0; i < class_devno_max(); i++) {
695                 struct obd_device *obd = class_num2obd(i);
696
697                 if (obd == NULL)
698                         continue;
699                 if (obd->obd_stopping)
700                         status = "ST";
701                 else if (obd->obd_set_up)
702                         status = "UP";
703                 else if (obd->obd_attached)
704                         status = "AT";
705                 else
706                         status = "--";
707                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
708                          i, status, obd->obd_type->typ_name,
709                          obd->obd_name, obd->obd_uuid.uuid,
710                          atomic_read(&obd->obd_refcount));
711         }
712         read_unlock(&obd_dev_lock);
713         return;
714 }
715
716 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
717    specified, then only the client with that uuid is returned,
718    otherwise any client connected to the tgt is returned. */
719 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
720                                           const char * typ_name,
721                                           struct obd_uuid *grp_uuid)
722 {
723         int i;
724
725         read_lock(&obd_dev_lock);
726         for (i = 0; i < class_devno_max(); i++) {
727                 struct obd_device *obd = class_num2obd(i);
728
729                 if (obd == NULL)
730                         continue;
731                 if ((strncmp(obd->obd_type->typ_name, typ_name,
732                              strlen(typ_name)) == 0)) {
733                         if (obd_uuid_equals(tgt_uuid,
734                                             &obd->u.cli.cl_target_uuid) &&
735                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
736                                                          &obd->obd_uuid) : 1)) {
737                                 read_unlock(&obd_dev_lock);
738                                 return obd;
739                         }
740                 }
741         }
742         read_unlock(&obd_dev_lock);
743
744         return NULL;
745 }
746 EXPORT_SYMBOL(class_find_client_obd);
747
748 /* Iterate the obd_device list looking devices have grp_uuid. Start
749    searching at *next, and if a device is found, the next index to look
750    at is saved in *next. If next is NULL, then the first matching device
751    will always be returned. */
752 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
753 {
754         int i;
755
756         if (next == NULL)
757                 i = 0;
758         else if (*next >= 0 && *next < class_devno_max())
759                 i = *next;
760         else
761                 return NULL;
762
763         read_lock(&obd_dev_lock);
764         for (; i < class_devno_max(); i++) {
765                 struct obd_device *obd = class_num2obd(i);
766
767                 if (obd == NULL)
768                         continue;
769                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
770                         if (next != NULL)
771                                 *next = i+1;
772                         read_unlock(&obd_dev_lock);
773                         return obd;
774                 }
775         }
776         read_unlock(&obd_dev_lock);
777
778         return NULL;
779 }
780 EXPORT_SYMBOL(class_devices_in_group);
781
782 /**
783  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
784  * adjust sptlrpc settings accordingly.
785  */
786 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
787 {
788         struct obd_device  *obd;
789         const char         *type;
790         int                 i, rc = 0, rc2;
791
792         LASSERT(namelen > 0);
793
794         read_lock(&obd_dev_lock);
795         for (i = 0; i < class_devno_max(); i++) {
796                 obd = class_num2obd(i);
797
798                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
799                         continue;
800
801                 /* only notify mdc, osc, osp, lwp, mdt, ost
802                  * because only these have a -sptlrpc llog */
803                 type = obd->obd_type->typ_name;
804                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
805                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
806                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
807                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
808                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
809                     strcmp(type, LUSTRE_OST_NAME) != 0)
810                         continue;
811
812                 if (strncmp(obd->obd_name, fsname, namelen))
813                         continue;
814
815                 class_incref(obd, __FUNCTION__, obd);
816                 read_unlock(&obd_dev_lock);
817                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
818                                          sizeof(KEY_SPTLRPC_CONF),
819                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
820                 rc = rc ? rc : rc2;
821                 class_decref(obd, __FUNCTION__, obd);
822                 read_lock(&obd_dev_lock);
823         }
824         read_unlock(&obd_dev_lock);
825         return rc;
826 }
827 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
828
829 void obd_cleanup_caches(void)
830 {
831         ENTRY;
832         if (obd_device_cachep) {
833                 kmem_cache_destroy(obd_device_cachep);
834                 obd_device_cachep = NULL;
835         }
836         if (obdo_cachep) {
837                 kmem_cache_destroy(obdo_cachep);
838                 obdo_cachep = NULL;
839         }
840         if (import_cachep) {
841                 kmem_cache_destroy(import_cachep);
842                 import_cachep = NULL;
843         }
844
845         EXIT;
846 }
847
848 int obd_init_caches(void)
849 {
850         int rc;
851         ENTRY;
852
853         LASSERT(obd_device_cachep == NULL);
854         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
855                                               sizeof(struct obd_device),
856                                               0, 0, NULL);
857         if (!obd_device_cachep)
858                 GOTO(out, rc = -ENOMEM);
859
860         LASSERT(obdo_cachep == NULL);
861         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
862                                         0, 0, NULL);
863         if (!obdo_cachep)
864                 GOTO(out, rc = -ENOMEM);
865
866         LASSERT(import_cachep == NULL);
867         import_cachep = kmem_cache_create("ll_import_cache",
868                                           sizeof(struct obd_import),
869                                           0, 0, NULL);
870         if (!import_cachep)
871                 GOTO(out, rc = -ENOMEM);
872
873         RETURN(0);
874 out:
875         obd_cleanup_caches();
876         RETURN(rc);
877 }
878
879 /* map connection to client */
880 struct obd_export *class_conn2export(struct lustre_handle *conn)
881 {
882         struct obd_export *export;
883         ENTRY;
884
885         if (!conn) {
886                 CDEBUG(D_CACHE, "looking for null handle\n");
887                 RETURN(NULL);
888         }
889
890         if (conn->cookie == -1) {  /* this means assign a new connection */
891                 CDEBUG(D_CACHE, "want a new connection\n");
892                 RETURN(NULL);
893         }
894
895         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
896         export = class_handle2object(conn->cookie, NULL);
897         RETURN(export);
898 }
899 EXPORT_SYMBOL(class_conn2export);
900
901 struct obd_device *class_exp2obd(struct obd_export *exp)
902 {
903         if (exp)
904                 return exp->exp_obd;
905         return NULL;
906 }
907 EXPORT_SYMBOL(class_exp2obd);
908
909 struct obd_device *class_conn2obd(struct lustre_handle *conn)
910 {
911         struct obd_export *export;
912         export = class_conn2export(conn);
913         if (export) {
914                 struct obd_device *obd = export->exp_obd;
915                 class_export_put(export);
916                 return obd;
917         }
918         return NULL;
919 }
920
921 struct obd_import *class_exp2cliimp(struct obd_export *exp)
922 {
923         struct obd_device *obd = exp->exp_obd;
924         if (obd == NULL)
925                 return NULL;
926         return obd->u.cli.cl_import;
927 }
928 EXPORT_SYMBOL(class_exp2cliimp);
929
930 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
931 {
932         struct obd_device *obd = class_conn2obd(conn);
933         if (obd == NULL)
934                 return NULL;
935         return obd->u.cli.cl_import;
936 }
937
938 /* Export management functions */
939 static void class_export_destroy(struct obd_export *exp)
940 {
941         struct obd_device *obd = exp->exp_obd;
942         ENTRY;
943
944         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
945         LASSERT(obd != NULL);
946
947         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
948                exp->exp_client_uuid.uuid, obd->obd_name);
949
950         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
951         if (exp->exp_connection)
952                 ptlrpc_put_connection_superhack(exp->exp_connection);
953
954         LASSERT(list_empty(&exp->exp_outstanding_replies));
955         LASSERT(list_empty(&exp->exp_uncommitted_replies));
956         LASSERT(list_empty(&exp->exp_req_replay_queue));
957         LASSERT(list_empty(&exp->exp_hp_rpcs));
958         obd_destroy_export(exp);
959         /* self export doesn't hold a reference to an obd, although it
960          * exists until freeing of the obd */
961         if (exp != obd->obd_self_export)
962                 class_decref(obd, "export", exp);
963
964         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
965         EXIT;
966 }
967
968 static void export_handle_addref(void *export)
969 {
970         class_export_get(export);
971 }
972
973 static struct portals_handle_ops export_handle_ops = {
974         .hop_addref = export_handle_addref,
975         .hop_free   = NULL,
976 };
977
978 struct obd_export *class_export_get(struct obd_export *exp)
979 {
980         atomic_inc(&exp->exp_refcount);
981         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
982                atomic_read(&exp->exp_refcount));
983         return exp;
984 }
985 EXPORT_SYMBOL(class_export_get);
986
987 void class_export_put(struct obd_export *exp)
988 {
989         LASSERT(exp != NULL);
990         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
991         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
992                atomic_read(&exp->exp_refcount) - 1);
993
994         if (atomic_dec_and_test(&exp->exp_refcount)) {
995                 struct obd_device *obd = exp->exp_obd;
996
997                 CDEBUG(D_IOCTL, "final put %p/%s\n",
998                        exp, exp->exp_client_uuid.uuid);
999
1000                 /* release nid stat refererence */
1001                 lprocfs_exp_cleanup(exp);
1002
1003                 if (exp == obd->obd_self_export) {
1004                         /* self export should be destroyed without
1005                          * zombie thread as it doesn't hold a
1006                          * reference to obd and doesn't hold any
1007                          * resources */
1008                         class_export_destroy(exp);
1009                         /* self export is destroyed, no class
1010                          * references exist and it is safe to free
1011                          * obd */
1012                         class_free_dev(obd);
1013                 } else {
1014                         LASSERT(!list_empty(&exp->exp_obd_chain));
1015                         obd_zombie_export_add(exp);
1016                 }
1017
1018         }
1019 }
1020 EXPORT_SYMBOL(class_export_put);
1021 /* Creates a new export, adds it to the hash table, and returns a
1022  * pointer to it. The refcount is 2: one for the hash reference, and
1023  * one for the pointer returned by this function. */
1024 struct obd_export *__class_new_export(struct obd_device *obd,
1025                                       struct obd_uuid *cluuid, bool is_self)
1026 {
1027         struct obd_export *export;
1028         struct cfs_hash *hash = NULL;
1029         int rc = 0;
1030         ENTRY;
1031
1032         OBD_ALLOC_PTR(export);
1033         if (!export)
1034                 return ERR_PTR(-ENOMEM);
1035
1036         export->exp_conn_cnt = 0;
1037         export->exp_lock_hash = NULL;
1038         export->exp_flock_hash = NULL;
1039         /* 2 = class_handle_hash + last */
1040         atomic_set(&export->exp_refcount, 2);
1041         atomic_set(&export->exp_rpc_count, 0);
1042         atomic_set(&export->exp_cb_count, 0);
1043         atomic_set(&export->exp_locks_count, 0);
1044 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1045         INIT_LIST_HEAD(&export->exp_locks_list);
1046         spin_lock_init(&export->exp_locks_list_guard);
1047 #endif
1048         atomic_set(&export->exp_replay_count, 0);
1049         export->exp_obd = obd;
1050         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1051         spin_lock_init(&export->exp_uncommitted_replies_lock);
1052         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1053         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1054         INIT_LIST_HEAD(&export->exp_handle.h_link);
1055         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1056         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1057         class_handle_hash(&export->exp_handle, &export_handle_ops);
1058         export->exp_last_request_time = ktime_get_real_seconds();
1059         spin_lock_init(&export->exp_lock);
1060         spin_lock_init(&export->exp_rpc_lock);
1061         INIT_HLIST_NODE(&export->exp_uuid_hash);
1062         INIT_HLIST_NODE(&export->exp_nid_hash);
1063         INIT_HLIST_NODE(&export->exp_gen_hash);
1064         spin_lock_init(&export->exp_bl_list_lock);
1065         INIT_LIST_HEAD(&export->exp_bl_list);
1066         INIT_LIST_HEAD(&export->exp_stale_list);
1067
1068         export->exp_sp_peer = LUSTRE_SP_ANY;
1069         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1070         export->exp_client_uuid = *cluuid;
1071         obd_init_export(export);
1072
1073         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1074                 spin_lock(&obd->obd_dev_lock);
1075                 /* shouldn't happen, but might race */
1076                 if (obd->obd_stopping)
1077                         GOTO(exit_unlock, rc = -ENODEV);
1078
1079                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1080                 if (hash == NULL)
1081                         GOTO(exit_unlock, rc = -ENODEV);
1082                 spin_unlock(&obd->obd_dev_lock);
1083
1084                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1085                 if (rc != 0) {
1086                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1087                                       obd->obd_name, cluuid->uuid, rc);
1088                         GOTO(exit_err, rc = -EALREADY);
1089                 }
1090         }
1091
1092         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1093         spin_lock(&obd->obd_dev_lock);
1094         if (obd->obd_stopping) {
1095                 if (hash)
1096                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1097                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1098         }
1099
1100         if (!is_self) {
1101                 class_incref(obd, "export", export);
1102                 list_add_tail(&export->exp_obd_chain_timed,
1103                               &obd->obd_exports_timed);
1104                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1105                 obd->obd_num_exports++;
1106         } else {
1107                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1108                 INIT_LIST_HEAD(&export->exp_obd_chain);
1109         }
1110         spin_unlock(&obd->obd_dev_lock);
1111         if (hash)
1112                 cfs_hash_putref(hash);
1113         RETURN(export);
1114
1115 exit_unlock:
1116         spin_unlock(&obd->obd_dev_lock);
1117 exit_err:
1118         if (hash)
1119                 cfs_hash_putref(hash);
1120         class_handle_unhash(&export->exp_handle);
1121         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1122         obd_destroy_export(export);
1123         OBD_FREE_PTR(export);
1124         return ERR_PTR(rc);
1125 }
1126
1127 struct obd_export *class_new_export(struct obd_device *obd,
1128                                     struct obd_uuid *uuid)
1129 {
1130         return __class_new_export(obd, uuid, false);
1131 }
1132 EXPORT_SYMBOL(class_new_export);
1133
1134 struct obd_export *class_new_export_self(struct obd_device *obd,
1135                                          struct obd_uuid *uuid)
1136 {
1137         return __class_new_export(obd, uuid, true);
1138 }
1139
1140 void class_unlink_export(struct obd_export *exp)
1141 {
1142         class_handle_unhash(&exp->exp_handle);
1143
1144         if (exp->exp_obd->obd_self_export == exp) {
1145                 class_export_put(exp);
1146                 return;
1147         }
1148
1149         spin_lock(&exp->exp_obd->obd_dev_lock);
1150         /* delete an uuid-export hashitem from hashtables */
1151         if (!hlist_unhashed(&exp->exp_uuid_hash))
1152                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1153                              &exp->exp_client_uuid,
1154                              &exp->exp_uuid_hash);
1155
1156 #ifdef HAVE_SERVER_SUPPORT
1157         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1158                 struct tg_export_data   *ted = &exp->exp_target_data;
1159                 struct cfs_hash         *hash;
1160
1161                 /* Because obd_gen_hash will not be released until
1162                  * class_cleanup(), so hash should never be NULL here */
1163                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1164                 LASSERT(hash != NULL);
1165                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1166                              &exp->exp_gen_hash);
1167                 cfs_hash_putref(hash);
1168         }
1169 #endif /* HAVE_SERVER_SUPPORT */
1170
1171         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1172         list_del_init(&exp->exp_obd_chain_timed);
1173         exp->exp_obd->obd_num_exports--;
1174         spin_unlock(&exp->exp_obd->obd_dev_lock);
1175         atomic_inc(&obd_stale_export_num);
1176
1177         /* A reference is kept by obd_stale_exports list */
1178         obd_stale_export_put(exp);
1179 }
1180 EXPORT_SYMBOL(class_unlink_export);
1181
1182 /* Import management functions */
1183 static void class_import_destroy(struct obd_import *imp)
1184 {
1185         ENTRY;
1186
1187         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1188                 imp->imp_obd->obd_name);
1189
1190         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1191
1192         ptlrpc_put_connection_superhack(imp->imp_connection);
1193
1194         while (!list_empty(&imp->imp_conn_list)) {
1195                 struct obd_import_conn *imp_conn;
1196
1197                 imp_conn = list_entry(imp->imp_conn_list.next,
1198                                       struct obd_import_conn, oic_item);
1199                 list_del_init(&imp_conn->oic_item);
1200                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1201                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1202         }
1203
1204         LASSERT(imp->imp_sec == NULL);
1205         class_decref(imp->imp_obd, "import", imp);
1206         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1207         EXIT;
1208 }
1209
1210 static void import_handle_addref(void *import)
1211 {
1212         class_import_get(import);
1213 }
1214
1215 static struct portals_handle_ops import_handle_ops = {
1216         .hop_addref = import_handle_addref,
1217         .hop_free   = NULL,
1218 };
1219
1220 struct obd_import *class_import_get(struct obd_import *import)
1221 {
1222         atomic_inc(&import->imp_refcount);
1223         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1224                atomic_read(&import->imp_refcount),
1225                import->imp_obd->obd_name);
1226         return import;
1227 }
1228 EXPORT_SYMBOL(class_import_get);
1229
1230 void class_import_put(struct obd_import *imp)
1231 {
1232         ENTRY;
1233
1234         LASSERT(list_empty(&imp->imp_zombie_chain));
1235         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1236
1237         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1238                atomic_read(&imp->imp_refcount) - 1,
1239                imp->imp_obd->obd_name);
1240
1241         if (atomic_dec_and_test(&imp->imp_refcount)) {
1242                 CDEBUG(D_INFO, "final put import %p\n", imp);
1243                 obd_zombie_import_add(imp);
1244         }
1245
1246         /* catch possible import put race */
1247         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1248         EXIT;
1249 }
1250 EXPORT_SYMBOL(class_import_put);
1251
1252 static void init_imp_at(struct imp_at *at) {
1253         int i;
1254         at_init(&at->iat_net_latency, 0, 0);
1255         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1256                 /* max service estimates are tracked on the server side, so
1257                    don't use the AT history here, just use the last reported
1258                    val. (But keep hist for proc histogram, worst_ever) */
1259                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1260                         AT_FLG_NOHIST);
1261         }
1262 }
1263
1264 struct obd_import *class_new_import(struct obd_device *obd)
1265 {
1266         struct obd_import *imp;
1267         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1268
1269         OBD_ALLOC(imp, sizeof(*imp));
1270         if (imp == NULL)
1271                 return NULL;
1272
1273         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1274         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1275         INIT_LIST_HEAD(&imp->imp_replay_list);
1276         INIT_LIST_HEAD(&imp->imp_sending_list);
1277         INIT_LIST_HEAD(&imp->imp_delayed_list);
1278         INIT_LIST_HEAD(&imp->imp_committed_list);
1279         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1280         imp->imp_known_replied_xid = 0;
1281         imp->imp_replay_cursor = &imp->imp_committed_list;
1282         spin_lock_init(&imp->imp_lock);
1283         imp->imp_last_success_conn = 0;
1284         imp->imp_state = LUSTRE_IMP_NEW;
1285         imp->imp_obd = class_incref(obd, "import", imp);
1286         mutex_init(&imp->imp_sec_mutex);
1287         init_waitqueue_head(&imp->imp_recovery_waitq);
1288
1289         if (curr_pid_ns->child_reaper)
1290                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1291         else
1292                 imp->imp_sec_refpid = 1;
1293
1294         atomic_set(&imp->imp_refcount, 2);
1295         atomic_set(&imp->imp_unregistering, 0);
1296         atomic_set(&imp->imp_inflight, 0);
1297         atomic_set(&imp->imp_replay_inflight, 0);
1298         atomic_set(&imp->imp_inval_count, 0);
1299         INIT_LIST_HEAD(&imp->imp_conn_list);
1300         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1301         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1302         init_imp_at(&imp->imp_at);
1303
1304         /* the default magic is V2, will be used in connect RPC, and
1305          * then adjusted according to the flags in request/reply. */
1306         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1307
1308         return imp;
1309 }
1310 EXPORT_SYMBOL(class_new_import);
1311
1312 void class_destroy_import(struct obd_import *import)
1313 {
1314         LASSERT(import != NULL);
1315         LASSERT(import != LP_POISON);
1316
1317         class_handle_unhash(&import->imp_handle);
1318
1319         spin_lock(&import->imp_lock);
1320         import->imp_generation++;
1321         spin_unlock(&import->imp_lock);
1322         class_import_put(import);
1323 }
1324 EXPORT_SYMBOL(class_destroy_import);
1325
1326 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1327
1328 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1329 {
1330         spin_lock(&exp->exp_locks_list_guard);
1331
1332         LASSERT(lock->l_exp_refs_nr >= 0);
1333
1334         if (lock->l_exp_refs_target != NULL &&
1335             lock->l_exp_refs_target != exp) {
1336                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1337                               exp, lock, lock->l_exp_refs_target);
1338         }
1339         if ((lock->l_exp_refs_nr ++) == 0) {
1340                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1341                 lock->l_exp_refs_target = exp;
1342         }
1343         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1344                lock, exp, lock->l_exp_refs_nr);
1345         spin_unlock(&exp->exp_locks_list_guard);
1346 }
1347 EXPORT_SYMBOL(__class_export_add_lock_ref);
1348
1349 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1350 {
1351         spin_lock(&exp->exp_locks_list_guard);
1352         LASSERT(lock->l_exp_refs_nr > 0);
1353         if (lock->l_exp_refs_target != exp) {
1354                 LCONSOLE_WARN("lock %p, "
1355                               "mismatching export pointers: %p, %p\n",
1356                               lock, lock->l_exp_refs_target, exp);
1357         }
1358         if (-- lock->l_exp_refs_nr == 0) {
1359                 list_del_init(&lock->l_exp_refs_link);
1360                 lock->l_exp_refs_target = NULL;
1361         }
1362         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1363                lock, exp, lock->l_exp_refs_nr);
1364         spin_unlock(&exp->exp_locks_list_guard);
1365 }
1366 EXPORT_SYMBOL(__class_export_del_lock_ref);
1367 #endif
1368
1369 /* A connection defines an export context in which preallocation can
1370    be managed. This releases the export pointer reference, and returns
1371    the export handle, so the export refcount is 1 when this function
1372    returns. */
1373 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1374                   struct obd_uuid *cluuid)
1375 {
1376         struct obd_export *export;
1377         LASSERT(conn != NULL);
1378         LASSERT(obd != NULL);
1379         LASSERT(cluuid != NULL);
1380         ENTRY;
1381
1382         export = class_new_export(obd, cluuid);
1383         if (IS_ERR(export))
1384                 RETURN(PTR_ERR(export));
1385
1386         conn->cookie = export->exp_handle.h_cookie;
1387         class_export_put(export);
1388
1389         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1390                cluuid->uuid, conn->cookie);
1391         RETURN(0);
1392 }
1393 EXPORT_SYMBOL(class_connect);
1394
1395 /* if export is involved in recovery then clean up related things */
1396 static void class_export_recovery_cleanup(struct obd_export *exp)
1397 {
1398         struct obd_device *obd = exp->exp_obd;
1399
1400         spin_lock(&obd->obd_recovery_task_lock);
1401         if (obd->obd_recovering) {
1402                 if (exp->exp_in_recovery) {
1403                         spin_lock(&exp->exp_lock);
1404                         exp->exp_in_recovery = 0;
1405                         spin_unlock(&exp->exp_lock);
1406                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1407                         atomic_dec(&obd->obd_connected_clients);
1408                 }
1409
1410                 /* if called during recovery then should update
1411                  * obd_stale_clients counter,
1412                  * lightweight exports are not counted */
1413                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1414                         exp->exp_obd->obd_stale_clients++;
1415         }
1416         spin_unlock(&obd->obd_recovery_task_lock);
1417
1418         spin_lock(&exp->exp_lock);
1419         /** Cleanup req replay fields */
1420         if (exp->exp_req_replay_needed) {
1421                 exp->exp_req_replay_needed = 0;
1422
1423                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1424                 atomic_dec(&obd->obd_req_replay_clients);
1425         }
1426
1427         /** Cleanup lock replay data */
1428         if (exp->exp_lock_replay_needed) {
1429                 exp->exp_lock_replay_needed = 0;
1430
1431                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1432                 atomic_dec(&obd->obd_lock_replay_clients);
1433         }
1434         spin_unlock(&exp->exp_lock);
1435 }
1436
1437 /* This function removes 1-3 references from the export:
1438  * 1 - for export pointer passed
1439  * and if disconnect really need
1440  * 2 - removing from hash
1441  * 3 - in client_unlink_export
1442  * The export pointer passed to this function can destroyed */
1443 int class_disconnect(struct obd_export *export)
1444 {
1445         int already_disconnected;
1446         ENTRY;
1447
1448         if (export == NULL) {
1449                 CWARN("attempting to free NULL export %p\n", export);
1450                 RETURN(-EINVAL);
1451         }
1452
1453         spin_lock(&export->exp_lock);
1454         already_disconnected = export->exp_disconnected;
1455         export->exp_disconnected = 1;
1456         /*  We hold references of export for uuid hash
1457          *  and nid_hash and export link at least. So
1458          *  it is safe to call cfs_hash_del in there.  */
1459         if (!hlist_unhashed(&export->exp_nid_hash))
1460                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1461                              &export->exp_connection->c_peer.nid,
1462                              &export->exp_nid_hash);
1463         spin_unlock(&export->exp_lock);
1464
1465         /* class_cleanup(), abort_recovery(), and class_fail_export()
1466          * all end up in here, and if any of them race we shouldn't
1467          * call extra class_export_puts(). */
1468         if (already_disconnected) {
1469                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1470                 GOTO(no_disconn, already_disconnected);
1471         }
1472
1473         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1474                export->exp_handle.h_cookie);
1475
1476         class_export_recovery_cleanup(export);
1477         class_unlink_export(export);
1478 no_disconn:
1479         class_export_put(export);
1480         RETURN(0);
1481 }
1482 EXPORT_SYMBOL(class_disconnect);
1483
1484 /* Return non-zero for a fully connected export */
1485 int class_connected_export(struct obd_export *exp)
1486 {
1487         int connected = 0;
1488
1489         if (exp) {
1490                 spin_lock(&exp->exp_lock);
1491                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1492                 spin_unlock(&exp->exp_lock);
1493         }
1494         return connected;
1495 }
1496 EXPORT_SYMBOL(class_connected_export);
1497
1498 static void class_disconnect_export_list(struct list_head *list,
1499                                          enum obd_option flags)
1500 {
1501         int rc;
1502         struct obd_export *exp;
1503         ENTRY;
1504
1505         /* It's possible that an export may disconnect itself, but
1506          * nothing else will be added to this list. */
1507         while (!list_empty(list)) {
1508                 exp = list_entry(list->next, struct obd_export,
1509                                  exp_obd_chain);
1510                 /* need for safe call CDEBUG after obd_disconnect */
1511                 class_export_get(exp);
1512
1513                 spin_lock(&exp->exp_lock);
1514                 exp->exp_flags = flags;
1515                 spin_unlock(&exp->exp_lock);
1516
1517                 if (obd_uuid_equals(&exp->exp_client_uuid,
1518                                     &exp->exp_obd->obd_uuid)) {
1519                         CDEBUG(D_HA,
1520                                "exp %p export uuid == obd uuid, don't discon\n",
1521                                exp);
1522                         /* Need to delete this now so we don't end up pointing
1523                          * to work_list later when this export is cleaned up. */
1524                         list_del_init(&exp->exp_obd_chain);
1525                         class_export_put(exp);
1526                         continue;
1527                 }
1528
1529                 class_export_get(exp);
1530                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1531                        "last request at %lld\n",
1532                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1533                        exp, exp->exp_last_request_time);
1534                 /* release one export reference anyway */
1535                 rc = obd_disconnect(exp);
1536
1537                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1538                        obd_export_nid2str(exp), exp, rc);
1539                 class_export_put(exp);
1540         }
1541         EXIT;
1542 }
1543
1544 void class_disconnect_exports(struct obd_device *obd)
1545 {
1546         struct list_head work_list;
1547         ENTRY;
1548
1549         /* Move all of the exports from obd_exports to a work list, en masse. */
1550         INIT_LIST_HEAD(&work_list);
1551         spin_lock(&obd->obd_dev_lock);
1552         list_splice_init(&obd->obd_exports, &work_list);
1553         list_splice_init(&obd->obd_delayed_exports, &work_list);
1554         spin_unlock(&obd->obd_dev_lock);
1555
1556         if (!list_empty(&work_list)) {
1557                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1558                        "disconnecting them\n", obd->obd_minor, obd);
1559                 class_disconnect_export_list(&work_list,
1560                                              exp_flags_from_obd(obd));
1561         } else
1562                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1563                        obd->obd_minor, obd);
1564         EXIT;
1565 }
1566 EXPORT_SYMBOL(class_disconnect_exports);
1567
1568 /* Remove exports that have not completed recovery.
1569  */
1570 void class_disconnect_stale_exports(struct obd_device *obd,
1571                                     int (*test_export)(struct obd_export *))
1572 {
1573         struct list_head work_list;
1574         struct obd_export *exp, *n;
1575         int evicted = 0;
1576         ENTRY;
1577
1578         INIT_LIST_HEAD(&work_list);
1579         spin_lock(&obd->obd_dev_lock);
1580         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1581                                  exp_obd_chain) {
1582                 /* don't count self-export as client */
1583                 if (obd_uuid_equals(&exp->exp_client_uuid,
1584                                     &exp->exp_obd->obd_uuid))
1585                         continue;
1586
1587                 /* don't evict clients which have no slot in last_rcvd
1588                  * (e.g. lightweight connection) */
1589                 if (exp->exp_target_data.ted_lr_idx == -1)
1590                         continue;
1591
1592                 spin_lock(&exp->exp_lock);
1593                 if (exp->exp_failed || test_export(exp)) {
1594                         spin_unlock(&exp->exp_lock);
1595                         continue;
1596                 }
1597                 exp->exp_failed = 1;
1598                 spin_unlock(&exp->exp_lock);
1599
1600                 list_move(&exp->exp_obd_chain, &work_list);
1601                 evicted++;
1602                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1603                        obd->obd_name, exp->exp_client_uuid.uuid,
1604                        exp->exp_connection == NULL ? "<unknown>" :
1605                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1606                 print_export_data(exp, "EVICTING", 0, D_HA);
1607         }
1608         spin_unlock(&obd->obd_dev_lock);
1609
1610         if (evicted)
1611                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1612                               obd->obd_name, evicted);
1613
1614         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1615                                                  OBD_OPT_ABORT_RECOV);
1616         EXIT;
1617 }
1618 EXPORT_SYMBOL(class_disconnect_stale_exports);
1619
1620 void class_fail_export(struct obd_export *exp)
1621 {
1622         int rc, already_failed;
1623
1624         spin_lock(&exp->exp_lock);
1625         already_failed = exp->exp_failed;
1626         exp->exp_failed = 1;
1627         spin_unlock(&exp->exp_lock);
1628
1629         if (already_failed) {
1630                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1631                        exp, exp->exp_client_uuid.uuid);
1632                 return;
1633         }
1634
1635         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1636                exp, exp->exp_client_uuid.uuid);
1637
1638         if (obd_dump_on_timeout)
1639                 libcfs_debug_dumplog();
1640
1641         /* need for safe call CDEBUG after obd_disconnect */
1642         class_export_get(exp);
1643
1644         /* Most callers into obd_disconnect are removing their own reference
1645          * (request, for example) in addition to the one from the hash table.
1646          * We don't have such a reference here, so make one. */
1647         class_export_get(exp);
1648         rc = obd_disconnect(exp);
1649         if (rc)
1650                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1651         else
1652                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1653                        exp, exp->exp_client_uuid.uuid);
1654         class_export_put(exp);
1655 }
1656 EXPORT_SYMBOL(class_fail_export);
1657
1658 char *obd_export_nid2str(struct obd_export *exp)
1659 {
1660         if (exp->exp_connection != NULL)
1661                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1662
1663         return "(no nid)";
1664 }
1665 EXPORT_SYMBOL(obd_export_nid2str);
1666
1667 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1668 {
1669         struct cfs_hash *nid_hash;
1670         struct obd_export *doomed_exp = NULL;
1671         int exports_evicted = 0;
1672
1673         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1674
1675         spin_lock(&obd->obd_dev_lock);
1676         /* umount has run already, so evict thread should leave
1677          * its task to umount thread now */
1678         if (obd->obd_stopping) {
1679                 spin_unlock(&obd->obd_dev_lock);
1680                 return exports_evicted;
1681         }
1682         nid_hash = obd->obd_nid_hash;
1683         cfs_hash_getref(nid_hash);
1684         spin_unlock(&obd->obd_dev_lock);
1685
1686         do {
1687                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1688                 if (doomed_exp == NULL)
1689                         break;
1690
1691                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1692                          "nid %s found, wanted nid %s, requested nid %s\n",
1693                          obd_export_nid2str(doomed_exp),
1694                          libcfs_nid2str(nid_key), nid);
1695                 LASSERTF(doomed_exp != obd->obd_self_export,
1696                          "self-export is hashed by NID?\n");
1697                 exports_evicted++;
1698                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1699                               "request\n", obd->obd_name,
1700                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1701                               obd_export_nid2str(doomed_exp));
1702                 class_fail_export(doomed_exp);
1703                 class_export_put(doomed_exp);
1704         } while (1);
1705
1706         cfs_hash_putref(nid_hash);
1707
1708         if (!exports_evicted)
1709                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1710                        obd->obd_name, nid);
1711         return exports_evicted;
1712 }
1713 EXPORT_SYMBOL(obd_export_evict_by_nid);
1714
1715 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1716 {
1717         struct cfs_hash *uuid_hash;
1718         struct obd_export *doomed_exp = NULL;
1719         struct obd_uuid doomed_uuid;
1720         int exports_evicted = 0;
1721
1722         spin_lock(&obd->obd_dev_lock);
1723         if (obd->obd_stopping) {
1724                 spin_unlock(&obd->obd_dev_lock);
1725                 return exports_evicted;
1726         }
1727         uuid_hash = obd->obd_uuid_hash;
1728         cfs_hash_getref(uuid_hash);
1729         spin_unlock(&obd->obd_dev_lock);
1730
1731         obd_str2uuid(&doomed_uuid, uuid);
1732         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1733                 CERROR("%s: can't evict myself\n", obd->obd_name);
1734                 cfs_hash_putref(uuid_hash);
1735                 return exports_evicted;
1736         }
1737
1738         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1739
1740         if (doomed_exp == NULL) {
1741                 CERROR("%s: can't disconnect %s: no exports found\n",
1742                        obd->obd_name, uuid);
1743         } else {
1744                 CWARN("%s: evicting %s at adminstrative request\n",
1745                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1746                 class_fail_export(doomed_exp);
1747                 class_export_put(doomed_exp);
1748                 exports_evicted++;
1749         }
1750         cfs_hash_putref(uuid_hash);
1751
1752         return exports_evicted;
1753 }
1754
1755 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1756 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1757 EXPORT_SYMBOL(class_export_dump_hook);
1758 #endif
1759
1760 static void print_export_data(struct obd_export *exp, const char *status,
1761                               int locks, int debug_level)
1762 {
1763         struct ptlrpc_reply_state *rs;
1764         struct ptlrpc_reply_state *first_reply = NULL;
1765         int nreplies = 0;
1766
1767         spin_lock(&exp->exp_lock);
1768         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1769                             rs_exp_list) {
1770                 if (nreplies == 0)
1771                         first_reply = rs;
1772                 nreplies++;
1773         }
1774         spin_unlock(&exp->exp_lock);
1775
1776         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1777                "%p %s %llu stale:%d\n",
1778                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1779                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1780                atomic_read(&exp->exp_rpc_count),
1781                atomic_read(&exp->exp_cb_count),
1782                atomic_read(&exp->exp_locks_count),
1783                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1784                nreplies, first_reply, nreplies > 3 ? "..." : "",
1785                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1786 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1787         if (locks && class_export_dump_hook != NULL)
1788                 class_export_dump_hook(exp);
1789 #endif
1790 }
1791
1792 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1793 {
1794         struct obd_export *exp;
1795
1796         spin_lock(&obd->obd_dev_lock);
1797         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1798                 print_export_data(exp, "ACTIVE", locks, debug_level);
1799         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1800                 print_export_data(exp, "UNLINKED", locks, debug_level);
1801         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1802                 print_export_data(exp, "DELAYED", locks, debug_level);
1803         spin_unlock(&obd->obd_dev_lock);
1804         spin_lock(&obd_zombie_impexp_lock);
1805         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1806                 print_export_data(exp, "ZOMBIE", locks, debug_level);
1807         spin_unlock(&obd_zombie_impexp_lock);
1808 }
1809
1810 void obd_exports_barrier(struct obd_device *obd)
1811 {
1812         int waited = 2;
1813         LASSERT(list_empty(&obd->obd_exports));
1814         spin_lock(&obd->obd_dev_lock);
1815         while (!list_empty(&obd->obd_unlinked_exports)) {
1816                 spin_unlock(&obd->obd_dev_lock);
1817                 set_current_state(TASK_UNINTERRUPTIBLE);
1818                 schedule_timeout(cfs_time_seconds(waited));
1819                 if (waited > 5 && is_power_of_2(waited)) {
1820                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1821                                       "more than %d seconds. "
1822                                       "The obd refcount = %d. Is it stuck?\n",
1823                                       obd->obd_name, waited,
1824                                       atomic_read(&obd->obd_refcount));
1825                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1826                 }
1827                 waited *= 2;
1828                 spin_lock(&obd->obd_dev_lock);
1829         }
1830         spin_unlock(&obd->obd_dev_lock);
1831 }
1832 EXPORT_SYMBOL(obd_exports_barrier);
1833
1834 /* Total amount of zombies to be destroyed */
1835 static int zombies_count = 0;
1836
1837 /**
1838  * kill zombie imports and exports
1839  */
1840 void obd_zombie_impexp_cull(void)
1841 {
1842         struct obd_import *import;
1843         struct obd_export *export;
1844         ENTRY;
1845
1846         do {
1847                 spin_lock(&obd_zombie_impexp_lock);
1848
1849                 import = NULL;
1850                 if (!list_empty(&obd_zombie_imports)) {
1851                         import = list_entry(obd_zombie_imports.next,
1852                                             struct obd_import,
1853                                             imp_zombie_chain);
1854                         list_del_init(&import->imp_zombie_chain);
1855                 }
1856
1857                 export = NULL;
1858                 if (!list_empty(&obd_zombie_exports)) {
1859                         export = list_entry(obd_zombie_exports.next,
1860                                             struct obd_export,
1861                                             exp_obd_chain);
1862                         list_del_init(&export->exp_obd_chain);
1863                 }
1864
1865                 spin_unlock(&obd_zombie_impexp_lock);
1866
1867                 if (import != NULL) {
1868                         class_import_destroy(import);
1869                         spin_lock(&obd_zombie_impexp_lock);
1870                         zombies_count--;
1871                         spin_unlock(&obd_zombie_impexp_lock);
1872                 }
1873
1874                 if (export != NULL) {
1875                         class_export_destroy(export);
1876                         spin_lock(&obd_zombie_impexp_lock);
1877                         zombies_count--;
1878                         spin_unlock(&obd_zombie_impexp_lock);
1879                 }
1880
1881                 cond_resched();
1882         } while (import != NULL || export != NULL);
1883         EXIT;
1884 }
1885
1886 static DECLARE_COMPLETION(obd_zombie_start);
1887 static DECLARE_COMPLETION(obd_zombie_stop);
1888 static unsigned long obd_zombie_flags;
1889 static DECLARE_WAIT_QUEUE_HEAD(obd_zombie_waitq);
1890 static pid_t obd_zombie_pid;
1891
1892 enum {
1893         OBD_ZOMBIE_STOP         = 0x0001,
1894 };
1895
1896 /**
1897  * check for work for kill zombie import/export thread.
1898  */
1899 static int obd_zombie_impexp_check(void *arg)
1900 {
1901         int rc;
1902
1903         spin_lock(&obd_zombie_impexp_lock);
1904         rc = (zombies_count == 0) &&
1905              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1906         spin_unlock(&obd_zombie_impexp_lock);
1907
1908         RETURN(rc);
1909 }
1910
1911 /**
1912  * Add export to the obd_zombe thread and notify it.
1913  */
1914 static void obd_zombie_export_add(struct obd_export *exp) {
1915         atomic_dec(&obd_stale_export_num);
1916         spin_lock(&exp->exp_obd->obd_dev_lock);
1917         LASSERT(!list_empty(&exp->exp_obd_chain));
1918         list_del_init(&exp->exp_obd_chain);
1919         spin_unlock(&exp->exp_obd->obd_dev_lock);
1920         spin_lock(&obd_zombie_impexp_lock);
1921         zombies_count++;
1922         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1923         spin_unlock(&obd_zombie_impexp_lock);
1924
1925         obd_zombie_impexp_notify();
1926 }
1927
1928 /**
1929  * Add import to the obd_zombe thread and notify it.
1930  */
1931 static void obd_zombie_import_add(struct obd_import *imp) {
1932         LASSERT(imp->imp_sec == NULL);
1933         spin_lock(&obd_zombie_impexp_lock);
1934         LASSERT(list_empty(&imp->imp_zombie_chain));
1935         zombies_count++;
1936         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1937         spin_unlock(&obd_zombie_impexp_lock);
1938
1939         obd_zombie_impexp_notify();
1940 }
1941
1942 /**
1943  * notify import/export destroy thread about new zombie.
1944  */
1945 static void obd_zombie_impexp_notify(void)
1946 {
1947         /*
1948          * Make sure obd_zomebie_impexp_thread get this notification.
1949          * It is possible this signal only get by obd_zombie_barrier, and
1950          * barrier gulps this notification and sleeps away and hangs ensues
1951          */
1952         wake_up_all(&obd_zombie_waitq);
1953 }
1954
1955 /**
1956  * check whether obd_zombie is idle
1957  */
1958 static int obd_zombie_is_idle(void)
1959 {
1960         int rc;
1961
1962         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1963         spin_lock(&obd_zombie_impexp_lock);
1964         rc = (zombies_count == 0);
1965         spin_unlock(&obd_zombie_impexp_lock);
1966         return rc;
1967 }
1968
1969 /**
1970  * wait when obd_zombie import/export queues become empty
1971  */
1972 void obd_zombie_barrier(void)
1973 {
1974         struct l_wait_info lwi = { 0 };
1975
1976         if (obd_zombie_pid == current_pid())
1977                 /* don't wait for myself */
1978                 return;
1979         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1980 }
1981 EXPORT_SYMBOL(obd_zombie_barrier);
1982
1983
1984 struct obd_export *obd_stale_export_get(void)
1985 {
1986         struct obd_export *exp = NULL;
1987         ENTRY;
1988
1989         spin_lock(&obd_stale_export_lock);
1990         if (!list_empty(&obd_stale_exports)) {
1991                 exp = list_entry(obd_stale_exports.next,
1992                                  struct obd_export, exp_stale_list);
1993                 list_del_init(&exp->exp_stale_list);
1994         }
1995         spin_unlock(&obd_stale_export_lock);
1996
1997         if (exp) {
1998                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1999                        atomic_read(&obd_stale_export_num));
2000         }
2001         RETURN(exp);
2002 }
2003 EXPORT_SYMBOL(obd_stale_export_get);
2004
2005 void obd_stale_export_put(struct obd_export *exp)
2006 {
2007         ENTRY;
2008
2009         LASSERT(list_empty(&exp->exp_stale_list));
2010         if (exp->exp_lock_hash &&
2011             atomic_read(&exp->exp_lock_hash->hs_count)) {
2012                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
2013                        atomic_read(&obd_stale_export_num));
2014
2015                 spin_lock_bh(&exp->exp_bl_list_lock);
2016                 spin_lock(&obd_stale_export_lock);
2017                 /* Add to the tail if there is no blocked locks,
2018                  * to the head otherwise. */
2019                 if (list_empty(&exp->exp_bl_list))
2020                         list_add_tail(&exp->exp_stale_list,
2021                                       &obd_stale_exports);
2022                 else
2023                         list_add(&exp->exp_stale_list,
2024                                  &obd_stale_exports);
2025
2026                 spin_unlock(&obd_stale_export_lock);
2027                 spin_unlock_bh(&exp->exp_bl_list_lock);
2028         } else {
2029                 class_export_put(exp);
2030         }
2031         EXIT;
2032 }
2033 EXPORT_SYMBOL(obd_stale_export_put);
2034
2035 /**
2036  * Adjust the position of the export in the stale list,
2037  * i.e. move to the head of the list if is needed.
2038  **/
2039 void obd_stale_export_adjust(struct obd_export *exp)
2040 {
2041         LASSERT(exp != NULL);
2042         spin_lock_bh(&exp->exp_bl_list_lock);
2043         spin_lock(&obd_stale_export_lock);
2044
2045         if (!list_empty(&exp->exp_stale_list) &&
2046             !list_empty(&exp->exp_bl_list))
2047                 list_move(&exp->exp_stale_list, &obd_stale_exports);
2048
2049         spin_unlock(&obd_stale_export_lock);
2050         spin_unlock_bh(&exp->exp_bl_list_lock);
2051 }
2052 EXPORT_SYMBOL(obd_stale_export_adjust);
2053
2054 /**
2055  * destroy zombie export/import thread.
2056  */
2057 static int obd_zombie_impexp_thread(void *unused)
2058 {
2059         unshare_fs_struct();
2060         complete(&obd_zombie_start);
2061
2062         obd_zombie_pid = current_pid();
2063
2064         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
2065                 struct l_wait_info lwi = { 0 };
2066
2067                 l_wait_event(obd_zombie_waitq,
2068                              !obd_zombie_impexp_check(NULL), &lwi);
2069                 obd_zombie_impexp_cull();
2070
2071                 /*
2072                  * Notify obd_zombie_barrier callers that queues
2073                  * may be empty.
2074                  */
2075                 wake_up(&obd_zombie_waitq);
2076         }
2077
2078         complete(&obd_zombie_stop);
2079
2080         RETURN(0);
2081 }
2082
2083
2084 /**
2085  * start destroy zombie import/export thread
2086  */
2087 int obd_zombie_impexp_init(void)
2088 {
2089         struct task_struct *task;
2090
2091         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
2092         if (IS_ERR(task))
2093                 RETURN(PTR_ERR(task));
2094
2095         wait_for_completion(&obd_zombie_start);
2096         RETURN(0);
2097 }
2098 /**
2099  * stop destroy zombie import/export thread
2100  */
2101 void obd_zombie_impexp_stop(void)
2102 {
2103         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
2104         obd_zombie_impexp_notify();
2105         wait_for_completion(&obd_zombie_stop);
2106         LASSERT(list_empty(&obd_stale_exports));
2107 }
2108
2109 /***** Kernel-userspace comm helpers *******/
2110
2111 /* Get length of entire message, including header */
2112 int kuc_len(int payload_len)
2113 {
2114         return sizeof(struct kuc_hdr) + payload_len;
2115 }
2116 EXPORT_SYMBOL(kuc_len);
2117
2118 /* Get a pointer to kuc header, given a ptr to the payload
2119  * @param p Pointer to payload area
2120  * @returns Pointer to kuc header
2121  */
2122 struct kuc_hdr * kuc_ptr(void *p)
2123 {
2124         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2125         LASSERT(lh->kuc_magic == KUC_MAGIC);
2126         return lh;
2127 }
2128 EXPORT_SYMBOL(kuc_ptr);
2129
2130 /* Alloc space for a message, and fill in header
2131  * @return Pointer to payload area
2132  */
2133 void *kuc_alloc(int payload_len, int transport, int type)
2134 {
2135         struct kuc_hdr *lh;
2136         int len = kuc_len(payload_len);
2137
2138         OBD_ALLOC(lh, len);
2139         if (lh == NULL)
2140                 return ERR_PTR(-ENOMEM);
2141
2142         lh->kuc_magic = KUC_MAGIC;
2143         lh->kuc_transport = transport;
2144         lh->kuc_msgtype = type;
2145         lh->kuc_msglen = len;
2146
2147         return (void *)(lh + 1);
2148 }
2149 EXPORT_SYMBOL(kuc_alloc);
2150
2151 /* Takes pointer to payload area */
2152 void kuc_free(void *p, int payload_len)
2153 {
2154         struct kuc_hdr *lh = kuc_ptr(p);
2155         OBD_FREE(lh, kuc_len(payload_len));
2156 }
2157 EXPORT_SYMBOL(kuc_free);
2158
2159 struct obd_request_slot_waiter {
2160         struct list_head        orsw_entry;
2161         wait_queue_head_t       orsw_waitq;
2162         bool                    orsw_signaled;
2163 };
2164
2165 static bool obd_request_slot_avail(struct client_obd *cli,
2166                                    struct obd_request_slot_waiter *orsw)
2167 {
2168         bool avail;
2169
2170         spin_lock(&cli->cl_loi_list_lock);
2171         avail = !!list_empty(&orsw->orsw_entry);
2172         spin_unlock(&cli->cl_loi_list_lock);
2173
2174         return avail;
2175 };
2176
2177 /*
2178  * For network flow control, the RPC sponsor needs to acquire a credit
2179  * before sending the RPC. The credits count for a connection is defined
2180  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2181  * the subsequent RPC sponsors need to wait until others released their
2182  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2183  */
2184 int obd_get_request_slot(struct client_obd *cli)
2185 {
2186         struct obd_request_slot_waiter   orsw;
2187         struct l_wait_info               lwi;
2188         int                              rc;
2189
2190         spin_lock(&cli->cl_loi_list_lock);
2191         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
2192                 cli->cl_r_in_flight++;
2193                 spin_unlock(&cli->cl_loi_list_lock);
2194                 return 0;
2195         }
2196
2197         init_waitqueue_head(&orsw.orsw_waitq);
2198         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
2199         orsw.orsw_signaled = false;
2200         spin_unlock(&cli->cl_loi_list_lock);
2201
2202         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2203         rc = l_wait_event(orsw.orsw_waitq,
2204                           obd_request_slot_avail(cli, &orsw) ||
2205                           orsw.orsw_signaled,
2206                           &lwi);
2207
2208         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2209          * freed but other (such as obd_put_request_slot) is using it. */
2210         spin_lock(&cli->cl_loi_list_lock);
2211         if (rc != 0) {
2212                 if (!orsw.orsw_signaled) {
2213                         if (list_empty(&orsw.orsw_entry))
2214                                 cli->cl_r_in_flight--;
2215                         else
2216                                 list_del(&orsw.orsw_entry);
2217                 }
2218         }
2219
2220         if (orsw.orsw_signaled) {
2221                 LASSERT(list_empty(&orsw.orsw_entry));
2222
2223                 rc = -EINTR;
2224         }
2225         spin_unlock(&cli->cl_loi_list_lock);
2226
2227         return rc;
2228 }
2229 EXPORT_SYMBOL(obd_get_request_slot);
2230
2231 void obd_put_request_slot(struct client_obd *cli)
2232 {
2233         struct obd_request_slot_waiter *orsw;
2234
2235         spin_lock(&cli->cl_loi_list_lock);
2236         cli->cl_r_in_flight--;
2237
2238         /* If there is free slot, wakeup the first waiter. */
2239         if (!list_empty(&cli->cl_loi_read_list) &&
2240             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2241                 orsw = list_entry(cli->cl_loi_read_list.next,
2242                                   struct obd_request_slot_waiter, orsw_entry);
2243                 list_del_init(&orsw->orsw_entry);
2244                 cli->cl_r_in_flight++;
2245                 wake_up(&orsw->orsw_waitq);
2246         }
2247         spin_unlock(&cli->cl_loi_list_lock);
2248 }
2249 EXPORT_SYMBOL(obd_put_request_slot);
2250
2251 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2252 {
2253         return cli->cl_max_rpcs_in_flight;
2254 }
2255 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2256
2257 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2258 {
2259         struct obd_request_slot_waiter *orsw;
2260         __u32                           old;
2261         int                             diff;
2262         int                             i;
2263         char                            *typ_name;
2264         int                             rc;
2265
2266         if (max > OBD_MAX_RIF_MAX || max < 1)
2267                 return -ERANGE;
2268
2269         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2270         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2271                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2272                  * strictly lower that max_rpcs_in_flight */
2273                 if (max < 2) {
2274                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2275                                "because it must be higher than "
2276                                "max_mod_rpcs_in_flight value",
2277                                cli->cl_import->imp_obd->obd_name);
2278                         return -ERANGE;
2279                 }
2280                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2281                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2282                         if (rc != 0)
2283                                 return rc;
2284                 }
2285         }
2286
2287         spin_lock(&cli->cl_loi_list_lock);
2288         old = cli->cl_max_rpcs_in_flight;
2289         cli->cl_max_rpcs_in_flight = max;
2290         diff = max - old;
2291
2292         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2293         for (i = 0; i < diff; i++) {
2294                 if (list_empty(&cli->cl_loi_read_list))
2295                         break;
2296
2297                 orsw = list_entry(cli->cl_loi_read_list.next,
2298                                   struct obd_request_slot_waiter, orsw_entry);
2299                 list_del_init(&orsw->orsw_entry);
2300                 cli->cl_r_in_flight++;
2301                 wake_up(&orsw->orsw_waitq);
2302         }
2303         spin_unlock(&cli->cl_loi_list_lock);
2304
2305         return 0;
2306 }
2307 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2308
2309 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2310 {
2311         return cli->cl_max_mod_rpcs_in_flight;
2312 }
2313 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2314
2315 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2316 {
2317         struct obd_connect_data *ocd;
2318         __u16 maxmodrpcs;
2319         __u16 prev;
2320
2321         if (max > OBD_MAX_RIF_MAX || max < 1)
2322                 return -ERANGE;
2323
2324         /* cannot exceed or equal max_rpcs_in_flight */
2325         if (max >= cli->cl_max_rpcs_in_flight) {
2326                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2327                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2328                        cli->cl_import->imp_obd->obd_name,
2329                        max, cli->cl_max_rpcs_in_flight);
2330                 return -ERANGE;
2331         }
2332
2333         /* cannot exceed max modify RPCs in flight supported by the server */
2334         ocd = &cli->cl_import->imp_connect_data;
2335         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2336                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2337         else
2338                 maxmodrpcs = 1;
2339         if (max > maxmodrpcs) {
2340                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2341                        "higher than max_mod_rpcs_per_client value (%hu) "
2342                        "returned by the server at connection\n",
2343                        cli->cl_import->imp_obd->obd_name,
2344                        max, maxmodrpcs);
2345                 return -ERANGE;
2346         }
2347
2348         spin_lock(&cli->cl_mod_rpcs_lock);
2349
2350         prev = cli->cl_max_mod_rpcs_in_flight;
2351         cli->cl_max_mod_rpcs_in_flight = max;
2352
2353         /* wakeup waiters if limit has been increased */
2354         if (cli->cl_max_mod_rpcs_in_flight > prev)
2355                 wake_up(&cli->cl_mod_rpcs_waitq);
2356
2357         spin_unlock(&cli->cl_mod_rpcs_lock);
2358
2359         return 0;
2360 }
2361 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2362
2363
2364 #define pct(a, b) (b ? a * 100 / b : 0)
2365 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2366                                struct seq_file *seq)
2367 {
2368         unsigned long mod_tot = 0, mod_cum;
2369         struct timespec64 now;
2370         int i;
2371
2372         ktime_get_real_ts64(&now);
2373
2374         spin_lock(&cli->cl_mod_rpcs_lock);
2375
2376         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2377                    (s64)now.tv_sec, now.tv_nsec);
2378         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2379                    cli->cl_mod_rpcs_in_flight);
2380
2381         seq_printf(seq, "\n\t\t\tmodify\n");
2382         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2383
2384         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2385
2386         mod_cum = 0;
2387         for (i = 0; i < OBD_HIST_MAX; i++) {
2388                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2389                 mod_cum += mod;
2390                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2391                            i, mod, pct(mod, mod_tot),
2392                            pct(mod_cum, mod_tot));
2393                 if (mod_cum == mod_tot)
2394                         break;
2395         }
2396
2397         spin_unlock(&cli->cl_mod_rpcs_lock);
2398
2399         return 0;
2400 }
2401 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2402 #undef pct
2403
2404
2405 /* The number of modify RPCs sent in parallel is limited
2406  * because the server has a finite number of slots per client to
2407  * store request result and ensure reply reconstruction when needed.
2408  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2409  * that takes into account server limit and cl_max_rpcs_in_flight
2410  * value.
2411  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2412  * one close request is allowed above the maximum.
2413  */
2414 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2415                                                  bool close_req)
2416 {
2417         bool avail;
2418
2419         /* A slot is available if
2420          * - number of modify RPCs in flight is less than the max
2421          * - it's a close RPC and no other close request is in flight
2422          */
2423         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2424                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2425
2426         return avail;
2427 }
2428
2429 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2430                                          bool close_req)
2431 {
2432         bool avail;
2433
2434         spin_lock(&cli->cl_mod_rpcs_lock);
2435         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2436         spin_unlock(&cli->cl_mod_rpcs_lock);
2437         return avail;
2438 }
2439
2440 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2441 {
2442         if (it != NULL &&
2443             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2444              it->it_op == IT_READDIR ||
2445              (it->it_op == IT_LAYOUT && !(it->it_flags & FMODE_WRITE))))
2446                         return true;
2447         return false;
2448 }
2449
2450 /* Get a modify RPC slot from the obd client @cli according
2451  * to the kind of operation @opc that is going to be sent
2452  * and the intent @it of the operation if it applies.
2453  * If the maximum number of modify RPCs in flight is reached
2454  * the thread is put to sleep.
2455  * Returns the tag to be set in the request message. Tag 0
2456  * is reserved for non-modifying requests.
2457  */
2458 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2459                            struct lookup_intent *it)
2460 {
2461         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2462         bool                    close_req = false;
2463         __u16                   i, max;
2464
2465         /* read-only metadata RPCs don't consume a slot on MDT
2466          * for reply reconstruction
2467          */
2468         if (obd_skip_mod_rpc_slot(it))
2469                 return 0;
2470
2471         if (opc == MDS_CLOSE)
2472                 close_req = true;
2473
2474         do {
2475                 spin_lock(&cli->cl_mod_rpcs_lock);
2476                 max = cli->cl_max_mod_rpcs_in_flight;
2477                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2478                         /* there is a slot available */
2479                         cli->cl_mod_rpcs_in_flight++;
2480                         if (close_req)
2481                                 cli->cl_close_rpcs_in_flight++;
2482                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2483                                          cli->cl_mod_rpcs_in_flight);
2484                         /* find a free tag */
2485                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2486                                                 max + 1);
2487                         LASSERT(i < OBD_MAX_RIF_MAX);
2488                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2489                         spin_unlock(&cli->cl_mod_rpcs_lock);
2490                         /* tag 0 is reserved for non-modify RPCs */
2491                         return i + 1;
2492                 }
2493                 spin_unlock(&cli->cl_mod_rpcs_lock);
2494
2495                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2496                        "opc %u, max %hu\n",
2497                        cli->cl_import->imp_obd->obd_name, opc, max);
2498
2499                 l_wait_event(cli->cl_mod_rpcs_waitq,
2500                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2501         } while (true);
2502 }
2503 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2504
2505 /* Put a modify RPC slot from the obd client @cli according
2506  * to the kind of operation @opc that has been sent and the
2507  * intent @it of the operation if it applies.
2508  */
2509 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2510                           struct lookup_intent *it, __u16 tag)
2511 {
2512         bool                    close_req = false;
2513
2514         if (obd_skip_mod_rpc_slot(it))
2515                 return;
2516
2517         if (opc == MDS_CLOSE)
2518                 close_req = true;
2519
2520         spin_lock(&cli->cl_mod_rpcs_lock);
2521         cli->cl_mod_rpcs_in_flight--;
2522         if (close_req)
2523                 cli->cl_close_rpcs_in_flight--;
2524         /* release the tag in the bitmap */
2525         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2526         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2527         spin_unlock(&cli->cl_mod_rpcs_lock);
2528         wake_up(&cli->cl_mod_rpcs_waitq);
2529 }
2530 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2531