for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
}
+ LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
+ INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
/* # pages in a large message fwd buffer */
#define SOCKNAL_RESCHED 100 /* # scheduler loops before reschedule */
+#define SOCKNAL_ENOMEM_RETRY 1 /* jiffies between retries */
#define SOCKNAL_TX_LOW_WATER(sk) (((sk)->sk_sndbuf*8)/10)
struct list_head ksnd_deathrow_conns; /* conns to be closed */
struct list_head ksnd_zombie_conns; /* conns to be freed */
- wait_queue_head_t ksnd_reaper_waitq; /* reaper sleep here */
+ struct list_head ksnd_enomem_conns; /* conns to be retried */
+ wait_queue_head_t ksnd_reaper_waitq; /* reaper sleeps here */
+ unsigned long ksnd_reaper_waketime; /* when reaper will wake */
spinlock_t ksnd_reaper_lock; /* serialise */
+ int ksnd_enomem_tx; /* test ENOMEM sender */
int ksnd_stall_tx; /* test sluggish sender */
int ksnd_stall_rx; /* test sluggish receiver */
set_fs (oldmm);
}
- if (rc <= 0)
- return (rc);
-
- tx->tx_resid -= rc;
-
- if (rc < iov->iov_len) {
- /* didn't send whole iov entry... */
- iov->iov_base = (void *)(vaddr + rc);
- iov->iov_len -= rc;
- /* ...but did we send everything we tried to send? */
- return ((rc == fragsize) ? 1 : -EAGAIN);
+ if (rc > 0) {
+ tx->tx_resid -= rc;
+
+ if (rc < iov->iov_len) {
+ /* didn't send whole iov entry... */
+ iov->iov_base = (void *)(vaddr + rc);
+ iov->iov_len -= rc;
+ } else {
+ tx->tx_iov++;
+ tx->tx_niov--;
+ }
}
-
- tx->tx_iov++;
- tx->tx_niov--;
- return (1);
+
+ return (rc);
}
int
kunmap (page);
}
- if (rc <= 0)
- return (rc);
-
- tx->tx_resid -= rc;
+ if (rc > 0) {
+ tx->tx_resid -= rc;
- if (rc < fragsize) {
- /* didn't send whole frag */
- kiov->kiov_offset = offset + rc;
- kiov->kiov_len = fragsize - rc;
- return (-EAGAIN);
+ if (rc < fragsize) {
+ kiov->kiov_offset = offset + rc;
+ kiov->kiov_len = fragsize - rc;
+ } else {
+ tx->tx_kiov++;
+ tx->tx_nkiov--;
+ }
}
- /* everything went */
- LASSERT (rc == fragsize);
- tx->tx_kiov++;
- tx->tx_nkiov--;
- return (1);
+ return (rc);
}
int
ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
{
- /* Return 0 on success, < 0 on error.
- * caller checks tx_resid to determine progress/completion */
int rc;
- ENTRY;
if (ksocknal_data.ksnd_stall_tx != 0) {
set_current_state (TASK_UNINTERRUPTIBLE);
schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
}
+ LASSERT (tx->tx_resid != 0);
+
rc = ksocknal_getconnsock (conn);
if (rc != 0) {
LASSERT (conn->ksnc_closing);
- return (rc);
+ return (-ESHUTDOWN);
}
- for (;;) {
- LASSERT (tx->tx_resid != 0);
-
- if (tx->tx_niov != 0)
+ do {
+ if (ksocknal_data.ksnd_enomem_tx > 0) {
+ /* testing... */
+ ksocknal_data.ksnd_enomem_tx--;
+ rc = -EAGAIN;
+ } else if (tx->tx_niov != 0) {
rc = ksocknal_send_iov (conn, tx);
- else
+ } else {
rc = ksocknal_send_kiov (conn, tx);
+ }
- if (rc <= 0) { /* error or socket full? */
- /* NB: rc == 0 and rc == -EAGAIN both mean try
+ if (rc <= 0) {
+ /* Didn't write anything.
+ *
+ * NB: rc == 0 and rc == -EAGAIN both mean try
* again later (linux stack returns -EAGAIN for
- * this, but Adaptech TOE returns 0) */
- if (rc == -EAGAIN)
- rc = 0;
+ * this, but Adaptech TOE returns 0).
+ *
+ * Also, sends never fail with -ENOMEM, just
+ * -EAGAIN, but with the added bonus that we can't
+ * expect write_space() to call us back to tell us
+ * when to try sending again. We use the
+ * SOCK_NOSPACE flag to diagnose... */
+
+ LASSERT(rc != -ENOMEM);
+
+ if (rc == 0 || rc == -EAGAIN) {
+ if (test_bit(SOCK_NOSPACE,
+ &conn->ksnc_sock->flags)) {
+ rc = -EAGAIN;
+ } else {
+ static int counter;
+
+ counter++;
+ if ((counter & (-counter)) == counter)
+ CWARN("%d ENOMEM tx %p\n",
+ counter, conn);
+ rc = -ENOMEM;
+ }
+ }
break;
}
+ rc = 0;
+
/* Consider the connection alive since we managed to chuck
* more data into it. Really, we'd like to consider it
* alive only when the peer ACKs something, but
ksocknal_data.ksnd_io_timeout * HZ;
conn->ksnc_peer->ksnp_last_alive = jiffies;
- if (tx->tx_resid == 0) { /* sent everything */
- rc = 0;
- break;
- }
- }
+ } while (tx->tx_resid != 0);
ksocknal_putconnsock (conn);
- RETURN (rc);
+ return (rc);
}
void
rc = ksocknal_getconnsock (conn);
if (rc != 0) {
LASSERT (conn->ksnc_closing);
- return (rc);
+ return (-ESHUTDOWN);
}
for (;;) {
int
ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
{
+ unsigned long flags;
int rc;
rc = ksocknal_transmit (conn, tx);
CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
- LASSERT (rc != -EAGAIN);
- if (rc == 0) {
- /* no errors */
- if (tx->tx_resid != 0) {
- /* didn't send everything */
- return (-EAGAIN);
- }
-
+ if (tx->tx_resid == 0) {
+ /* Sent everything OK */
+ LASSERT (rc == 0);
+
ksocknal_tx_launched (tx);
return (0);
}
+ if (rc == -EAGAIN)
+ return (rc);
+
+ if (rc == -ENOMEM) {
+ /* Queue on ksnd_enomem_conns for retry after a timeout */
+ spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
+
+ /* enomem list takes over scheduler's ref... */
+ LASSERT (conn->ksnc_tx_scheduled);
+ list_add_tail(&conn->ksnc_tx_list,
+ &ksocknal_data.ksnd_enomem_conns);
+ if (!time_after_eq(jiffies + SOCKNAL_ENOMEM_RETRY,
+ ksocknal_data.ksnd_reaper_waketime))
+ wake_up (&ksocknal_data.ksnd_reaper_waitq);
+
+ spin_unlock_irqrestore(&ksocknal_data.ksnd_reaper_lock, flags);
+ return (rc);
+ }
+
+ /* Actual error */
+ LASSERT (rc < 0);
+
if (!conn->ksnc_closing)
- CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
- conn, rc, conn->ksnc_peer->ksnp_nid,
+ CERROR ("[%p] Error %d on write to "LPX64
+ " ip %08x:%d\n",conn, rc,
+ conn->ksnc_peer->ksnp_nid,
conn->ksnc_ipaddr, conn->ksnc_port);
ksocknal_close_conn_and_siblings (conn, rc);
ksocknal_tx_launched (tx);
-
+
return (rc);
}
spin_lock_irqsave (&sched->kss_lock, flags);
- if (rc != -EAGAIN) {
- /* error or everything went: assume more can go */
- conn->ksnc_tx_ready = 1;
- } else {
- /* back onto HEAD of tx_queue */
+ if (rc == -ENOMEM || rc == -EAGAIN) {
+ /* Incomplete send: replace tx on HEAD of tx_queue */
list_add (&tx->tx_list, &conn->ksnc_tx_queue);
+ } else {
+ /* Complete send; assume space for more */
+ conn->ksnc_tx_ready = 1;
}
-
- if (conn->ksnc_tx_ready &&
- !list_empty (&conn->ksnc_tx_queue)) {
+
+ if (rc == -ENOMEM) {
+ /* Do nothing; after a short timeout, this
+ * conn will be reposted on kss_tx_conns. */
+ } else if (conn->ksnc_tx_ready &&
+ !list_empty (&conn->ksnc_tx_queue)) {
/* reschedule for tx */
list_add_tail (&conn->ksnc_tx_list,
&sched->kss_tx_conns);
wait_queue_t wait;
unsigned long flags;
ksock_conn_t *conn;
+ ksock_sched_t *sched;
+ struct list_head enomem_conns;
+ int nenomem_conns;
int timeout;
int i;
int peer_index = 0;
kportal_daemonize ("ksocknal_reaper");
kportal_blockallsigs ();
+ INIT_LIST_HEAD(&enomem_conns);
init_waitqueue_entry (&wait, current);
current->flags |= PF_MEMALLOC;
spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
continue;
}
-
+
+ if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) {
+ list_add(&enomem_conns, &ksocknal_data.ksnd_enomem_conns);
+ list_del_init(&ksocknal_data.ksnd_enomem_conns);
+ }
+
spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
+ /* reschedule all the connections that stalled with ENOMEM... */
+ nenomem_conns = 0;
+ while (!list_empty (&enomem_conns)) {
+ conn = list_entry (enomem_conns.next,
+ ksock_conn_t, ksnc_tx_list);
+ list_del (&conn->ksnc_tx_list);
+
+ sched = conn->ksnc_scheduler;
+
+ spin_lock_irqsave (&sched->kss_lock, flags);
+
+ LASSERT (conn->ksnc_tx_scheduled);
+ conn->ksnc_tx_ready = 1;
+ list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
+ wake_up (&sched->kss_waitq);
+
+ spin_unlock_irqrestore (&sched->kss_lock, flags);
+ nenomem_conns++;
+ }
+
/* careful with the jiffy wrap... */
- while ((timeout = ((int)deadline - (int)jiffies)) <= 0) {
+ while ((timeout = (int)(deadline - jiffies)) <= 0) {
const int n = 4;
const int p = 1;
int chunk = ksocknal_data.ksnd_peer_hash_size;
deadline += p * HZ;
}
+ if (nenomem_conns != 0) {
+ /* Reduce my timeout if I rescheduled ENOMEM conns.
+ * This also prevents me getting woken immediately
+ * if any go back on my enomem list. */
+ timeout = SOCKNAL_ENOMEM_RETRY;
+ }
+ ksocknal_data.ksnd_reaper_waketime = jiffies + timeout;
+
add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
set_current_state (TASK_INTERRUPTIBLE);
for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
}
+ LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
+ INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
/* # pages in a large message fwd buffer */
#define SOCKNAL_RESCHED 100 /* # scheduler loops before reschedule */
+#define SOCKNAL_ENOMEM_RETRY 1 /* jiffies between retries */
#define SOCKNAL_TX_LOW_WATER(sk) (((sk)->sk_sndbuf*8)/10)
struct list_head ksnd_deathrow_conns; /* conns to be closed */
struct list_head ksnd_zombie_conns; /* conns to be freed */
- wait_queue_head_t ksnd_reaper_waitq; /* reaper sleep here */
+ struct list_head ksnd_enomem_conns; /* conns to be retried */
+ wait_queue_head_t ksnd_reaper_waitq; /* reaper sleeps here */
+ unsigned long ksnd_reaper_waketime; /* when reaper will wake */
spinlock_t ksnd_reaper_lock; /* serialise */
+ int ksnd_enomem_tx; /* test ENOMEM sender */
int ksnd_stall_tx; /* test sluggish sender */
int ksnd_stall_rx; /* test sluggish receiver */
set_fs (oldmm);
}
- if (rc <= 0)
- return (rc);
-
- tx->tx_resid -= rc;
-
- if (rc < iov->iov_len) {
- /* didn't send whole iov entry... */
- iov->iov_base = (void *)(vaddr + rc);
- iov->iov_len -= rc;
- /* ...but did we send everything we tried to send? */
- return ((rc == fragsize) ? 1 : -EAGAIN);
+ if (rc > 0) {
+ tx->tx_resid -= rc;
+
+ if (rc < iov->iov_len) {
+ /* didn't send whole iov entry... */
+ iov->iov_base = (void *)(vaddr + rc);
+ iov->iov_len -= rc;
+ } else {
+ tx->tx_iov++;
+ tx->tx_niov--;
+ }
}
-
- tx->tx_iov++;
- tx->tx_niov--;
- return (1);
+
+ return (rc);
}
int
kunmap (page);
}
- if (rc <= 0)
- return (rc);
-
- tx->tx_resid -= rc;
+ if (rc > 0) {
+ tx->tx_resid -= rc;
- if (rc < fragsize) {
- /* didn't send whole frag */
- kiov->kiov_offset = offset + rc;
- kiov->kiov_len = fragsize - rc;
- return (-EAGAIN);
+ if (rc < fragsize) {
+ kiov->kiov_offset = offset + rc;
+ kiov->kiov_len = fragsize - rc;
+ } else {
+ tx->tx_kiov++;
+ tx->tx_nkiov--;
+ }
}
- /* everything went */
- LASSERT (rc == fragsize);
- tx->tx_kiov++;
- tx->tx_nkiov--;
- return (1);
+ return (rc);
}
int
ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
{
- /* Return 0 on success, < 0 on error.
- * caller checks tx_resid to determine progress/completion */
int rc;
- ENTRY;
if (ksocknal_data.ksnd_stall_tx != 0) {
set_current_state (TASK_UNINTERRUPTIBLE);
schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
}
+ LASSERT (tx->tx_resid != 0);
+
rc = ksocknal_getconnsock (conn);
if (rc != 0) {
LASSERT (conn->ksnc_closing);
- return (rc);
+ return (-ESHUTDOWN);
}
- for (;;) {
- LASSERT (tx->tx_resid != 0);
-
- if (tx->tx_niov != 0)
+ do {
+ if (ksocknal_data.ksnd_enomem_tx > 0) {
+ /* testing... */
+ ksocknal_data.ksnd_enomem_tx--;
+ rc = -EAGAIN;
+ } else if (tx->tx_niov != 0) {
rc = ksocknal_send_iov (conn, tx);
- else
+ } else {
rc = ksocknal_send_kiov (conn, tx);
+ }
- if (rc <= 0) { /* error or socket full? */
- /* NB: rc == 0 and rc == -EAGAIN both mean try
+ if (rc <= 0) {
+ /* Didn't write anything.
+ *
+ * NB: rc == 0 and rc == -EAGAIN both mean try
* again later (linux stack returns -EAGAIN for
- * this, but Adaptech TOE returns 0) */
- if (rc == -EAGAIN)
- rc = 0;
+ * this, but Adaptech TOE returns 0).
+ *
+ * Also, sends never fail with -ENOMEM, just
+ * -EAGAIN, but with the added bonus that we can't
+ * expect write_space() to call us back to tell us
+ * when to try sending again. We use the
+ * SOCK_NOSPACE flag to diagnose... */
+
+ LASSERT(rc != -ENOMEM);
+
+ if (rc == 0 || rc == -EAGAIN) {
+ if (test_bit(SOCK_NOSPACE,
+ &conn->ksnc_sock->flags)) {
+ rc = -EAGAIN;
+ } else {
+ static int counter;
+
+ counter++;
+ if ((counter & (-counter)) == counter)
+ CWARN("%d ENOMEM tx %p\n",
+ counter, conn);
+ rc = -ENOMEM;
+ }
+ }
break;
}
+ rc = 0;
+
/* Consider the connection alive since we managed to chuck
* more data into it. Really, we'd like to consider it
* alive only when the peer ACKs something, but
ksocknal_data.ksnd_io_timeout * HZ;
conn->ksnc_peer->ksnp_last_alive = jiffies;
- if (tx->tx_resid == 0) { /* sent everything */
- rc = 0;
- break;
- }
- }
+ } while (tx->tx_resid != 0);
ksocknal_putconnsock (conn);
- RETURN (rc);
+ return (rc);
}
void
rc = ksocknal_getconnsock (conn);
if (rc != 0) {
LASSERT (conn->ksnc_closing);
- return (rc);
+ return (-ESHUTDOWN);
}
for (;;) {
int
ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
{
+ unsigned long flags;
int rc;
rc = ksocknal_transmit (conn, tx);
CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
- LASSERT (rc != -EAGAIN);
- if (rc == 0) {
- /* no errors */
- if (tx->tx_resid != 0) {
- /* didn't send everything */
- return (-EAGAIN);
- }
-
+ if (tx->tx_resid == 0) {
+ /* Sent everything OK */
+ LASSERT (rc == 0);
+
ksocknal_tx_launched (tx);
return (0);
}
+ if (rc == -EAGAIN)
+ return (rc);
+
+ if (rc == -ENOMEM) {
+ /* Queue on ksnd_enomem_conns for retry after a timeout */
+ spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
+
+ /* enomem list takes over scheduler's ref... */
+ LASSERT (conn->ksnc_tx_scheduled);
+ list_add_tail(&conn->ksnc_tx_list,
+ &ksocknal_data.ksnd_enomem_conns);
+ if (!time_after_eq(jiffies + SOCKNAL_ENOMEM_RETRY,
+ ksocknal_data.ksnd_reaper_waketime))
+ wake_up (&ksocknal_data.ksnd_reaper_waitq);
+
+ spin_unlock_irqrestore(&ksocknal_data.ksnd_reaper_lock, flags);
+ return (rc);
+ }
+
+ /* Actual error */
+ LASSERT (rc < 0);
+
if (!conn->ksnc_closing)
- CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
- conn, rc, conn->ksnc_peer->ksnp_nid,
+ CERROR ("[%p] Error %d on write to "LPX64
+ " ip %08x:%d\n",conn, rc,
+ conn->ksnc_peer->ksnp_nid,
conn->ksnc_ipaddr, conn->ksnc_port);
ksocknal_close_conn_and_siblings (conn, rc);
ksocknal_tx_launched (tx);
-
+
return (rc);
}
spin_lock_irqsave (&sched->kss_lock, flags);
- if (rc != -EAGAIN) {
- /* error or everything went: assume more can go */
- conn->ksnc_tx_ready = 1;
- } else {
- /* back onto HEAD of tx_queue */
+ if (rc == -ENOMEM || rc == -EAGAIN) {
+ /* Incomplete send: replace tx on HEAD of tx_queue */
list_add (&tx->tx_list, &conn->ksnc_tx_queue);
+ } else {
+ /* Complete send; assume space for more */
+ conn->ksnc_tx_ready = 1;
}
-
- if (conn->ksnc_tx_ready &&
- !list_empty (&conn->ksnc_tx_queue)) {
+
+ if (rc == -ENOMEM) {
+ /* Do nothing; after a short timeout, this
+ * conn will be reposted on kss_tx_conns. */
+ } else if (conn->ksnc_tx_ready &&
+ !list_empty (&conn->ksnc_tx_queue)) {
/* reschedule for tx */
list_add_tail (&conn->ksnc_tx_list,
&sched->kss_tx_conns);
wait_queue_t wait;
unsigned long flags;
ksock_conn_t *conn;
+ ksock_sched_t *sched;
+ struct list_head enomem_conns;
+ int nenomem_conns;
int timeout;
int i;
int peer_index = 0;
kportal_daemonize ("ksocknal_reaper");
kportal_blockallsigs ();
+ INIT_LIST_HEAD(&enomem_conns);
init_waitqueue_entry (&wait, current);
current->flags |= PF_MEMALLOC;
spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
continue;
}
-
+
+ if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) {
+ list_add(&enomem_conns, &ksocknal_data.ksnd_enomem_conns);
+ list_del_init(&ksocknal_data.ksnd_enomem_conns);
+ }
+
spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
+ /* reschedule all the connections that stalled with ENOMEM... */
+ nenomem_conns = 0;
+ while (!list_empty (&enomem_conns)) {
+ conn = list_entry (enomem_conns.next,
+ ksock_conn_t, ksnc_tx_list);
+ list_del (&conn->ksnc_tx_list);
+
+ sched = conn->ksnc_scheduler;
+
+ spin_lock_irqsave (&sched->kss_lock, flags);
+
+ LASSERT (conn->ksnc_tx_scheduled);
+ conn->ksnc_tx_ready = 1;
+ list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
+ wake_up (&sched->kss_waitq);
+
+ spin_unlock_irqrestore (&sched->kss_lock, flags);
+ nenomem_conns++;
+ }
+
/* careful with the jiffy wrap... */
- while ((timeout = ((int)deadline - (int)jiffies)) <= 0) {
+ while ((timeout = (int)(deadline - jiffies)) <= 0) {
const int n = 4;
const int p = 1;
int chunk = ksocknal_data.ksnd_peer_hash_size;
deadline += p * HZ;
}
+ if (nenomem_conns != 0) {
+ /* Reduce my timeout if I rescheduled ENOMEM conns.
+ * This also prevents me getting woken immediately
+ * if any go back on my enomem list. */
+ timeout = SOCKNAL_ENOMEM_RETRY;
+ }
+ ksocknal_data.ksnd_reaper_waketime = jiffies + timeout;
+
add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
set_current_state (TASK_INTERRUPTIBLE);