Whamcloud - gitweb
smash the HEAD with the contents of b_cmd. HEAD_PRE_CMD_SMASH and
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
index f4005de..a386eef 100644 (file)
 
 #include "qswnal.h"
 
+ptl_handle_ni_t                kqswnal_ni;
 nal_t                  kqswnal_api;
 kqswnal_data_t         kqswnal_data;
-ptl_handle_ni_t         kqswnal_ni;
-kqswnal_tunables_t      kqswnal_tunables;
 
 kpr_nal_interface_t kqswnal_router_interface = {
        kprni_nalid:    QSWNAL,
@@ -44,7 +43,10 @@ kpr_nal_interface_t kqswnal_router_interface = {
 
 static ctl_table kqswnal_ctl_table[] = {
        {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
-        &kqswnal_tunables.kqn_optimized_gets, sizeof (int),
+        &kqswnal_data.kqn_optimized_gets, sizeof (int),
+        0644, NULL, &proc_dointvec},
+       {QSWNAL_SYSCTL_COPY_SMALL_FWD, "copy_small_fwd",
+        &kqswnal_data.kqn_copy_small_fwd, sizeof (int),
         0644, NULL, &proc_dointvec},
        {0}
 };
@@ -99,6 +101,15 @@ kqswnal_unlock(nal_t *nal, unsigned long *flags)
 }
 
 static int
+kqswnal_shutdown(nal_t *nal, int ni)
+{
+       CDEBUG (D_NET, "shutdown\n");
+
+       LASSERT (nal == &kqswnal_api);
+       return (0);
+}
+
+static int
 kqswnal_yield(nal_t *nal, unsigned long *flags, int milliseconds)
 {
        /* NB called holding statelock */
@@ -108,7 +119,7 @@ kqswnal_yield(nal_t *nal, unsigned long *flags, int milliseconds)
        CDEBUG (D_NET, "yield\n");
 
        if (milliseconds == 0) {
-               if (need_resched())
+               if (current->need_resched)
                        schedule();
                return 0;
        }
@@ -137,6 +148,20 @@ kqswnal_yield(nal_t *nal, unsigned long *flags, int milliseconds)
        return (milliseconds);
 }
 
+static nal_t *
+kqswnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
+            ptl_pid_t requested_pid)
+{
+       ptl_nid_t mynid = kqswnal_elanid2nid (kqswnal_data.kqn_elanid);
+       int       nnids = kqswnal_data.kqn_nnodes;
+
+        CDEBUG(D_NET, "calling lib_init with nid "LPX64" of %d\n", mynid, nnids);
+
+       lib_init(&kqswnal_lib, mynid, 0, nnids, ptl_size, ac_size);
+
+       return (&kqswnal_api);
+}
+
 int
 kqswnal_get_tx_desc (struct portals_cfg *pcfg)
 {
@@ -194,32 +219,26 @@ kqswnal_cmd (struct portals_cfg *pcfg, void *private)
        }
 }
 
