Whamcloud - gitweb
abb6d3d609eea30f21e9c7f16ba429785fac182a
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdclass/genops.c
32  *
33  * These are the only exported functions, they provide some generic
34  * infrastructure for managing object devices
35  */
36
37 #define DEBUG_SUBSYSTEM S_CLASS
38
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 DEFINE_RWLOCK(obd_dev_lock);
49 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
50
51 static struct kmem_cache *obd_device_cachep;
52 static struct kobj_type class_ktype;
53 static struct workqueue_struct *zombie_wq;
54
55 static void obd_zombie_export_add(struct obd_export *exp);
56 static void obd_zombie_import_add(struct obd_import *imp);
57 static void print_export_data(struct obd_export *exp,
58                               const char *status, int locks, int debug_level);
59
60 static LIST_HEAD(obd_stale_exports);
61 static DEFINE_SPINLOCK(obd_stale_export_lock);
62 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
63
64 /*
65  * support functions: we could use inter-module communication, but this
66  * is more portable to other OS's
67  */
68 static struct obd_device *obd_device_alloc(void)
69 {
70         struct obd_device *obd;
71
72         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
73         if (obd != NULL) {
74                 obd->obd_magic = OBD_DEVICE_MAGIC;
75         }
76         return obd;
77 }
78
79 static void obd_device_free(struct obd_device *obd)
80 {
81         LASSERT(obd != NULL);
82         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84         if (obd->obd_namespace != NULL) {
85                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86                        obd, obd->obd_namespace, obd->obd_force);
87                 LBUG();
88         }
89         lu_ref_fini(&obd->obd_reference);
90         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
91 }
92
93 struct obd_type *class_search_type(const char *name)
94 {
95         struct kobject *kobj = kset_find_obj(lustre_kset, name);
96
97         if (kobj && kobj->ktype == &class_ktype)
98                 return container_of(kobj, struct obd_type, typ_kobj);
99
100         kobject_put(kobj);
101         return NULL;
102 }
103 EXPORT_SYMBOL(class_search_type);
104
105 struct obd_type *class_get_type(const char *name)
106 {
107         struct obd_type *type;
108
109         type = class_search_type(name);
110 #ifdef HAVE_MODULE_LOADING_SUPPORT
111         if (!type) {
112                 const char *modname = name;
113
114 #ifdef HAVE_SERVER_SUPPORT
115                 if (strcmp(modname, "obdfilter") == 0)
116                         modname = "ofd";
117
118                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119                         modname = LUSTRE_OSP_NAME;
120
121                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122                         modname = LUSTRE_MDT_NAME;
123 #endif /* HAVE_SERVER_SUPPORT */
124
125                 if (!request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                         type = class_search_type(name);
128                 } else {
129                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130                                            modname);
131                 }
132         }
133 #endif
134         if (type) {
135                 if (try_module_get(type->typ_dt_ops->o_owner)) {
136                         atomic_inc(&type->typ_refcnt);
137                         /* class_search_type() returned a counted reference,
138                          * but we don't need that count any more as
139                          * we have one through typ_refcnt.
140                          */
141                         kobject_put(&type->typ_kobj);
142                 } else {
143                         kobject_put(&type->typ_kobj);
144                         type = NULL;
145                 }
146         }
147         return type;
148 }
149 EXPORT_SYMBOL(class_get_type);
150
151 void class_put_type(struct obd_type *type)
152 {
153         LASSERT(type);
154         module_put(type->typ_dt_ops->o_owner);
155         atomic_dec(&type->typ_refcnt);
156 }
157 EXPORT_SYMBOL(class_put_type);
158
159 static void class_sysfs_release(struct kobject *kobj)
160 {
161         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
162
163         debugfs_remove_recursive(type->typ_debugfs_entry);
164         type->typ_debugfs_entry = NULL;
165
166         if (type->typ_lu)
167                 lu_device_type_fini(type->typ_lu);
168
169 #ifdef CONFIG_PROC_FS
170         if (type->typ_name && type->typ_procroot)
171                 remove_proc_subtree(type->typ_name, proc_lustre_root);
172 #endif
173         OBD_FREE(type, sizeof(*type));
174 }
175
176 static struct kobj_type class_ktype = {
177         .sysfs_ops      = &lustre_sysfs_ops,
178         .release        = class_sysfs_release,
179 };
180
181 #ifdef HAVE_SERVER_SUPPORT
182 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
183 {
184         struct dentry *symlink;
185         struct obd_type *type;
186         int rc;
187
188         type = class_search_type(name);
189         if (type) {
190                 kobject_put(&type->typ_kobj);
191                 return ERR_PTR(-EEXIST);
192         }
193
194         OBD_ALLOC(type, sizeof(*type));
195         if (!type)
196                 return ERR_PTR(-ENOMEM);
197
198         type->typ_kobj.kset = lustre_kset;
199         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
200                                   &lustre_kset->kobj, "%s", name);
201         if (rc)
202                 return ERR_PTR(rc);
203
204         symlink = debugfs_create_dir(name, debugfs_lustre_root);
205         type->typ_debugfs_entry = symlink;
206         type->typ_sym_filter = true;
207
208         if (enable_proc) {
209                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
210                                                       NULL, NULL);
211                 if (IS_ERR(type->typ_procroot)) {
212                         CERROR("%s: can't create compat proc entry: %d\n",
213                                name, (int)PTR_ERR(type->typ_procroot));
214                         type->typ_procroot = NULL;
215                 }
216         }
217
218         return type;
219 }
220 EXPORT_SYMBOL(class_add_symlinks);
221 #endif /* HAVE_SERVER_SUPPORT */
222
223 #define CLASS_MAX_NAME 1024
224
225 int class_register_type(const struct obd_ops *dt_ops,
226                         const struct md_ops *md_ops,
227                         bool enable_proc,
228                         const char *name, struct lu_device_type *ldt)
229 {
230         struct obd_type *type;
231         int rc;
232
233         ENTRY;
234         /* sanity check */
235         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
236
237         type = class_search_type(name);
238         if (type) {
239 #ifdef HAVE_SERVER_SUPPORT
240                 if (type->typ_sym_filter)
241                         goto dir_exist;
242 #endif /* HAVE_SERVER_SUPPORT */
243                 kobject_put(&type->typ_kobj);
244                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
245                 RETURN(-EEXIST);
246         }
247
248         OBD_ALLOC(type, sizeof(*type));
249         if (type == NULL)
250                 RETURN(-ENOMEM);
251
252         type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
253         type->typ_kobj.kset = lustre_kset;
254         kobject_init(&type->typ_kobj, &class_ktype);
255 #ifdef HAVE_SERVER_SUPPORT
256 dir_exist:
257 #endif /* HAVE_SERVER_SUPPORT */
258
259         type->typ_dt_ops = dt_ops;
260         type->typ_md_ops = md_ops;
261
262 #ifdef HAVE_SERVER_SUPPORT
263         if (type->typ_sym_filter) {
264                 type->typ_sym_filter = false;
265                 kobject_put(&type->typ_kobj);
266                 goto setup_ldt;
267         }
268 #endif
269 #ifdef CONFIG_PROC_FS
270         if (enable_proc && !type->typ_procroot) {
271                 type->typ_procroot = lprocfs_register(name,
272                                                       proc_lustre_root,
273                                                       NULL, type);
274                 if (IS_ERR(type->typ_procroot)) {
275                         rc = PTR_ERR(type->typ_procroot);
276                         type->typ_procroot = NULL;
277                         GOTO(failed, rc);
278                 }
279         }
280 #endif
281         type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
282
283         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
284         if (rc)
285                 GOTO(failed, rc);
286 #ifdef HAVE_SERVER_SUPPORT
287 setup_ldt:
288 #endif
289         if (ldt) {
290                 rc = lu_device_type_init(ldt);
291                 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
292                 wake_up_var(&type->typ_lu);
293                 if (rc)
294                         GOTO(failed, rc);
295         }
296
297         RETURN(0);
298
299 failed:
300         kobject_put(&type->typ_kobj);
301
302         RETURN(rc);
303 }
304 EXPORT_SYMBOL(class_register_type);
305
306 int class_unregister_type(const char *name)
307 {
308         struct obd_type *type = class_search_type(name);
309         int rc = 0;
310         ENTRY;
311
312         if (!type) {
313                 CERROR("unknown obd type\n");
314                 RETURN(-EINVAL);
315         }
316
317         if (atomic_read(&type->typ_refcnt)) {
318                 CERROR("type %s has refcount (%d)\n", name,
319                        atomic_read(&type->typ_refcnt));
320                 /* This is a bad situation, let's make the best of it */
321                 /* Remove ops, but leave the name for debugging */
322                 type->typ_dt_ops = NULL;
323                 type->typ_md_ops = NULL;
324                 GOTO(out_put, rc = -EBUSY);
325         }
326
327         /* Put the final ref */
328         kobject_put(&type->typ_kobj);
329 out_put:
330         /* Put the ref returned by class_search_type() */
331         kobject_put(&type->typ_kobj);
332
333         RETURN(rc);
334 } /* class_unregister_type */
335 EXPORT_SYMBOL(class_unregister_type);
336
337 /**
338  * Create a new obd device.
339  *
340  * Allocate the new obd_device and initialize it.
341  *
342  * \param[in] type_name obd device type string.
343  * \param[in] name      obd device name.
344  * \param[in] uuid      obd device UUID
345  *
346  * \retval newdev         pointer to created obd_device
347  * \retval ERR_PTR(errno) on error
348  */
349 struct obd_device *class_newdev(const char *type_name, const char *name,
350                                 const char *uuid)
351 {
352         struct obd_device *newdev;
353         struct obd_type *type = NULL;
354         ENTRY;
355
356         if (strlen(name) >= MAX_OBD_NAME) {
357                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
358                 RETURN(ERR_PTR(-EINVAL));
359         }
360
361         type = class_get_type(type_name);
362         if (type == NULL){
363                 CERROR("OBD: unknown type: %s\n", type_name);
364                 RETURN(ERR_PTR(-ENODEV));
365         }
366
367         newdev = obd_device_alloc();
368         if (newdev == NULL) {
369                 class_put_type(type);
370                 RETURN(ERR_PTR(-ENOMEM));
371         }
372         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
373         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
374         newdev->obd_type = type;
375         newdev->obd_minor = -1;
376
377         rwlock_init(&newdev->obd_pool_lock);
378         newdev->obd_pool_limit = 0;
379         newdev->obd_pool_slv = 0;
380
381         INIT_LIST_HEAD(&newdev->obd_exports);
382         newdev->obd_num_exports = 0;
383         newdev->obd_grant_check_threshold = 100;
384         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
385         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
386         INIT_LIST_HEAD(&newdev->obd_exports_timed);
387         INIT_LIST_HEAD(&newdev->obd_nid_stats);
388         spin_lock_init(&newdev->obd_nid_lock);
389         spin_lock_init(&newdev->obd_dev_lock);
390         mutex_init(&newdev->obd_dev_mutex);
391         spin_lock_init(&newdev->obd_osfs_lock);
392         /* newdev->obd_osfs_age must be set to a value in the distant
393          * past to guarantee a fresh statfs is fetched on mount. */
394         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
395
396         /* XXX belongs in setup not attach  */
397         init_rwsem(&newdev->obd_observer_link_sem);
398         /* recovery data */
399         spin_lock_init(&newdev->obd_recovery_task_lock);
400         init_waitqueue_head(&newdev->obd_next_transno_waitq);
401         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
402         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
403         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
404         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
405         INIT_LIST_HEAD(&newdev->obd_evict_list);
406         INIT_LIST_HEAD(&newdev->obd_lwp_list);
407
408         llog_group_init(&newdev->obd_olg);
409         /* Detach drops this */
410         atomic_set(&newdev->obd_refcount, 1);
411         lu_ref_init(&newdev->obd_reference);
412         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
413
414         newdev->obd_conn_inprogress = 0;
415
416         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
417
418         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
419                newdev->obd_name, newdev);
420
421         return newdev;
422 }
423
424 /**
425  * Free obd device.
426  *
427  * \param[in] obd obd_device to be freed
428  *
429  * \retval none
430  */
431 void class_free_dev(struct obd_device *obd)
432 {
433         struct obd_type *obd_type = obd->obd_type;
434
435         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
436                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
437         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
438                  "obd %p != obd_devs[%d] %p\n",
439                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
440         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
441                  "obd_refcount should be 0, not %d\n",
442                  atomic_read(&obd->obd_refcount));
443         LASSERT(obd_type != NULL);
444
445         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
446                obd->obd_name, obd->obd_type->typ_name);
447
448         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
449                          obd->obd_name, obd->obd_uuid.uuid);
450         if (obd->obd_stopping) {
451                 int err;
452
453                 /* If we're not stopping, we were never set up */
454                 err = obd_cleanup(obd);
455                 if (err)
456                         CERROR("Cleanup %s returned %d\n",
457                                 obd->obd_name, err);
458         }
459
460         obd_device_free(obd);
461
462         class_put_type(obd_type);
463 }
464
465 /**
466  * Unregister obd device.
467  *
468  * Free slot in obd_dev[] used by \a obd.
469  *
470  * \param[in] new_obd obd_device to be unregistered
471  *
472  * \retval none
473  */
474 void class_unregister_device(struct obd_device *obd)
475 {
476         write_lock(&obd_dev_lock);
477         if (obd->obd_minor >= 0) {
478                 LASSERT(obd_devs[obd->obd_minor] == obd);
479                 obd_devs[obd->obd_minor] = NULL;
480                 obd->obd_minor = -1;
481         }
482         write_unlock(&obd_dev_lock);
483 }
484
485 /**
486  * Register obd device.
487  *
488  * Find free slot in obd_devs[], fills it with \a new_obd.
489  *
490  * \param[in] new_obd obd_device to be registered
491  *
492  * \retval 0          success
493  * \retval -EEXIST    device with this name is registered
494  * \retval -EOVERFLOW obd_devs[] is full
495  */
496 int class_register_device(struct obd_device *new_obd)
497 {
498         int ret = 0;
499         int i;
500         int new_obd_minor = 0;
501         bool minor_assign = false;
502         bool retried = false;
503
504 again:
505         write_lock(&obd_dev_lock);
506         for (i = 0; i < class_devno_max(); i++) {
507                 struct obd_device *obd = class_num2obd(i);
508
509                 if (obd != NULL &&
510                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
511
512                         if (!retried) {
513                                 write_unlock(&obd_dev_lock);
514
515                                 /* the obd_device could be waited to be
516                                  * destroyed by the "obd_zombie_impexp_thread".
517                                  */
518                                 obd_zombie_barrier();
519                                 retried = true;
520                                 goto again;
521                         }
522
523                         CERROR("%s: already exists, won't add\n",
524                                obd->obd_name);
525                         /* in case we found a free slot before duplicate */
526                         minor_assign = false;
527                         ret = -EEXIST;
528                         break;
529                 }
530                 if (!minor_assign && obd == NULL) {
531                         new_obd_minor = i;
532                         minor_assign = true;
533                 }
534         }
535
536         if (minor_assign) {
537                 new_obd->obd_minor = new_obd_minor;
538                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
539                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
540                 obd_devs[new_obd_minor] = new_obd;
541         } else {
542                 if (ret == 0) {
543                         ret = -EOVERFLOW;
544                         CERROR("%s: all %u/%u devices used, increase "
545                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
546                                i, class_devno_max(), ret);
547                 }
548         }
549         write_unlock(&obd_dev_lock);
550
551         RETURN(ret);
552 }
553
554 static int class_name2dev_nolock(const char *name)
555 {
556         int i;
557
558         if (!name)
559                 return -1;
560
561         for (i = 0; i < class_devno_max(); i++) {
562                 struct obd_device *obd = class_num2obd(i);
563
564                 if (obd && strcmp(name, obd->obd_name) == 0) {
565                         /* Make sure we finished attaching before we give
566                            out any references */
567                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
568                         if (obd->obd_attached) {
569                                 return i;
570                         }
571                         break;
572                 }
573         }
574
575         return -1;
576 }
577
578 int class_name2dev(const char *name)
579 {
580         int i;
581
582         if (!name)
583                 return -1;
584
585         read_lock(&obd_dev_lock);
586         i = class_name2dev_nolock(name);
587         read_unlock(&obd_dev_lock);
588
589         return i;
590 }
591 EXPORT_SYMBOL(class_name2dev);
592
593 struct obd_device *class_name2obd(const char *name)
594 {
595         int dev = class_name2dev(name);
596
597         if (dev < 0 || dev > class_devno_max())
598                 return NULL;
599         return class_num2obd(dev);
600 }
601 EXPORT_SYMBOL(class_name2obd);
602
603 int class_uuid2dev_nolock(struct obd_uuid *uuid)
604 {
605         int i;
606
607         for (i = 0; i < class_devno_max(); i++) {
608                 struct obd_device *obd = class_num2obd(i);
609
610                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
611                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
612                         return i;
613                 }
614         }
615
616         return -1;
617 }
618
619 int class_uuid2dev(struct obd_uuid *uuid)
620 {
621         int i;
622
623         read_lock(&obd_dev_lock);
624         i = class_uuid2dev_nolock(uuid);
625         read_unlock(&obd_dev_lock);
626
627         return i;
628 }
629 EXPORT_SYMBOL(class_uuid2dev);
630
631 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
632 {
633         int dev = class_uuid2dev(uuid);
634         if (dev < 0)
635                 return NULL;
636         return class_num2obd(dev);
637 }
638 EXPORT_SYMBOL(class_uuid2obd);
639
640 /**
641  * Get obd device from ::obd_devs[]
642  *
643  * \param num [in] array index
644  *
645  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
646  *         otherwise return the obd device there.
647  */
648 struct obd_device *class_num2obd(int num)
649 {
650         struct obd_device *obd = NULL;
651
652         if (num < class_devno_max()) {
653                 obd = obd_devs[num];
654                 if (obd == NULL)
655                         return NULL;
656
657                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
658                          "%p obd_magic %08x != %08x\n",
659                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
660                 LASSERTF(obd->obd_minor == num,
661                          "%p obd_minor %0d != %0d\n",
662                          obd, obd->obd_minor, num);
663         }
664
665         return obd;
666 }
667 EXPORT_SYMBOL(class_num2obd);
668
669 /**
670  * Find obd in obd_dev[] by name or uuid.
671  *
672  * Increment obd's refcount if found.
673  *
674  * \param[in] str obd name or uuid
675  *
676  * \retval NULL    if not found
677  * \retval target  pointer to found obd_device
678  */
679 struct obd_device *class_dev_by_str(const char *str)
680 {
681         struct obd_device *target = NULL;
682         struct obd_uuid tgtuuid;
683         int rc;
684
685         obd_str2uuid(&tgtuuid, str);
686
687         read_lock(&obd_dev_lock);
688         rc = class_uuid2dev_nolock(&tgtuuid);
689         if (rc < 0)
690                 rc = class_name2dev_nolock(str);
691
692         if (rc >= 0)
693                 target = class_num2obd(rc);
694
695         if (target != NULL)
696                 class_incref(target, "find", current);
697         read_unlock(&obd_dev_lock);
698
699         RETURN(target);
700 }
701 EXPORT_SYMBOL(class_dev_by_str);
702
703 /**
704  * Get obd devices count. Device in any
705  *    state are counted
706  * \retval obd device count
707  */
708 int get_devices_count(void)
709 {
710         int index, max_index = class_devno_max(), dev_count = 0;
711
712         read_lock(&obd_dev_lock);
713         for (index = 0; index <= max_index; index++) {
714                 struct obd_device *obd = class_num2obd(index);
715                 if (obd != NULL)
716                         dev_count++;
717         }
718         read_unlock(&obd_dev_lock);
719
720         return dev_count;
721 }
722 EXPORT_SYMBOL(get_devices_count);
723
724 void class_obd_list(void)
725 {
726         char *status;
727         int i;
728
729         read_lock(&obd_dev_lock);
730         for (i = 0; i < class_devno_max(); i++) {
731                 struct obd_device *obd = class_num2obd(i);
732
733                 if (obd == NULL)
734                         continue;
735                 if (obd->obd_stopping)
736                         status = "ST";
737                 else if (obd->obd_set_up)
738                         status = "UP";
739                 else if (obd->obd_attached)
740                         status = "AT";
741                 else
742                         status = "--";
743                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
744                          i, status, obd->obd_type->typ_name,
745                          obd->obd_name, obd->obd_uuid.uuid,
746                          atomic_read(&obd->obd_refcount));
747         }
748         read_unlock(&obd_dev_lock);
749 }
750
751 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
752  * specified, then only the client with that uuid is returned,
753  * otherwise any client connected to the tgt is returned.
754  */
755 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
756                                          const char *type_name,
757                                          struct obd_uuid *grp_uuid)
758 {
759         int i;
760
761         read_lock(&obd_dev_lock);
762         for (i = 0; i < class_devno_max(); i++) {
763                 struct obd_device *obd = class_num2obd(i);
764
765                 if (obd == NULL)
766                         continue;
767                 if ((strncmp(obd->obd_type->typ_name, type_name,
768                              strlen(type_name)) == 0)) {
769                         if (obd_uuid_equals(tgt_uuid,
770                                             &obd->u.cli.cl_target_uuid) &&
771                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
772                                                          &obd->obd_uuid) : 1)) {
773                                 read_unlock(&obd_dev_lock);
774                                 return obd;
775                         }
776                 }
777         }
778         read_unlock(&obd_dev_lock);
779
780         return NULL;
781 }
782 EXPORT_SYMBOL(class_find_client_obd);
783
784 /* Iterate the obd_device list looking devices have grp_uuid. Start
785  * searching at *next, and if a device is found, the next index to look
786  * at is saved in *next. If next is NULL, then the first matching device
787  * will always be returned.
788  */
789 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
790 {
791         int i;
792
793         if (next == NULL)
794                 i = 0;
795         else if (*next >= 0 && *next < class_devno_max())
796                 i = *next;
797         else
798                 return NULL;
799
800         read_lock(&obd_dev_lock);
801         for (; i < class_devno_max(); i++) {
802                 struct obd_device *obd = class_num2obd(i);
803
804                 if (obd == NULL)
805                         continue;
806                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
807                         if (next != NULL)
808                                 *next = i+1;
809                         read_unlock(&obd_dev_lock);
810                         return obd;
811                 }
812         }
813         read_unlock(&obd_dev_lock);
814
815         return NULL;
816 }
817 EXPORT_SYMBOL(class_devices_in_group);
818
819 /**
820  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
821  * adjust sptlrpc settings accordingly.
822  */
823 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
824 {
825         struct obd_device  *obd;
826         const char         *type;
827         int                 i, rc = 0, rc2;
828
829         LASSERT(namelen > 0);
830
831         read_lock(&obd_dev_lock);
832         for (i = 0; i < class_devno_max(); i++) {
833                 obd = class_num2obd(i);
834
835                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
836                         continue;
837
838                 /* only notify mdc, osc, osp, lwp, mdt, ost
839                  * because only these have a -sptlrpc llog */
840                 type = obd->obd_type->typ_name;
841                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
842                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
843                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
844                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
845                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
846                     strcmp(type, LUSTRE_OST_NAME) != 0)
847                         continue;
848
849                 if (strncmp(obd->obd_name, fsname, namelen))
850                         continue;
851
852                 class_incref(obd, __FUNCTION__, obd);
853                 read_unlock(&obd_dev_lock);
854                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
855                                          sizeof(KEY_SPTLRPC_CONF),
856                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
857                 rc = rc ? rc : rc2;
858                 class_decref(obd, __FUNCTION__, obd);
859                 read_lock(&obd_dev_lock);
860         }
861         read_unlock(&obd_dev_lock);
862         return rc;
863 }
864 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
865
866 void obd_cleanup_caches(void)
867 {
868         ENTRY;
869         if (obd_device_cachep) {
870                 kmem_cache_destroy(obd_device_cachep);
871                 obd_device_cachep = NULL;
872         }
873
874         EXIT;
875 }
876
877 int obd_init_caches(void)
878 {
879         int rc;
880         ENTRY;
881
882         LASSERT(obd_device_cachep == NULL);
883         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
884                                 sizeof(struct obd_device),
885                                 0, 0, 0, sizeof(struct obd_device), NULL);
886         if (!obd_device_cachep)
887                 GOTO(out, rc = -ENOMEM);
888
889         RETURN(0);
890 out:
891         obd_cleanup_caches();
892         RETURN(rc);
893 }
894
895 static const char export_handle_owner[] = "export";
896
897 /* map connection to client */
898 struct obd_export *class_conn2export(struct lustre_handle *conn)
899 {
900         struct obd_export *export;
901         ENTRY;
902
903         if (!conn) {
904                 CDEBUG(D_CACHE, "looking for null handle\n");
905                 RETURN(NULL);
906         }
907
908         if (conn->cookie == -1) {  /* this means assign a new connection */
909                 CDEBUG(D_CACHE, "want a new connection\n");
910                 RETURN(NULL);
911         }
912
913         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
914         export = class_handle2object(conn->cookie, export_handle_owner);
915         RETURN(export);
916 }
917 EXPORT_SYMBOL(class_conn2export);
918
919 struct obd_device *class_exp2obd(struct obd_export *exp)
920 {
921         if (exp)
922                 return exp->exp_obd;
923         return NULL;
924 }
925 EXPORT_SYMBOL(class_exp2obd);
926
927 struct obd_import *class_exp2cliimp(struct obd_export *exp)
928 {
929         struct obd_device *obd = exp->exp_obd;
930         if (obd == NULL)
931                 return NULL;
932         return obd->u.cli.cl_import;
933 }
934 EXPORT_SYMBOL(class_exp2cliimp);
935
936 /* Export management functions */
937 static void class_export_destroy(struct obd_export *exp)
938 {
939         struct obd_device *obd = exp->exp_obd;
940         ENTRY;
941
942         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
943         LASSERT(obd != NULL);
944
945         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
946                exp->exp_client_uuid.uuid, obd->obd_name);
947
948         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
949         ptlrpc_connection_put(exp->exp_connection);
950
951         LASSERT(list_empty(&exp->exp_outstanding_replies));
952         LASSERT(list_empty(&exp->exp_uncommitted_replies));
953         LASSERT(list_empty(&exp->exp_req_replay_queue));
954         LASSERT(list_empty(&exp->exp_hp_rpcs));
955         obd_destroy_export(exp);
956         /* self export doesn't hold a reference to an obd, although it
957          * exists until freeing of the obd */
958         if (exp != obd->obd_self_export)
959                 class_decref(obd, "export", exp);
960
961         OBD_FREE_PRE(exp, sizeof(*exp), "kfree_rcu");
962         kfree_rcu(exp, exp_handle.h_rcu);
963         EXIT;
964 }
965
966 struct obd_export *class_export_get(struct obd_export *exp)
967 {
968         refcount_inc(&exp->exp_handle.h_ref);
969         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
970                refcount_read(&exp->exp_handle.h_ref));
971         return exp;
972 }
973 EXPORT_SYMBOL(class_export_get);
974
975 void class_export_put(struct obd_export *exp)
976 {
977         LASSERT(exp != NULL);
978         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
979         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
980         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
981                refcount_read(&exp->exp_handle.h_ref) - 1);
982
983         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
984                 struct obd_device *obd = exp->exp_obd;
985
986                 CDEBUG(D_IOCTL, "final put %p/%s\n",
987                        exp, exp->exp_client_uuid.uuid);
988
989                 /* release nid stat refererence */
990                 lprocfs_exp_cleanup(exp);
991
992                 if (exp == obd->obd_self_export) {
993                         /* self export should be destroyed without
994                          * zombie thread as it doesn't hold a
995                          * reference to obd and doesn't hold any
996                          * resources */
997                         class_export_destroy(exp);
998                         /* self export is destroyed, no class
999                          * references exist and it is safe to free
1000                          * obd */
1001                         class_free_dev(obd);
1002                 } else {
1003                         LASSERT(!list_empty(&exp->exp_obd_chain));
1004                         obd_zombie_export_add(exp);
1005                 }
1006
1007         }
1008 }
1009 EXPORT_SYMBOL(class_export_put);
1010
1011 static void obd_zombie_exp_cull(struct work_struct *ws)
1012 {
1013         struct obd_export *export;
1014
1015         export = container_of(ws, struct obd_export, exp_zombie_work);
1016         class_export_destroy(export);
1017 }
1018
1019 /* Creates a new export, adds it to the hash table, and returns a
1020  * pointer to it. The refcount is 2: one for the hash reference, and
1021  * one for the pointer returned by this function. */
1022 struct obd_export *__class_new_export(struct obd_device *obd,
1023                                       struct obd_uuid *cluuid, bool is_self)
1024 {
1025         struct obd_export *export;
1026         int rc = 0;
1027         ENTRY;
1028
1029         OBD_ALLOC_PTR(export);
1030         if (!export)
1031                 return ERR_PTR(-ENOMEM);
1032
1033         export->exp_conn_cnt = 0;
1034         export->exp_lock_hash = NULL;
1035         export->exp_flock_hash = NULL;
1036         /* 2 = class_handle_hash + last */
1037         refcount_set(&export->exp_handle.h_ref, 2);
1038         atomic_set(&export->exp_rpc_count, 0);
1039         atomic_set(&export->exp_cb_count, 0);
1040         atomic_set(&export->exp_locks_count, 0);
1041 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1042         INIT_LIST_HEAD(&export->exp_locks_list);
1043         spin_lock_init(&export->exp_locks_list_guard);
1044 #endif
1045         atomic_set(&export->exp_replay_count, 0);
1046         export->exp_obd = obd;
1047         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1048         spin_lock_init(&export->exp_uncommitted_replies_lock);
1049         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1050         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1051         INIT_HLIST_NODE(&export->exp_handle.h_link);
1052         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1053         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1054         class_handle_hash(&export->exp_handle, export_handle_owner);
1055         export->exp_last_request_time = ktime_get_real_seconds();
1056         spin_lock_init(&export->exp_lock);
1057         spin_lock_init(&export->exp_rpc_lock);
1058         INIT_HLIST_NODE(&export->exp_gen_hash);
1059         spin_lock_init(&export->exp_bl_list_lock);
1060         INIT_LIST_HEAD(&export->exp_bl_list);
1061         INIT_LIST_HEAD(&export->exp_stale_list);
1062         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1063
1064         export->exp_sp_peer = LUSTRE_SP_ANY;
1065         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1066         export->exp_client_uuid = *cluuid;
1067         obd_init_export(export);
1068
1069         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1070
1071         spin_lock(&obd->obd_dev_lock);
1072         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1073                 /* shouldn't happen, but might race */
1074                 if (obd->obd_stopping)
1075                         GOTO(exit_unlock, rc = -ENODEV);
1076
1077                 rc = obd_uuid_add(obd, export);
1078                 if (rc != 0) {
1079                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1080                                       obd->obd_name, cluuid->uuid, rc);
1081                         GOTO(exit_unlock, rc = -EALREADY);
1082                 }
1083         }
1084
1085         if (!is_self) {
1086                 class_incref(obd, "export", export);
1087                 list_add_tail(&export->exp_obd_chain_timed,
1088                               &obd->obd_exports_timed);
1089                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1090                 obd->obd_num_exports++;
1091         } else {
1092                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1093                 INIT_LIST_HEAD(&export->exp_obd_chain);
1094         }
1095         spin_unlock(&obd->obd_dev_lock);
1096         RETURN(export);
1097
1098 exit_unlock:
1099         spin_unlock(&obd->obd_dev_lock);
1100         class_handle_unhash(&export->exp_handle);
1101         obd_destroy_export(export);
1102         OBD_FREE_PTR(export);
1103         return ERR_PTR(rc);
1104 }
1105
1106 struct obd_export *class_new_export(struct obd_device *obd,
1107                                     struct obd_uuid *uuid)
1108 {
1109         return __class_new_export(obd, uuid, false);
1110 }
1111 EXPORT_SYMBOL(class_new_export);
1112
1113 struct obd_export *class_new_export_self(struct obd_device *obd,
1114                                          struct obd_uuid *uuid)
1115 {
1116         return __class_new_export(obd, uuid, true);
1117 }
1118
1119 void class_unlink_export(struct obd_export *exp)
1120 {
1121         class_handle_unhash(&exp->exp_handle);
1122
1123         if (exp->exp_obd->obd_self_export == exp) {
1124                 class_export_put(exp);
1125                 return;
1126         }
1127
1128         spin_lock(&exp->exp_obd->obd_dev_lock);
1129         /* delete an uuid-export hashitem from hashtables */
1130         if (exp != exp->exp_obd->obd_self_export)
1131                 obd_uuid_del(exp->exp_obd, exp);
1132
1133 #ifdef HAVE_SERVER_SUPPORT
1134         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1135                 struct tg_export_data   *ted = &exp->exp_target_data;
1136                 struct cfs_hash         *hash;
1137
1138                 /* Because obd_gen_hash will not be released until
1139                  * class_cleanup(), so hash should never be NULL here */
1140                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1141                 LASSERT(hash != NULL);
1142                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1143                              &exp->exp_gen_hash);
1144                 cfs_hash_putref(hash);
1145         }
1146 #endif /* HAVE_SERVER_SUPPORT */
1147
1148         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1149         list_del_init(&exp->exp_obd_chain_timed);
1150         exp->exp_obd->obd_num_exports--;
1151         spin_unlock(&exp->exp_obd->obd_dev_lock);
1152         atomic_inc(&obd_stale_export_num);
1153
1154         /* A reference is kept by obd_stale_exports list */
1155         obd_stale_export_put(exp);
1156 }
1157 EXPORT_SYMBOL(class_unlink_export);
1158
1159 /* Import management functions */
1160 static void obd_zombie_import_free(struct obd_import *imp)
1161 {
1162         struct obd_import_conn *imp_conn;
1163
1164         ENTRY;
1165         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1166                imp->imp_obd->obd_name);
1167
1168         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1169
1170         ptlrpc_connection_put(imp->imp_connection);
1171
1172         while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1173                                                     struct obd_import_conn,
1174                                                     oic_item)) != NULL) {
1175                 list_del_init(&imp_conn->oic_item);
1176                 ptlrpc_connection_put(imp_conn->oic_conn);
1177                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1178         }
1179
1180         LASSERT(imp->imp_sec == NULL);
1181         LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1182                  imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1183         class_decref(imp->imp_obd, "import", imp);
1184         OBD_FREE_PTR(imp);
1185         EXIT;
1186 }
1187
1188 struct obd_import *class_import_get(struct obd_import *import)
1189 {
1190         refcount_inc(&import->imp_refcount);
1191         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1192                refcount_read(&import->imp_refcount),
1193                import->imp_obd->obd_name);
1194         return import;
1195 }
1196 EXPORT_SYMBOL(class_import_get);
1197
1198 void class_import_put(struct obd_import *imp)
1199 {
1200         ENTRY;
1201
1202         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1203
1204         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1205                refcount_read(&imp->imp_refcount) - 1,
1206                imp->imp_obd->obd_name);
1207
1208         if (refcount_dec_and_test(&imp->imp_refcount)) {
1209                 CDEBUG(D_INFO, "final put import %p\n", imp);
1210                 obd_zombie_import_add(imp);
1211         }
1212
1213         EXIT;
1214 }
1215 EXPORT_SYMBOL(class_import_put);
1216
1217 static void init_imp_at(struct imp_at *at) {
1218         int i;
1219         at_init(&at->iat_net_latency, 0, 0);
1220         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1221                 /* max service estimates are tracked on the server side, so
1222                    don't use the AT history here, just use the last reported
1223                    val. (But keep hist for proc histogram, worst_ever) */
1224                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1225                         AT_FLG_NOHIST);
1226         }
1227 }
1228
1229 static void obd_zombie_imp_cull(struct work_struct *ws)
1230 {
1231         struct obd_import *import;
1232
1233         import = container_of(ws, struct obd_import, imp_zombie_work);
1234         obd_zombie_import_free(import);
1235 }
1236
1237 struct obd_import *class_new_import(struct obd_device *obd)
1238 {
1239         struct obd_import *imp;
1240         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1241
1242         OBD_ALLOC(imp, sizeof(*imp));
1243         if (imp == NULL)
1244                 return NULL;
1245
1246         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1247         INIT_LIST_HEAD(&imp->imp_replay_list);
1248         INIT_LIST_HEAD(&imp->imp_sending_list);
1249         INIT_LIST_HEAD(&imp->imp_delayed_list);
1250         INIT_LIST_HEAD(&imp->imp_committed_list);
1251         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1252         imp->imp_known_replied_xid = 0;
1253         imp->imp_replay_cursor = &imp->imp_committed_list;
1254         spin_lock_init(&imp->imp_lock);
1255         imp->imp_last_success_conn = 0;
1256         imp->imp_state = LUSTRE_IMP_NEW;
1257         imp->imp_obd = class_incref(obd, "import", imp);
1258         rwlock_init(&imp->imp_sec_lock);
1259         init_waitqueue_head(&imp->imp_recovery_waitq);
1260         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1261
1262         if (curr_pid_ns && curr_pid_ns->child_reaper)
1263                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1264         else
1265                 imp->imp_sec_refpid = 1;
1266
1267         refcount_set(&imp->imp_refcount, 2);
1268         atomic_set(&imp->imp_unregistering, 0);
1269         atomic_set(&imp->imp_reqs, 0);
1270         atomic_set(&imp->imp_inflight, 0);
1271         atomic_set(&imp->imp_replay_inflight, 0);
1272         init_waitqueue_head(&imp->imp_replay_waitq);
1273         atomic_set(&imp->imp_inval_count, 0);
1274         atomic_set(&imp->imp_waiting, 0);
1275         INIT_LIST_HEAD(&imp->imp_conn_list);
1276         init_imp_at(&imp->imp_at);
1277
1278         /* the default magic is V2, will be used in connect RPC, and
1279          * then adjusted according to the flags in request/reply. */
1280         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1281
1282         return imp;
1283 }
1284 EXPORT_SYMBOL(class_new_import);
1285
1286 void class_destroy_import(struct obd_import *import)
1287 {
1288         LASSERT(import != NULL);
1289         LASSERT(import != LP_POISON);
1290
1291         spin_lock(&import->imp_lock);
1292         import->imp_generation++;
1293         spin_unlock(&import->imp_lock);
1294         class_import_put(import);
1295 }
1296 EXPORT_SYMBOL(class_destroy_import);
1297
1298 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1299
1300 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1301 {
1302         spin_lock(&exp->exp_locks_list_guard);
1303
1304         LASSERT(lock->l_exp_refs_nr >= 0);
1305
1306         if (lock->l_exp_refs_target != NULL &&
1307             lock->l_exp_refs_target != exp) {
1308                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1309                               exp, lock, lock->l_exp_refs_target);
1310         }
1311         if ((lock->l_exp_refs_nr ++) == 0) {
1312                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1313                 lock->l_exp_refs_target = exp;
1314         }
1315         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1316                lock, exp, lock->l_exp_refs_nr);
1317         spin_unlock(&exp->exp_locks_list_guard);
1318 }
1319 EXPORT_SYMBOL(__class_export_add_lock_ref);
1320
1321 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1322 {
1323         spin_lock(&exp->exp_locks_list_guard);
1324         LASSERT(lock->l_exp_refs_nr > 0);
1325         if (lock->l_exp_refs_target != exp) {
1326                 LCONSOLE_WARN("lock %p, "
1327                               "mismatching export pointers: %p, %p\n",
1328                               lock, lock->l_exp_refs_target, exp);
1329         }
1330         if (-- lock->l_exp_refs_nr == 0) {
1331                 list_del_init(&lock->l_exp_refs_link);
1332                 lock->l_exp_refs_target = NULL;
1333         }
1334         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1335                lock, exp, lock->l_exp_refs_nr);
1336         spin_unlock(&exp->exp_locks_list_guard);
1337 }
1338 EXPORT_SYMBOL(__class_export_del_lock_ref);
1339 #endif
1340
1341 /* A connection defines an export context in which preallocation can
1342    be managed. This releases the export pointer reference, and returns
1343    the export handle, so the export refcount is 1 when this function
1344    returns. */
1345 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1346                   struct obd_uuid *cluuid)
1347 {
1348         struct obd_export *export;
1349         LASSERT(conn != NULL);
1350         LASSERT(obd != NULL);
1351         LASSERT(cluuid != NULL);
1352         ENTRY;
1353
1354         export = class_new_export(obd, cluuid);
1355         if (IS_ERR(export))
1356                 RETURN(PTR_ERR(export));
1357
1358         conn->cookie = export->exp_handle.h_cookie;
1359         class_export_put(export);
1360
1361         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1362                cluuid->uuid, conn->cookie);
1363         RETURN(0);
1364 }
1365 EXPORT_SYMBOL(class_connect);
1366
1367 /* if export is involved in recovery then clean up related things */
1368 static void class_export_recovery_cleanup(struct obd_export *exp)
1369 {
1370         struct obd_device *obd = exp->exp_obd;
1371
1372         spin_lock(&obd->obd_recovery_task_lock);
1373         if (obd->obd_recovering) {
1374                 if (exp->exp_in_recovery) {
1375                         spin_lock(&exp->exp_lock);
1376                         exp->exp_in_recovery = 0;
1377                         spin_unlock(&exp->exp_lock);
1378                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1379                         atomic_dec(&obd->obd_connected_clients);
1380                 }
1381
1382                 /* if called during recovery then should update
1383                  * obd_stale_clients counter,
1384                  * lightweight exports are not counted */
1385                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1386                         exp->exp_obd->obd_stale_clients++;
1387         }
1388         spin_unlock(&obd->obd_recovery_task_lock);
1389
1390         spin_lock(&exp->exp_lock);
1391         /** Cleanup req replay fields */
1392         if (exp->exp_req_replay_needed) {
1393                 exp->exp_req_replay_needed = 0;
1394
1395                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1396                 atomic_dec(&obd->obd_req_replay_clients);
1397         }
1398
1399         /** Cleanup lock replay data */
1400         if (exp->exp_lock_replay_needed) {
1401                 exp->exp_lock_replay_needed = 0;
1402
1403                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1404                 atomic_dec(&obd->obd_lock_replay_clients);
1405         }
1406         spin_unlock(&exp->exp_lock);
1407 }
1408
1409 /* This function removes 1-3 references from the export:
1410  * 1 - for export pointer passed
1411  * and if disconnect really need
1412  * 2 - removing from hash
1413  * 3 - in client_unlink_export
1414  * The export pointer passed to this function can destroyed */
1415 int class_disconnect(struct obd_export *export)
1416 {
1417         int already_disconnected;
1418         ENTRY;
1419
1420         if (export == NULL) {
1421                 CWARN("attempting to free NULL export %p\n", export);
1422                 RETURN(-EINVAL);
1423         }
1424
1425         spin_lock(&export->exp_lock);
1426         already_disconnected = export->exp_disconnected;
1427         export->exp_disconnected = 1;
1428 #ifdef HAVE_SERVER_SUPPORT
1429         /*  We hold references of export for uuid hash
1430          *  and nid_hash and export link at least. So
1431          *  it is safe to call rh*table_remove_fast in
1432          *  there.
1433          */
1434         obd_nid_del(export->exp_obd, export);
1435 #endif /* HAVE_SERVER_SUPPORT */
1436         spin_unlock(&export->exp_lock);
1437
1438         /* class_cleanup(), abort_recovery(), and class_fail_export()
1439          * all end up in here, and if any of them race we shouldn't
1440          * call extra class_export_puts(). */
1441         if (already_disconnected)
1442                 GOTO(no_disconn, already_disconnected);
1443
1444         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1445                export->exp_handle.h_cookie);
1446
1447         class_export_recovery_cleanup(export);
1448         class_unlink_export(export);
1449 no_disconn:
1450         class_export_put(export);
1451         RETURN(0);
1452 }
1453 EXPORT_SYMBOL(class_disconnect);
1454
1455 /* Return non-zero for a fully connected export */
1456 int class_connected_export(struct obd_export *exp)
1457 {
1458         int connected = 0;
1459
1460         if (exp) {
1461                 spin_lock(&exp->exp_lock);
1462                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1463                 spin_unlock(&exp->exp_lock);
1464         }
1465         return connected;
1466 }
1467 EXPORT_SYMBOL(class_connected_export);
1468
1469 static void class_disconnect_export_list(struct list_head *list,
1470                                          enum obd_option flags)
1471 {
1472         int rc;
1473         struct obd_export *exp;
1474         ENTRY;
1475
1476         /* It's possible that an export may disconnect itself, but
1477          * nothing else will be added to this list.
1478          */
1479         while ((exp = list_first_entry_or_null(list, struct obd_export,
1480                                                exp_obd_chain)) != NULL) {
1481                 /* need for safe call CDEBUG after obd_disconnect */
1482                 class_export_get(exp);
1483
1484                 spin_lock(&exp->exp_lock);
1485                 exp->exp_flags = flags;
1486                 spin_unlock(&exp->exp_lock);
1487
1488                 if (obd_uuid_equals(&exp->exp_client_uuid,
1489                                     &exp->exp_obd->obd_uuid)) {
1490                         CDEBUG(D_HA,
1491                                "exp %p export uuid == obd uuid, don't discon\n",
1492                                exp);
1493                         /* Need to delete this now so we don't end up pointing
1494                          * to work_list later when this export is cleaned up. */
1495                         list_del_init(&exp->exp_obd_chain);
1496                         class_export_put(exp);
1497                         continue;
1498                 }
1499
1500                 class_export_get(exp);
1501                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1502                        "last request at %lld\n",
1503                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1504                        exp, exp->exp_last_request_time);
1505                 /* release one export reference anyway */
1506                 rc = obd_disconnect(exp);
1507
1508                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1509                        obd_export_nid2str(exp), exp, rc);
1510                 class_export_put(exp);
1511         }
1512         EXIT;
1513 }
1514
1515 void class_disconnect_exports(struct obd_device *obd)
1516 {
1517         LIST_HEAD(work_list);
1518         ENTRY;
1519
1520         /* Move all of the exports from obd_exports to a work list, en masse. */
1521         spin_lock(&obd->obd_dev_lock);
1522         list_splice_init(&obd->obd_exports, &work_list);
1523         list_splice_init(&obd->obd_delayed_exports, &work_list);
1524         spin_unlock(&obd->obd_dev_lock);
1525
1526         if (!list_empty(&work_list)) {
1527                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1528                        "disconnecting them\n", obd->obd_minor, obd);
1529                 class_disconnect_export_list(&work_list,
1530                                              exp_flags_from_obd(obd));
1531         } else
1532                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1533                        obd->obd_minor, obd);
1534         EXIT;
1535 }
1536 EXPORT_SYMBOL(class_disconnect_exports);
1537
1538 /* Remove exports that have not completed recovery.
1539  */
1540 void class_disconnect_stale_exports(struct obd_device *obd,
1541                                     int (*test_export)(struct obd_export *))
1542 {
1543         LIST_HEAD(work_list);
1544         struct obd_export *exp, *n;
1545         int evicted = 0;
1546         ENTRY;
1547
1548         spin_lock(&obd->obd_dev_lock);
1549         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1550                                  exp_obd_chain) {
1551                 /* don't count self-export as client */
1552                 if (obd_uuid_equals(&exp->exp_client_uuid,
1553                                     &exp->exp_obd->obd_uuid))
1554                         continue;
1555
1556                 /* don't evict clients which have no slot in last_rcvd
1557                  * (e.g. lightweight connection) */
1558                 if (exp->exp_target_data.ted_lr_idx == -1)
1559                         continue;
1560
1561                 spin_lock(&exp->exp_lock);
1562                 if (exp->exp_failed || test_export(exp)) {
1563                         spin_unlock(&exp->exp_lock);
1564                         continue;
1565                 }
1566                 exp->exp_failed = 1;
1567                 atomic_inc(&exp->exp_obd->obd_eviction_count);
1568                 spin_unlock(&exp->exp_lock);
1569
1570                 list_move(&exp->exp_obd_chain, &work_list);
1571                 evicted++;
1572                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1573                        obd->obd_name, exp->exp_client_uuid.uuid,
1574                        obd_export_nid2str(exp));
1575                 print_export_data(exp, "EVICTING", 0, D_HA);
1576         }
1577         spin_unlock(&obd->obd_dev_lock);
1578
1579         if (evicted)
1580                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1581                               obd->obd_name, evicted);
1582
1583         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1584                                                  OBD_OPT_ABORT_RECOV);
1585         EXIT;
1586 }
1587 EXPORT_SYMBOL(class_disconnect_stale_exports);
1588
1589 void class_fail_export(struct obd_export *exp)
1590 {
1591         int rc, already_failed;
1592
1593         spin_lock(&exp->exp_lock);
1594         already_failed = exp->exp_failed;
1595         exp->exp_failed = 1;
1596         spin_unlock(&exp->exp_lock);
1597
1598         if (already_failed) {
1599                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1600                        exp, exp->exp_client_uuid.uuid);
1601                 return;
1602         }
1603
1604         atomic_inc(&exp->exp_obd->obd_eviction_count);
1605
1606         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1607                exp, exp->exp_client_uuid.uuid);
1608
1609         if (obd_dump_on_timeout)
1610                 libcfs_debug_dumplog();
1611
1612         /* need for safe call CDEBUG after obd_disconnect */
1613         class_export_get(exp);
1614
1615         /* Most callers into obd_disconnect are removing their own reference
1616          * (request, for example) in addition to the one from the hash table.
1617          * We don't have such a reference here, so make one. */
1618         class_export_get(exp);
1619         rc = obd_disconnect(exp);
1620         if (rc)
1621                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1622         else
1623                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1624                        exp, exp->exp_client_uuid.uuid);
1625         class_export_put(exp);
1626 }
1627 EXPORT_SYMBOL(class_fail_export);
1628
1629 #ifdef HAVE_SERVER_SUPPORT
1630
1631 static int take_first(struct obd_export *exp, void *data)
1632 {
1633         struct obd_export **expp = data;
1634
1635         if (*expp)
1636                 /* already have one */
1637                 return 0;
1638         if (exp->exp_failed)
1639                 /* Don't want this one */
1640                 return 0;
1641         if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1642                 /* Cannot get a ref on this one */
1643                 return 0;
1644         *expp = exp;
1645         return 1;
1646 }
1647
1648 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1649 {
1650         struct lnet_nid nid_key;
1651         struct obd_export *doomed_exp;
1652         int exports_evicted = 0;
1653
1654         libcfs_strnid(&nid_key, nid);
1655
1656         spin_lock(&obd->obd_dev_lock);
1657         /* umount has run already, so evict thread should leave
1658          * its task to umount thread now */
1659         if (obd->obd_stopping) {
1660                 spin_unlock(&obd->obd_dev_lock);
1661                 return exports_evicted;
1662         }
1663         spin_unlock(&obd->obd_dev_lock);
1664
1665         doomed_exp = NULL;
1666         while (obd_nid_export_for_each(obd, &nid_key,
1667                                        take_first, &doomed_exp) > 0) {
1668
1669                 LASSERTF(doomed_exp != obd->obd_self_export,
1670                          "self-export is hashed by NID?\n");
1671
1672                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1673                               obd->obd_name,
1674                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1675                               obd_export_nid2str(doomed_exp));
1676
1677                 class_fail_export(doomed_exp);
1678                 class_export_put(doomed_exp);
1679                 exports_evicted++;
1680                 doomed_exp = NULL;
1681         }
1682
1683         if (!exports_evicted)
1684                 CDEBUG(D_HA,
1685                        "%s: can't disconnect NID '%s': no exports found\n",
1686                        obd->obd_name, nid);
1687         return exports_evicted;
1688 }
1689 EXPORT_SYMBOL(obd_export_evict_by_nid);
1690
1691 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1692 {
1693         struct obd_export *doomed_exp = NULL;
1694         struct obd_uuid doomed_uuid;
1695         int exports_evicted = 0;
1696
1697         spin_lock(&obd->obd_dev_lock);
1698         if (obd->obd_stopping) {
1699                 spin_unlock(&obd->obd_dev_lock);
1700                 return exports_evicted;
1701         }
1702         spin_unlock(&obd->obd_dev_lock);
1703
1704         obd_str2uuid(&doomed_uuid, uuid);
1705         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1706                 CERROR("%s: can't evict myself\n", obd->obd_name);
1707                 return exports_evicted;
1708         }
1709
1710         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1711         if (doomed_exp == NULL) {
1712                 CERROR("%s: can't disconnect %s: no exports found\n",
1713                        obd->obd_name, uuid);
1714         } else {
1715                 CWARN("%s: evicting %s at adminstrative request\n",
1716                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1717                 class_fail_export(doomed_exp);
1718                 class_export_put(doomed_exp);
1719                 obd_uuid_del(obd, doomed_exp);
1720                 exports_evicted++;
1721         }
1722
1723         return exports_evicted;
1724 }
1725 #endif /* HAVE_SERVER_SUPPORT */
1726
1727 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1728 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1729 EXPORT_SYMBOL(class_export_dump_hook);
1730 #endif
1731
1732 static void print_export_data(struct obd_export *exp, const char *status,
1733                               int locks, int debug_level)
1734 {
1735         struct ptlrpc_reply_state *rs;
1736         struct ptlrpc_reply_state *first_reply = NULL;
1737         int nreplies = 0;
1738
1739         spin_lock(&exp->exp_lock);
1740         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1741                             rs_exp_list) {
1742                 if (nreplies == 0)
1743                         first_reply = rs;
1744                 nreplies++;
1745         }
1746         spin_unlock(&exp->exp_lock);
1747
1748         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1749                "%p %s %llu stale:%d\n",
1750                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1751                obd_export_nid2str(exp),
1752                refcount_read(&exp->exp_handle.h_ref),
1753                atomic_read(&exp->exp_rpc_count),
1754                atomic_read(&exp->exp_cb_count),
1755                atomic_read(&exp->exp_locks_count),
1756                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1757                nreplies, first_reply, nreplies > 3 ? "..." : "",
1758                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1759 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1760         if (locks && class_export_dump_hook != NULL)
1761                 class_export_dump_hook(exp);
1762 #endif
1763 }
1764
1765 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1766 {
1767         struct obd_export *exp;
1768
1769         spin_lock(&obd->obd_dev_lock);
1770         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1771                 print_export_data(exp, "ACTIVE", locks, debug_level);
1772         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1773                 print_export_data(exp, "UNLINKED", locks, debug_level);
1774         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1775                 print_export_data(exp, "DELAYED", locks, debug_level);
1776         spin_unlock(&obd->obd_dev_lock);
1777 }
1778
1779 void obd_exports_barrier(struct obd_device *obd)
1780 {
1781         int waited = 2;
1782         LASSERT(list_empty(&obd->obd_exports));
1783         spin_lock(&obd->obd_dev_lock);
1784         while (!list_empty(&obd->obd_unlinked_exports)) {
1785                 spin_unlock(&obd->obd_dev_lock);
1786                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1787                 if (waited > 5 && is_power_of_2(waited)) {
1788                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1789                                       "more than %d seconds. "
1790                                       "The obd refcount = %d. Is it stuck?\n",
1791                                       obd->obd_name, waited,
1792                                       atomic_read(&obd->obd_refcount));
1793                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1794                 }
1795                 waited *= 2;
1796                 spin_lock(&obd->obd_dev_lock);
1797         }
1798         spin_unlock(&obd->obd_dev_lock);
1799 }
1800 EXPORT_SYMBOL(obd_exports_barrier);
1801
1802 /**
1803  * Add export to the obd_zombe thread and notify it.
1804  */
1805 static void obd_zombie_export_add(struct obd_export *exp) {
1806         atomic_dec(&obd_stale_export_num);
1807         spin_lock(&exp->exp_obd->obd_dev_lock);
1808         LASSERT(!list_empty(&exp->exp_obd_chain));
1809         list_del_init(&exp->exp_obd_chain);
1810         spin_unlock(&exp->exp_obd->obd_dev_lock);
1811
1812         queue_work(zombie_wq, &exp->exp_zombie_work);
1813 }
1814
1815 /**
1816  * Add import to the obd_zombe thread and notify it.
1817  */
1818 static void obd_zombie_import_add(struct obd_import *imp) {
1819         LASSERT(imp->imp_sec == NULL);
1820
1821         queue_work(zombie_wq, &imp->imp_zombie_work);
1822 }
1823
1824 /**
1825  * wait when obd_zombie import/export queues become empty
1826  */
1827 void obd_zombie_barrier(void)
1828 {
1829         flush_workqueue(zombie_wq);
1830 }
1831 EXPORT_SYMBOL(obd_zombie_barrier);
1832
1833
1834 struct obd_export *obd_stale_export_get(void)
1835 {
1836         struct obd_export *exp = NULL;
1837         ENTRY;
1838
1839         spin_lock(&obd_stale_export_lock);
1840         if (!list_empty(&obd_stale_exports)) {
1841                 exp = list_first_entry(&obd_stale_exports,
1842                                        struct obd_export, exp_stale_list);
1843                 list_del_init(&exp->exp_stale_list);
1844         }
1845         spin_unlock(&obd_stale_export_lock);
1846
1847         if (exp) {
1848                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1849                        atomic_read(&obd_stale_export_num));
1850         }
1851         RETURN(exp);
1852 }
1853 EXPORT_SYMBOL(obd_stale_export_get);
1854
1855 void obd_stale_export_put(struct obd_export *exp)
1856 {
1857         ENTRY;
1858
1859         LASSERT(list_empty(&exp->exp_stale_list));
1860         if (exp->exp_lock_hash &&
1861             atomic_read(&exp->exp_lock_hash->hs_count)) {
1862                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1863                        atomic_read(&obd_stale_export_num));
1864
1865                 spin_lock_bh(&exp->exp_bl_list_lock);
1866                 spin_lock(&obd_stale_export_lock);
1867                 /* Add to the tail if there is no blocked locks,
1868                  * to the head otherwise. */
1869                 if (list_empty(&exp->exp_bl_list))
1870                         list_add_tail(&exp->exp_stale_list,
1871                                       &obd_stale_exports);
1872                 else
1873                         list_add(&exp->exp_stale_list,
1874                                  &obd_stale_exports);
1875
1876                 spin_unlock(&obd_stale_export_lock);
1877                 spin_unlock_bh(&exp->exp_bl_list_lock);
1878         } else {
1879                 class_export_put(exp);
1880         }
1881         EXIT;
1882 }
1883 EXPORT_SYMBOL(obd_stale_export_put);
1884
1885 /**
1886  * Adjust the position of the export in the stale list,
1887  * i.e. move to the head of the list if is needed.
1888  **/
1889 void obd_stale_export_adjust(struct obd_export *exp)
1890 {
1891         LASSERT(exp != NULL);
1892         spin_lock_bh(&exp->exp_bl_list_lock);
1893         spin_lock(&obd_stale_export_lock);
1894
1895         if (!list_empty(&exp->exp_stale_list) &&
1896             !list_empty(&exp->exp_bl_list))
1897                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1898
1899         spin_unlock(&obd_stale_export_lock);
1900         spin_unlock_bh(&exp->exp_bl_list_lock);
1901 }
1902 EXPORT_SYMBOL(obd_stale_export_adjust);
1903
1904 /**
1905  * start destroy zombie import/export thread
1906  */
1907 int obd_zombie_impexp_init(void)
1908 {
1909         zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1910                                            0, CFS_CPT_ANY,
1911                                            cfs_cpt_number(cfs_cpt_tab));
1912
1913         return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1914 }
1915
1916 /**
1917  * stop destroy zombie import/export thread
1918  */
1919 void obd_zombie_impexp_stop(void)
1920 {
1921         destroy_workqueue(zombie_wq);
1922         LASSERT(list_empty(&obd_stale_exports));
1923 }
1924
1925 /***** Kernel-userspace comm helpers *******/
1926
1927 /* Get length of entire message, including header */
1928 int kuc_len(int payload_len)
1929 {
1930         return sizeof(struct kuc_hdr) + payload_len;
1931 }
1932 EXPORT_SYMBOL(kuc_len);
1933
1934 /* Get a pointer to kuc header, given a ptr to the payload
1935  * @param p Pointer to payload area
1936  * @returns Pointer to kuc header
1937  */
1938 struct kuc_hdr * kuc_ptr(void *p)
1939 {
1940         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1941         LASSERT(lh->kuc_magic == KUC_MAGIC);
1942         return lh;
1943 }
1944 EXPORT_SYMBOL(kuc_ptr);
1945
1946 /* Alloc space for a message, and fill in header
1947  * @return Pointer to payload area
1948  */
1949 void *kuc_alloc(int payload_len, int transport, int type)
1950 {
1951         struct kuc_hdr *lh;
1952         int len = kuc_len(payload_len);
1953
1954         OBD_ALLOC(lh, len);
1955         if (lh == NULL)
1956                 return ERR_PTR(-ENOMEM);
1957
1958         lh->kuc_magic = KUC_MAGIC;
1959         lh->kuc_transport = transport;
1960         lh->kuc_msgtype = type;
1961         lh->kuc_msglen = len;
1962
1963         return (void *)(lh + 1);
1964 }
1965 EXPORT_SYMBOL(kuc_alloc);
1966
1967 /* Takes pointer to payload area */
1968 void kuc_free(void *p, int payload_len)
1969 {
1970         struct kuc_hdr *lh = kuc_ptr(p);
1971         OBD_FREE(lh, kuc_len(payload_len));
1972 }
1973 EXPORT_SYMBOL(kuc_free);
1974
1975 struct obd_request_slot_waiter {
1976         struct list_head        orsw_entry;
1977         wait_queue_head_t       orsw_waitq;
1978         bool                    orsw_signaled;
1979 };
1980
1981 static bool obd_request_slot_avail(struct client_obd *cli,
1982                                    struct obd_request_slot_waiter *orsw)
1983 {
1984         bool avail;
1985
1986         spin_lock(&cli->cl_loi_list_lock);
1987         avail = !!list_empty(&orsw->orsw_entry);
1988         spin_unlock(&cli->cl_loi_list_lock);
1989
1990         return avail;
1991 };
1992
1993 /*
1994  * For network flow control, the RPC sponsor needs to acquire a credit
1995  * before sending the RPC. The credits count for a connection is defined
1996  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1997  * the subsequent RPC sponsors need to wait until others released their
1998  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1999  */
2000 int obd_get_request_slot(struct client_obd *cli)
2001 {
2002         struct obd_request_slot_waiter   orsw;
2003         int                              rc;
2004
2005         spin_lock(&cli->cl_loi_list_lock);
2006         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2007                 cli->cl_rpcs_in_flight++;
2008                 spin_unlock(&cli->cl_loi_list_lock);
2009                 return 0;
2010         }
2011
2012         init_waitqueue_head(&orsw.orsw_waitq);
2013         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2014         orsw.orsw_signaled = false;
2015         spin_unlock(&cli->cl_loi_list_lock);
2016
2017         rc = l_wait_event_abortable(orsw.orsw_waitq,
2018                                     obd_request_slot_avail(cli, &orsw) ||
2019                                     orsw.orsw_signaled);
2020
2021         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2022          * freed but other (such as obd_put_request_slot) is using it. */
2023         spin_lock(&cli->cl_loi_list_lock);
2024         if (rc != 0) {
2025                 if (!orsw.orsw_signaled) {
2026                         if (list_empty(&orsw.orsw_entry))
2027                                 cli->cl_rpcs_in_flight--;
2028                         else
2029                                 list_del(&orsw.orsw_entry);
2030                 }
2031                 rc = -EINTR;
2032         }
2033
2034         if (orsw.orsw_signaled) {
2035                 LASSERT(list_empty(&orsw.orsw_entry));
2036
2037                 rc = -EINTR;
2038         }
2039         spin_unlock(&cli->cl_loi_list_lock);
2040
2041         return rc;
2042 }
2043 EXPORT_SYMBOL(obd_get_request_slot);
2044
2045 void obd_put_request_slot(struct client_obd *cli)
2046 {
2047         struct obd_request_slot_waiter *orsw;
2048
2049         spin_lock(&cli->cl_loi_list_lock);
2050         cli->cl_rpcs_in_flight--;
2051
2052         /* If there is free slot, wakeup the first waiter. */
2053         if (!list_empty(&cli->cl_flight_waiters) &&
2054             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2055                 orsw = list_first_entry(&cli->cl_flight_waiters,
2056                                         struct obd_request_slot_waiter,
2057                                         orsw_entry);
2058                 list_del_init(&orsw->orsw_entry);
2059                 cli->cl_rpcs_in_flight++;
2060                 wake_up(&orsw->orsw_waitq);
2061         }
2062         spin_unlock(&cli->cl_loi_list_lock);
2063 }
2064 EXPORT_SYMBOL(obd_put_request_slot);
2065
2066 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2067 {
2068         return cli->cl_max_rpcs_in_flight;
2069 }
2070 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2071
2072 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2073 {
2074         struct obd_request_slot_waiter *orsw;
2075         __u32                           old;
2076         int                             diff;
2077         int                             i;
2078         int                             rc;
2079
2080         if (max > OBD_MAX_RIF_MAX || max < 1)
2081                 return -ERANGE;
2082
2083         CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2084                cli->cl_import->imp_obd->obd_name, max,
2085                cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2086
2087         if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2088                    LUSTRE_MDC_NAME) == 0) {
2089                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2090                  * strictly lower that max_rpcs_in_flight */
2091                 if (max < 2) {
2092                         CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2093                                cli->cl_import->imp_obd->obd_name);
2094                         return -ERANGE;
2095                 }
2096                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2097                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2098                         if (rc != 0)
2099                                 return rc;
2100                 }
2101         }
2102
2103         spin_lock(&cli->cl_loi_list_lock);
2104         old = cli->cl_max_rpcs_in_flight;
2105         cli->cl_max_rpcs_in_flight = max;
2106         client_adjust_max_dirty(cli);
2107
2108         diff = max - old;
2109
2110         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2111         for (i = 0; i < diff; i++) {
2112                 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2113                                                 struct obd_request_slot_waiter,
2114                                                 orsw_entry);
2115                 if (!orsw)
2116                         break;
2117
2118                 list_del_init(&orsw->orsw_entry);
2119                 cli->cl_rpcs_in_flight++;
2120                 wake_up(&orsw->orsw_waitq);
2121         }
2122         spin_unlock(&cli->cl_loi_list_lock);
2123
2124         return 0;
2125 }
2126 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2127
2128 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2129 {
2130         return cli->cl_max_mod_rpcs_in_flight;
2131 }
2132 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2133
2134 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2135 {
2136         struct obd_connect_data *ocd;
2137         __u16 maxmodrpcs;
2138         __u16 prev;
2139
2140         if (max > OBD_MAX_RIF_MAX || max < 1)
2141                 return -ERANGE;
2142
2143         ocd = &cli->cl_import->imp_connect_data;
2144         CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2145                cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2146                ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2147
2148         if (max == OBD_MAX_RIF_MAX)
2149                 max = OBD_MAX_RIF_MAX - 1;
2150
2151         /* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
2152          * increase this value, also bump up max_rpcs_in_flight to match.
2153          */
2154         if (max >= cli->cl_max_rpcs_in_flight) {
2155                 CDEBUG(D_INFO,
2156                        "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2157                        cli->cl_import->imp_obd->obd_name, max + 1, max);
2158                 obd_set_max_rpcs_in_flight(cli, max + 1);
2159         }
2160
2161         /* cannot exceed max modify RPCs in flight supported by the server,
2162          * but verify ocd_connect_flags is at least initialized first.  If
2163          * not, allow it and fix value later in ptlrpc_connect_set_flags().
2164          */
2165         if (!ocd->ocd_connect_flags) {
2166                 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2167         } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2168                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2169                 if (maxmodrpcs == 0) { /* connection not finished yet */
2170                         maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2171                         CDEBUG(D_INFO,
2172                                "%s: partial connect, assume maxmodrpcs=%hu\n",
2173                                cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2174                 }
2175         } else {
2176                 maxmodrpcs = 1;
2177         }
2178         if (max > maxmodrpcs) {
2179                 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than mdt.*.max_mod_rpcs_in_flight=%hu returned by the MDT server at connection.\n",
2180                        cli->cl_import->imp_obd->obd_name,
2181                        max, maxmodrpcs);
2182                 return -ERANGE;
2183         }
2184
2185         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2186
2187         prev = cli->cl_max_mod_rpcs_in_flight;
2188         cli->cl_max_mod_rpcs_in_flight = max;
2189
2190         /* wakeup waiters if limit has been increased */
2191         if (cli->cl_max_mod_rpcs_in_flight > prev)
2192                 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2193
2194         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2195
2196         return 0;
2197 }
2198 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2199
2200 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2201                                struct seq_file *seq)
2202 {
2203         unsigned long mod_tot = 0, mod_cum;
2204         int i;
2205
2206         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2207         lprocfs_stats_header(seq, ktime_get_real(), cli->cl_mod_rpcs_init, 25,
2208                              ":", true, "");
2209         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2210                    cli->cl_mod_rpcs_in_flight);
2211
2212         seq_printf(seq, "\n\t\t\tmodify\n");
2213         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2214
2215         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2216
2217         mod_cum = 0;
2218         for (i = 0; i < OBD_HIST_MAX; i++) {
2219                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2220
2221                 mod_cum += mod;
2222                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2223                            i, mod, pct(mod, mod_tot),
2224                            pct(mod_cum, mod_tot));
2225                 if (mod_cum == mod_tot)
2226                         break;
2227         }
2228
2229         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2230
2231         return 0;
2232 }
2233 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2234
2235 /* The number of modify RPCs sent in parallel is limited
2236  * because the server has a finite number of slots per client to
2237  * store request result and ensure reply reconstruction when needed.
2238  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2239  * that takes into account server limit and cl_max_rpcs_in_flight
2240  * value.
2241  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2242  * one close request is allowed above the maximum.
2243  */
2244 struct mod_waiter {
2245         struct client_obd *cli;
2246         bool close_req;
2247         wait_queue_entry_t wqe;
2248 };
2249 static int claim_mod_rpc_function(wait_queue_entry_t *wq_entry,
2250                                   unsigned int mode, int flags, void *key)
2251 {
2252         struct mod_waiter *w = container_of(wq_entry, struct mod_waiter, wqe);
2253         struct client_obd *cli = w->cli;
2254         bool close_req = w->close_req;
2255         bool avail;
2256         int ret;
2257
2258         /* As woken_wake_function() doesn't remove us from the wait_queue,
2259          * we could get called twice for the same thread - take care.
2260          */
2261         if (wq_entry->flags & WQ_FLAG_WOKEN)
2262                 /* Already woke this thread, don't try again */
2263                 return 0;
2264
2265         /* A slot is available if
2266          * - number of modify RPCs in flight is less than the max
2267          * - it's a close RPC and no other close request is in flight
2268          */
2269         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2270                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2271         if (avail) {
2272                 cli->cl_mod_rpcs_in_flight++;
2273                 if (w->close_req)
2274                         cli->cl_close_rpcs_in_flight++;
2275                 ret = woken_wake_function(wq_entry, mode, flags, key);
2276         } else if (cli->cl_close_rpcs_in_flight)
2277                 /* No other waiter could be woken */
2278                 ret = -1;
2279         else if (key == NULL)
2280                 /* This was not a wakeup from a close completion, so there is no
2281                  * point seeing if there are close waiters to be woken
2282                  */
2283                 ret = -1;
2284         else
2285                 /* There might be be a close we could wake, keep looking */
2286                 ret = 0;
2287         return ret;
2288 }
2289
2290 /* Get a modify RPC slot from the obd client @cli according
2291  * to the kind of operation @opc that is going to be sent
2292  * and the intent @it of the operation if it applies.
2293  * If the maximum number of modify RPCs in flight is reached
2294  * the thread is put to sleep.
2295  * Returns the tag to be set in the request message. Tag 0
2296  * is reserved for non-modifying requests.
2297  */
2298 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2299 {
2300         struct mod_waiter wait = {
2301                 .cli = cli,
2302                 .close_req = (opc == MDS_CLOSE),
2303         };
2304         __u16                   i, max;
2305
2306         init_wait(&wait.wqe);
2307         wait.wqe.func = claim_mod_rpc_function;
2308
2309         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2310         __add_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2311         /* This wakeup will only succeed if the maximums haven't
2312          * been reached.  If that happens, WQ_FLAG_WOKEN will be cleared
2313          * and there will be no need to wait.
2314          */
2315         wake_up_locked(&cli->cl_mod_rpcs_waitq);
2316         if (!(wait.wqe.flags & WQ_FLAG_WOKEN)) {
2317                 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2318                 wait_woken(&wait.wqe, TASK_UNINTERRUPTIBLE,
2319                            MAX_SCHEDULE_TIMEOUT);
2320                 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2321         }
2322         __remove_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2323
2324         max = cli->cl_max_mod_rpcs_in_flight;
2325         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2326                          cli->cl_mod_rpcs_in_flight);
2327         /* find a free tag */
2328         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2329                                 max + 1);
2330         LASSERT(i < OBD_MAX_RIF_MAX);
2331         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2332         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2333         /* tag 0 is reserved for non-modify RPCs */
2334
2335         CDEBUG(D_RPCTRACE,
2336                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2337                cli->cl_import->imp_obd->obd_name,
2338                i + 1, opc, max);
2339
2340         return i + 1;
2341 }
2342 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2343
2344 /* Put a modify RPC slot from the obd client @cli according
2345  * to the kind of operation @opc that has been sent.
2346  */
2347 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2348 {
2349         bool                    close_req = false;
2350
2351         if (tag == 0)
2352                 return;
2353
2354         if (opc == MDS_CLOSE)
2355                 close_req = true;
2356
2357         spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2358         cli->cl_mod_rpcs_in_flight--;
2359         if (close_req)
2360                 cli->cl_close_rpcs_in_flight--;
2361         /* release the tag in the bitmap */
2362         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2363         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2364         __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2365                              (void *)close_req);
2366         spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2367 }
2368 EXPORT_SYMBOL(obd_put_mod_rpc_slot);