Whamcloud - gitweb
cd006e759e361db6359229dd932a9d8dc0bb4757
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 static DEFINE_SPINLOCK(obd_types_lock);
49 static LIST_HEAD(obd_types);
50 DEFINE_RWLOCK(obd_dev_lock);
51 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52
53 static struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 static struct kmem_cache *import_cachep;
57
58 static LIST_HEAD(obd_zombie_imports);
59 static LIST_HEAD(obd_zombie_exports);
60 static DEFINE_SPINLOCK(obd_zombie_impexp_lock);
61
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp,
66                               const char *status, int locks, int debug_level);
67
68 static LIST_HEAD(obd_stale_exports);
69 static DEFINE_SPINLOCK(obd_stale_export_lock);
70 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
71
72 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
73 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
74
75 /*
76  * support functions: we could use inter-module communication, but this
77  * is more portable to other OS's
78  */
79 static struct obd_device *obd_device_alloc(void)
80 {
81         struct obd_device *obd;
82
83         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
84         if (obd != NULL) {
85                 obd->obd_magic = OBD_DEVICE_MAGIC;
86         }
87         return obd;
88 }
89
90 static void obd_device_free(struct obd_device *obd)
91 {
92         LASSERT(obd != NULL);
93         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
94                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
95         if (obd->obd_namespace != NULL) {
96                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
97                        obd, obd->obd_namespace, obd->obd_force);
98                 LBUG();
99         }
100         lu_ref_fini(&obd->obd_reference);
101         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
102 }
103
104 struct obd_type *class_search_type(const char *name)
105 {
106         struct list_head *tmp;
107         struct obd_type *type;
108
109         spin_lock(&obd_types_lock);
110         list_for_each(tmp, &obd_types) {
111                 type = list_entry(tmp, struct obd_type, typ_chain);
112                 if (strcmp(type->typ_name, name) == 0) {
113                         spin_unlock(&obd_types_lock);
114                         return type;
115                 }
116         }
117         spin_unlock(&obd_types_lock);
118         return NULL;
119 }
120 EXPORT_SYMBOL(class_search_type);
121
122 struct obd_type *class_get_type(const char *name)
123 {
124         struct obd_type *type = class_search_type(name);
125
126 #ifdef HAVE_MODULE_LOADING_SUPPORT
127         if (!type) {
128                 const char *modname = name;
129
130                 if (strcmp(modname, "obdfilter") == 0)
131                         modname = "ofd";
132
133                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
134                         modname = LUSTRE_OSP_NAME;
135
136                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
137                         modname = LUSTRE_MDT_NAME;
138
139                 if (!request_module("%s", modname)) {
140                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
141                         type = class_search_type(name);
142                 } else {
143                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144                                            modname);
145                 }
146         }
147 #endif
148         if (type) {
149                 spin_lock(&type->obd_type_lock);
150                 type->typ_refcnt++;
151                 try_module_get(type->typ_dt_ops->o_owner);
152                 spin_unlock(&type->obd_type_lock);
153         }
154         return type;
155 }
156
157 void class_put_type(struct obd_type *type)
158 {
159         LASSERT(type);
160         spin_lock(&type->obd_type_lock);
161         type->typ_refcnt--;
162         module_put(type->typ_dt_ops->o_owner);
163         spin_unlock(&type->obd_type_lock);
164 }
165
166 static void class_sysfs_release(struct kobject *kobj)
167 {
168         struct obd_type *type = container_of(kobj, struct obd_type,
169                                              typ_kobj);
170
171         complete(&type->typ_kobj_unregister);
172 }
173
174 static struct kobj_type class_ktype = {
175         .sysfs_ops      = &lustre_sysfs_ops,
176         .release        = class_sysfs_release,
177 };
178
179 #define CLASS_MAX_NAME 1024
180
181 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
182                         bool enable_proc, struct lprocfs_vars *vars,
183                         const char *name, struct lu_device_type *ldt)
184 {
185         struct obd_type *type;
186         int rc = 0;
187         ENTRY;
188
189         /* sanity check */
190         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
191
192         if (class_search_type(name)) {
193                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
194                 RETURN(-EEXIST);
195         }
196
197         rc = -ENOMEM;
198         OBD_ALLOC(type, sizeof(*type));
199         if (type == NULL)
200                 RETURN(rc);
201
202         OBD_ALLOC_PTR(type->typ_dt_ops);
203         OBD_ALLOC_PTR(type->typ_md_ops);
204         OBD_ALLOC(type->typ_name, strlen(name) + 1);
205
206         if (type->typ_dt_ops == NULL ||
207             type->typ_md_ops == NULL ||
208             type->typ_name == NULL)
209                 GOTO (failed, rc);
210
211         *(type->typ_dt_ops) = *dt_ops;
212         /* md_ops is optional */
213         if (md_ops)
214                 *(type->typ_md_ops) = *md_ops;
215         strcpy(type->typ_name, name);
216         spin_lock_init(&type->obd_type_lock);
217
218 #ifdef CONFIG_PROC_FS
219         if (enable_proc) {
220                 type->typ_procroot = lprocfs_register(type->typ_name,
221                                                       proc_lustre_root,
222                                                       vars, type);
223                 if (IS_ERR(type->typ_procroot)) {
224                         rc = PTR_ERR(type->typ_procroot);
225                         type->typ_procroot = NULL;
226                         GOTO(failed, rc);
227                 }
228         }
229 #endif
230         type->typ_kobj.kset = lustre_kset;
231         init_completion(&type->typ_kobj_unregister);
232         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
233                                   &lustre_kset->kobj, "%s", type->typ_name);
234         if (rc)
235                 GOTO(failed, rc);
236
237         if (ldt) {
238                 type->typ_lu = ldt;
239                 rc = lu_device_type_init(ldt);
240                 if (rc) {
241                         kobject_put(&type->typ_kobj);
242                         GOTO(failed, rc);
243                 }
244         }
245
246         spin_lock(&obd_types_lock);
247         list_add(&type->typ_chain, &obd_types);
248         spin_unlock(&obd_types_lock);
249
250         RETURN(0);
251
252 failed:
253         if (type->typ_name != NULL) {
254 #ifdef CONFIG_PROC_FS
255                 if (type->typ_procroot != NULL)
256                         remove_proc_subtree(type->typ_name, proc_lustre_root);
257 #endif
258                 OBD_FREE(type->typ_name, strlen(name) + 1);
259         }
260         if (type->typ_md_ops != NULL)
261                 OBD_FREE_PTR(type->typ_md_ops);
262         if (type->typ_dt_ops != NULL)
263                 OBD_FREE_PTR(type->typ_dt_ops);
264         OBD_FREE(type, sizeof(*type));
265         RETURN(rc);
266 }
267 EXPORT_SYMBOL(class_register_type);
268
269 int class_unregister_type(const char *name)
270 {
271         struct obd_type *type = class_search_type(name);
272         ENTRY;
273
274         if (!type) {
275                 CERROR("unknown obd type\n");
276                 RETURN(-EINVAL);
277         }
278
279         if (type->typ_refcnt) {
280                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
281                 /* This is a bad situation, let's make the best of it */
282                 /* Remove ops, but leave the name for debugging */
283                 OBD_FREE_PTR(type->typ_dt_ops);
284                 OBD_FREE_PTR(type->typ_md_ops);
285                 RETURN(-EBUSY);
286         }
287
288         kobject_put(&type->typ_kobj);
289         wait_for_completion(&type->typ_kobj_unregister);
290
291         /* we do not use type->typ_procroot as for compatibility purposes
292          * other modules can share names (i.e. lod can use lov entry). so
293          * we can't reference pointer as it can get invalided when another
294          * module removes the entry */
295 #ifdef CONFIG_PROC_FS
296         if (type->typ_procroot != NULL)
297                 remove_proc_subtree(type->typ_name, proc_lustre_root);
298         if (type->typ_procsym != NULL)
299                 lprocfs_remove(&type->typ_procsym);
300 #endif
301         if (type->typ_lu)
302                 lu_device_type_fini(type->typ_lu);
303
304         spin_lock(&obd_types_lock);
305         list_del(&type->typ_chain);
306         spin_unlock(&obd_types_lock);
307         OBD_FREE(type->typ_name, strlen(name) + 1);
308         if (type->typ_dt_ops != NULL)
309                 OBD_FREE_PTR(type->typ_dt_ops);
310         if (type->typ_md_ops != NULL)
311                 OBD_FREE_PTR(type->typ_md_ops);
312         OBD_FREE(type, sizeof(*type));
313         RETURN(0);
314 } /* class_unregister_type */
315 EXPORT_SYMBOL(class_unregister_type);
316
317 /**
318  * Create a new obd device.
319  *
320  * Allocate the new obd_device and initialize it.
321  *
322  * \param[in] type_name obd device type string.
323  * \param[in] name      obd device name.
324  * \param[in] uuid      obd device UUID
325  *
326  * \retval newdev         pointer to created obd_device
327  * \retval ERR_PTR(errno) on error
328  */
329 struct obd_device *class_newdev(const char *type_name, const char *name,
330                                 const char *uuid)
331 {
332         struct obd_device *newdev;
333         struct obd_type *type = NULL;
334         ENTRY;
335
336         if (strlen(name) >= MAX_OBD_NAME) {
337                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
338                 RETURN(ERR_PTR(-EINVAL));
339         }
340
341         type = class_get_type(type_name);
342         if (type == NULL){
343                 CERROR("OBD: unknown type: %s\n", type_name);
344                 RETURN(ERR_PTR(-ENODEV));
345         }
346
347         newdev = obd_device_alloc();
348         if (newdev == NULL) {
349                 class_put_type(type);
350                 RETURN(ERR_PTR(-ENOMEM));
351         }
352         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
353         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
354         newdev->obd_type = type;
355         newdev->obd_minor = -1;
356
357         rwlock_init(&newdev->obd_pool_lock);
358         newdev->obd_pool_limit = 0;
359         newdev->obd_pool_slv = 0;
360
361         INIT_LIST_HEAD(&newdev->obd_exports);
362         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
363         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
364         INIT_LIST_HEAD(&newdev->obd_exports_timed);
365         INIT_LIST_HEAD(&newdev->obd_nid_stats);
366         spin_lock_init(&newdev->obd_nid_lock);
367         spin_lock_init(&newdev->obd_dev_lock);
368         mutex_init(&newdev->obd_dev_mutex);
369         spin_lock_init(&newdev->obd_osfs_lock);
370         /* newdev->obd_osfs_age must be set to a value in the distant
371          * past to guarantee a fresh statfs is fetched on mount. */
372         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
373
374         /* XXX belongs in setup not attach  */
375         init_rwsem(&newdev->obd_observer_link_sem);
376         /* recovery data */
377         init_timer(&newdev->obd_recovery_timer);
378         spin_lock_init(&newdev->obd_recovery_task_lock);
379         init_waitqueue_head(&newdev->obd_next_transno_waitq);
380         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
381         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
382         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
383         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
384         INIT_LIST_HEAD(&newdev->obd_evict_list);
385         INIT_LIST_HEAD(&newdev->obd_lwp_list);
386
387         llog_group_init(&newdev->obd_olg);
388         /* Detach drops this */
389         atomic_set(&newdev->obd_refcount, 1);
390         lu_ref_init(&newdev->obd_reference);
391         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
392
393         newdev->obd_conn_inprogress = 0;
394
395         strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
396
397         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
398                newdev->obd_name, newdev);
399
400         return newdev;
401 }
402
403 /**
404  * Free obd device.
405  *
406  * \param[in] obd obd_device to be freed
407  *
408  * \retval none
409  */
410 void class_free_dev(struct obd_device *obd)
411 {
412         struct obd_type *obd_type = obd->obd_type;
413
414         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
415                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
416         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
417                  "obd %p != obd_devs[%d] %p\n",
418                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
419         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
420                  "obd_refcount should be 0, not %d\n",
421                  atomic_read(&obd->obd_refcount));
422         LASSERT(obd_type != NULL);
423
424         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
425                obd->obd_name, obd->obd_type->typ_name);
426
427         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
428                          obd->obd_name, obd->obd_uuid.uuid);
429         if (obd->obd_stopping) {
430                 int err;
431
432                 /* If we're not stopping, we were never set up */
433                 err = obd_cleanup(obd);
434                 if (err)
435                         CERROR("Cleanup %s returned %d\n",
436                                 obd->obd_name, err);
437         }
438
439         obd_device_free(obd);
440
441         class_put_type(obd_type);
442 }
443
444 /**
445  * Unregister obd device.
446  *
447  * Free slot in obd_dev[] used by \a obd.
448  *
449  * \param[in] new_obd obd_device to be unregistered
450  *
451  * \retval none
452  */
453 void class_unregister_device(struct obd_device *obd)
454 {
455         write_lock(&obd_dev_lock);
456         if (obd->obd_minor >= 0) {
457                 LASSERT(obd_devs[obd->obd_minor] == obd);
458                 obd_devs[obd->obd_minor] = NULL;
459                 obd->obd_minor = -1;
460         }
461         write_unlock(&obd_dev_lock);
462 }
463
464 /**
465  * Register obd device.
466  *
467  * Find free slot in obd_devs[], fills it with \a new_obd.
468  *
469  * \param[in] new_obd obd_device to be registered
470  *
471  * \retval 0          success
472  * \retval -EEXIST    device with this name is registered
473  * \retval -EOVERFLOW obd_devs[] is full
474  */
475 int class_register_device(struct obd_device *new_obd)
476 {
477         int ret = 0;
478         int i;
479         int new_obd_minor = 0;
480         bool minor_assign = false;
481         bool retried = false;
482
483 again:
484         write_lock(&obd_dev_lock);
485         for (i = 0; i < class_devno_max(); i++) {
486                 struct obd_device *obd = class_num2obd(i);
487
488                 if (obd != NULL &&
489                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
490
491                         if (!retried) {
492                                 write_unlock(&obd_dev_lock);
493
494                                 /* the obd_device could be waited to be
495                                  * destroyed by the "obd_zombie_impexp_thread".
496                                  */
497                                 obd_zombie_barrier();
498                                 retried = true;
499                                 goto again;
500                         }
501
502                         CERROR("%s: already exists, won't add\n",
503                                obd->obd_name);
504                         /* in case we found a free slot before duplicate */
505                         minor_assign = false;
506                         ret = -EEXIST;
507                         break;
508                 }
509                 if (!minor_assign && obd == NULL) {
510                         new_obd_minor = i;
511                         minor_assign = true;
512                 }
513         }
514
515         if (minor_assign) {
516                 new_obd->obd_minor = new_obd_minor;
517                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
518                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
519                 obd_devs[new_obd_minor] = new_obd;
520         } else {
521                 if (ret == 0) {
522                         ret = -EOVERFLOW;
523                         CERROR("%s: all %u/%u devices used, increase "
524                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
525                                i, class_devno_max(), ret);
526                 }
527         }
528         write_unlock(&obd_dev_lock);
529
530         RETURN(ret);
531 }
532
533 static int class_name2dev_nolock(const char *name)
534 {
535         int i;
536
537         if (!name)
538                 return -1;
539
540         for (i = 0; i < class_devno_max(); i++) {
541                 struct obd_device *obd = class_num2obd(i);
542
543                 if (obd && strcmp(name, obd->obd_name) == 0) {
544                         /* Make sure we finished attaching before we give
545                            out any references */
546                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
547                         if (obd->obd_attached) {
548                                 return i;
549                         }
550                         break;
551                 }
552         }
553
554         return -1;
555 }
556
557 int class_name2dev(const char *name)
558 {
559         int i;
560
561         if (!name)
562                 return -1;
563
564         read_lock(&obd_dev_lock);
565         i = class_name2dev_nolock(name);
566         read_unlock(&obd_dev_lock);
567
568         return i;
569 }
570 EXPORT_SYMBOL(class_name2dev);
571
572 struct obd_device *class_name2obd(const char *name)
573 {
574         int dev = class_name2dev(name);
575
576         if (dev < 0 || dev > class_devno_max())
577                 return NULL;
578         return class_num2obd(dev);
579 }
580 EXPORT_SYMBOL(class_name2obd);
581
582 int class_uuid2dev_nolock(struct obd_uuid *uuid)
583 {
584         int i;
585
586         for (i = 0; i < class_devno_max(); i++) {
587                 struct obd_device *obd = class_num2obd(i);
588
589                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
590                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
591                         return i;
592                 }
593         }
594
595         return -1;
596 }
597
598 int class_uuid2dev(struct obd_uuid *uuid)
599 {
600         int i;
601
602         read_lock(&obd_dev_lock);
603         i = class_uuid2dev_nolock(uuid);
604         read_unlock(&obd_dev_lock);
605
606         return i;
607 }
608 EXPORT_SYMBOL(class_uuid2dev);
609
610 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
611 {
612         int dev = class_uuid2dev(uuid);
613         if (dev < 0)
614                 return NULL;
615         return class_num2obd(dev);
616 }
617 EXPORT_SYMBOL(class_uuid2obd);
618
619 /**
620  * Get obd device from ::obd_devs[]
621  *
622  * \param num [in] array index
623  *
624  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
625  *         otherwise return the obd device there.
626  */
627 struct obd_device *class_num2obd(int num)
628 {
629         struct obd_device *obd = NULL;
630
631         if (num < class_devno_max()) {
632                 obd = obd_devs[num];
633                 if (obd == NULL)
634                         return NULL;
635
636                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
637                          "%p obd_magic %08x != %08x\n",
638                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
639                 LASSERTF(obd->obd_minor == num,
640                          "%p obd_minor %0d != %0d\n",
641                          obd, obd->obd_minor, num);
642         }
643
644         return obd;
645 }
646
647 /**
648  * Find obd in obd_dev[] by name or uuid.
649  *
650  * Increment obd's refcount if found.
651  *
652  * \param[in] str obd name or uuid
653  *
654  * \retval NULL    if not found
655  * \retval target  pointer to found obd_device
656  */
657 struct obd_device *class_dev_by_str(const char *str)
658 {
659         struct obd_device *target = NULL;
660         struct obd_uuid tgtuuid;
661         int rc;
662
663         obd_str2uuid(&tgtuuid, str);
664
665         read_lock(&obd_dev_lock);
666         rc = class_uuid2dev_nolock(&tgtuuid);
667         if (rc < 0)
668                 rc = class_name2dev_nolock(str);
669
670         if (rc >= 0)
671                 target = class_num2obd(rc);
672
673         if (target != NULL)
674                 class_incref(target, "find", current);
675         read_unlock(&obd_dev_lock);
676
677         RETURN(target);
678 }
679 EXPORT_SYMBOL(class_dev_by_str);
680
681 /**
682  * Get obd devices count. Device in any
683  *    state are counted
684  * \retval obd device count
685  */
686 int get_devices_count(void)
687 {
688         int index, max_index = class_devno_max(), dev_count = 0;
689
690         read_lock(&obd_dev_lock);
691         for (index = 0; index <= max_index; index++) {
692                 struct obd_device *obd = class_num2obd(index);
693                 if (obd != NULL)
694                         dev_count++;
695         }
696         read_unlock(&obd_dev_lock);
697
698         return dev_count;
699 }
700 EXPORT_SYMBOL(get_devices_count);
701
702 void class_obd_list(void)
703 {
704         char *status;
705         int i;
706
707         read_lock(&obd_dev_lock);
708         for (i = 0; i < class_devno_max(); i++) {
709                 struct obd_device *obd = class_num2obd(i);
710
711                 if (obd == NULL)
712                         continue;
713                 if (obd->obd_stopping)
714                         status = "ST";
715                 else if (obd->obd_set_up)
716                         status = "UP";
717                 else if (obd->obd_attached)
718                         status = "AT";
719                 else
720                         status = "--";
721                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
722                          i, status, obd->obd_type->typ_name,
723                          obd->obd_name, obd->obd_uuid.uuid,
724                          atomic_read(&obd->obd_refcount));
725         }
726         read_unlock(&obd_dev_lock);
727         return;
728 }
729
730 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
731    specified, then only the client with that uuid is returned,
732    otherwise any client connected to the tgt is returned. */
733 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
734                                           const char * typ_name,
735                                           struct obd_uuid *grp_uuid)
736 {
737         int i;
738
739         read_lock(&obd_dev_lock);
740         for (i = 0; i < class_devno_max(); i++) {
741                 struct obd_device *obd = class_num2obd(i);
742
743                 if (obd == NULL)
744                         continue;
745                 if ((strncmp(obd->obd_type->typ_name, typ_name,
746                              strlen(typ_name)) == 0)) {
747                         if (obd_uuid_equals(tgt_uuid,
748                                             &obd->u.cli.cl_target_uuid) &&
749                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
750                                                          &obd->obd_uuid) : 1)) {
751                                 read_unlock(&obd_dev_lock);
752                                 return obd;
753                         }
754                 }
755         }
756         read_unlock(&obd_dev_lock);
757
758         return NULL;
759 }
760 EXPORT_SYMBOL(class_find_client_obd);
761
762 /* Iterate the obd_device list looking devices have grp_uuid. Start
763    searching at *next, and if a device is found, the next index to look
764    at is saved in *next. If next is NULL, then the first matching device
765    will always be returned. */
766 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
767 {
768         int i;
769
770         if (next == NULL)
771                 i = 0;
772         else if (*next >= 0 && *next < class_devno_max())
773                 i = *next;
774         else
775                 return NULL;
776
777         read_lock(&obd_dev_lock);
778         for (; i < class_devno_max(); i++) {
779                 struct obd_device *obd = class_num2obd(i);
780
781                 if (obd == NULL)
782                         continue;
783                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
784                         if (next != NULL)
785                                 *next = i+1;
786                         read_unlock(&obd_dev_lock);
787                         return obd;
788                 }
789         }
790         read_unlock(&obd_dev_lock);
791
792         return NULL;
793 }
794 EXPORT_SYMBOL(class_devices_in_group);
795
796 /**
797  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
798  * adjust sptlrpc settings accordingly.
799  */
800 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
801 {
802         struct obd_device  *obd;
803         const char         *type;
804         int                 i, rc = 0, rc2;
805
806         LASSERT(namelen > 0);
807
808         read_lock(&obd_dev_lock);
809         for (i = 0; i < class_devno_max(); i++) {
810                 obd = class_num2obd(i);
811
812                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
813                         continue;
814
815                 /* only notify mdc, osc, osp, lwp, mdt, ost
816                  * because only these have a -sptlrpc llog */
817                 type = obd->obd_type->typ_name;
818                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
819                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
820                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
821                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
822                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
823                     strcmp(type, LUSTRE_OST_NAME) != 0)
824                         continue;
825
826                 if (strncmp(obd->obd_name, fsname, namelen))
827                         continue;
828
829                 class_incref(obd, __FUNCTION__, obd);
830                 read_unlock(&obd_dev_lock);
831                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
832                                          sizeof(KEY_SPTLRPC_CONF),
833                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
834                 rc = rc ? rc : rc2;
835                 class_decref(obd, __FUNCTION__, obd);
836                 read_lock(&obd_dev_lock);
837         }
838         read_unlock(&obd_dev_lock);
839         return rc;
840 }
841 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
842
843 void obd_cleanup_caches(void)
844 {
845         ENTRY;
846         if (obd_device_cachep) {
847                 kmem_cache_destroy(obd_device_cachep);
848                 obd_device_cachep = NULL;
849         }
850         if (obdo_cachep) {
851                 kmem_cache_destroy(obdo_cachep);
852                 obdo_cachep = NULL;
853         }
854         if (import_cachep) {
855                 kmem_cache_destroy(import_cachep);
856                 import_cachep = NULL;
857         }
858
859         EXIT;
860 }
861
862 int obd_init_caches(void)
863 {
864         int rc;
865         ENTRY;
866
867         LASSERT(obd_device_cachep == NULL);
868         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
869                                               sizeof(struct obd_device),
870                                               0, 0, NULL);
871         if (!obd_device_cachep)
872                 GOTO(out, rc = -ENOMEM);
873
874         LASSERT(obdo_cachep == NULL);
875         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
876                                         0, 0, NULL);
877         if (!obdo_cachep)
878                 GOTO(out, rc = -ENOMEM);
879
880         LASSERT(import_cachep == NULL);
881         import_cachep = kmem_cache_create("ll_import_cache",
882                                           sizeof(struct obd_import),
883                                           0, 0, NULL);
884         if (!import_cachep)
885                 GOTO(out, rc = -ENOMEM);
886
887         RETURN(0);
888 out:
889         obd_cleanup_caches();
890         RETURN(rc);
891 }
892
893 /* map connection to client */
894 struct obd_export *class_conn2export(struct lustre_handle *conn)
895 {
896         struct obd_export *export;
897         ENTRY;
898
899         if (!conn) {
900                 CDEBUG(D_CACHE, "looking for null handle\n");
901                 RETURN(NULL);
902         }
903
904         if (conn->cookie == -1) {  /* this means assign a new connection */
905                 CDEBUG(D_CACHE, "want a new connection\n");
906                 RETURN(NULL);
907         }
908
909         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
910         export = class_handle2object(conn->cookie, NULL);
911         RETURN(export);
912 }
913 EXPORT_SYMBOL(class_conn2export);
914
915 struct obd_device *class_exp2obd(struct obd_export *exp)
916 {
917         if (exp)
918                 return exp->exp_obd;
919         return NULL;
920 }
921 EXPORT_SYMBOL(class_exp2obd);
922
923 struct obd_device *class_conn2obd(struct lustre_handle *conn)
924 {
925         struct obd_export *export;
926         export = class_conn2export(conn);
927         if (export) {
928                 struct obd_device *obd = export->exp_obd;
929                 class_export_put(export);
930                 return obd;
931         }
932         return NULL;
933 }
934
935 struct obd_import *class_exp2cliimp(struct obd_export *exp)
936 {
937         struct obd_device *obd = exp->exp_obd;
938         if (obd == NULL)
939                 return NULL;
940         return obd->u.cli.cl_import;
941 }
942 EXPORT_SYMBOL(class_exp2cliimp);
943
944 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
945 {
946         struct obd_device *obd = class_conn2obd(conn);
947         if (obd == NULL)
948                 return NULL;
949         return obd->u.cli.cl_import;
950 }
951
952 /* Export management functions */
953 static void class_export_destroy(struct obd_export *exp)
954 {
955         struct obd_device *obd = exp->exp_obd;
956         ENTRY;
957
958         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
959         LASSERT(obd != NULL);
960
961         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
962                exp->exp_client_uuid.uuid, obd->obd_name);
963
964         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
965         if (exp->exp_connection)
966                 ptlrpc_put_connection_superhack(exp->exp_connection);
967
968         LASSERT(list_empty(&exp->exp_outstanding_replies));
969         LASSERT(list_empty(&exp->exp_uncommitted_replies));
970         LASSERT(list_empty(&exp->exp_req_replay_queue));
971         LASSERT(list_empty(&exp->exp_hp_rpcs));
972         obd_destroy_export(exp);
973         /* self export doesn't hold a reference to an obd, although it
974          * exists until freeing of the obd */
975         if (exp != obd->obd_self_export)
976                 class_decref(obd, "export", exp);
977
978         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
979         EXIT;
980 }
981
982 static void export_handle_addref(void *export)
983 {
984         class_export_get(export);
985 }
986
987 static struct portals_handle_ops export_handle_ops = {
988         .hop_addref = export_handle_addref,
989         .hop_free   = NULL,
990 };
991
992 struct obd_export *class_export_get(struct obd_export *exp)
993 {
994         atomic_inc(&exp->exp_refcount);
995         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
996                atomic_read(&exp->exp_refcount));
997         return exp;
998 }
999 EXPORT_SYMBOL(class_export_get);
1000
1001 void class_export_put(struct obd_export *exp)
1002 {
1003         LASSERT(exp != NULL);
1004         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1005         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1006                atomic_read(&exp->exp_refcount) - 1);
1007
1008         if (atomic_dec_and_test(&exp->exp_refcount)) {
1009                 struct obd_device *obd = exp->exp_obd;
1010
1011                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1012                        exp, exp->exp_client_uuid.uuid);
1013
1014                 /* release nid stat refererence */
1015                 lprocfs_exp_cleanup(exp);
1016
1017                 if (exp == obd->obd_self_export) {
1018                         /* self export should be destroyed without
1019                          * zombie thread as it doesn't hold a
1020                          * reference to obd and doesn't hold any
1021                          * resources */
1022                         class_export_destroy(exp);
1023                         /* self export is destroyed, no class
1024                          * references exist and it is safe to free
1025                          * obd */
1026                         class_free_dev(obd);
1027                 } else {
1028                         LASSERT(!list_empty(&exp->exp_obd_chain));
1029                         obd_zombie_export_add(exp);
1030                 }
1031
1032         }
1033 }
1034 EXPORT_SYMBOL(class_export_put);
1035 /* Creates a new export, adds it to the hash table, and returns a
1036  * pointer to it. The refcount is 2: one for the hash reference, and
1037  * one for the pointer returned by this function. */
1038 struct obd_export *__class_new_export(struct obd_device *obd,
1039                                       struct obd_uuid *cluuid, bool is_self)
1040 {
1041         struct obd_export *export;
1042         struct cfs_hash *hash = NULL;
1043         int rc = 0;
1044         ENTRY;
1045
1046         OBD_ALLOC_PTR(export);
1047         if (!export)
1048                 return ERR_PTR(-ENOMEM);
1049
1050         export->exp_conn_cnt = 0;
1051         export->exp_lock_hash = NULL;
1052         export->exp_flock_hash = NULL;
1053         /* 2 = class_handle_hash + last */
1054         atomic_set(&export->exp_refcount, 2);
1055         atomic_set(&export->exp_rpc_count, 0);
1056         atomic_set(&export->exp_cb_count, 0);
1057         atomic_set(&export->exp_locks_count, 0);
1058 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1059         INIT_LIST_HEAD(&export->exp_locks_list);
1060         spin_lock_init(&export->exp_locks_list_guard);
1061 #endif
1062         atomic_set(&export->exp_replay_count, 0);
1063         export->exp_obd = obd;
1064         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1065         spin_lock_init(&export->exp_uncommitted_replies_lock);
1066         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1067         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1068         INIT_LIST_HEAD(&export->exp_handle.h_link);
1069         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1070         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1071         class_handle_hash(&export->exp_handle, &export_handle_ops);
1072         export->exp_last_request_time = ktime_get_real_seconds();
1073         spin_lock_init(&export->exp_lock);
1074         spin_lock_init(&export->exp_rpc_lock);
1075         INIT_HLIST_NODE(&export->exp_uuid_hash);
1076         INIT_HLIST_NODE(&export->exp_nid_hash);
1077         INIT_HLIST_NODE(&export->exp_gen_hash);
1078         spin_lock_init(&export->exp_bl_list_lock);
1079         INIT_LIST_HEAD(&export->exp_bl_list);
1080         INIT_LIST_HEAD(&export->exp_stale_list);
1081
1082         export->exp_sp_peer = LUSTRE_SP_ANY;
1083         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1084         export->exp_client_uuid = *cluuid;
1085         obd_init_export(export);
1086
1087         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1088                 spin_lock(&obd->obd_dev_lock);
1089                 /* shouldn't happen, but might race */
1090                 if (obd->obd_stopping)
1091                         GOTO(exit_unlock, rc = -ENODEV);
1092
1093                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1094                 if (hash == NULL)
1095                         GOTO(exit_unlock, rc = -ENODEV);
1096                 spin_unlock(&obd->obd_dev_lock);
1097
1098                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1099                 if (rc != 0) {
1100                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1101                                       obd->obd_name, cluuid->uuid, rc);
1102                         GOTO(exit_err, rc = -EALREADY);
1103                 }
1104         }
1105
1106         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1107         spin_lock(&obd->obd_dev_lock);
1108         if (obd->obd_stopping) {
1109                 if (hash)
1110                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1111                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1112         }
1113
1114         if (!is_self) {
1115                 class_incref(obd, "export", export);
1116                 list_add_tail(&export->exp_obd_chain_timed,
1117                               &obd->obd_exports_timed);
1118                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1119                 obd->obd_num_exports++;
1120         } else {
1121                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1122                 INIT_LIST_HEAD(&export->exp_obd_chain);
1123         }
1124         spin_unlock(&obd->obd_dev_lock);
1125         if (hash)
1126                 cfs_hash_putref(hash);
1127         RETURN(export);
1128
1129 exit_unlock:
1130         spin_unlock(&obd->obd_dev_lock);
1131 exit_err:
1132         if (hash)
1133                 cfs_hash_putref(hash);
1134         class_handle_unhash(&export->exp_handle);
1135         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1136         obd_destroy_export(export);
1137         OBD_FREE_PTR(export);
1138         return ERR_PTR(rc);
1139 }
1140
1141 struct obd_export *class_new_export(struct obd_device *obd,
1142                                     struct obd_uuid *uuid)
1143 {
1144         return __class_new_export(obd, uuid, false);
1145 }
1146 EXPORT_SYMBOL(class_new_export);
1147
1148 struct obd_export *class_new_export_self(struct obd_device *obd,
1149                                          struct obd_uuid *uuid)
1150 {
1151         return __class_new_export(obd, uuid, true);
1152 }
1153
1154 void class_unlink_export(struct obd_export *exp)
1155 {
1156         class_handle_unhash(&exp->exp_handle);
1157
1158         if (exp->exp_obd->obd_self_export == exp) {
1159                 class_export_put(exp);
1160                 return;
1161         }
1162
1163         spin_lock(&exp->exp_obd->obd_dev_lock);
1164         /* delete an uuid-export hashitem from hashtables */
1165         if (!hlist_unhashed(&exp->exp_uuid_hash))
1166                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1167                              &exp->exp_client_uuid,
1168                              &exp->exp_uuid_hash);
1169
1170 #ifdef HAVE_SERVER_SUPPORT
1171         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1172                 struct tg_export_data   *ted = &exp->exp_target_data;
1173                 struct cfs_hash         *hash;
1174
1175                 /* Because obd_gen_hash will not be released until
1176                  * class_cleanup(), so hash should never be NULL here */
1177                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1178                 LASSERT(hash != NULL);
1179                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1180                              &exp->exp_gen_hash);
1181                 cfs_hash_putref(hash);
1182         }
1183 #endif /* HAVE_SERVER_SUPPORT */
1184
1185         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1186         list_del_init(&exp->exp_obd_chain_timed);
1187         exp->exp_obd->obd_num_exports--;
1188         spin_unlock(&exp->exp_obd->obd_dev_lock);
1189         atomic_inc(&obd_stale_export_num);
1190
1191         /* A reference is kept by obd_stale_exports list */
1192         obd_stale_export_put(exp);
1193 }
1194 EXPORT_SYMBOL(class_unlink_export);
1195
1196 /* Import management functions */
1197 static void class_import_destroy(struct obd_import *imp)
1198 {
1199         ENTRY;
1200
1201         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1202                 imp->imp_obd->obd_name);
1203
1204         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1205
1206         ptlrpc_put_connection_superhack(imp->imp_connection);
1207
1208         while (!list_empty(&imp->imp_conn_list)) {
1209                 struct obd_import_conn *imp_conn;
1210
1211                 imp_conn = list_entry(imp->imp_conn_list.next,
1212                                       struct obd_import_conn, oic_item);
1213                 list_del_init(&imp_conn->oic_item);
1214                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1215                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1216         }
1217
1218         LASSERT(imp->imp_sec == NULL);
1219         class_decref(imp->imp_obd, "import", imp);
1220         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1221         EXIT;
1222 }
1223
1224 static void import_handle_addref(void *import)
1225 {
1226         class_import_get(import);
1227 }
1228
1229 static struct portals_handle_ops import_handle_ops = {
1230         .hop_addref = import_handle_addref,
1231         .hop_free   = NULL,
1232 };
1233
1234 struct obd_import *class_import_get(struct obd_import *import)
1235 {
1236         atomic_inc(&import->imp_refcount);
1237         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1238                atomic_read(&import->imp_refcount),
1239                import->imp_obd->obd_name);
1240         return import;
1241 }
1242 EXPORT_SYMBOL(class_import_get);
1243
1244 void class_import_put(struct obd_import *imp)
1245 {
1246         ENTRY;
1247
1248         LASSERT(list_empty(&imp->imp_zombie_chain));
1249         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1250
1251         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1252                atomic_read(&imp->imp_refcount) - 1,
1253                imp->imp_obd->obd_name);
1254
1255         if (atomic_dec_and_test(&imp->imp_refcount)) {
1256                 CDEBUG(D_INFO, "final put import %p\n", imp);
1257                 obd_zombie_import_add(imp);
1258         }
1259
1260         /* catch possible import put race */
1261         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1262         EXIT;
1263 }
1264 EXPORT_SYMBOL(class_import_put);
1265
1266 static void init_imp_at(struct imp_at *at) {
1267         int i;
1268         at_init(&at->iat_net_latency, 0, 0);
1269         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1270                 /* max service estimates are tracked on the server side, so
1271                    don't use the AT history here, just use the last reported
1272                    val. (But keep hist for proc histogram, worst_ever) */
1273                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1274                         AT_FLG_NOHIST);
1275         }
1276 }
1277
1278 struct obd_import *class_new_import(struct obd_device *obd)
1279 {
1280         struct obd_import *imp;
1281         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1282
1283         OBD_ALLOC(imp, sizeof(*imp));
1284         if (imp == NULL)
1285                 return NULL;
1286
1287         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1288         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1289         INIT_LIST_HEAD(&imp->imp_replay_list);
1290         INIT_LIST_HEAD(&imp->imp_sending_list);
1291         INIT_LIST_HEAD(&imp->imp_delayed_list);
1292         INIT_LIST_HEAD(&imp->imp_committed_list);
1293         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1294         imp->imp_known_replied_xid = 0;
1295         imp->imp_replay_cursor = &imp->imp_committed_list;
1296         spin_lock_init(&imp->imp_lock);
1297         imp->imp_last_success_conn = 0;
1298         imp->imp_state = LUSTRE_IMP_NEW;
1299         imp->imp_obd = class_incref(obd, "import", imp);
1300         mutex_init(&imp->imp_sec_mutex);
1301         init_waitqueue_head(&imp->imp_recovery_waitq);
1302
1303         if (curr_pid_ns->child_reaper)
1304                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1305         else
1306                 imp->imp_sec_refpid = 1;
1307
1308         atomic_set(&imp->imp_refcount, 2);
1309         atomic_set(&imp->imp_unregistering, 0);
1310         atomic_set(&imp->imp_inflight, 0);
1311         atomic_set(&imp->imp_replay_inflight, 0);
1312         atomic_set(&imp->imp_inval_count, 0);
1313         INIT_LIST_HEAD(&imp->imp_conn_list);
1314         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1315         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1316         init_imp_at(&imp->imp_at);
1317
1318         /* the default magic is V2, will be used in connect RPC, and
1319          * then adjusted according to the flags in request/reply. */
1320         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1321
1322         return imp;
1323 }
1324 EXPORT_SYMBOL(class_new_import);
1325
1326 void class_destroy_import(struct obd_import *import)
1327 {
1328         LASSERT(import != NULL);
1329         LASSERT(import != LP_POISON);
1330
1331         class_handle_unhash(&import->imp_handle);
1332
1333         spin_lock(&import->imp_lock);
1334         import->imp_generation++;
1335         spin_unlock(&import->imp_lock);
1336         class_import_put(import);
1337 }
1338 EXPORT_SYMBOL(class_destroy_import);
1339
1340 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1341
1342 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1343 {
1344         spin_lock(&exp->exp_locks_list_guard);
1345
1346         LASSERT(lock->l_exp_refs_nr >= 0);
1347
1348         if (lock->l_exp_refs_target != NULL &&
1349             lock->l_exp_refs_target != exp) {
1350                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1351                               exp, lock, lock->l_exp_refs_target);
1352         }
1353         if ((lock->l_exp_refs_nr ++) == 0) {
1354                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1355                 lock->l_exp_refs_target = exp;
1356         }
1357         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1358                lock, exp, lock->l_exp_refs_nr);
1359         spin_unlock(&exp->exp_locks_list_guard);
1360 }
1361 EXPORT_SYMBOL(__class_export_add_lock_ref);
1362
1363 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1364 {
1365         spin_lock(&exp->exp_locks_list_guard);
1366         LASSERT(lock->l_exp_refs_nr > 0);
1367         if (lock->l_exp_refs_target != exp) {
1368                 LCONSOLE_WARN("lock %p, "
1369                               "mismatching export pointers: %p, %p\n",
1370                               lock, lock->l_exp_refs_target, exp);
1371         }
1372         if (-- lock->l_exp_refs_nr == 0) {
1373                 list_del_init(&lock->l_exp_refs_link);
1374                 lock->l_exp_refs_target = NULL;
1375         }
1376         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1377                lock, exp, lock->l_exp_refs_nr);
1378         spin_unlock(&exp->exp_locks_list_guard);
1379 }
1380 EXPORT_SYMBOL(__class_export_del_lock_ref);
1381 #endif
1382
1383 /* A connection defines an export context in which preallocation can
1384    be managed. This releases the export pointer reference, and returns
1385    the export handle, so the export refcount is 1 when this function
1386    returns. */
1387 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1388                   struct obd_uuid *cluuid)
1389 {
1390         struct obd_export *export;
1391         LASSERT(conn != NULL);
1392         LASSERT(obd != NULL);
1393         LASSERT(cluuid != NULL);
1394         ENTRY;
1395
1396         export = class_new_export(obd, cluuid);
1397         if (IS_ERR(export))
1398                 RETURN(PTR_ERR(export));
1399
1400         conn->cookie = export->exp_handle.h_cookie;
1401         class_export_put(export);
1402
1403         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1404                cluuid->uuid, conn->cookie);
1405         RETURN(0);
1406 }
1407 EXPORT_SYMBOL(class_connect);
1408
1409 /* if export is involved in recovery then clean up related things */
1410 static void class_export_recovery_cleanup(struct obd_export *exp)
1411 {
1412         struct obd_device *obd = exp->exp_obd;
1413
1414         spin_lock(&obd->obd_recovery_task_lock);
1415         if (obd->obd_recovering) {
1416                 if (exp->exp_in_recovery) {
1417                         spin_lock(&exp->exp_lock);
1418                         exp->exp_in_recovery = 0;
1419                         spin_unlock(&exp->exp_lock);
1420                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1421                         atomic_dec(&obd->obd_connected_clients);
1422                 }
1423
1424                 /* if called during recovery then should update
1425                  * obd_stale_clients counter,
1426                  * lightweight exports are not counted */
1427                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1428                         exp->exp_obd->obd_stale_clients++;
1429         }
1430         spin_unlock(&obd->obd_recovery_task_lock);
1431
1432         spin_lock(&exp->exp_lock);
1433         /** Cleanup req replay fields */
1434         if (exp->exp_req_replay_needed) {
1435                 exp->exp_req_replay_needed = 0;
1436
1437                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1438                 atomic_dec(&obd->obd_req_replay_clients);
1439         }
1440
1441         /** Cleanup lock replay data */
1442         if (exp->exp_lock_replay_needed) {
1443                 exp->exp_lock_replay_needed = 0;
1444
1445                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1446                 atomic_dec(&obd->obd_lock_replay_clients);
1447         }
1448         spin_unlock(&exp->exp_lock);
1449 }
1450
1451 /* This function removes 1-3 references from the export:
1452  * 1 - for export pointer passed
1453  * and if disconnect really need
1454  * 2 - removing from hash
1455  * 3 - in client_unlink_export
1456  * The export pointer passed to this function can destroyed */
1457 int class_disconnect(struct obd_export *export)
1458 {
1459         int already_disconnected;
1460         ENTRY;
1461
1462         if (export == NULL) {
1463                 CWARN("attempting to free NULL export %p\n", export);
1464                 RETURN(-EINVAL);
1465         }
1466
1467         spin_lock(&export->exp_lock);
1468         already_disconnected = export->exp_disconnected;
1469         export->exp_disconnected = 1;
1470         /*  We hold references of export for uuid hash
1471          *  and nid_hash and export link at least. So
1472          *  it is safe to call cfs_hash_del in there.  */
1473         if (!hlist_unhashed(&export->exp_nid_hash))
1474                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1475                              &export->exp_connection->c_peer.nid,
1476                              &export->exp_nid_hash);
1477         spin_unlock(&export->exp_lock);
1478
1479         /* class_cleanup(), abort_recovery(), and class_fail_export()
1480          * all end up in here, and if any of them race we shouldn't
1481          * call extra class_export_puts(). */
1482         if (already_disconnected) {
1483                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1484                 GOTO(no_disconn, already_disconnected);
1485         }
1486
1487         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1488                export->exp_handle.h_cookie);
1489
1490         class_export_recovery_cleanup(export);
1491         class_unlink_export(export);
1492 no_disconn:
1493         class_export_put(export);
1494         RETURN(0);
1495 }
1496 EXPORT_SYMBOL(class_disconnect);
1497
1498 /* Return non-zero for a fully connected export */
1499 int class_connected_export(struct obd_export *exp)
1500 {
1501         int connected = 0;
1502
1503         if (exp) {
1504                 spin_lock(&exp->exp_lock);
1505                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1506                 spin_unlock(&exp->exp_lock);
1507         }
1508         return connected;
1509 }
1510 EXPORT_SYMBOL(class_connected_export);
1511
1512 static void class_disconnect_export_list(struct list_head *list,
1513                                          enum obd_option flags)
1514 {
1515         int rc;
1516         struct obd_export *exp;
1517         ENTRY;
1518
1519         /* It's possible that an export may disconnect itself, but
1520          * nothing else will be added to this list. */
1521         while (!list_empty(list)) {
1522                 exp = list_entry(list->next, struct obd_export,
1523                                  exp_obd_chain);
1524                 /* need for safe call CDEBUG after obd_disconnect */
1525                 class_export_get(exp);
1526
1527                 spin_lock(&exp->exp_lock);
1528                 exp->exp_flags = flags;
1529                 spin_unlock(&exp->exp_lock);
1530
1531                 if (obd_uuid_equals(&exp->exp_client_uuid,
1532                                     &exp->exp_obd->obd_uuid)) {
1533                         CDEBUG(D_HA,
1534                                "exp %p export uuid == obd uuid, don't discon\n",
1535                                exp);
1536                         /* Need to delete this now so we don't end up pointing
1537                          * to work_list later when this export is cleaned up. */
1538                         list_del_init(&exp->exp_obd_chain);
1539                         class_export_put(exp);
1540                         continue;
1541                 }
1542
1543                 class_export_get(exp);
1544                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1545                        "last request at %lld\n",
1546                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1547                        exp, exp->exp_last_request_time);
1548                 /* release one export reference anyway */
1549                 rc = obd_disconnect(exp);
1550
1551                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1552                        obd_export_nid2str(exp), exp, rc);
1553                 class_export_put(exp);
1554         }
1555         EXIT;
1556 }
1557
1558 void class_disconnect_exports(struct obd_device *obd)
1559 {
1560         struct list_head work_list;
1561         ENTRY;
1562
1563         /* Move all of the exports from obd_exports to a work list, en masse. */
1564         INIT_LIST_HEAD(&work_list);
1565         spin_lock(&obd->obd_dev_lock);
1566         list_splice_init(&obd->obd_exports, &work_list);
1567         list_splice_init(&obd->obd_delayed_exports, &work_list);
1568         spin_unlock(&obd->obd_dev_lock);
1569
1570         if (!list_empty(&work_list)) {
1571                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1572                        "disconnecting them\n", obd->obd_minor, obd);
1573                 class_disconnect_export_list(&work_list,
1574                                              exp_flags_from_obd(obd));
1575         } else
1576                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1577                        obd->obd_minor, obd);
1578         EXIT;
1579 }
1580 EXPORT_SYMBOL(class_disconnect_exports);
1581
1582 /* Remove exports that have not completed recovery.
1583  */
1584 void class_disconnect_stale_exports(struct obd_device *obd,
1585                                     int (*test_export)(struct obd_export *))
1586 {
1587         struct list_head work_list;
1588         struct obd_export *exp, *n;
1589         int evicted = 0;
1590         ENTRY;
1591
1592         INIT_LIST_HEAD(&work_list);
1593         spin_lock(&obd->obd_dev_lock);
1594         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1595                                  exp_obd_chain) {
1596                 /* don't count self-export as client */
1597                 if (obd_uuid_equals(&exp->exp_client_uuid,
1598                                     &exp->exp_obd->obd_uuid))
1599                         continue;
1600
1601                 /* don't evict clients which have no slot in last_rcvd
1602                  * (e.g. lightweight connection) */
1603                 if (exp->exp_target_data.ted_lr_idx == -1)
1604                         continue;
1605
1606                 spin_lock(&exp->exp_lock);
1607                 if (exp->exp_failed || test_export(exp)) {
1608                         spin_unlock(&exp->exp_lock);
1609                         continue;
1610                 }
1611                 exp->exp_failed = 1;
1612                 spin_unlock(&exp->exp_lock);
1613
1614                 list_move(&exp->exp_obd_chain, &work_list);
1615                 evicted++;
1616                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1617                        obd->obd_name, exp->exp_client_uuid.uuid,
1618                        exp->exp_connection == NULL ? "<unknown>" :
1619                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1620                 print_export_data(exp, "EVICTING", 0, D_HA);
1621         }
1622         spin_unlock(&obd->obd_dev_lock);
1623
1624         if (evicted)
1625                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1626                               obd->obd_name, evicted);
1627
1628         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1629                                                  OBD_OPT_ABORT_RECOV);
1630         EXIT;
1631 }
1632 EXPORT_SYMBOL(class_disconnect_stale_exports);
1633
1634 void class_fail_export(struct obd_export *exp)
1635 {
1636         int rc, already_failed;
1637
1638         spin_lock(&exp->exp_lock);
1639         already_failed = exp->exp_failed;
1640         exp->exp_failed = 1;
1641         spin_unlock(&exp->exp_lock);
1642
1643         if (already_failed) {
1644                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1645                        exp, exp->exp_client_uuid.uuid);
1646                 return;
1647         }
1648
1649         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1650                exp, exp->exp_client_uuid.uuid);
1651
1652         if (obd_dump_on_timeout)
1653                 libcfs_debug_dumplog();
1654
1655         /* need for safe call CDEBUG after obd_disconnect */
1656         class_export_get(exp);
1657
1658         /* Most callers into obd_disconnect are removing their own reference
1659          * (request, for example) in addition to the one from the hash table.
1660          * We don't have such a reference here, so make one. */
1661         class_export_get(exp);
1662         rc = obd_disconnect(exp);
1663         if (rc)
1664                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1665         else
1666                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1667                        exp, exp->exp_client_uuid.uuid);
1668         class_export_put(exp);
1669 }
1670 EXPORT_SYMBOL(class_fail_export);
1671
1672 char *obd_export_nid2str(struct obd_export *exp)
1673 {
1674         if (exp->exp_connection != NULL)
1675                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1676
1677         return "(no nid)";
1678 }
1679 EXPORT_SYMBOL(obd_export_nid2str);
1680
1681 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1682 {
1683         struct cfs_hash *nid_hash;
1684         struct obd_export *doomed_exp = NULL;
1685         int exports_evicted = 0;
1686
1687         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1688
1689         spin_lock(&obd->obd_dev_lock);
1690         /* umount has run already, so evict thread should leave
1691          * its task to umount thread now */
1692         if (obd->obd_stopping) {
1693                 spin_unlock(&obd->obd_dev_lock);
1694                 return exports_evicted;
1695         }
1696         nid_hash = obd->obd_nid_hash;
1697         cfs_hash_getref(nid_hash);
1698         spin_unlock(&obd->obd_dev_lock);
1699
1700         do {
1701                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1702                 if (doomed_exp == NULL)
1703                         break;
1704
1705                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1706                          "nid %s found, wanted nid %s, requested nid %s\n",
1707                          obd_export_nid2str(doomed_exp),
1708                          libcfs_nid2str(nid_key), nid);
1709                 LASSERTF(doomed_exp != obd->obd_self_export,
1710                          "self-export is hashed by NID?\n");
1711                 exports_evicted++;
1712                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1713                               "request\n", obd->obd_name,
1714                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1715                               obd_export_nid2str(doomed_exp));
1716                 class_fail_export(doomed_exp);
1717                 class_export_put(doomed_exp);
1718         } while (1);
1719
1720         cfs_hash_putref(nid_hash);
1721
1722         if (!exports_evicted)
1723                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1724                        obd->obd_name, nid);
1725         return exports_evicted;
1726 }
1727 EXPORT_SYMBOL(obd_export_evict_by_nid);
1728
1729 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1730 {
1731         struct cfs_hash *uuid_hash;
1732         struct obd_export *doomed_exp = NULL;
1733         struct obd_uuid doomed_uuid;
1734         int exports_evicted = 0;
1735
1736         spin_lock(&obd->obd_dev_lock);
1737         if (obd->obd_stopping) {
1738                 spin_unlock(&obd->obd_dev_lock);
1739                 return exports_evicted;
1740         }
1741         uuid_hash = obd->obd_uuid_hash;
1742         cfs_hash_getref(uuid_hash);
1743         spin_unlock(&obd->obd_dev_lock);
1744
1745         obd_str2uuid(&doomed_uuid, uuid);
1746         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1747                 CERROR("%s: can't evict myself\n", obd->obd_name);
1748                 cfs_hash_putref(uuid_hash);
1749                 return exports_evicted;
1750         }
1751
1752         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1753
1754         if (doomed_exp == NULL) {
1755                 CERROR("%s: can't disconnect %s: no exports found\n",
1756                        obd->obd_name, uuid);
1757         } else {
1758                 CWARN("%s: evicting %s at adminstrative request\n",
1759                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1760                 class_fail_export(doomed_exp);
1761                 class_export_put(doomed_exp);
1762                 exports_evicted++;
1763         }
1764         cfs_hash_putref(uuid_hash);
1765
1766         return exports_evicted;
1767 }
1768
1769 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1770 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1771 EXPORT_SYMBOL(class_export_dump_hook);
1772 #endif
1773
1774 static void print_export_data(struct obd_export *exp, const char *status,
1775                               int locks, int debug_level)
1776 {
1777         struct ptlrpc_reply_state *rs;
1778         struct ptlrpc_reply_state *first_reply = NULL;
1779         int nreplies = 0;
1780
1781         spin_lock(&exp->exp_lock);
1782         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1783                             rs_exp_list) {
1784                 if (nreplies == 0)
1785                         first_reply = rs;
1786                 nreplies++;
1787         }
1788         spin_unlock(&exp->exp_lock);
1789
1790         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1791                "%p %s %llu stale:%d\n",
1792                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1793                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1794                atomic_read(&exp->exp_rpc_count),
1795                atomic_read(&exp->exp_cb_count),
1796                atomic_read(&exp->exp_locks_count),
1797                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1798                nreplies, first_reply, nreplies > 3 ? "..." : "",
1799                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1800 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1801         if (locks && class_export_dump_hook != NULL)
1802                 class_export_dump_hook(exp);
1803 #endif
1804 }
1805
1806 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1807 {
1808         struct obd_export *exp;
1809
1810         spin_lock(&obd->obd_dev_lock);
1811         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1812                 print_export_data(exp, "ACTIVE", locks, debug_level);
1813         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1814                 print_export_data(exp, "UNLINKED", locks, debug_level);
1815         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1816                 print_export_data(exp, "DELAYED", locks, debug_level);
1817         spin_unlock(&obd->obd_dev_lock);
1818         spin_lock(&obd_zombie_impexp_lock);
1819         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1820                 print_export_data(exp, "ZOMBIE", locks, debug_level);
1821         spin_unlock(&obd_zombie_impexp_lock);
1822 }
1823
1824 void obd_exports_barrier(struct obd_device *obd)
1825 {
1826         int waited = 2;
1827         LASSERT(list_empty(&obd->obd_exports));
1828         spin_lock(&obd->obd_dev_lock);
1829         while (!list_empty(&obd->obd_unlinked_exports)) {
1830                 spin_unlock(&obd->obd_dev_lock);
1831                 set_current_state(TASK_UNINTERRUPTIBLE);
1832                 schedule_timeout(cfs_time_seconds(waited));
1833                 if (waited > 5 && is_power_of_2(waited)) {
1834                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1835                                       "more than %d seconds. "
1836                                       "The obd refcount = %d. Is it stuck?\n",
1837                                       obd->obd_name, waited,
1838                                       atomic_read(&obd->obd_refcount));
1839                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1840                 }
1841                 waited *= 2;
1842                 spin_lock(&obd->obd_dev_lock);
1843         }
1844         spin_unlock(&obd->obd_dev_lock);
1845 }
1846 EXPORT_SYMBOL(obd_exports_barrier);
1847
1848 /* Total amount of zombies to be destroyed */
1849 static int zombies_count = 0;
1850
1851 /**
1852  * kill zombie imports and exports
1853  */
1854 void obd_zombie_impexp_cull(void)
1855 {
1856         struct obd_import *import;
1857         struct obd_export *export;
1858         ENTRY;
1859
1860         do {
1861                 spin_lock(&obd_zombie_impexp_lock);
1862
1863                 import = NULL;
1864                 if (!list_empty(&obd_zombie_imports)) {
1865                         import = list_entry(obd_zombie_imports.next,
1866                                             struct obd_import,
1867                                             imp_zombie_chain);
1868                         list_del_init(&import->imp_zombie_chain);
1869                 }
1870
1871                 export = NULL;
1872                 if (!list_empty(&obd_zombie_exports)) {
1873                         export = list_entry(obd_zombie_exports.next,
1874                                             struct obd_export,
1875                                             exp_obd_chain);
1876                         list_del_init(&export->exp_obd_chain);
1877                 }
1878
1879                 spin_unlock(&obd_zombie_impexp_lock);
1880
1881                 if (import != NULL) {
1882                         class_import_destroy(import);
1883                         spin_lock(&obd_zombie_impexp_lock);
1884                         zombies_count--;
1885                         spin_unlock(&obd_zombie_impexp_lock);
1886                 }
1887
1888                 if (export != NULL) {
1889                         class_export_destroy(export);
1890                         spin_lock(&obd_zombie_impexp_lock);
1891                         zombies_count--;
1892                         spin_unlock(&obd_zombie_impexp_lock);
1893                 }
1894
1895                 cond_resched();
1896         } while (import != NULL || export != NULL);
1897         EXIT;
1898 }
1899
1900 static DECLARE_COMPLETION(obd_zombie_start);
1901 static DECLARE_COMPLETION(obd_zombie_stop);
1902 static unsigned long obd_zombie_flags;
1903 static DECLARE_WAIT_QUEUE_HEAD(obd_zombie_waitq);
1904 static pid_t obd_zombie_pid;
1905
1906 enum {
1907         OBD_ZOMBIE_STOP         = 0x0001,
1908 };
1909
1910 /**
1911  * check for work for kill zombie import/export thread.
1912  */
1913 static int obd_zombie_impexp_check(void *arg)
1914 {
1915         int rc;
1916
1917         spin_lock(&obd_zombie_impexp_lock);
1918         rc = (zombies_count == 0) &&
1919              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1920         spin_unlock(&obd_zombie_impexp_lock);
1921
1922         RETURN(rc);
1923 }
1924
1925 /**
1926  * Add export to the obd_zombe thread and notify it.
1927  */
1928 static void obd_zombie_export_add(struct obd_export *exp) {
1929         atomic_dec(&obd_stale_export_num);
1930         spin_lock(&exp->exp_obd->obd_dev_lock);
1931         LASSERT(!list_empty(&exp->exp_obd_chain));
1932         list_del_init(&exp->exp_obd_chain);
1933         spin_unlock(&exp->exp_obd->obd_dev_lock);
1934         spin_lock(&obd_zombie_impexp_lock);
1935         zombies_count++;
1936         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1937         spin_unlock(&obd_zombie_impexp_lock);
1938
1939         obd_zombie_impexp_notify();
1940 }
1941
1942 /**
1943  * Add import to the obd_zombe thread and notify it.
1944  */
1945 static void obd_zombie_import_add(struct obd_import *imp) {
1946         LASSERT(imp->imp_sec == NULL);
1947         spin_lock(&obd_zombie_impexp_lock);
1948         LASSERT(list_empty(&imp->imp_zombie_chain));
1949         zombies_count++;
1950         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1951         spin_unlock(&obd_zombie_impexp_lock);
1952
1953         obd_zombie_impexp_notify();
1954 }
1955
1956 /**
1957  * notify import/export destroy thread about new zombie.
1958  */
1959 static void obd_zombie_impexp_notify(void)
1960 {
1961         /*
1962          * Make sure obd_zomebie_impexp_thread get this notification.
1963          * It is possible this signal only get by obd_zombie_barrier, and
1964          * barrier gulps this notification and sleeps away and hangs ensues
1965          */
1966         wake_up_all(&obd_zombie_waitq);
1967 }
1968
1969 /**
1970  * check whether obd_zombie is idle
1971  */
1972 static int obd_zombie_is_idle(void)
1973 {
1974         int rc;
1975
1976         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1977         spin_lock(&obd_zombie_impexp_lock);
1978         rc = (zombies_count == 0);
1979         spin_unlock(&obd_zombie_impexp_lock);
1980         return rc;
1981 }
1982
1983 /**
1984  * wait when obd_zombie import/export queues become empty
1985  */
1986 void obd_zombie_barrier(void)
1987 {
1988         struct l_wait_info lwi = { 0 };
1989
1990         if (obd_zombie_pid == current_pid())
1991                 /* don't wait for myself */
1992                 return;
1993         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1994 }
1995 EXPORT_SYMBOL(obd_zombie_barrier);
1996
1997
1998 struct obd_export *obd_stale_export_get(void)
1999 {
2000         struct obd_export *exp = NULL;
2001         ENTRY;
2002
2003         spin_lock(&obd_stale_export_lock);
2004         if (!list_empty(&obd_stale_exports)) {
2005                 exp = list_entry(obd_stale_exports.next,
2006                                  struct obd_export, exp_stale_list);
2007                 list_del_init(&exp->exp_stale_list);
2008         }
2009         spin_unlock(&obd_stale_export_lock);
2010
2011         if (exp) {
2012                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
2013                        atomic_read(&obd_stale_export_num));
2014         }
2015         RETURN(exp);
2016 }
2017 EXPORT_SYMBOL(obd_stale_export_get);
2018
2019 void obd_stale_export_put(struct obd_export *exp)
2020 {
2021         ENTRY;
2022
2023         LASSERT(list_empty(&exp->exp_stale_list));
2024         if (exp->exp_lock_hash &&
2025             atomic_read(&exp->exp_lock_hash->hs_count)) {
2026                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
2027                        atomic_read(&obd_stale_export_num));
2028
2029                 spin_lock_bh(&exp->exp_bl_list_lock);
2030                 spin_lock(&obd_stale_export_lock);
2031                 /* Add to the tail if there is no blocked locks,
2032                  * to the head otherwise. */
2033                 if (list_empty(&exp->exp_bl_list))
2034                         list_add_tail(&exp->exp_stale_list,
2035                                       &obd_stale_exports);
2036                 else
2037                         list_add(&exp->exp_stale_list,
2038                                  &obd_stale_exports);
2039
2040                 spin_unlock(&obd_stale_export_lock);
2041                 spin_unlock_bh(&exp->exp_bl_list_lock);
2042         } else {
2043                 class_export_put(exp);
2044         }
2045         EXIT;
2046 }
2047 EXPORT_SYMBOL(obd_stale_export_put);
2048
2049 /**
2050  * Adjust the position of the export in the stale list,
2051  * i.e. move to the head of the list if is needed.
2052  **/
2053 void obd_stale_export_adjust(struct obd_export *exp)
2054 {
2055         LASSERT(exp != NULL);
2056         spin_lock_bh(&exp->exp_bl_list_lock);
2057         spin_lock(&obd_stale_export_lock);
2058
2059         if (!list_empty(&exp->exp_stale_list) &&
2060             !list_empty(&exp->exp_bl_list))
2061                 list_move(&exp->exp_stale_list, &obd_stale_exports);
2062
2063         spin_unlock(&obd_stale_export_lock);
2064         spin_unlock_bh(&exp->exp_bl_list_lock);
2065 }
2066 EXPORT_SYMBOL(obd_stale_export_adjust);
2067
2068 /**
2069  * destroy zombie export/import thread.
2070  */
2071 static int obd_zombie_impexp_thread(void *unused)
2072 {
2073         unshare_fs_struct();
2074         complete(&obd_zombie_start);
2075
2076         obd_zombie_pid = current_pid();
2077
2078         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
2079                 struct l_wait_info lwi = { 0 };
2080
2081                 l_wait_event(obd_zombie_waitq,
2082                              !obd_zombie_impexp_check(NULL), &lwi);
2083                 obd_zombie_impexp_cull();
2084
2085                 /*
2086                  * Notify obd_zombie_barrier callers that queues
2087                  * may be empty.
2088                  */
2089                 wake_up(&obd_zombie_waitq);
2090         }
2091
2092         complete(&obd_zombie_stop);
2093
2094         RETURN(0);
2095 }
2096
2097
2098 /**
2099  * start destroy zombie import/export thread
2100  */
2101 int obd_zombie_impexp_init(void)
2102 {
2103         struct task_struct *task;
2104
2105         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
2106         if (IS_ERR(task))
2107                 RETURN(PTR_ERR(task));
2108
2109         wait_for_completion(&obd_zombie_start);
2110         RETURN(0);
2111 }
2112 /**
2113  * stop destroy zombie import/export thread
2114  */
2115 void obd_zombie_impexp_stop(void)
2116 {
2117         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
2118         obd_zombie_impexp_notify();
2119         wait_for_completion(&obd_zombie_stop);
2120         LASSERT(list_empty(&obd_stale_exports));
2121 }
2122
2123 /***** Kernel-userspace comm helpers *******/
2124
2125 /* Get length of entire message, including header */
2126 int kuc_len(int payload_len)
2127 {
2128         return sizeof(struct kuc_hdr) + payload_len;
2129 }
2130 EXPORT_SYMBOL(kuc_len);
2131
2132 /* Get a pointer to kuc header, given a ptr to the payload
2133  * @param p Pointer to payload area
2134  * @returns Pointer to kuc header
2135  */
2136 struct kuc_hdr * kuc_ptr(void *p)
2137 {
2138         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2139         LASSERT(lh->kuc_magic == KUC_MAGIC);
2140         return lh;
2141 }
2142 EXPORT_SYMBOL(kuc_ptr);
2143
2144 /* Alloc space for a message, and fill in header
2145  * @return Pointer to payload area
2146  */
2147 void *kuc_alloc(int payload_len, int transport, int type)
2148 {
2149         struct kuc_hdr *lh;
2150         int len = kuc_len(payload_len);
2151
2152         OBD_ALLOC(lh, len);
2153         if (lh == NULL)
2154                 return ERR_PTR(-ENOMEM);
2155
2156         lh->kuc_magic = KUC_MAGIC;
2157         lh->kuc_transport = transport;
2158         lh->kuc_msgtype = type;
2159         lh->kuc_msglen = len;
2160
2161         return (void *)(lh + 1);
2162 }
2163 EXPORT_SYMBOL(kuc_alloc);
2164
2165 /* Takes pointer to payload area */
2166 void kuc_free(void *p, int payload_len)
2167 {
2168         struct kuc_hdr *lh = kuc_ptr(p);
2169         OBD_FREE(lh, kuc_len(payload_len));
2170 }
2171 EXPORT_SYMBOL(kuc_free);
2172
2173 struct obd_request_slot_waiter {
2174         struct list_head        orsw_entry;
2175         wait_queue_head_t       orsw_waitq;
2176         bool                    orsw_signaled;
2177 };
2178
2179 static bool obd_request_slot_avail(struct client_obd *cli,
2180                                    struct obd_request_slot_waiter *orsw)
2181 {
2182         bool avail;
2183
2184         spin_lock(&cli->cl_loi_list_lock);
2185         avail = !!list_empty(&orsw->orsw_entry);
2186         spin_unlock(&cli->cl_loi_list_lock);
2187
2188         return avail;
2189 };
2190
2191 /*
2192  * For network flow control, the RPC sponsor needs to acquire a credit
2193  * before sending the RPC. The credits count for a connection is defined
2194  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2195  * the subsequent RPC sponsors need to wait until others released their
2196  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2197  */
2198 int obd_get_request_slot(struct client_obd *cli)
2199 {
2200         struct obd_request_slot_waiter   orsw;
2201         struct l_wait_info               lwi;
2202         int                              rc;
2203
2204         spin_lock(&cli->cl_loi_list_lock);
2205         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2206                 cli->cl_rpcs_in_flight++;
2207                 spin_unlock(&cli->cl_loi_list_lock);
2208                 return 0;
2209         }
2210
2211         init_waitqueue_head(&orsw.orsw_waitq);
2212         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2213         orsw.orsw_signaled = false;
2214         spin_unlock(&cli->cl_loi_list_lock);
2215
2216         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2217         rc = l_wait_event(orsw.orsw_waitq,
2218                           obd_request_slot_avail(cli, &orsw) ||
2219                           orsw.orsw_signaled,
2220                           &lwi);
2221
2222         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2223          * freed but other (such as obd_put_request_slot) is using it. */
2224         spin_lock(&cli->cl_loi_list_lock);
2225         if (rc != 0) {
2226                 if (!orsw.orsw_signaled) {
2227                         if (list_empty(&orsw.orsw_entry))
2228                                 cli->cl_rpcs_in_flight--;
2229                         else
2230                                 list_del(&orsw.orsw_entry);
2231                 }
2232         }
2233
2234         if (orsw.orsw_signaled) {
2235                 LASSERT(list_empty(&orsw.orsw_entry));
2236
2237                 rc = -EINTR;
2238         }
2239         spin_unlock(&cli->cl_loi_list_lock);
2240
2241         return rc;
2242 }
2243 EXPORT_SYMBOL(obd_get_request_slot);
2244
2245 void obd_put_request_slot(struct client_obd *cli)
2246 {
2247         struct obd_request_slot_waiter *orsw;
2248
2249         spin_lock(&cli->cl_loi_list_lock);
2250         cli->cl_rpcs_in_flight--;
2251
2252         /* If there is free slot, wakeup the first waiter. */
2253         if (!list_empty(&cli->cl_flight_waiters) &&
2254             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2255                 orsw = list_entry(cli->cl_flight_waiters.next,
2256                                   struct obd_request_slot_waiter, orsw_entry);
2257                 list_del_init(&orsw->orsw_entry);
2258                 cli->cl_rpcs_in_flight++;
2259                 wake_up(&orsw->orsw_waitq);
2260         }
2261         spin_unlock(&cli->cl_loi_list_lock);
2262 }
2263 EXPORT_SYMBOL(obd_put_request_slot);
2264
2265 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2266 {
2267         return cli->cl_max_rpcs_in_flight;
2268 }
2269 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2270
2271 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2272 {
2273         struct obd_request_slot_waiter *orsw;
2274         __u32                           old;
2275         int                             diff;
2276         int                             i;
2277         char                            *typ_name;
2278         int                             rc;
2279
2280         if (max > OBD_MAX_RIF_MAX || max < 1)
2281                 return -ERANGE;
2282
2283         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2284         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2285                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2286                  * strictly lower that max_rpcs_in_flight */
2287                 if (max < 2) {
2288                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2289                                "because it must be higher than "
2290                                "max_mod_rpcs_in_flight value",
2291                                cli->cl_import->imp_obd->obd_name);
2292                         return -ERANGE;
2293                 }
2294                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2295                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2296                         if (rc != 0)
2297                                 return rc;
2298                 }
2299         }
2300
2301         spin_lock(&cli->cl_loi_list_lock);
2302         old = cli->cl_max_rpcs_in_flight;
2303         cli->cl_max_rpcs_in_flight = max;
2304         client_adjust_max_dirty(cli);
2305
2306         diff = max - old;
2307
2308         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2309         for (i = 0; i < diff; i++) {
2310                 if (list_empty(&cli->cl_flight_waiters))
2311                         break;
2312
2313                 orsw = list_entry(cli->cl_flight_waiters.next,
2314                                   struct obd_request_slot_waiter, orsw_entry);
2315                 list_del_init(&orsw->orsw_entry);
2316                 cli->cl_rpcs_in_flight++;
2317                 wake_up(&orsw->orsw_waitq);
2318         }
2319         spin_unlock(&cli->cl_loi_list_lock);
2320
2321         return 0;
2322 }
2323 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2324
2325 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2326 {
2327         return cli->cl_max_mod_rpcs_in_flight;
2328 }
2329 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2330
2331 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2332 {
2333         struct obd_connect_data *ocd;
2334         __u16 maxmodrpcs;
2335         __u16 prev;
2336
2337         if (max > OBD_MAX_RIF_MAX || max < 1)
2338                 return -ERANGE;
2339
2340         /* cannot exceed or equal max_rpcs_in_flight */
2341         if (max >= cli->cl_max_rpcs_in_flight) {
2342                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2343                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2344                        cli->cl_import->imp_obd->obd_name,
2345                        max, cli->cl_max_rpcs_in_flight);
2346                 return -ERANGE;
2347         }
2348
2349         /* cannot exceed max modify RPCs in flight supported by the server */
2350         ocd = &cli->cl_import->imp_connect_data;
2351         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2352                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2353         else
2354                 maxmodrpcs = 1;
2355         if (max > maxmodrpcs) {
2356                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2357                        "higher than max_mod_rpcs_per_client value (%hu) "
2358                        "returned by the server at connection\n",
2359                        cli->cl_import->imp_obd->obd_name,
2360                        max, maxmodrpcs);
2361                 return -ERANGE;
2362         }
2363
2364         spin_lock(&cli->cl_mod_rpcs_lock);
2365
2366         prev = cli->cl_max_mod_rpcs_in_flight;
2367         cli->cl_max_mod_rpcs_in_flight = max;
2368
2369         /* wakeup waiters if limit has been increased */
2370         if (cli->cl_max_mod_rpcs_in_flight > prev)
2371                 wake_up(&cli->cl_mod_rpcs_waitq);
2372
2373         spin_unlock(&cli->cl_mod_rpcs_lock);
2374
2375         return 0;
2376 }
2377 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2378
2379
2380 #define pct(a, b) (b ? a * 100 / b : 0)
2381 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2382                                struct seq_file *seq)
2383 {
2384         unsigned long mod_tot = 0, mod_cum;
2385         struct timespec64 now;
2386         int i;
2387
2388         ktime_get_real_ts64(&now);
2389
2390         spin_lock(&cli->cl_mod_rpcs_lock);
2391
2392         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2393                    (s64)now.tv_sec, now.tv_nsec);
2394         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2395                    cli->cl_mod_rpcs_in_flight);
2396
2397         seq_printf(seq, "\n\t\t\tmodify\n");
2398         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2399
2400         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2401
2402         mod_cum = 0;
2403         for (i = 0; i < OBD_HIST_MAX; i++) {
2404                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2405                 mod_cum += mod;
2406                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2407                            i, mod, pct(mod, mod_tot),
2408                            pct(mod_cum, mod_tot));
2409                 if (mod_cum == mod_tot)
2410                         break;
2411         }
2412
2413         spin_unlock(&cli->cl_mod_rpcs_lock);
2414
2415         return 0;
2416 }
2417 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2418 #undef pct
2419
2420
2421 /* The number of modify RPCs sent in parallel is limited
2422  * because the server has a finite number of slots per client to
2423  * store request result and ensure reply reconstruction when needed.
2424  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2425  * that takes into account server limit and cl_max_rpcs_in_flight
2426  * value.
2427  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2428  * one close request is allowed above the maximum.
2429  */
2430 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2431                                                  bool close_req)
2432 {
2433         bool avail;
2434
2435         /* A slot is available if
2436          * - number of modify RPCs in flight is less than the max
2437          * - it's a close RPC and no other close request is in flight
2438          */
2439         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2440                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2441
2442         return avail;
2443 }
2444
2445 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2446                                          bool close_req)
2447 {
2448         bool avail;
2449
2450         spin_lock(&cli->cl_mod_rpcs_lock);
2451         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2452         spin_unlock(&cli->cl_mod_rpcs_lock);
2453         return avail;
2454 }
2455
2456 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2457 {
2458         if (it != NULL &&
2459             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2460              it->it_op == IT_READDIR ||
2461              (it->it_op == IT_LAYOUT && !(it->it_flags & FMODE_WRITE))))
2462                         return true;
2463         return false;
2464 }
2465
2466 /* Get a modify RPC slot from the obd client @cli according
2467  * to the kind of operation @opc that is going to be sent
2468  * and the intent @it of the operation if it applies.
2469  * If the maximum number of modify RPCs in flight is reached
2470  * the thread is put to sleep.
2471  * Returns the tag to be set in the request message. Tag 0
2472  * is reserved for non-modifying requests.
2473  */
2474 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2475                            struct lookup_intent *it)
2476 {
2477         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2478         bool                    close_req = false;
2479         __u16                   i, max;
2480
2481         /* read-only metadata RPCs don't consume a slot on MDT
2482          * for reply reconstruction
2483          */
2484         if (obd_skip_mod_rpc_slot(it))
2485                 return 0;
2486
2487         if (opc == MDS_CLOSE)
2488                 close_req = true;
2489
2490         do {
2491                 spin_lock(&cli->cl_mod_rpcs_lock);
2492                 max = cli->cl_max_mod_rpcs_in_flight;
2493                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2494                         /* there is a slot available */
2495                         cli->cl_mod_rpcs_in_flight++;
2496                         if (close_req)
2497                                 cli->cl_close_rpcs_in_flight++;
2498                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2499                                          cli->cl_mod_rpcs_in_flight);
2500                         /* find a free tag */
2501                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2502                                                 max + 1);
2503                         LASSERT(i < OBD_MAX_RIF_MAX);
2504                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2505                         spin_unlock(&cli->cl_mod_rpcs_lock);
2506                         /* tag 0 is reserved for non-modify RPCs */
2507                         return i + 1;
2508                 }
2509                 spin_unlock(&cli->cl_mod_rpcs_lock);
2510
2511                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2512                        "opc %u, max %hu\n",
2513                        cli->cl_import->imp_obd->obd_name, opc, max);
2514
2515                 l_wait_event(cli->cl_mod_rpcs_waitq,
2516                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2517         } while (true);
2518 }
2519 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2520
2521 /* Put a modify RPC slot from the obd client @cli according
2522  * to the kind of operation @opc that has been sent and the
2523  * intent @it of the operation if it applies.
2524  */
2525 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2526                           struct lookup_intent *it, __u16 tag)
2527 {
2528         bool                    close_req = false;
2529
2530         if (obd_skip_mod_rpc_slot(it))
2531                 return;
2532
2533         if (opc == MDS_CLOSE)
2534                 close_req = true;
2535
2536         spin_lock(&cli->cl_mod_rpcs_lock);
2537         cli->cl_mod_rpcs_in_flight--;
2538         if (close_req)
2539                 cli->cl_close_rpcs_in_flight--;
2540         /* release the tag in the bitmap */
2541         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2542         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2543         spin_unlock(&cli->cl_mod_rpcs_lock);
2544         wake_up(&cli->cl_mod_rpcs_waitq);
2545 }
2546 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2547