Whamcloud - gitweb
LU-6032 obdclass: new wrapper to convert NID to string
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 static DEFINE_SPINLOCK(obd_types_lock);
49 static LIST_HEAD(obd_types);
50 DEFINE_RWLOCK(obd_dev_lock);
51 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52
53 static struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 static struct kmem_cache *import_cachep;
57
58 static LIST_HEAD(obd_zombie_imports);
59 static LIST_HEAD(obd_zombie_exports);
60 static DEFINE_SPINLOCK(obd_zombie_impexp_lock);
61
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp,
66                               const char *status, int locks, int debug_level);
67
68 static LIST_HEAD(obd_stale_exports);
69 static DEFINE_SPINLOCK(obd_stale_export_lock);
70 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
71
72 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
73 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
74
75 /*
76  * support functions: we could use inter-module communication, but this
77  * is more portable to other OS's
78  */
79 static struct obd_device *obd_device_alloc(void)
80 {
81         struct obd_device *obd;
82
83         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
84         if (obd != NULL) {
85                 obd->obd_magic = OBD_DEVICE_MAGIC;
86         }
87         return obd;
88 }
89
90 static void obd_device_free(struct obd_device *obd)
91 {
92         LASSERT(obd != NULL);
93         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
94                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
95         if (obd->obd_namespace != NULL) {
96                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
97                        obd, obd->obd_namespace, obd->obd_force);
98                 LBUG();
99         }
100         lu_ref_fini(&obd->obd_reference);
101         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
102 }
103
104 struct obd_type *class_search_type(const char *name)
105 {
106         struct list_head *tmp;
107         struct obd_type *type;
108
109         spin_lock(&obd_types_lock);
110         list_for_each(tmp, &obd_types) {
111                 type = list_entry(tmp, struct obd_type, typ_chain);
112                 if (strcmp(type->typ_name, name) == 0) {
113                         spin_unlock(&obd_types_lock);
114                         return type;
115                 }
116         }
117         spin_unlock(&obd_types_lock);
118         return NULL;
119 }
120 EXPORT_SYMBOL(class_search_type);
121
122 struct obd_type *class_get_type(const char *name)
123 {
124         struct obd_type *type = class_search_type(name);
125
126 #ifdef HAVE_MODULE_LOADING_SUPPORT
127         if (!type) {
128                 const char *modname = name;
129
130                 if (strcmp(modname, "obdfilter") == 0)
131                         modname = "ofd";
132
133                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
134                         modname = LUSTRE_OSP_NAME;
135
136                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
137                         modname = LUSTRE_MDT_NAME;
138
139                 if (!request_module("%s", modname)) {
140                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
141                         type = class_search_type(name);
142                 } else {
143                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144                                            modname);
145                 }
146         }
147 #endif
148         if (type) {
149                 spin_lock(&type->obd_type_lock);
150                 type->typ_refcnt++;
151                 try_module_get(type->typ_dt_ops->o_owner);
152                 spin_unlock(&type->obd_type_lock);
153         }
154         return type;
155 }
156
157 void class_put_type(struct obd_type *type)
158 {
159         LASSERT(type);
160         spin_lock(&type->obd_type_lock);
161         type->typ_refcnt--;
162         module_put(type->typ_dt_ops->o_owner);
163         spin_unlock(&type->obd_type_lock);
164 }
165
166 static void class_sysfs_release(struct kobject *kobj)
167 {
168         OBD_FREE(kobj, sizeof(*kobj));
169 }
170
171 static struct kobj_type class_ktype = {
172         .sysfs_ops      = &lustre_sysfs_ops,
173         .release        = class_sysfs_release,
174 };
175
176 struct kobject *class_setup_tunables(const char *name)
177 {
178         struct kobject *kobj;
179         int rc;
180
181 #ifdef HAVE_SERVER_SUPPORT
182         kobj = kset_find_obj(lustre_kset, name);
183         if (kobj)
184                 return kobj;
185 #endif
186         OBD_ALLOC(kobj, sizeof(*kobj));
187         if (!kobj)
188                 return ERR_PTR(-ENOMEM);
189
190         kobj->kset = lustre_kset;
191         kobject_init(kobj, &class_ktype);
192         rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
193         if (rc) {
194                 kobject_put(kobj);
195                 return ERR_PTR(rc);
196         }
197         return kobj;
198 }
199 EXPORT_SYMBOL(class_setup_tunables);
200
201 #define CLASS_MAX_NAME 1024
202
203 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
204                         bool enable_proc, struct lprocfs_vars *vars,
205                         const char *name, struct lu_device_type *ldt)
206 {
207         struct obd_type *type;
208 #ifdef HAVE_SERVER_SUPPORT
209         struct qstr dname;
210 #endif /* HAVE_SERVER_SUPPORT */
211         int rc = 0;
212
213         ENTRY;
214         /* sanity check */
215         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
216
217         if (class_search_type(name)) {
218                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
219                 RETURN(-EEXIST);
220         }
221
222         rc = -ENOMEM;
223         OBD_ALLOC(type, sizeof(*type));
224         if (type == NULL)
225                 RETURN(rc);
226
227         OBD_ALLOC_PTR(type->typ_dt_ops);
228         OBD_ALLOC_PTR(type->typ_md_ops);
229         OBD_ALLOC(type->typ_name, strlen(name) + 1);
230
231         if (type->typ_dt_ops == NULL ||
232             type->typ_md_ops == NULL ||
233             type->typ_name == NULL)
234                 GOTO (failed, rc);
235
236         *(type->typ_dt_ops) = *dt_ops;
237         /* md_ops is optional */
238         if (md_ops)
239                 *(type->typ_md_ops) = *md_ops;
240         strcpy(type->typ_name, name);
241         spin_lock_init(&type->obd_type_lock);
242
243 #ifdef CONFIG_PROC_FS
244         if (enable_proc) {
245                 type->typ_procroot = lprocfs_register(type->typ_name,
246                                                       proc_lustre_root,
247                                                       vars, type);
248                 if (IS_ERR(type->typ_procroot)) {
249                         rc = PTR_ERR(type->typ_procroot);
250                         type->typ_procroot = NULL;
251                         GOTO(failed, rc);
252                 }
253         }
254 #endif
255 #ifdef HAVE_SERVER_SUPPORT
256         dname.name = name;
257         dname.len = strlen(dname.name);
258         dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
259                                        dname.len);
260         type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
261         if (type->typ_debugfs_entry) {
262                 dput(type->typ_debugfs_entry);
263                 type->typ_sym_filter = true;
264                 goto dir_exist;
265         }
266 #endif /* HAVE_SERVER_SUPPORT */
267
268         type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
269                                                     debugfs_lustre_root,
270                                                     NULL, type);
271         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
272                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
273                                              : -ENOMEM;
274                 type->typ_debugfs_entry = NULL;
275                 GOTO(failed, rc);
276         }
277 #ifdef HAVE_SERVER_SUPPORT
278 dir_exist:
279 #endif
280         type->typ_kobj = class_setup_tunables(type->typ_name);
281         if (IS_ERR(type->typ_kobj))
282                 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
283
284         if (ldt) {
285                 type->typ_lu = ldt;
286                 rc = lu_device_type_init(ldt);
287                 if (rc) {
288                         kobject_put(type->typ_kobj);
289                         GOTO(failed, rc);
290                 }
291         }
292
293         spin_lock(&obd_types_lock);
294         list_add(&type->typ_chain, &obd_types);
295         spin_unlock(&obd_types_lock);
296
297         RETURN(0);
298
299 failed:
300 #ifdef HAVE_SERVER_SUPPORT
301         if (type->typ_sym_filter)
302                 type->typ_debugfs_entry = NULL;
303 #endif
304         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
305                 ldebugfs_remove(&type->typ_debugfs_entry);
306         if (type->typ_name != NULL) {
307 #ifdef CONFIG_PROC_FS
308                 if (type->typ_procroot != NULL)
309                         remove_proc_subtree(type->typ_name, proc_lustre_root);
310 #endif
311                 OBD_FREE(type->typ_name, strlen(name) + 1);
312         }
313         if (type->typ_md_ops != NULL)
314                 OBD_FREE_PTR(type->typ_md_ops);
315         if (type->typ_dt_ops != NULL)
316                 OBD_FREE_PTR(type->typ_dt_ops);
317         OBD_FREE(type, sizeof(*type));
318         RETURN(rc);
319 }
320 EXPORT_SYMBOL(class_register_type);
321
322 int class_unregister_type(const char *name)
323 {
324         struct obd_type *type = class_search_type(name);
325         ENTRY;
326
327         if (!type) {
328                 CERROR("unknown obd type\n");
329                 RETURN(-EINVAL);
330         }
331
332         if (type->typ_refcnt) {
333                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
334                 /* This is a bad situation, let's make the best of it */
335                 /* Remove ops, but leave the name for debugging */
336                 OBD_FREE_PTR(type->typ_dt_ops);
337                 OBD_FREE_PTR(type->typ_md_ops);
338                 RETURN(-EBUSY);
339         }
340
341         kobject_put(type->typ_kobj);
342
343         /* we do not use type->typ_procroot as for compatibility purposes
344          * other modules can share names (i.e. lod can use lov entry). so
345          * we can't reference pointer as it can get invalided when another
346          * module removes the entry */
347 #ifdef CONFIG_PROC_FS
348         if (type->typ_procroot != NULL)
349                 remove_proc_subtree(type->typ_name, proc_lustre_root);
350         if (type->typ_procsym != NULL)
351                 lprocfs_remove(&type->typ_procsym);
352 #endif
353 #ifdef HAVE_SERVER_SUPPORT
354         if (type->typ_sym_filter)
355                 type->typ_debugfs_entry = NULL;
356 #endif
357         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
358                 ldebugfs_remove(&type->typ_debugfs_entry);
359
360         if (type->typ_lu)
361                 lu_device_type_fini(type->typ_lu);
362
363         spin_lock(&obd_types_lock);
364         list_del(&type->typ_chain);
365         spin_unlock(&obd_types_lock);
366         OBD_FREE(type->typ_name, strlen(name) + 1);
367         if (type->typ_dt_ops != NULL)
368                 OBD_FREE_PTR(type->typ_dt_ops);
369         if (type->typ_md_ops != NULL)
370                 OBD_FREE_PTR(type->typ_md_ops);
371         OBD_FREE(type, sizeof(*type));
372         RETURN(0);
373 } /* class_unregister_type */
374 EXPORT_SYMBOL(class_unregister_type);
375
376 /**
377  * Create a new obd device.
378  *
379  * Allocate the new obd_device and initialize it.
380  *
381  * \param[in] type_name obd device type string.
382  * \param[in] name      obd device name.
383  * \param[in] uuid      obd device UUID
384  *
385  * \retval newdev         pointer to created obd_device
386  * \retval ERR_PTR(errno) on error
387  */
388 struct obd_device *class_newdev(const char *type_name, const char *name,
389                                 const char *uuid)
390 {
391         struct obd_device *newdev;
392         struct obd_type *type = NULL;
393         ENTRY;
394
395         if (strlen(name) >= MAX_OBD_NAME) {
396                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
397                 RETURN(ERR_PTR(-EINVAL));
398         }
399
400         type = class_get_type(type_name);
401         if (type == NULL){
402                 CERROR("OBD: unknown type: %s\n", type_name);
403                 RETURN(ERR_PTR(-ENODEV));
404         }
405
406         newdev = obd_device_alloc();
407         if (newdev == NULL) {
408                 class_put_type(type);
409                 RETURN(ERR_PTR(-ENOMEM));
410         }
411         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
412         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
413         newdev->obd_type = type;
414         newdev->obd_minor = -1;
415
416         rwlock_init(&newdev->obd_pool_lock);
417         newdev->obd_pool_limit = 0;
418         newdev->obd_pool_slv = 0;
419
420         INIT_LIST_HEAD(&newdev->obd_exports);
421         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
422         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
423         INIT_LIST_HEAD(&newdev->obd_exports_timed);
424         INIT_LIST_HEAD(&newdev->obd_nid_stats);
425         spin_lock_init(&newdev->obd_nid_lock);
426         spin_lock_init(&newdev->obd_dev_lock);
427         mutex_init(&newdev->obd_dev_mutex);
428         spin_lock_init(&newdev->obd_osfs_lock);
429         /* newdev->obd_osfs_age must be set to a value in the distant
430          * past to guarantee a fresh statfs is fetched on mount. */
431         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
432
433         /* XXX belongs in setup not attach  */
434         init_rwsem(&newdev->obd_observer_link_sem);
435         /* recovery data */
436         init_timer(&newdev->obd_recovery_timer);
437         spin_lock_init(&newdev->obd_recovery_task_lock);
438         init_waitqueue_head(&newdev->obd_next_transno_waitq);
439         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
440         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
441         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
442         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
443         INIT_LIST_HEAD(&newdev->obd_evict_list);
444         INIT_LIST_HEAD(&newdev->obd_lwp_list);
445
446         llog_group_init(&newdev->obd_olg);
447         /* Detach drops this */
448         atomic_set(&newdev->obd_refcount, 1);
449         lu_ref_init(&newdev->obd_reference);
450         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
451
452         newdev->obd_conn_inprogress = 0;
453
454         strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
455
456         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
457                newdev->obd_name, newdev);
458
459         return newdev;
460 }
461
462 /**
463  * Free obd device.
464  *
465  * \param[in] obd obd_device to be freed
466  *
467  * \retval none
468  */
469 void class_free_dev(struct obd_device *obd)
470 {
471         struct obd_type *obd_type = obd->obd_type;
472
473         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
474                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
475         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
476                  "obd %p != obd_devs[%d] %p\n",
477                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
478         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
479                  "obd_refcount should be 0, not %d\n",
480                  atomic_read(&obd->obd_refcount));
481         LASSERT(obd_type != NULL);
482
483         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
484                obd->obd_name, obd->obd_type->typ_name);
485
486         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
487                          obd->obd_name, obd->obd_uuid.uuid);
488         if (obd->obd_stopping) {
489                 int err;
490
491                 /* If we're not stopping, we were never set up */
492                 err = obd_cleanup(obd);
493                 if (err)
494                         CERROR("Cleanup %s returned %d\n",
495                                 obd->obd_name, err);
496         }
497
498         obd_device_free(obd);
499
500         class_put_type(obd_type);
501 }
502
503 /**
504  * Unregister obd device.
505  *
506  * Free slot in obd_dev[] used by \a obd.
507  *
508  * \param[in] new_obd obd_device to be unregistered
509  *
510  * \retval none
511  */
512 void class_unregister_device(struct obd_device *obd)
513 {
514         write_lock(&obd_dev_lock);
515         if (obd->obd_minor >= 0) {
516                 LASSERT(obd_devs[obd->obd_minor] == obd);
517                 obd_devs[obd->obd_minor] = NULL;
518                 obd->obd_minor = -1;
519         }
520         write_unlock(&obd_dev_lock);
521 }
522
523 /**
524  * Register obd device.
525  *
526  * Find free slot in obd_devs[], fills it with \a new_obd.
527  *
528  * \param[in] new_obd obd_device to be registered
529  *
530  * \retval 0          success
531  * \retval -EEXIST    device with this name is registered
532  * \retval -EOVERFLOW obd_devs[] is full
533  */
534 int class_register_device(struct obd_device *new_obd)
535 {
536         int ret = 0;
537         int i;
538         int new_obd_minor = 0;
539         bool minor_assign = false;
540         bool retried = false;
541
542 again:
543         write_lock(&obd_dev_lock);
544         for (i = 0; i < class_devno_max(); i++) {
545                 struct obd_device *obd = class_num2obd(i);
546
547                 if (obd != NULL &&
548                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
549
550                         if (!retried) {
551                                 write_unlock(&obd_dev_lock);
552
553                                 /* the obd_device could be waited to be
554                                  * destroyed by the "obd_zombie_impexp_thread".
555                                  */
556                                 obd_zombie_barrier();
557                                 retried = true;
558                                 goto again;
559                         }
560
561                         CERROR("%s: already exists, won't add\n",
562                                obd->obd_name);
563                         /* in case we found a free slot before duplicate */
564                         minor_assign = false;
565                         ret = -EEXIST;
566                         break;
567                 }
568                 if (!minor_assign && obd == NULL) {
569                         new_obd_minor = i;
570                         minor_assign = true;
571                 }
572         }
573
574         if (minor_assign) {
575                 new_obd->obd_minor = new_obd_minor;
576                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
577                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
578                 obd_devs[new_obd_minor] = new_obd;
579         } else {
580                 if (ret == 0) {
581                         ret = -EOVERFLOW;
582                         CERROR("%s: all %u/%u devices used, increase "
583                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
584                                i, class_devno_max(), ret);
585                 }
586         }
587         write_unlock(&obd_dev_lock);
588
589         RETURN(ret);
590 }
591
592 static int class_name2dev_nolock(const char *name)
593 {
594         int i;
595
596         if (!name)
597                 return -1;
598
599         for (i = 0; i < class_devno_max(); i++) {
600                 struct obd_device *obd = class_num2obd(i);
601
602                 if (obd && strcmp(name, obd->obd_name) == 0) {
603                         /* Make sure we finished attaching before we give
604                            out any references */
605                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
606                         if (obd->obd_attached) {
607                                 return i;
608                         }
609                         break;
610                 }
611         }
612
613         return -1;
614 }
615
616 int class_name2dev(const char *name)
617 {
618         int i;
619
620         if (!name)
621                 return -1;
622
623         read_lock(&obd_dev_lock);
624         i = class_name2dev_nolock(name);
625         read_unlock(&obd_dev_lock);
626
627         return i;
628 }
629 EXPORT_SYMBOL(class_name2dev);
630
631 struct obd_device *class_name2obd(const char *name)
632 {
633         int dev = class_name2dev(name);
634
635         if (dev < 0 || dev > class_devno_max())
636                 return NULL;
637         return class_num2obd(dev);
638 }
639 EXPORT_SYMBOL(class_name2obd);
640
641 int class_uuid2dev_nolock(struct obd_uuid *uuid)
642 {
643         int i;
644
645         for (i = 0; i < class_devno_max(); i++) {
646                 struct obd_device *obd = class_num2obd(i);
647
648                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
649                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
650                         return i;
651                 }
652         }
653
654         return -1;
655 }
656
657 int class_uuid2dev(struct obd_uuid *uuid)
658 {
659         int i;
660
661         read_lock(&obd_dev_lock);
662         i = class_uuid2dev_nolock(uuid);
663         read_unlock(&obd_dev_lock);
664
665         return i;
666 }
667 EXPORT_SYMBOL(class_uuid2dev);
668
669 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
670 {
671         int dev = class_uuid2dev(uuid);
672         if (dev < 0)
673                 return NULL;
674         return class_num2obd(dev);
675 }
676 EXPORT_SYMBOL(class_uuid2obd);
677
678 /**
679  * Get obd device from ::obd_devs[]
680  *
681  * \param num [in] array index
682  *
683  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
684  *         otherwise return the obd device there.
685  */
686 struct obd_device *class_num2obd(int num)
687 {
688         struct obd_device *obd = NULL;
689
690         if (num < class_devno_max()) {
691                 obd = obd_devs[num];
692                 if (obd == NULL)
693                         return NULL;
694
695                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
696                          "%p obd_magic %08x != %08x\n",
697                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
698                 LASSERTF(obd->obd_minor == num,
699                          "%p obd_minor %0d != %0d\n",
700                          obd, obd->obd_minor, num);
701         }
702
703         return obd;
704 }
705
706 /**
707  * Find obd in obd_dev[] by name or uuid.
708  *
709  * Increment obd's refcount if found.
710  *
711  * \param[in] str obd name or uuid
712  *
713  * \retval NULL    if not found
714  * \retval target  pointer to found obd_device
715  */
716 struct obd_device *class_dev_by_str(const char *str)
717 {
718         struct obd_device *target = NULL;
719         struct obd_uuid tgtuuid;
720         int rc;
721
722         obd_str2uuid(&tgtuuid, str);
723
724         read_lock(&obd_dev_lock);
725         rc = class_uuid2dev_nolock(&tgtuuid);
726         if (rc < 0)
727                 rc = class_name2dev_nolock(str);
728
729         if (rc >= 0)
730                 target = class_num2obd(rc);
731
732         if (target != NULL)
733                 class_incref(target, "find", current);
734         read_unlock(&obd_dev_lock);
735
736         RETURN(target);
737 }
738 EXPORT_SYMBOL(class_dev_by_str);
739
740 /**
741  * Get obd devices count. Device in any
742  *    state are counted
743  * \retval obd device count
744  */
745 int get_devices_count(void)
746 {
747         int index, max_index = class_devno_max(), dev_count = 0;
748
749         read_lock(&obd_dev_lock);
750         for (index = 0; index <= max_index; index++) {
751                 struct obd_device *obd = class_num2obd(index);
752                 if (obd != NULL)
753                         dev_count++;
754         }
755         read_unlock(&obd_dev_lock);
756
757         return dev_count;
758 }
759 EXPORT_SYMBOL(get_devices_count);
760
761 void class_obd_list(void)
762 {
763         char *status;
764         int i;
765
766         read_lock(&obd_dev_lock);
767         for (i = 0; i < class_devno_max(); i++) {
768                 struct obd_device *obd = class_num2obd(i);
769
770                 if (obd == NULL)
771                         continue;
772                 if (obd->obd_stopping)
773                         status = "ST";
774                 else if (obd->obd_set_up)
775                         status = "UP";
776                 else if (obd->obd_attached)
777                         status = "AT";
778                 else
779                         status = "--";
780                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
781                          i, status, obd->obd_type->typ_name,
782                          obd->obd_name, obd->obd_uuid.uuid,
783                          atomic_read(&obd->obd_refcount));
784         }
785         read_unlock(&obd_dev_lock);
786         return;
787 }
788
789 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
790    specified, then only the client with that uuid is returned,
791    otherwise any client connected to the tgt is returned. */
792 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
793                                           const char * typ_name,
794                                           struct obd_uuid *grp_uuid)
795 {
796         int i;
797
798         read_lock(&obd_dev_lock);
799         for (i = 0; i < class_devno_max(); i++) {
800                 struct obd_device *obd = class_num2obd(i);
801
802                 if (obd == NULL)
803                         continue;
804                 if ((strncmp(obd->obd_type->typ_name, typ_name,
805                              strlen(typ_name)) == 0)) {
806                         if (obd_uuid_equals(tgt_uuid,
807                                             &obd->u.cli.cl_target_uuid) &&
808                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
809                                                          &obd->obd_uuid) : 1)) {
810                                 read_unlock(&obd_dev_lock);
811                                 return obd;
812                         }
813                 }
814         }
815         read_unlock(&obd_dev_lock);
816
817         return NULL;
818 }
819 EXPORT_SYMBOL(class_find_client_obd);
820
821 /* Iterate the obd_device list looking devices have grp_uuid. Start
822    searching at *next, and if a device is found, the next index to look
823    at is saved in *next. If next is NULL, then the first matching device
824    will always be returned. */
825 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
826 {
827         int i;
828
829         if (next == NULL)
830                 i = 0;
831         else if (*next >= 0 && *next < class_devno_max())
832                 i = *next;
833         else
834                 return NULL;
835
836         read_lock(&obd_dev_lock);
837         for (; i < class_devno_max(); i++) {
838                 struct obd_device *obd = class_num2obd(i);
839
840                 if (obd == NULL)
841                         continue;
842                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
843                         if (next != NULL)
844                                 *next = i+1;
845                         read_unlock(&obd_dev_lock);
846                         return obd;
847                 }
848         }
849         read_unlock(&obd_dev_lock);
850
851         return NULL;
852 }
853 EXPORT_SYMBOL(class_devices_in_group);
854
855 /**
856  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
857  * adjust sptlrpc settings accordingly.
858  */
859 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
860 {
861         struct obd_device  *obd;
862         const char         *type;
863         int                 i, rc = 0, rc2;
864
865         LASSERT(namelen > 0);
866
867         read_lock(&obd_dev_lock);
868         for (i = 0; i < class_devno_max(); i++) {
869                 obd = class_num2obd(i);
870
871                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
872                         continue;
873
874                 /* only notify mdc, osc, osp, lwp, mdt, ost
875                  * because only these have a -sptlrpc llog */
876                 type = obd->obd_type->typ_name;
877                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
878                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
879                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
880                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
881                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
882                     strcmp(type, LUSTRE_OST_NAME) != 0)
883                         continue;
884
885                 if (strncmp(obd->obd_name, fsname, namelen))
886                         continue;
887
888                 class_incref(obd, __FUNCTION__, obd);
889                 read_unlock(&obd_dev_lock);
890                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
891                                          sizeof(KEY_SPTLRPC_CONF),
892                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
893                 rc = rc ? rc : rc2;
894                 class_decref(obd, __FUNCTION__, obd);
895                 read_lock(&obd_dev_lock);
896         }
897         read_unlock(&obd_dev_lock);
898         return rc;
899 }
900 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
901
902 void obd_cleanup_caches(void)
903 {
904         ENTRY;
905         if (obd_device_cachep) {
906                 kmem_cache_destroy(obd_device_cachep);
907                 obd_device_cachep = NULL;
908         }
909         if (obdo_cachep) {
910                 kmem_cache_destroy(obdo_cachep);
911                 obdo_cachep = NULL;
912         }
913         if (import_cachep) {
914                 kmem_cache_destroy(import_cachep);
915                 import_cachep = NULL;
916         }
917
918         EXIT;
919 }
920
921 int obd_init_caches(void)
922 {
923         int rc;
924         ENTRY;
925
926         LASSERT(obd_device_cachep == NULL);
927         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
928                                               sizeof(struct obd_device),
929                                               0, 0, NULL);
930         if (!obd_device_cachep)
931                 GOTO(out, rc = -ENOMEM);
932
933         LASSERT(obdo_cachep == NULL);
934         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
935                                         0, 0, NULL);
936         if (!obdo_cachep)
937                 GOTO(out, rc = -ENOMEM);
938
939         LASSERT(import_cachep == NULL);
940         import_cachep = kmem_cache_create("ll_import_cache",
941                                           sizeof(struct obd_import),
942                                           0, 0, NULL);
943         if (!import_cachep)
944                 GOTO(out, rc = -ENOMEM);
945
946         RETURN(0);
947 out:
948         obd_cleanup_caches();
949         RETURN(rc);
950 }
951
952 /* map connection to client */
953 struct obd_export *class_conn2export(struct lustre_handle *conn)
954 {
955         struct obd_export *export;
956         ENTRY;
957
958         if (!conn) {
959                 CDEBUG(D_CACHE, "looking for null handle\n");
960                 RETURN(NULL);
961         }
962
963         if (conn->cookie == -1) {  /* this means assign a new connection */
964                 CDEBUG(D_CACHE, "want a new connection\n");
965                 RETURN(NULL);
966         }
967
968         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
969         export = class_handle2object(conn->cookie, NULL);
970         RETURN(export);
971 }
972 EXPORT_SYMBOL(class_conn2export);
973
974 struct obd_device *class_exp2obd(struct obd_export *exp)
975 {
976         if (exp)
977                 return exp->exp_obd;
978         return NULL;
979 }
980 EXPORT_SYMBOL(class_exp2obd);
981
982 struct obd_device *class_conn2obd(struct lustre_handle *conn)
983 {
984         struct obd_export *export;
985         export = class_conn2export(conn);
986         if (export) {
987                 struct obd_device *obd = export->exp_obd;
988                 class_export_put(export);
989                 return obd;
990         }
991         return NULL;
992 }
993
994 struct obd_import *class_exp2cliimp(struct obd_export *exp)
995 {
996         struct obd_device *obd = exp->exp_obd;
997         if (obd == NULL)
998                 return NULL;
999         return obd->u.cli.cl_import;
1000 }
1001 EXPORT_SYMBOL(class_exp2cliimp);
1002
1003 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
1004 {
1005         struct obd_device *obd = class_conn2obd(conn);
1006         if (obd == NULL)
1007                 return NULL;
1008         return obd->u.cli.cl_import;
1009 }
1010
1011 /* Export management functions */
1012 static void class_export_destroy(struct obd_export *exp)
1013 {
1014         struct obd_device *obd = exp->exp_obd;
1015         ENTRY;
1016
1017         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
1018         LASSERT(obd != NULL);
1019
1020         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
1021                exp->exp_client_uuid.uuid, obd->obd_name);
1022
1023         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
1024         if (exp->exp_connection)
1025                 ptlrpc_put_connection_superhack(exp->exp_connection);
1026
1027         LASSERT(list_empty(&exp->exp_outstanding_replies));
1028         LASSERT(list_empty(&exp->exp_uncommitted_replies));
1029         LASSERT(list_empty(&exp->exp_req_replay_queue));
1030         LASSERT(list_empty(&exp->exp_hp_rpcs));
1031         obd_destroy_export(exp);
1032         /* self export doesn't hold a reference to an obd, although it
1033          * exists until freeing of the obd */
1034         if (exp != obd->obd_self_export)
1035                 class_decref(obd, "export", exp);
1036
1037         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1038         EXIT;
1039 }
1040
1041 static void export_handle_addref(void *export)
1042 {
1043         class_export_get(export);
1044 }
1045
1046 static struct portals_handle_ops export_handle_ops = {
1047         .hop_addref = export_handle_addref,
1048         .hop_free   = NULL,
1049 };
1050
1051 struct obd_export *class_export_get(struct obd_export *exp)
1052 {
1053         atomic_inc(&exp->exp_refcount);
1054         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1055                atomic_read(&exp->exp_refcount));
1056         return exp;
1057 }
1058 EXPORT_SYMBOL(class_export_get);
1059
1060 void class_export_put(struct obd_export *exp)
1061 {
1062         LASSERT(exp != NULL);
1063         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1064         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1065                atomic_read(&exp->exp_refcount) - 1);
1066
1067         if (atomic_dec_and_test(&exp->exp_refcount)) {
1068                 struct obd_device *obd = exp->exp_obd;
1069
1070                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1071                        exp, exp->exp_client_uuid.uuid);
1072
1073                 /* release nid stat refererence */
1074                 lprocfs_exp_cleanup(exp);
1075
1076                 if (exp == obd->obd_self_export) {
1077                         /* self export should be destroyed without
1078                          * zombie thread as it doesn't hold a
1079                          * reference to obd and doesn't hold any
1080                          * resources */
1081                         class_export_destroy(exp);
1082                         /* self export is destroyed, no class
1083                          * references exist and it is safe to free
1084                          * obd */
1085                         class_free_dev(obd);
1086                 } else {
1087                         LASSERT(!list_empty(&exp->exp_obd_chain));
1088                         obd_zombie_export_add(exp);
1089                 }
1090
1091         }
1092 }
1093 EXPORT_SYMBOL(class_export_put);
1094 /* Creates a new export, adds it to the hash table, and returns a
1095  * pointer to it. The refcount is 2: one for the hash reference, and
1096  * one for the pointer returned by this function. */
1097 struct obd_export *__class_new_export(struct obd_device *obd,
1098                                       struct obd_uuid *cluuid, bool is_self)
1099 {
1100         struct obd_export *export;
1101         struct cfs_hash *hash = NULL;
1102         int rc = 0;
1103         ENTRY;
1104
1105         OBD_ALLOC_PTR(export);
1106         if (!export)
1107                 return ERR_PTR(-ENOMEM);
1108
1109         export->exp_conn_cnt = 0;
1110         export->exp_lock_hash = NULL;
1111         export->exp_flock_hash = NULL;
1112         /* 2 = class_handle_hash + last */
1113         atomic_set(&export->exp_refcount, 2);
1114         atomic_set(&export->exp_rpc_count, 0);
1115         atomic_set(&export->exp_cb_count, 0);
1116         atomic_set(&export->exp_locks_count, 0);
1117 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1118         INIT_LIST_HEAD(&export->exp_locks_list);
1119         spin_lock_init(&export->exp_locks_list_guard);
1120 #endif
1121         atomic_set(&export->exp_replay_count, 0);
1122         export->exp_obd = obd;
1123         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1124         spin_lock_init(&export->exp_uncommitted_replies_lock);
1125         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1126         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1127         INIT_LIST_HEAD(&export->exp_handle.h_link);
1128         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1129         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1130         class_handle_hash(&export->exp_handle, &export_handle_ops);
1131         export->exp_last_request_time = ktime_get_real_seconds();
1132         spin_lock_init(&export->exp_lock);
1133         spin_lock_init(&export->exp_rpc_lock);
1134         INIT_HLIST_NODE(&export->exp_uuid_hash);
1135         INIT_HLIST_NODE(&export->exp_nid_hash);
1136         INIT_HLIST_NODE(&export->exp_gen_hash);
1137         spin_lock_init(&export->exp_bl_list_lock);
1138         INIT_LIST_HEAD(&export->exp_bl_list);
1139         INIT_LIST_HEAD(&export->exp_stale_list);
1140
1141         export->exp_sp_peer = LUSTRE_SP_ANY;
1142         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1143         export->exp_client_uuid = *cluuid;
1144         obd_init_export(export);
1145
1146         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1147                 spin_lock(&obd->obd_dev_lock);
1148                 /* shouldn't happen, but might race */
1149                 if (obd->obd_stopping)
1150                         GOTO(exit_unlock, rc = -ENODEV);
1151
1152                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1153                 if (hash == NULL)
1154                         GOTO(exit_unlock, rc = -ENODEV);
1155                 spin_unlock(&obd->obd_dev_lock);
1156
1157                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1158                 if (rc != 0) {
1159                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1160                                       obd->obd_name, cluuid->uuid, rc);
1161                         GOTO(exit_err, rc = -EALREADY);
1162                 }
1163         }
1164
1165         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1166         spin_lock(&obd->obd_dev_lock);
1167         if (obd->obd_stopping) {
1168                 if (hash)
1169                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1170                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1171         }
1172
1173         if (!is_self) {
1174                 class_incref(obd, "export", export);
1175                 list_add_tail(&export->exp_obd_chain_timed,
1176                               &obd->obd_exports_timed);
1177                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1178                 obd->obd_num_exports++;
1179         } else {
1180                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1181                 INIT_LIST_HEAD(&export->exp_obd_chain);
1182         }
1183         spin_unlock(&obd->obd_dev_lock);
1184         if (hash)
1185                 cfs_hash_putref(hash);
1186         RETURN(export);
1187
1188 exit_unlock:
1189         spin_unlock(&obd->obd_dev_lock);
1190 exit_err:
1191         if (hash)
1192                 cfs_hash_putref(hash);
1193         class_handle_unhash(&export->exp_handle);
1194         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1195         obd_destroy_export(export);
1196         OBD_FREE_PTR(export);
1197         return ERR_PTR(rc);
1198 }
1199
1200 struct obd_export *class_new_export(struct obd_device *obd,
1201                                     struct obd_uuid *uuid)
1202 {
1203         return __class_new_export(obd, uuid, false);
1204 }
1205 EXPORT_SYMBOL(class_new_export);
1206
1207 struct obd_export *class_new_export_self(struct obd_device *obd,
1208                                          struct obd_uuid *uuid)
1209 {
1210         return __class_new_export(obd, uuid, true);
1211 }
1212
1213 void class_unlink_export(struct obd_export *exp)
1214 {
1215         class_handle_unhash(&exp->exp_handle);
1216
1217         if (exp->exp_obd->obd_self_export == exp) {
1218                 class_export_put(exp);
1219                 return;
1220         }
1221
1222         spin_lock(&exp->exp_obd->obd_dev_lock);
1223         /* delete an uuid-export hashitem from hashtables */
1224         if (!hlist_unhashed(&exp->exp_uuid_hash))
1225                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1226                              &exp->exp_client_uuid,
1227                              &exp->exp_uuid_hash);
1228
1229 #ifdef HAVE_SERVER_SUPPORT
1230         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1231                 struct tg_export_data   *ted = &exp->exp_target_data;
1232                 struct cfs_hash         *hash;
1233
1234                 /* Because obd_gen_hash will not be released until
1235                  * class_cleanup(), so hash should never be NULL here */
1236                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1237                 LASSERT(hash != NULL);
1238                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1239                              &exp->exp_gen_hash);
1240                 cfs_hash_putref(hash);
1241         }
1242 #endif /* HAVE_SERVER_SUPPORT */
1243
1244         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1245         list_del_init(&exp->exp_obd_chain_timed);
1246         exp->exp_obd->obd_num_exports--;
1247         spin_unlock(&exp->exp_obd->obd_dev_lock);
1248         atomic_inc(&obd_stale_export_num);
1249
1250         /* A reference is kept by obd_stale_exports list */
1251         obd_stale_export_put(exp);
1252 }
1253 EXPORT_SYMBOL(class_unlink_export);
1254
1255 /* Import management functions */
1256 static void class_import_destroy(struct obd_import *imp)
1257 {
1258         ENTRY;
1259
1260         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1261                 imp->imp_obd->obd_name);
1262
1263         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1264
1265         ptlrpc_put_connection_superhack(imp->imp_connection);
1266
1267         while (!list_empty(&imp->imp_conn_list)) {
1268                 struct obd_import_conn *imp_conn;
1269
1270                 imp_conn = list_entry(imp->imp_conn_list.next,
1271                                       struct obd_import_conn, oic_item);
1272                 list_del_init(&imp_conn->oic_item);
1273                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1274                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1275         }
1276
1277         LASSERT(imp->imp_sec == NULL);
1278         class_decref(imp->imp_obd, "import", imp);
1279         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1280         EXIT;
1281 }
1282
1283 static void import_handle_addref(void *import)
1284 {
1285         class_import_get(import);
1286 }
1287
1288 static struct portals_handle_ops import_handle_ops = {
1289         .hop_addref = import_handle_addref,
1290         .hop_free   = NULL,
1291 };
1292
1293 struct obd_import *class_import_get(struct obd_import *import)
1294 {
1295         atomic_inc(&import->imp_refcount);
1296         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1297                atomic_read(&import->imp_refcount),
1298                import->imp_obd->obd_name);
1299         return import;
1300 }
1301 EXPORT_SYMBOL(class_import_get);
1302
1303 void class_import_put(struct obd_import *imp)
1304 {
1305         ENTRY;
1306
1307         LASSERT(list_empty(&imp->imp_zombie_chain));
1308         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1309
1310         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1311                atomic_read(&imp->imp_refcount) - 1,
1312                imp->imp_obd->obd_name);
1313
1314         if (atomic_dec_and_test(&imp->imp_refcount)) {
1315                 CDEBUG(D_INFO, "final put import %p\n", imp);
1316                 obd_zombie_import_add(imp);
1317         }
1318
1319         /* catch possible import put race */
1320         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1321         EXIT;
1322 }
1323 EXPORT_SYMBOL(class_import_put);
1324
1325 static void init_imp_at(struct imp_at *at) {
1326         int i;
1327         at_init(&at->iat_net_latency, 0, 0);
1328         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1329                 /* max service estimates are tracked on the server side, so
1330                    don't use the AT history here, just use the last reported
1331                    val. (But keep hist for proc histogram, worst_ever) */
1332                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1333                         AT_FLG_NOHIST);
1334         }
1335 }
1336
1337 struct obd_import *class_new_import(struct obd_device *obd)
1338 {
1339         struct obd_import *imp;
1340         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1341
1342         OBD_ALLOC(imp, sizeof(*imp));
1343         if (imp == NULL)
1344                 return NULL;
1345
1346         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1347         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1348         INIT_LIST_HEAD(&imp->imp_replay_list);
1349         INIT_LIST_HEAD(&imp->imp_sending_list);
1350         INIT_LIST_HEAD(&imp->imp_delayed_list);
1351         INIT_LIST_HEAD(&imp->imp_committed_list);
1352         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1353         imp->imp_known_replied_xid = 0;
1354         imp->imp_replay_cursor = &imp->imp_committed_list;
1355         spin_lock_init(&imp->imp_lock);
1356         imp->imp_last_success_conn = 0;
1357         imp->imp_state = LUSTRE_IMP_NEW;
1358         imp->imp_obd = class_incref(obd, "import", imp);
1359         mutex_init(&imp->imp_sec_mutex);
1360         init_waitqueue_head(&imp->imp_recovery_waitq);
1361
1362         if (curr_pid_ns->child_reaper)
1363                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1364         else
1365                 imp->imp_sec_refpid = 1;
1366
1367         atomic_set(&imp->imp_refcount, 2);
1368         atomic_set(&imp->imp_unregistering, 0);
1369         atomic_set(&imp->imp_inflight, 0);
1370         atomic_set(&imp->imp_replay_inflight, 0);
1371         atomic_set(&imp->imp_inval_count, 0);
1372         INIT_LIST_HEAD(&imp->imp_conn_list);
1373         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1374         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1375         init_imp_at(&imp->imp_at);
1376
1377         /* the default magic is V2, will be used in connect RPC, and
1378          * then adjusted according to the flags in request/reply. */
1379         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1380
1381         return imp;
1382 }
1383 EXPORT_SYMBOL(class_new_import);
1384
1385 void class_destroy_import(struct obd_import *import)
1386 {
1387         LASSERT(import != NULL);
1388         LASSERT(import != LP_POISON);
1389
1390         class_handle_unhash(&import->imp_handle);
1391
1392         spin_lock(&import->imp_lock);
1393         import->imp_generation++;
1394         spin_unlock(&import->imp_lock);
1395         class_import_put(import);
1396 }
1397 EXPORT_SYMBOL(class_destroy_import);
1398
1399 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1400
1401 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1402 {
1403         spin_lock(&exp->exp_locks_list_guard);
1404
1405         LASSERT(lock->l_exp_refs_nr >= 0);
1406
1407         if (lock->l_exp_refs_target != NULL &&
1408             lock->l_exp_refs_target != exp) {
1409                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1410                               exp, lock, lock->l_exp_refs_target);
1411         }
1412         if ((lock->l_exp_refs_nr ++) == 0) {
1413                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1414                 lock->l_exp_refs_target = exp;
1415         }
1416         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1417                lock, exp, lock->l_exp_refs_nr);
1418         spin_unlock(&exp->exp_locks_list_guard);
1419 }
1420 EXPORT_SYMBOL(__class_export_add_lock_ref);
1421
1422 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1423 {
1424         spin_lock(&exp->exp_locks_list_guard);
1425         LASSERT(lock->l_exp_refs_nr > 0);
1426         if (lock->l_exp_refs_target != exp) {
1427                 LCONSOLE_WARN("lock %p, "
1428                               "mismatching export pointers: %p, %p\n",
1429                               lock, lock->l_exp_refs_target, exp);
1430         }
1431         if (-- lock->l_exp_refs_nr == 0) {
1432                 list_del_init(&lock->l_exp_refs_link);
1433                 lock->l_exp_refs_target = NULL;
1434         }
1435         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1436                lock, exp, lock->l_exp_refs_nr);
1437         spin_unlock(&exp->exp_locks_list_guard);
1438 }
1439 EXPORT_SYMBOL(__class_export_del_lock_ref);
1440 #endif
1441
1442 /* A connection defines an export context in which preallocation can
1443    be managed. This releases the export pointer reference, and returns
1444    the export handle, so the export refcount is 1 when this function
1445    returns. */
1446 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1447                   struct obd_uuid *cluuid)
1448 {
1449         struct obd_export *export;
1450         LASSERT(conn != NULL);
1451         LASSERT(obd != NULL);
1452         LASSERT(cluuid != NULL);
1453         ENTRY;
1454
1455         export = class_new_export(obd, cluuid);
1456         if (IS_ERR(export))
1457                 RETURN(PTR_ERR(export));
1458
1459         conn->cookie = export->exp_handle.h_cookie;
1460         class_export_put(export);
1461
1462         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1463                cluuid->uuid, conn->cookie);
1464         RETURN(0);
1465 }
1466 EXPORT_SYMBOL(class_connect);
1467
1468 /* if export is involved in recovery then clean up related things */
1469 static void class_export_recovery_cleanup(struct obd_export *exp)
1470 {
1471         struct obd_device *obd = exp->exp_obd;
1472
1473         spin_lock(&obd->obd_recovery_task_lock);
1474         if (obd->obd_recovering) {
1475                 if (exp->exp_in_recovery) {
1476                         spin_lock(&exp->exp_lock);
1477                         exp->exp_in_recovery = 0;
1478                         spin_unlock(&exp->exp_lock);
1479                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1480                         atomic_dec(&obd->obd_connected_clients);
1481                 }
1482
1483                 /* if called during recovery then should update
1484                  * obd_stale_clients counter,
1485                  * lightweight exports are not counted */
1486                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1487                         exp->exp_obd->obd_stale_clients++;
1488         }
1489         spin_unlock(&obd->obd_recovery_task_lock);
1490
1491         spin_lock(&exp->exp_lock);
1492         /** Cleanup req replay fields */
1493         if (exp->exp_req_replay_needed) {
1494                 exp->exp_req_replay_needed = 0;
1495
1496                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1497                 atomic_dec(&obd->obd_req_replay_clients);
1498         }
1499
1500         /** Cleanup lock replay data */
1501         if (exp->exp_lock_replay_needed) {
1502                 exp->exp_lock_replay_needed = 0;
1503
1504                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1505                 atomic_dec(&obd->obd_lock_replay_clients);
1506         }
1507         spin_unlock(&exp->exp_lock);
1508 }
1509
1510 /* This function removes 1-3 references from the export:
1511  * 1 - for export pointer passed
1512  * and if disconnect really need
1513  * 2 - removing from hash
1514  * 3 - in client_unlink_export
1515  * The export pointer passed to this function can destroyed */
1516 int class_disconnect(struct obd_export *export)
1517 {
1518         int already_disconnected;
1519         ENTRY;
1520
1521         if (export == NULL) {
1522                 CWARN("attempting to free NULL export %p\n", export);
1523                 RETURN(-EINVAL);
1524         }
1525
1526         spin_lock(&export->exp_lock);
1527         already_disconnected = export->exp_disconnected;
1528         export->exp_disconnected = 1;
1529         /*  We hold references of export for uuid hash
1530          *  and nid_hash and export link at least. So
1531          *  it is safe to call cfs_hash_del in there.  */
1532         if (!hlist_unhashed(&export->exp_nid_hash))
1533                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1534                              &export->exp_connection->c_peer.nid,
1535                              &export->exp_nid_hash);
1536         spin_unlock(&export->exp_lock);
1537
1538         /* class_cleanup(), abort_recovery(), and class_fail_export()
1539          * all end up in here, and if any of them race we shouldn't
1540          * call extra class_export_puts(). */
1541         if (already_disconnected) {
1542                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1543                 GOTO(no_disconn, already_disconnected);
1544         }
1545
1546         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1547                export->exp_handle.h_cookie);
1548
1549         class_export_recovery_cleanup(export);
1550         class_unlink_export(export);
1551 no_disconn:
1552         class_export_put(export);
1553         RETURN(0);
1554 }
1555 EXPORT_SYMBOL(class_disconnect);
1556
1557 /* Return non-zero for a fully connected export */
1558 int class_connected_export(struct obd_export *exp)
1559 {
1560         int connected = 0;
1561
1562         if (exp) {
1563                 spin_lock(&exp->exp_lock);
1564                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1565                 spin_unlock(&exp->exp_lock);
1566         }
1567         return connected;
1568 }
1569 EXPORT_SYMBOL(class_connected_export);
1570
1571 static void class_disconnect_export_list(struct list_head *list,
1572                                          enum obd_option flags)
1573 {
1574         int rc;
1575         struct obd_export *exp;
1576         ENTRY;
1577
1578         /* It's possible that an export may disconnect itself, but
1579          * nothing else will be added to this list. */
1580         while (!list_empty(list)) {
1581                 exp = list_entry(list->next, struct obd_export,
1582                                  exp_obd_chain);
1583                 /* need for safe call CDEBUG after obd_disconnect */
1584                 class_export_get(exp);
1585
1586                 spin_lock(&exp->exp_lock);
1587                 exp->exp_flags = flags;
1588                 spin_unlock(&exp->exp_lock);
1589
1590                 if (obd_uuid_equals(&exp->exp_client_uuid,
1591                                     &exp->exp_obd->obd_uuid)) {
1592                         CDEBUG(D_HA,
1593                                "exp %p export uuid == obd uuid, don't discon\n",
1594                                exp);
1595                         /* Need to delete this now so we don't end up pointing
1596                          * to work_list later when this export is cleaned up. */
1597                         list_del_init(&exp->exp_obd_chain);
1598                         class_export_put(exp);
1599                         continue;
1600                 }
1601
1602                 class_export_get(exp);
1603                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1604                        "last request at %lld\n",
1605                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1606                        exp, exp->exp_last_request_time);
1607                 /* release one export reference anyway */
1608                 rc = obd_disconnect(exp);
1609
1610                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1611                        obd_export_nid2str(exp), exp, rc);
1612                 class_export_put(exp);
1613         }
1614         EXIT;
1615 }
1616
1617 void class_disconnect_exports(struct obd_device *obd)
1618 {
1619         struct list_head work_list;
1620         ENTRY;
1621
1622         /* Move all of the exports from obd_exports to a work list, en masse. */
1623         INIT_LIST_HEAD(&work_list);
1624         spin_lock(&obd->obd_dev_lock);
1625         list_splice_init(&obd->obd_exports, &work_list);
1626         list_splice_init(&obd->obd_delayed_exports, &work_list);
1627         spin_unlock(&obd->obd_dev_lock);
1628
1629         if (!list_empty(&work_list)) {
1630                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1631                        "disconnecting them\n", obd->obd_minor, obd);
1632                 class_disconnect_export_list(&work_list,
1633                                              exp_flags_from_obd(obd));
1634         } else
1635                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1636                        obd->obd_minor, obd);
1637         EXIT;
1638 }
1639 EXPORT_SYMBOL(class_disconnect_exports);
1640
1641 /* Remove exports that have not completed recovery.
1642  */
1643 void class_disconnect_stale_exports(struct obd_device *obd,
1644                                     int (*test_export)(struct obd_export *))
1645 {
1646         struct list_head work_list;
1647         struct obd_export *exp, *n;
1648         int evicted = 0;
1649         ENTRY;
1650
1651         INIT_LIST_HEAD(&work_list);
1652         spin_lock(&obd->obd_dev_lock);
1653         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1654                                  exp_obd_chain) {
1655                 /* don't count self-export as client */
1656                 if (obd_uuid_equals(&exp->exp_client_uuid,
1657                                     &exp->exp_obd->obd_uuid))
1658                         continue;
1659
1660                 /* don't evict clients which have no slot in last_rcvd
1661                  * (e.g. lightweight connection) */
1662                 if (exp->exp_target_data.ted_lr_idx == -1)
1663                         continue;
1664
1665                 spin_lock(&exp->exp_lock);
1666                 if (exp->exp_failed || test_export(exp)) {
1667                         spin_unlock(&exp->exp_lock);
1668                         continue;
1669                 }
1670                 exp->exp_failed = 1;
1671                 spin_unlock(&exp->exp_lock);
1672
1673                 list_move(&exp->exp_obd_chain, &work_list);
1674                 evicted++;
1675                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1676                        obd->obd_name, exp->exp_client_uuid.uuid,
1677                        obd_export_nid2str(exp));
1678                 print_export_data(exp, "EVICTING", 0, D_HA);
1679         }
1680         spin_unlock(&obd->obd_dev_lock);
1681
1682         if (evicted)
1683                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1684                               obd->obd_name, evicted);
1685
1686         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1687                                                  OBD_OPT_ABORT_RECOV);
1688         EXIT;
1689 }
1690 EXPORT_SYMBOL(class_disconnect_stale_exports);
1691
1692 void class_fail_export(struct obd_export *exp)
1693 {
1694         int rc, already_failed;
1695
1696         spin_lock(&exp->exp_lock);
1697         already_failed = exp->exp_failed;
1698         exp->exp_failed = 1;
1699         spin_unlock(&exp->exp_lock);
1700
1701         if (already_failed) {
1702                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1703                        exp, exp->exp_client_uuid.uuid);
1704                 return;
1705         }
1706
1707         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1708                exp, exp->exp_client_uuid.uuid);
1709
1710         if (obd_dump_on_timeout)
1711                 libcfs_debug_dumplog();
1712
1713         /* need for safe call CDEBUG after obd_disconnect */
1714         class_export_get(exp);
1715
1716         /* Most callers into obd_disconnect are removing their own reference
1717          * (request, for example) in addition to the one from the hash table.
1718          * We don't have such a reference here, so make one. */
1719         class_export_get(exp);
1720         rc = obd_disconnect(exp);
1721         if (rc)
1722                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1723         else
1724                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1725                        exp, exp->exp_client_uuid.uuid);
1726         class_export_put(exp);
1727 }
1728 EXPORT_SYMBOL(class_fail_export);
1729
1730 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1731 {
1732         struct cfs_hash *nid_hash;
1733         struct obd_export *doomed_exp = NULL;
1734         int exports_evicted = 0;
1735
1736         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1737
1738         spin_lock(&obd->obd_dev_lock);
1739         /* umount has run already, so evict thread should leave
1740          * its task to umount thread now */
1741         if (obd->obd_stopping) {
1742                 spin_unlock(&obd->obd_dev_lock);
1743                 return exports_evicted;
1744         }
1745         nid_hash = obd->obd_nid_hash;
1746         cfs_hash_getref(nid_hash);
1747         spin_unlock(&obd->obd_dev_lock);
1748
1749         do {
1750                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1751                 if (doomed_exp == NULL)
1752                         break;
1753
1754                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1755                          "nid %s found, wanted nid %s, requested nid %s\n",
1756                          obd_export_nid2str(doomed_exp),
1757                          libcfs_nid2str(nid_key), nid);
1758                 LASSERTF(doomed_exp != obd->obd_self_export,
1759                          "self-export is hashed by NID?\n");
1760                 exports_evicted++;
1761                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1762                               "request\n", obd->obd_name,
1763                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1764                               obd_export_nid2str(doomed_exp));
1765                 class_fail_export(doomed_exp);
1766                 class_export_put(doomed_exp);
1767         } while (1);
1768
1769         cfs_hash_putref(nid_hash);
1770
1771         if (!exports_evicted)
1772                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1773                        obd->obd_name, nid);
1774         return exports_evicted;
1775 }
1776 EXPORT_SYMBOL(obd_export_evict_by_nid);
1777
1778 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1779 {
1780         struct cfs_hash *uuid_hash;
1781         struct obd_export *doomed_exp = NULL;
1782         struct obd_uuid doomed_uuid;
1783         int exports_evicted = 0;
1784
1785         spin_lock(&obd->obd_dev_lock);
1786         if (obd->obd_stopping) {
1787                 spin_unlock(&obd->obd_dev_lock);
1788                 return exports_evicted;
1789         }
1790         uuid_hash = obd->obd_uuid_hash;
1791         cfs_hash_getref(uuid_hash);
1792         spin_unlock(&obd->obd_dev_lock);
1793
1794         obd_str2uuid(&doomed_uuid, uuid);
1795         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1796                 CERROR("%s: can't evict myself\n", obd->obd_name);
1797                 cfs_hash_putref(uuid_hash);
1798                 return exports_evicted;
1799         }
1800
1801         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1802
1803         if (doomed_exp == NULL) {
1804                 CERROR("%s: can't disconnect %s: no exports found\n",
1805                        obd->obd_name, uuid);
1806         } else {
1807                 CWARN("%s: evicting %s at adminstrative request\n",
1808                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1809                 class_fail_export(doomed_exp);
1810                 class_export_put(doomed_exp);
1811                 exports_evicted++;
1812         }
1813         cfs_hash_putref(uuid_hash);
1814
1815         return exports_evicted;
1816 }
1817
1818 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1819 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1820 EXPORT_SYMBOL(class_export_dump_hook);
1821 #endif
1822
1823 static void print_export_data(struct obd_export *exp, const char *status,
1824                               int locks, int debug_level)
1825 {
1826         struct ptlrpc_reply_state *rs;
1827         struct ptlrpc_reply_state *first_reply = NULL;
1828         int nreplies = 0;
1829
1830         spin_lock(&exp->exp_lock);
1831         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1832                             rs_exp_list) {
1833                 if (nreplies == 0)
1834                         first_reply = rs;
1835                 nreplies++;
1836         }
1837         spin_unlock(&exp->exp_lock);
1838
1839         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1840                "%p %s %llu stale:%d\n",
1841                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1842                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1843                atomic_read(&exp->exp_rpc_count),
1844                atomic_read(&exp->exp_cb_count),
1845                atomic_read(&exp->exp_locks_count),
1846                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1847                nreplies, first_reply, nreplies > 3 ? "..." : "",
1848                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1849 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1850         if (locks && class_export_dump_hook != NULL)
1851                 class_export_dump_hook(exp);
1852 #endif
1853 }
1854
1855 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1856 {
1857         struct obd_export *exp;
1858
1859         spin_lock(&obd->obd_dev_lock);
1860         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1861                 print_export_data(exp, "ACTIVE", locks, debug_level);
1862         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1863                 print_export_data(exp, "UNLINKED", locks, debug_level);
1864         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1865                 print_export_data(exp, "DELAYED", locks, debug_level);
1866         spin_unlock(&obd->obd_dev_lock);
1867         spin_lock(&obd_zombie_impexp_lock);
1868         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1869                 print_export_data(exp, "ZOMBIE", locks, debug_level);
1870         spin_unlock(&obd_zombie_impexp_lock);
1871 }
1872
1873 void obd_exports_barrier(struct obd_device *obd)
1874 {
1875         int waited = 2;
1876         LASSERT(list_empty(&obd->obd_exports));
1877         spin_lock(&obd->obd_dev_lock);
1878         while (!list_empty(&obd->obd_unlinked_exports)) {
1879                 spin_unlock(&obd->obd_dev_lock);
1880                 set_current_state(TASK_UNINTERRUPTIBLE);
1881                 schedule_timeout(cfs_time_seconds(waited));
1882                 if (waited > 5 && is_power_of_2(waited)) {
1883                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1884                                       "more than %d seconds. "
1885                                       "The obd refcount = %d. Is it stuck?\n",
1886                                       obd->obd_name, waited,
1887                                       atomic_read(&obd->obd_refcount));
1888                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1889                 }
1890                 waited *= 2;
1891                 spin_lock(&obd->obd_dev_lock);
1892         }
1893         spin_unlock(&obd->obd_dev_lock);
1894 }
1895 EXPORT_SYMBOL(obd_exports_barrier);
1896
1897 /* Total amount of zombies to be destroyed */
1898 static int zombies_count = 0;
1899
1900 /**
1901  * kill zombie imports and exports
1902  */
1903 void obd_zombie_impexp_cull(void)
1904 {
1905         struct obd_import *import;
1906         struct obd_export *export;
1907         ENTRY;
1908
1909         do {
1910                 spin_lock(&obd_zombie_impexp_lock);
1911
1912                 import = NULL;
1913                 if (!list_empty(&obd_zombie_imports)) {
1914                         import = list_entry(obd_zombie_imports.next,
1915                                             struct obd_import,
1916                                             imp_zombie_chain);
1917                         list_del_init(&import->imp_zombie_chain);
1918                 }
1919
1920                 export = NULL;
1921                 if (!list_empty(&obd_zombie_exports)) {
1922                         export = list_entry(obd_zombie_exports.next,
1923                                             struct obd_export,
1924                                             exp_obd_chain);
1925                         list_del_init(&export->exp_obd_chain);
1926                 }
1927
1928                 spin_unlock(&obd_zombie_impexp_lock);
1929
1930                 if (import != NULL) {
1931                         class_import_destroy(import);
1932                         spin_lock(&obd_zombie_impexp_lock);
1933                         zombies_count--;
1934                         spin_unlock(&obd_zombie_impexp_lock);
1935                 }
1936
1937                 if (export != NULL) {
1938                         class_export_destroy(export);
1939                         spin_lock(&obd_zombie_impexp_lock);
1940                         zombies_count--;
1941                         spin_unlock(&obd_zombie_impexp_lock);
1942                 }
1943
1944                 cond_resched();
1945         } while (import != NULL || export != NULL);
1946         EXIT;
1947 }
1948
1949 static DECLARE_COMPLETION(obd_zombie_start);
1950 static DECLARE_COMPLETION(obd_zombie_stop);
1951 static unsigned long obd_zombie_flags;
1952 static DECLARE_WAIT_QUEUE_HEAD(obd_zombie_waitq);
1953 static pid_t obd_zombie_pid;
1954
1955 enum {
1956         OBD_ZOMBIE_STOP         = 0x0001,
1957 };
1958
1959 /**
1960  * check for work for kill zombie import/export thread.
1961  */
1962 static int obd_zombie_impexp_check(void *arg)
1963 {
1964         int rc;
1965
1966         spin_lock(&obd_zombie_impexp_lock);
1967         rc = (zombies_count == 0) &&
1968              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1969         spin_unlock(&obd_zombie_impexp_lock);
1970
1971         RETURN(rc);
1972 }
1973
1974 /**
1975  * Add export to the obd_zombe thread and notify it.
1976  */
1977 static void obd_zombie_export_add(struct obd_export *exp) {
1978         atomic_dec(&obd_stale_export_num);
1979         spin_lock(&exp->exp_obd->obd_dev_lock);
1980         LASSERT(!list_empty(&exp->exp_obd_chain));
1981         list_del_init(&exp->exp_obd_chain);
1982         spin_unlock(&exp->exp_obd->obd_dev_lock);
1983         spin_lock(&obd_zombie_impexp_lock);
1984         zombies_count++;
1985         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1986         spin_unlock(&obd_zombie_impexp_lock);
1987
1988         obd_zombie_impexp_notify();
1989 }
1990
1991 /**
1992  * Add import to the obd_zombe thread and notify it.
1993  */
1994 static void obd_zombie_import_add(struct obd_import *imp) {
1995         LASSERT(imp->imp_sec == NULL);
1996         spin_lock(&obd_zombie_impexp_lock);
1997         LASSERT(list_empty(&imp->imp_zombie_chain));
1998         zombies_count++;
1999         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
2000         spin_unlock(&obd_zombie_impexp_lock);
2001
2002         obd_zombie_impexp_notify();
2003 }
2004
2005 /**
2006  * notify import/export destroy thread about new zombie.
2007  */
2008 static void obd_zombie_impexp_notify(void)
2009 {
2010         /*
2011          * Make sure obd_zomebie_impexp_thread get this notification.
2012          * It is possible this signal only get by obd_zombie_barrier, and
2013          * barrier gulps this notification and sleeps away and hangs ensues
2014          */
2015         wake_up_all(&obd_zombie_waitq);
2016 }
2017
2018 /**
2019  * check whether obd_zombie is idle
2020  */
2021 static int obd_zombie_is_idle(void)
2022 {
2023         int rc;
2024
2025         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
2026         spin_lock(&obd_zombie_impexp_lock);
2027         rc = (zombies_count == 0);
2028         spin_unlock(&obd_zombie_impexp_lock);
2029         return rc;
2030 }
2031
2032 /**
2033  * wait when obd_zombie import/export queues become empty
2034  */
2035 void obd_zombie_barrier(void)
2036 {
2037         struct l_wait_info lwi = { 0 };
2038
2039         if (obd_zombie_pid == current_pid())
2040                 /* don't wait for myself */
2041                 return;
2042         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
2043 }
2044 EXPORT_SYMBOL(obd_zombie_barrier);
2045
2046
2047 struct obd_export *obd_stale_export_get(void)
2048 {
2049         struct obd_export *exp = NULL;
2050         ENTRY;
2051
2052         spin_lock(&obd_stale_export_lock);
2053         if (!list_empty(&obd_stale_exports)) {
2054                 exp = list_entry(obd_stale_exports.next,
2055                                  struct obd_export, exp_stale_list);
2056                 list_del_init(&exp->exp_stale_list);
2057         }
2058         spin_unlock(&obd_stale_export_lock);
2059
2060         if (exp) {
2061                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
2062                        atomic_read(&obd_stale_export_num));
2063         }
2064         RETURN(exp);
2065 }
2066 EXPORT_SYMBOL(obd_stale_export_get);
2067
2068 void obd_stale_export_put(struct obd_export *exp)
2069 {
2070         ENTRY;
2071
2072         LASSERT(list_empty(&exp->exp_stale_list));
2073         if (exp->exp_lock_hash &&
2074             atomic_read(&exp->exp_lock_hash->hs_count)) {
2075                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
2076                        atomic_read(&obd_stale_export_num));
2077
2078                 spin_lock_bh(&exp->exp_bl_list_lock);
2079                 spin_lock(&obd_stale_export_lock);
2080                 /* Add to the tail if there is no blocked locks,
2081                  * to the head otherwise. */
2082                 if (list_empty(&exp->exp_bl_list))
2083                         list_add_tail(&exp->exp_stale_list,
2084                                       &obd_stale_exports);
2085                 else
2086                         list_add(&exp->exp_stale_list,
2087                                  &obd_stale_exports);
2088
2089                 spin_unlock(&obd_stale_export_lock);
2090                 spin_unlock_bh(&exp->exp_bl_list_lock);
2091         } else {
2092                 class_export_put(exp);
2093         }
2094         EXIT;
2095 }
2096 EXPORT_SYMBOL(obd_stale_export_put);
2097
2098 /**
2099  * Adjust the position of the export in the stale list,
2100  * i.e. move to the head of the list if is needed.
2101  **/
2102 void obd_stale_export_adjust(struct obd_export *exp)
2103 {
2104         LASSERT(exp != NULL);
2105         spin_lock_bh(&exp->exp_bl_list_lock);
2106         spin_lock(&obd_stale_export_lock);
2107
2108         if (!list_empty(&exp->exp_stale_list) &&
2109             !list_empty(&exp->exp_bl_list))
2110                 list_move(&exp->exp_stale_list, &obd_stale_exports);
2111
2112         spin_unlock(&obd_stale_export_lock);
2113         spin_unlock_bh(&exp->exp_bl_list_lock);
2114 }
2115 EXPORT_SYMBOL(obd_stale_export_adjust);
2116
2117 /**
2118  * destroy zombie export/import thread.
2119  */
2120 static int obd_zombie_impexp_thread(void *unused)
2121 {
2122         unshare_fs_struct();
2123         complete(&obd_zombie_start);
2124
2125         obd_zombie_pid = current_pid();
2126
2127         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
2128                 struct l_wait_info lwi = { 0 };
2129
2130                 l_wait_event(obd_zombie_waitq,
2131                              !obd_zombie_impexp_check(NULL), &lwi);
2132                 obd_zombie_impexp_cull();
2133
2134                 /*
2135                  * Notify obd_zombie_barrier callers that queues
2136                  * may be empty.
2137                  */
2138                 wake_up(&obd_zombie_waitq);
2139         }
2140
2141         complete(&obd_zombie_stop);
2142
2143         RETURN(0);
2144 }
2145
2146
2147 /**
2148  * start destroy zombie import/export thread
2149  */
2150 int obd_zombie_impexp_init(void)
2151 {
2152         struct task_struct *task;
2153
2154         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
2155         if (IS_ERR(task))
2156                 RETURN(PTR_ERR(task));
2157
2158         wait_for_completion(&obd_zombie_start);
2159         RETURN(0);
2160 }
2161 /**
2162  * stop destroy zombie import/export thread
2163  */
2164 void obd_zombie_impexp_stop(void)
2165 {
2166         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
2167         obd_zombie_impexp_notify();
2168         wait_for_completion(&obd_zombie_stop);
2169         LASSERT(list_empty(&obd_stale_exports));
2170 }
2171
2172 /***** Kernel-userspace comm helpers *******/
2173
2174 /* Get length of entire message, including header */
2175 int kuc_len(int payload_len)
2176 {
2177         return sizeof(struct kuc_hdr) + payload_len;
2178 }
2179 EXPORT_SYMBOL(kuc_len);
2180
2181 /* Get a pointer to kuc header, given a ptr to the payload
2182  * @param p Pointer to payload area
2183  * @returns Pointer to kuc header
2184  */
2185 struct kuc_hdr * kuc_ptr(void *p)
2186 {
2187         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2188         LASSERT(lh->kuc_magic == KUC_MAGIC);
2189         return lh;
2190 }
2191 EXPORT_SYMBOL(kuc_ptr);
2192
2193 /* Alloc space for a message, and fill in header
2194  * @return Pointer to payload area
2195  */
2196 void *kuc_alloc(int payload_len, int transport, int type)
2197 {
2198         struct kuc_hdr *lh;
2199         int len = kuc_len(payload_len);
2200
2201         OBD_ALLOC(lh, len);
2202         if (lh == NULL)
2203                 return ERR_PTR(-ENOMEM);
2204
2205         lh->kuc_magic = KUC_MAGIC;
2206         lh->kuc_transport = transport;
2207         lh->kuc_msgtype = type;
2208         lh->kuc_msglen = len;
2209
2210         return (void *)(lh + 1);
2211 }
2212 EXPORT_SYMBOL(kuc_alloc);
2213
2214 /* Takes pointer to payload area */
2215 void kuc_free(void *p, int payload_len)
2216 {
2217         struct kuc_hdr *lh = kuc_ptr(p);
2218         OBD_FREE(lh, kuc_len(payload_len));
2219 }
2220 EXPORT_SYMBOL(kuc_free);
2221
2222 struct obd_request_slot_waiter {
2223         struct list_head        orsw_entry;
2224         wait_queue_head_t       orsw_waitq;
2225         bool                    orsw_signaled;
2226 };
2227
2228 static bool obd_request_slot_avail(struct client_obd *cli,
2229                                    struct obd_request_slot_waiter *orsw)
2230 {
2231         bool avail;
2232
2233         spin_lock(&cli->cl_loi_list_lock);
2234         avail = !!list_empty(&orsw->orsw_entry);
2235         spin_unlock(&cli->cl_loi_list_lock);
2236
2237         return avail;
2238 };
2239
2240 /*
2241  * For network flow control, the RPC sponsor needs to acquire a credit
2242  * before sending the RPC. The credits count for a connection is defined
2243  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2244  * the subsequent RPC sponsors need to wait until others released their
2245  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2246  */
2247 int obd_get_request_slot(struct client_obd *cli)
2248 {
2249         struct obd_request_slot_waiter   orsw;
2250         struct l_wait_info               lwi;
2251         int                              rc;
2252
2253         spin_lock(&cli->cl_loi_list_lock);
2254         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2255                 cli->cl_rpcs_in_flight++;
2256                 spin_unlock(&cli->cl_loi_list_lock);
2257                 return 0;
2258         }
2259
2260         init_waitqueue_head(&orsw.orsw_waitq);
2261         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2262         orsw.orsw_signaled = false;
2263         spin_unlock(&cli->cl_loi_list_lock);
2264
2265         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2266         rc = l_wait_event(orsw.orsw_waitq,
2267                           obd_request_slot_avail(cli, &orsw) ||
2268                           orsw.orsw_signaled,
2269                           &lwi);
2270
2271         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2272          * freed but other (such as obd_put_request_slot) is using it. */
2273         spin_lock(&cli->cl_loi_list_lock);
2274         if (rc != 0) {
2275                 if (!orsw.orsw_signaled) {
2276                         if (list_empty(&orsw.orsw_entry))
2277                                 cli->cl_rpcs_in_flight--;
2278                         else
2279                                 list_del(&orsw.orsw_entry);
2280                 }
2281         }
2282
2283         if (orsw.orsw_signaled) {
2284                 LASSERT(list_empty(&orsw.orsw_entry));
2285
2286                 rc = -EINTR;
2287         }
2288         spin_unlock(&cli->cl_loi_list_lock);
2289
2290         return rc;
2291 }
2292 EXPORT_SYMBOL(obd_get_request_slot);
2293
2294 void obd_put_request_slot(struct client_obd *cli)
2295 {
2296         struct obd_request_slot_waiter *orsw;
2297
2298         spin_lock(&cli->cl_loi_list_lock);
2299         cli->cl_rpcs_in_flight--;
2300
2301         /* If there is free slot, wakeup the first waiter. */
2302         if (!list_empty(&cli->cl_flight_waiters) &&
2303             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2304                 orsw = list_entry(cli->cl_flight_waiters.next,
2305                                   struct obd_request_slot_waiter, orsw_entry);
2306                 list_del_init(&orsw->orsw_entry);
2307                 cli->cl_rpcs_in_flight++;
2308                 wake_up(&orsw->orsw_waitq);
2309         }
2310         spin_unlock(&cli->cl_loi_list_lock);
2311 }
2312 EXPORT_SYMBOL(obd_put_request_slot);
2313
2314 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2315 {
2316         return cli->cl_max_rpcs_in_flight;
2317 }
2318 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2319
2320 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2321 {
2322         struct obd_request_slot_waiter *orsw;
2323         __u32                           old;
2324         int                             diff;
2325         int                             i;
2326         char                            *typ_name;
2327         int                             rc;
2328
2329         if (max > OBD_MAX_RIF_MAX || max < 1)
2330                 return -ERANGE;
2331
2332         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2333         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2334                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2335                  * strictly lower that max_rpcs_in_flight */
2336                 if (max < 2) {
2337                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2338                                "because it must be higher than "
2339                                "max_mod_rpcs_in_flight value",
2340                                cli->cl_import->imp_obd->obd_name);
2341                         return -ERANGE;
2342                 }
2343                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2344                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2345                         if (rc != 0)
2346                                 return rc;
2347                 }
2348         }
2349
2350         spin_lock(&cli->cl_loi_list_lock);
2351         old = cli->cl_max_rpcs_in_flight;
2352         cli->cl_max_rpcs_in_flight = max;
2353         client_adjust_max_dirty(cli);
2354
2355         diff = max - old;
2356
2357         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2358         for (i = 0; i < diff; i++) {
2359                 if (list_empty(&cli->cl_flight_waiters))
2360                         break;
2361
2362                 orsw = list_entry(cli->cl_flight_waiters.next,
2363                                   struct obd_request_slot_waiter, orsw_entry);
2364                 list_del_init(&orsw->orsw_entry);
2365                 cli->cl_rpcs_in_flight++;
2366                 wake_up(&orsw->orsw_waitq);
2367         }
2368         spin_unlock(&cli->cl_loi_list_lock);
2369
2370         return 0;
2371 }
2372 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2373
2374 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2375 {
2376         return cli->cl_max_mod_rpcs_in_flight;
2377 }
2378 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2379
2380 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2381 {
2382         struct obd_connect_data *ocd;
2383         __u16 maxmodrpcs;
2384         __u16 prev;
2385
2386         if (max > OBD_MAX_RIF_MAX || max < 1)
2387                 return -ERANGE;
2388
2389         /* cannot exceed or equal max_rpcs_in_flight */
2390         if (max >= cli->cl_max_rpcs_in_flight) {
2391                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2392                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2393                        cli->cl_import->imp_obd->obd_name,
2394                        max, cli->cl_max_rpcs_in_flight);
2395                 return -ERANGE;
2396         }
2397
2398         /* cannot exceed max modify RPCs in flight supported by the server */
2399         ocd = &cli->cl_import->imp_connect_data;
2400         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2401                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2402         else
2403                 maxmodrpcs = 1;
2404         if (max > maxmodrpcs) {
2405                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2406                        "higher than max_mod_rpcs_per_client value (%hu) "
2407                        "returned by the server at connection\n",
2408                        cli->cl_import->imp_obd->obd_name,
2409                        max, maxmodrpcs);
2410                 return -ERANGE;
2411         }
2412
2413         spin_lock(&cli->cl_mod_rpcs_lock);
2414
2415         prev = cli->cl_max_mod_rpcs_in_flight;
2416         cli->cl_max_mod_rpcs_in_flight = max;
2417
2418         /* wakeup waiters if limit has been increased */
2419         if (cli->cl_max_mod_rpcs_in_flight > prev)
2420                 wake_up(&cli->cl_mod_rpcs_waitq);
2421
2422         spin_unlock(&cli->cl_mod_rpcs_lock);
2423
2424         return 0;
2425 }
2426 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2427
2428
2429 #define pct(a, b) (b ? a * 100 / b : 0)
2430 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2431                                struct seq_file *seq)
2432 {
2433         unsigned long mod_tot = 0, mod_cum;
2434         struct timespec64 now;
2435         int i;
2436
2437         ktime_get_real_ts64(&now);
2438
2439         spin_lock(&cli->cl_mod_rpcs_lock);
2440
2441         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2442                    (s64)now.tv_sec, now.tv_nsec);
2443         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2444                    cli->cl_mod_rpcs_in_flight);
2445
2446         seq_printf(seq, "\n\t\t\tmodify\n");
2447         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2448
2449         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2450
2451         mod_cum = 0;
2452         for (i = 0; i < OBD_HIST_MAX; i++) {
2453                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2454                 mod_cum += mod;
2455                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2456                            i, mod, pct(mod, mod_tot),
2457                            pct(mod_cum, mod_tot));
2458                 if (mod_cum == mod_tot)
2459                         break;
2460         }
2461
2462         spin_unlock(&cli->cl_mod_rpcs_lock);
2463
2464         return 0;
2465 }
2466 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2467 #undef pct
2468
2469
2470 /* The number of modify RPCs sent in parallel is limited
2471  * because the server has a finite number of slots per client to
2472  * store request result and ensure reply reconstruction when needed.
2473  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2474  * that takes into account server limit and cl_max_rpcs_in_flight
2475  * value.
2476  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2477  * one close request is allowed above the maximum.
2478  */
2479 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2480                                                  bool close_req)
2481 {
2482         bool avail;
2483
2484         /* A slot is available if
2485          * - number of modify RPCs in flight is less than the max
2486          * - it's a close RPC and no other close request is in flight
2487          */
2488         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2489                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2490
2491         return avail;
2492 }
2493
2494 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2495                                          bool close_req)
2496 {
2497         bool avail;
2498
2499         spin_lock(&cli->cl_mod_rpcs_lock);
2500         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2501         spin_unlock(&cli->cl_mod_rpcs_lock);
2502         return avail;
2503 }
2504
2505 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2506 {
2507         if (it != NULL &&
2508             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2509              it->it_op == IT_READDIR ||
2510              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2511                         return true;
2512         return false;
2513 }
2514
2515 /* Get a modify RPC slot from the obd client @cli according
2516  * to the kind of operation @opc that is going to be sent
2517  * and the intent @it of the operation if it applies.
2518  * If the maximum number of modify RPCs in flight is reached
2519  * the thread is put to sleep.
2520  * Returns the tag to be set in the request message. Tag 0
2521  * is reserved for non-modifying requests.
2522  */
2523 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2524                            struct lookup_intent *it)
2525 {
2526         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2527         bool                    close_req = false;
2528         __u16                   i, max;
2529
2530         /* read-only metadata RPCs don't consume a slot on MDT
2531          * for reply reconstruction
2532          */
2533         if (obd_skip_mod_rpc_slot(it))
2534                 return 0;
2535
2536         if (opc == MDS_CLOSE)
2537                 close_req = true;
2538
2539         do {
2540                 spin_lock(&cli->cl_mod_rpcs_lock);
2541                 max = cli->cl_max_mod_rpcs_in_flight;
2542                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2543                         /* there is a slot available */
2544                         cli->cl_mod_rpcs_in_flight++;
2545                         if (close_req)
2546                                 cli->cl_close_rpcs_in_flight++;
2547                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2548                                          cli->cl_mod_rpcs_in_flight);
2549                         /* find a free tag */
2550                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2551                                                 max + 1);
2552                         LASSERT(i < OBD_MAX_RIF_MAX);
2553                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2554                         spin_unlock(&cli->cl_mod_rpcs_lock);
2555                         /* tag 0 is reserved for non-modify RPCs */
2556                         return i + 1;
2557                 }
2558                 spin_unlock(&cli->cl_mod_rpcs_lock);
2559
2560                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2561                        "opc %u, max %hu\n",
2562                        cli->cl_import->imp_obd->obd_name, opc, max);
2563
2564                 l_wait_event(cli->cl_mod_rpcs_waitq,
2565                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2566         } while (true);
2567 }
2568 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2569
2570 /* Put a modify RPC slot from the obd client @cli according
2571  * to the kind of operation @opc that has been sent and the
2572  * intent @it of the operation if it applies.
2573  */
2574 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2575                           struct lookup_intent *it, __u16 tag)
2576 {
2577         bool                    close_req = false;
2578
2579         if (obd_skip_mod_rpc_slot(it))
2580                 return;
2581
2582         if (opc == MDS_CLOSE)
2583                 close_req = true;
2584
2585         spin_lock(&cli->cl_mod_rpcs_lock);
2586         cli->cl_mod_rpcs_in_flight--;
2587         if (close_req)
2588                 cli->cl_close_rpcs_in_flight--;
2589         /* release the tag in the bitmap */
2590         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2591         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2592         spin_unlock(&cli->cl_mod_rpcs_lock);
2593         wake_up(&cli->cl_mod_rpcs_waitq);
2594 }
2595 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2596