-static void
-kqswnal_shutdown(nal_t *nal)
+void __exit
+kqswnal_finalise (void)
 {
-       unsigned long flags;
-       int           do_lib_fini = 0;
-
-       /* NB The first ref was this module! */
-       if (nal->nal_refct != 0) {
-               PORTAL_MODULE_UNUSE;
-               return;
-       }
-
-       CDEBUG (D_NET, "shutdown\n");
-       LASSERT (nal == &kqswnal_api);
-
        switch (kqswnal_data.kqn_init)
        {
        default:
                LASSERT (0);
 
        case KQN_INIT_ALL:
-                libcfs_nal_cmd_unregister(QSWNAL);
+#if CONFIG_SYSCTL
+                if (kqswnal_data.kqn_sysctl != NULL)
+                        unregister_sysctl_table (kqswnal_data.kqn_sysctl);
+#endif         
+               PORTAL_SYMBOL_UNREGISTER (kqswnal_ni);
+                kportal_nal_unregister(QSWNAL);
                /* fall through */
 
-       case KQN_INIT_LIB:
-               do_lib_fini = 1;
+       case KQN_INIT_PTL:
+               PtlNIFini (kqswnal_ni);
+               lib_fini (&kqswnal_lib);
                /* fall through */
 
        case KQN_INIT_DATA:
@@ -230,24 +249,18 @@ kqswnal_shutdown(nal_t *nal)
        }
 
        /**********************************************************************/
-       /* Tell router we're shutting down.  Any router calls my threads
-        * make will now fail immediately and the router will stop calling
-        * into me. */
+       /* Make router stop her calling me and fail any more call-ins */
        kpr_shutdown (&kqswnal_data.kqn_router);
-       
+
        /**********************************************************************/
-       /* Signal the start of shutdown... */
-       spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
-       kqswnal_data.kqn_shuttingdown = 1;
-       spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
+       /* flag threads we've started to terminate and wait for all to ack */
 
-       wake_up_all(&kqswnal_data.kqn_idletxd_waitq);
+       kqswnal_data.kqn_shuttingdown = 1;
+       wake_up_all (&kqswnal_data.kqn_sched_waitq);
 
-       /**********************************************************************/
-       /* wait for sends that have allocated a tx desc to launch or give up */
-       while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
-               CDEBUG(D_NET, "waiting for %d pending sends\n",
-                      atomic_read (&kqswnal_data.kqn_pending_txs));
+       while (atomic_read (&kqswnal_data.kqn_nthreads_running) != 0) {
+               CDEBUG(D_NET, "waiting for %d threads to start shutting down\n",
+                      atomic_read (&kqswnal_data.kqn_nthreads_running));
                set_current_state (TASK_UNINTERRUPTIBLE);
                schedule_timeout (HZ);
        }
@@ -255,27 +268,18 @@ kqswnal_shutdown(nal_t *nal)
        /**********************************************************************/
        /* close elan comms */
 #if MULTIRAIL_EKC
-       /* Shut down receivers first; rx callbacks might try sending... */
        if (kqswnal_data.kqn_eprx_small != NULL)
                ep_free_rcvr (kqswnal_data.kqn_eprx_small);
 
        if (kqswnal_data.kqn_eprx_large != NULL)
                ep_free_rcvr (kqswnal_data.kqn_eprx_large);
 
-       /* NB ep_free_rcvr() returns only after we've freed off all receive
-        * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
-        * means we must have completed any messages we passed to
-        * lib_parse() or kpr_fwd_start(). */
-
        if (kqswnal_data.kqn_eptx != NULL)
                ep_free_xmtr (kqswnal_data.kqn_eptx);
 
-       /* NB ep_free_xmtr() returns only after all outstanding transmits
-        * have called their callback... */
+       /* freeing the xmtr completes all txs pdq */
        LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
 #else
-       /* "Old" EKC just pretends to shutdown cleanly but actually
-        * provides no guarantees */
        if (kqswnal_data.kqn_eprx_small != NULL)
                ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
 
@@ -294,6 +298,7 @@ kqswnal_shutdown(nal_t *nal)
 #endif
        /**********************************************************************/
        /* flag threads to terminate, wake them and wait for them to die */
+
        kqswnal_data.kqn_shuttingdown = 2;
        wake_up_all (&kqswnal_data.kqn_sched_waitq);
 
@@ -311,12 +316,10 @@ kqswnal_shutdown(nal_t *nal)
 
 #if MULTIRAIL_EKC
        LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
-       LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
-       LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds));
 #endif
 
        /**********************************************************************/
-       /* Complete any blocked forwarding packets, with error
+       /* Complete any blocked forwarding packets with error
         */
 
        while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
@@ -324,16 +327,23 @@ kqswnal_shutdown(nal_t *nal)
                kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
                                                  kpr_fwd_desc_t, kprfd_list);
                list_del (&fwd->kprfd_list);
-               kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN);
+               kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
+       }
+
+       while (!list_empty (&kqswnal_data.kqn_delayedfwds))
+       {
+               kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_delayedfwds.next,
+                                                 kpr_fwd_desc_t, kprfd_list);
+               list_del (&fwd->kprfd_list);
+               kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
        }
 
        /**********************************************************************/
-       /* finalise router and portals lib */
+       /* Wait for router to complete any packets I sent her
+        */
 
        kpr_deregister (&kqswnal_data.kqn_router);
 
