#include <linux/lustre_mds.h>
#include <linux/lustre_lite.h>
+#include <linux/lustre_gs.h>
#include "llite_internal.h"
/* record that a write is in flight */
void llap_write_pending(struct inode *inode, struct ll_async_page *llap)
{
struct ll_inode_info *lli = ll_i2info(inode);
+ struct page *page = llap->llap_page;
spin_lock(&lli->lli_lock);
- list_add(&llap->llap_pending_write, &lli->lli_pending_write_llaps);
+ CDEBUG(D_INODE, "track page 0x%p/%lu %s\n",
+ page, (unsigned long) page->index,
+ !list_empty(&llap->llap_pending_write) ? "(already)" : "");
+ if (list_empty(&llap->llap_pending_write))
+ list_add(&llap->llap_pending_write,
+ &lli->lli_pending_write_llaps);
spin_unlock(&lli->lli_lock);
}
spin_lock(&lli->lli_lock);
if (!list_empty(&llap->llap_pending_write))
list_del_init(&llap->llap_pending_write);
+ if (list_empty(&lli->lli_pending_write_llaps))
+ wake_up(&lli->lli_dirty_wait);
spin_unlock(&lli->lli_lock);
}
{
struct ll_inode_info *lli = ll_i2info(inode);
struct ll_close_queue *lcq = ll_i2sbi(inode)->ll_lcq;
+ int added = 0;
spin_lock(&lli->lli_lock);
if (lli->lli_send_done_writing &&
list_empty(&lli->lli_pending_write_llaps)) {
-
spin_lock(&lcq->lcq_lock);
if (list_empty(&lli->lli_close_item)) {
CDEBUG(D_INODE, "adding inode %lu/%u to close list\n",
inode->i_ino, inode->i_generation);
- LASSERT(igrab(inode) == inode);
list_add_tail(&lli->lli_close_item, &lcq->lcq_list);
wake_up(&lcq->lcq_waitq);
+ added = 1;
}
spin_unlock(&lcq->lcq_lock);
}
spin_unlock(&lli->lli_lock);
+
+ /*
+ * we can't grab inode under lli_lock, because:
+ * ll_try_done_writing: ll_prep_inode:
+ * spin_lock(&lli_lock) spin_lock(&inode_lock)
+ * igrab() ll_update_inode()
+ * spin_lock(&inode_lock) spin_lock(&lli_lock)
+ */
+ if (added)
+ LASSERT(igrab(inode) == inode);
}
/* The MDS needs us to get the real file attributes, then send a DONE_WRITING */
struct ll_inode_info *lli = ll_i2info(inode);
ENTRY;
+ CDEBUG(D_INODE, "queue closing for %lu/%u\n",
+ inode->i_ino, inode->i_generation);
spin_lock(&lli->lli_lock);
lli->lli_send_done_writing = 1;
spin_unlock(&lli->lli_lock);
EXIT;
}
-#if 0
/* If we know the file size and have the cookies:
* - send a DONE_WRITING rpc
*
* - get the authoritative size and all cookies with GETATTRs
* - send a DONE_WRITING rpc
*/
-static void ll_close_done_writing(struct inode *inode)
+static void ll_try_to_close(struct inode *inode)
{
- struct ll_inode_info *lli = ll_i2info(inode);
- ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF } };
- struct lustre_handle lockh = { 0 };
- struct obdo obdo;
- obd_flag valid;
- int rc, ast_flags = 0;
- ENTRY;
-
- memset(&obdo, 0, sizeof(obdo));
- if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags))
- goto rpc;
-
- rc = ll_extent_lock(NULL, inode, lli->lli_smd, LCK_PW, &policy, &lockh,
- ast_flags);
- if (rc != ELDLM_OK) {
- CERROR("lock acquisition failed (%d): unable to send "
- "DONE_WRITING for inode %lu/%u\n", rc, inode->i_ino,
- inode->i_generation);
- GOTO(out, rc);
- }
-
- rc = ll_lsm_getattr(ll_i2obdexp(inode), lli->lli_smd, &obdo);
- if (rc) {
- CERROR("inode_getattr failed (%d): unable to send DONE_WRITING "
- "for inode %lu/%u\n", rc, inode->i_ino,
- inode->i_generation);
- ll_extent_unlock(NULL, inode, lli->lli_smd, LCK_PW, &lockh);
- GOTO(out, rc);
- }
-
- obdo_refresh_inode(inode, &obdo, valid);
-
- CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
- lli->lli_smd->lsm_object_id, inode->i_size, inode->i_blocks,
- inode->i_blksize);
-
- set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
-
- rc = ll_extent_unlock(NULL, inode, lli->lli_smd, LCK_PW, &lockh);
- if (rc != ELDLM_OK)
- CERROR("unlock failed (%d)? proceeding anyways...\n", rc);
-
- rpc:
- obdo.o_id = inode->i_ino;
- obdo.o_size = inode->i_size;
- obdo.o_blocks = inode->i_blocks;
- obdo.o_valid = OBD_MD_FLID | OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
-
- rc = mdc_done_writing(ll_i2sbi(inode)->ll_mdc_exp, &obdo);
- out:
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ ll_md_real_close(sbi->ll_md_exp, inode, FMODE_WRITE | FMODE_SYNC);
}
-#endif
static struct ll_inode_info *ll_close_next_lli(struct ll_close_queue *lcq)
{
spin_lock(&lcq->lcq_lock);
- if (lcq->lcq_list.next == NULL)
- lli = ERR_PTR(-1);
- else if (!list_empty(&lcq->lcq_list)) {
+ /* first, check for queued request. otherwise, we would
+ * leak them upon umount */
+ if (!list_empty(&lcq->lcq_list)) {
lli = list_entry(lcq->lcq_list.next, struct ll_inode_info,
lli_close_item);
- list_del(&lli->lli_close_item);
+ list_del_init(&lli->lli_close_item);
+ } else if (lcq->lcq_stop != 0) {
+ lli = ERR_PTR(-1);
}
spin_unlock(&lcq->lcq_lock);
while (1) {
struct l_wait_info lwi = { 0 };
struct ll_inode_info *lli;
- //struct inode *inode;
+ struct inode *inode;
l_wait_event_exclusive(lcq->lcq_waitq,
(lli = ll_close_next_lli(lcq)) != NULL,
if (IS_ERR(lli))
break;
- //inode = ll_info2i(lli);
- //ll_close_done_writing(inode);
- //iput(inode);
+ inode = ll_info2i(lli);
+ ll_try_to_close(inode);
+ iput(inode);
}
- complete(&lcq->lcq_comp);
- RETURN(0);
+ EXIT;
+
+ /* SMF-safe way to finish threads */
+ complete_and_exit(&lcq->lcq_comp, 0);
}
int ll_close_thread_start(struct ll_close_queue **lcq_ret)
if (lcq == NULL)
return -ENOMEM;
+ lcq->lcq_stop = 0;
spin_lock_init(&lcq->lcq_lock);
INIT_LIST_HEAD(&lcq->lcq_list);
init_waitqueue_head(&lcq->lcq_waitq);
return 0;
}
-void ll_close_thread_shutdown(struct ll_close_queue *lcq)
+void ll_close_thread_stop(struct ll_close_queue *lcq)
{
init_completion(&lcq->lcq_comp);
- lcq->lcq_list.next = NULL;
+ lcq->lcq_stop = 1;
wake_up(&lcq->lcq_waitq);
wait_for_completion(&lcq->lcq_comp);
OBD_FREE(lcq, sizeof(*lcq));