-       if (do_lib_fini)
-               lib_fini (&kqswnal_lib);
 
        /**********************************************************************/
        /* Unmap message buffers and free all descriptors and buffers
@@ -454,9 +464,7 @@ kqswnal_shutdown(nal_t *nal)
 }
 
 static int __init
-kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
-                ptl_ni_limits_t *requested_limits, 
-                ptl_ni_limits_t *actual_limits)
+kqswnal_initialise (void)
 {
 #if MULTIRAIL_EKC
        EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
@@ -466,21 +474,22 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        int               rc;
        int               i;
        int               elan_page_idx;
-       ptl_process_id_t  my_process_id;
        int               pkmem = atomic_read(&portal_kmemory);
 
-       if (nal->nal_refct != 0) {
-               if (actual_limits != NULL)
-                       *actual_limits = kqswnal_lib.ni.actual_limits;
-               /* This module got the first ref */
-               PORTAL_MODULE_USE;
-               return (PTL_OK);
-       }
-
        LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
 
        CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
 
+       kqswnal_api.forward  = kqswnal_forward;
+       kqswnal_api.shutdown = kqswnal_shutdown;
+       kqswnal_api.yield    = kqswnal_yield;
+       kqswnal_api.validate = NULL;            /* our api validate is a NOOP */
+       kqswnal_api.lock     = kqswnal_lock;
+       kqswnal_api.unlock   = kqswnal_unlock;
+       kqswnal_api.nal_data = &kqswnal_data;
+
+       kqswnal_lib.nal_data = &kqswnal_data;
+
        memset(&kqswnal_rpc_success, 0, sizeof(kqswnal_rpc_success));
        memset(&kqswnal_rpc_failed, 0, sizeof(kqswnal_rpc_failed));
 #if MULTIRAIL_EKC
@@ -491,6 +500,9 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        /* ensure all pointers NULL etc */
        memset (&kqswnal_data, 0, sizeof (kqswnal_data));
 
+       kqswnal_data.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
+       kqswnal_data.kqn_copy_small_fwd = KQSW_COPY_SMALL_FWD;
+
        kqswnal_data.kqn_cb = &kqswnal_lib;
 
        INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
@@ -512,19 +524,18 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
 
        /* pointers/lists/locks initialised */
        kqswnal_data.kqn_init = KQN_INIT_DATA;
-       
+
 #if MULTIRAIL_EKC
        kqswnal_data.kqn_ep = ep_system();
        if (kqswnal_data.kqn_ep == NULL) {
                CERROR("Can't initialise EKC\n");
-               kqswnal_shutdown(&kqswnal_api);
-               return (PTL_IFACE_INVALID);
+               return (-ENODEV);
        }
 
        if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
                CERROR("Can't get elan ID\n");
-               kqswnal_shutdown(&kqswnal_api);
-               return (PTL_IFACE_INVALID);
+               kqswnal_finalise();
+               return (-ENODEV);
        }
 #else
        /**********************************************************************/
@@ -534,8 +545,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        if (kqswnal_data.kqn_ep == NULL)
        {
                CERROR ("Can't get elan device 0\n");
-               kqswnal_shutdown(&kqswnal_api);
-               return (PTL_IFACE_INVALID);
+               return (-ENODEV);
        }
 #endif
 
@@ -550,8 +560,8 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        if (kqswnal_data.kqn_eptx == NULL)
        {
                CERROR ("Can't allocate transmitter\n");
-               kqswnal_shutdown (&kqswnal_api);
-               return (PTL_NO_SPACE);
+               kqswnal_finalise ();
+               return (-ENOMEM);
        }
 
        /**********************************************************************/
@@ -563,8 +573,8 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        if (kqswnal_data.kqn_eprx_small == NULL)
        {
                CERROR ("Can't install small msg receiver\n");
-               kqswnal_shutdown (&kqswnal_api);
-               return (PTL_NO_SPACE);
+               kqswnal_finalise ();
+               return (-ENOMEM);
        }
 
        kqswnal_data.kqn_eprx_large = ep_alloc_rcvr (kqswnal_data.kqn_ep,
@@ -573,8 +583,8 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        if (kqswnal_data.kqn_eprx_large == NULL)
        {
                CERROR ("Can't install large msg receiver\n");
-               kqswnal_shutdown (&kqswnal_api);
-               return (PTL_NO_SPACE);
+               kqswnal_finalise ();
+               return (-ENOMEM);
        }
 
        /**********************************************************************/
@@ -588,8 +598,8 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
                                EP_PERM_WRITE);
        if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
                CERROR("Can't reserve tx dma space\n");
-               kqswnal_shutdown(&kqswnal_api);
-               return (PTL_NO_SPACE);
+               kqswnal_finalise();
+               return (-ENOMEM);
        }
 #else
         dmareq.Waitfn   = DDI_DMA_SLEEP;
@@ -603,8 +613,8 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        if (rc != DDI_SUCCESS)
        {
                CERROR ("Can't reserve rx dma space\n");
-               kqswnal_shutdown (&kqswnal_api);
-               return (PTL_NO_SPACE);
+               kqswnal_finalise ();
+               return (-ENOMEM);
        }
 #endif
        /**********************************************************************/
@@ -617,8 +627,8 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
                                EP_PERM_WRITE);
        if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
                CERROR("Can't reserve rx dma space\n");
-               kqswnal_shutdown(&kqswnal_api);
-               return (PTL_NO_SPACE);
+               kqswnal_finalise();
+               return (-ENOMEM);
        }
 #else
         dmareq.Waitfn   = DDI_DMA_SLEEP;
@@ -633,8 +643,8 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        if (rc != DDI_SUCCESS)
        {
                CERROR ("Can't reserve rx dma space\n");
-               kqswnal_shutdown (&kqswnal_api);
-               return (PTL_NO_SPACE);
+               kqswnal_finalise ();
+               return (-ENOMEM);
        }
 #endif
        /**********************************************************************/
@@ -644,8 +654,8 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
                     sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
        if (kqswnal_data.kqn_txds == NULL)
        {
-               kqswnal_shutdown (&kqswnal_api);
-               return (PTL_NO_SPACE);
+               kqswnal_finalise ();
+               return (-ENOMEM);
        }
 
        /* clear flags, null pointers etc */
@@ -660,8 +670,8 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
                PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
                if (ktx->ktx_buffer == NULL)
                {
-                       kqswnal_shutdown (&kqswnal_api);
-                       return (PTL_NO_SPACE);
+                       kqswnal_finalise ();
+                       return (-ENOMEM);
                }
 
                /* Map pre-allocated buffer NOW, to save latency on transmit */
@@ -697,8 +707,8 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
                      sizeof (kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE));
        if (kqswnal_data.kqn_rxds == NULL)
        {
-               kqswnal_shutdown (&kqswnal_api);
-               return (PTL_NO_SPACE);
+               kqswnal_finalise ();
+               return (-ENOMEM);
        }
 
        memset(kqswnal_data.kqn_rxds, 0, /* clear flags, null pointers etc */
@@ -732,8 +742,8 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
                        struct page *page = alloc_page(GFP_KERNEL);
                        
                        if (page == NULL) {
-                               kqswnal_shutdown (&kqswnal_api);
-                               return (PTL_NO_SPACE);
+                               kqswnal_finalise ();
+                               return (-ENOMEM);
                        }
 
                        krx->krx_kiov[j].kiov_page = page;
@@ -777,19 +787,15 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        /**********************************************************************/
        /* Network interface ready to initialise */
 
-       my_process_id.nid = kqswnal_elanid2nid(kqswnal_data.kqn_elanid);
-       my_process_id.pid = 0;
-
-       rc = lib_init(&kqswnal_lib, my_process_id,
-                     requested_limits, actual_limits);
-        if (rc != PTL_OK)
+        rc = PtlNIInit(kqswnal_init, 32, 4, 0, &kqswnal_ni);
+        if (rc != 0)
        {
-               CERROR ("lib_init failed %d\n", rc);
-               kqswnal_shutdown (&kqswnal_api);
-               return (rc);
+               CERROR ("PtlNIInit failed %d\n", rc);
+               kqswnal_finalise ();
+               return (-ENOMEM);
        }
 
-       kqswnal_data.kqn_init = KQN_INIT_LIB;
+       kqswnal_data.kqn_init = KQN_INIT_PTL;
 
        /**********************************************************************/
        /* Queue receives, now that it's OK to run their completion callbacks */
@@ -810,20 +816,21 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
                if (rc != EP_SUCCESS)
                {
                        CERROR ("failed ep_queue_receive %d\n", rc);
-                       kqswnal_shutdown (&kqswnal_api);
-                       return (PTL_FAIL);
+                       kqswnal_finalise ();
+                       return (-ENOMEM);
                }
        }
 
        /**********************************************************************/
        /* Spawn scheduling threads */
-       for (i = 0; i < num_online_cpus(); i++) {
+       for (i = 0; i < smp_num_cpus; i++)
+       {
                rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
                if (rc != 0)
                {
                        CERROR ("failed to spawn scheduling thread: %d\n", rc);
-                       kqswnal_shutdown (&kqswnal_api);
-                       return (PTL_FAIL);
+                       kqswnal_finalise ();
+                       return (rc);
                }
        }
 
@@ -832,13 +839,19 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
        CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
 
-       rc = libcfs_nal_cmd_register (QSWNAL, &kqswnal_cmd, NULL);
+       rc = kportal_nal_register (QSWNAL, &kqswnal_cmd, NULL);
        if (rc != 0) {
                CERROR ("Can't initialise command interface (rc = %d)\n", rc);
-               kqswnal_shutdown (&kqswnal_api);
-               return (PTL_FAIL);
+               kqswnal_finalise ();
+               return (rc);
        }
 
+#if CONFIG_SYSCTL
+        /* Press on regardless even if registering sysctl doesn't work */
+        kqswnal_data.kqn_sysctl = register_sysctl_table (kqswnal_top_ctl_table, 0);
+#endif
+
+       PORTAL_SYMBOL_REGISTER(kqswnal_ni);
        kqswnal_data.kqn_init = KQN_INIT_ALL;
 
        printk(KERN_INFO "Lustre: Routing QSW NAL loaded on node %d of %d "
@@ -847,65 +860,15 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
               kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
               pkmem);
 
-       return (PTL_OK);
-}
-
-void __exit
-kqswnal_finalise (void)
-{
-#if CONFIG_SYSCTL
-       if (kqswnal_tunables.kqn_sysctl != NULL)
-               unregister_sysctl_table (kqswnal_tunables.kqn_sysctl);
-#endif
-       PtlNIFini(kqswnal_ni);
-
-       ptl_unregister_nal(QSWNAL);
-}
-
-static int __init
-kqswnal_initialise (void)
-{
-       int   rc;
-
-       kqswnal_api.startup  = kqswnal_startup;
-       kqswnal_api.shutdown = kqswnal_shutdown;
-       kqswnal_api.forward  = kqswnal_forward;
-       kqswnal_api.yield    = kqswnal_yield;
-       kqswnal_api.lock     = kqswnal_lock;
-       kqswnal_api.unlock   = kqswnal_unlock;
-       kqswnal_api.nal_data = &kqswnal_data;
-
-       kqswnal_lib.nal_data = &kqswnal_data;
-
-       /* Initialise dynamic tunables to defaults once only */
-       kqswnal_tunables.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
-       
-       rc = ptl_register_nal(QSWNAL, &kqswnal_api);
-       if (rc != PTL_OK) {
-               CERROR("Can't register QSWNAL: %d\n", rc);
-               return (-ENOMEM);               /* or something... */
-       }
-
-       /* Pure gateways, and the workaround for 'EKC blocks forever until
-        * the service is active' want the NAL started up at module load
-        * time... */
-       rc = PtlNIInit(QSWNAL, 0, NULL, NULL, &kqswnal_ni);
-       if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
-               ptl_unregister_nal(QSWNAL);
-               return (-ENODEV);
-       }
-
-#if CONFIG_SYSCTL
-        /* Press on regardless even if registering sysctl doesn't work */
-        kqswnal_tunables.kqn_sysctl = 
-               register_sysctl_table (kqswnal_top_ctl_table, 0);
-#endif
        return (0);
 }
 
+
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Kernel Quadrics/Elan NAL v1.01");
 MODULE_LICENSE("GPL");
 
 module_init (kqswnal_initialise);
 module_exit (kqswnal_finalise);
+
+EXPORT_SYMBOL (kqswnal_ni);