#define DEBUG_SUBSYSTEM S_LLITE
-#include <linux/lustre_mds.h>
-#include <linux/lustre_lite.h>
-#include <linux/lustre_gs.h>
+#include <lustre_lite.h>
#include "llite_internal.h"
/* record that a write is in flight */
void llap_write_pending(struct inode *inode, struct ll_async_page *llap)
{
struct ll_inode_info *lli = ll_i2info(inode);
- struct page *page = llap->llap_page;
spin_lock(&lli->lli_lock);
- CDEBUG(D_INODE, "track page 0x%p/%lu %s\n",
- page, (unsigned long) page->index,
- !list_empty(&llap->llap_pending_write) ? "(already)" : "");
- if (list_empty(&llap->llap_pending_write))
- list_add(&llap->llap_pending_write,
- &lli->lli_pending_write_llaps);
+ list_add(&llap->llap_pending_write, &lli->lli_pending_write_llaps);
spin_unlock(&lli->lli_lock);
}
{
struct ll_inode_info *lli = ll_i2info(inode);
spin_lock(&lli->lli_lock);
- if (!list_empty(&llap->llap_pending_write))
- list_del_init(&llap->llap_pending_write);
- if (list_empty(&lli->lli_pending_write_llaps))
- wake_up(&lli->lli_dirty_wait);
+ list_del_init(&llap->llap_pending_write);
spin_unlock(&lli->lli_lock);
}
{
struct ll_inode_info *lli = ll_i2info(inode);
struct ll_close_queue *lcq = ll_i2sbi(inode)->ll_lcq;
- int added = 0;
spin_lock(&lli->lli_lock);
if (lli->lli_send_done_writing &&
list_empty(&lli->lli_pending_write_llaps)) {
+
spin_lock(&lcq->lcq_lock);
if (list_empty(&lli->lli_close_item)) {
CDEBUG(D_INODE, "adding inode %lu/%u to close list\n",
inode->i_ino, inode->i_generation);
+ igrab(inode);
list_add_tail(&lli->lli_close_item, &lcq->lcq_list);
wake_up(&lcq->lcq_waitq);
- added = 1;
}
spin_unlock(&lcq->lcq_lock);
}
spin_unlock(&lli->lli_lock);
-
- /*
- * we can't grab inode under lli_lock, because:
- * ll_try_done_writing: ll_prep_inode:
- * spin_lock(&lli_lock) spin_lock(&inode_lock)
- * igrab() ll_update_inode()
- * spin_lock(&inode_lock) spin_lock(&lli_lock)
- */
- if (added)
- LASSERT(igrab(inode) == inode);
}
/* The MDS needs us to get the real file attributes, then send a DONE_WRITING */
struct ll_inode_info *lli = ll_i2info(inode);
ENTRY;
- CDEBUG(D_INODE, "queue closing for %lu/%u\n",
- inode->i_ino, inode->i_generation);
spin_lock(&lli->lli_lock);
lli->lli_send_done_writing = 1;
spin_unlock(&lli->lli_lock);
EXIT;
}
+#if 0
/* If we know the file size and have the cookies:
* - send a DONE_WRITING rpc
*
* - get the authoritative size and all cookies with GETATTRs
* - send a DONE_WRITING rpc
*/
-static void ll_try_to_close(struct inode *inode)
+static void ll_close_done_writing(struct inode *inode)
{
- struct ll_sb_info *sbi = ll_i2sbi(inode);
- ll_md_real_close(sbi->ll_md_exp, inode, FMODE_WRITE | FMODE_SYNC);
+ struct ll_inode_info *lli = ll_i2info(inode);
+ ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF } };
+ struct lustre_handle lockh = { 0 };
+ struct obdo obdo;
+ obd_flag valid;
+ int rc, ast_flags = 0;
+ ENTRY;
+
+ memset(&obdo, 0, sizeof(obdo));
+ if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags))
+ goto rpc;
+
+ rc = ll_extent_lock(NULL, inode, lli->lli_smd, LCK_PW, &policy, &lockh,
+ ast_flags);
+ if (rc != 0) {
+ CERROR("lock acquisition failed (%d): unable to send "
+ "DONE_WRITING for inode %lu/%u\n", rc, inode->i_ino,
+ inode->i_generation);
+ GOTO(out, rc);
+ }
+
+ rc = ll_lsm_getattr(ll_i2obdexp(inode), lli->lli_smd, &obdo);
+ if (rc) {
+ CERROR("inode_getattr failed (%d): unable to send DONE_WRITING "
+ "for inode %lu/%u\n", rc, inode->i_ino,
+ inode->i_generation);
+ ll_extent_unlock(NULL, inode, lli->lli_smd, LCK_PW, &lockh);
+ GOTO(out, rc);
+ }
+
+ obdo_refresh_inode(inode, &obdo, valid);
+
+ CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
+ lli->lli_smd->lsm_object_id, inode->i_size, inode->i_blocks,
+ 1<<inode->i_blkbits);
+
+ set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
+
+ rc = ll_extent_unlock(NULL, inode, lli->lli_smd, LCK_PW, &lockh);
+ if (rc != ELDLM_OK)
+ CERROR("unlock failed (%d)? proceeding anyways...\n", rc);
+
+ rpc:
+ obdo.o_id = inode->i_ino;
+ obdo.o_size = inode->i_size;
+ obdo.o_blocks = inode->i_blocks;
+ obdo.o_valid = OBD_MD_FLID | OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+
+ rc = mdc_done_writing(ll_i2sbi(inode)->ll_mdc_exp, &obdo);
+ out:
}
+#endif
static struct ll_inode_info *ll_close_next_lli(struct ll_close_queue *lcq)
{
spin_lock(&lcq->lcq_lock);
- /* first, check for queued request. otherwise, we would
- * leak them upon umount */
- if (!list_empty(&lcq->lcq_list)) {
+ if (lcq->lcq_list.next == NULL)
+ lli = ERR_PTR(-1);
+ else if (!list_empty(&lcq->lcq_list)) {
lli = list_entry(lcq->lcq_list.next, struct ll_inode_info,
lli_close_item);
- list_del_init(&lli->lli_close_item);
- } else if (lcq->lcq_stop != 0) {
- lli = ERR_PTR(-1);
+ list_del(&lli->lli_close_item);
}
spin_unlock(&lcq->lcq_lock);
struct ll_close_queue *lcq = arg;
ENTRY;
- /* XXX boiler-plate */
{
- char name[sizeof(current->comm)];
- unsigned long flags;
+ char name[CFS_CURPROC_COMM_MAX];
snprintf(name, sizeof(name) - 1, "ll_close");
- kportal_daemonize(name);
- SIGNAL_MASK_LOCK(current, flags);
- sigfillset(¤t->blocked);
- RECALC_SIGPENDING;
- SIGNAL_MASK_UNLOCK(current, flags);
+ cfs_daemonize(name);
}
-
+
complete(&lcq->lcq_comp);
while (1) {
struct l_wait_info lwi = { 0 };
struct ll_inode_info *lli;
- struct inode *inode;
+ //struct inode *inode;
l_wait_event_exclusive(lcq->lcq_waitq,
(lli = ll_close_next_lli(lcq)) != NULL,
if (IS_ERR(lli))
break;
- inode = ll_info2i(lli);
- ll_try_to_close(inode);
- iput(inode);
+ //inode = ll_info2i(lli);
+ //ll_close_done_writing(inode);
+ //iput(inode);
}
- EXIT;
-
- /* SMF-safe way to finish threads */
- complete_and_exit(&lcq->lcq_comp, 0);
+ complete(&lcq->lcq_comp);
+ RETURN(0);
}
int ll_close_thread_start(struct ll_close_queue **lcq_ret)
if (lcq == NULL)
return -ENOMEM;
- lcq->lcq_stop = 0;
spin_lock_init(&lcq->lcq_lock);
INIT_LIST_HEAD(&lcq->lcq_list);
init_waitqueue_head(&lcq->lcq_waitq);
return 0;
}
-void ll_close_thread_stop(struct ll_close_queue *lcq)
+void ll_close_thread_shutdown(struct ll_close_queue *lcq)
{
init_completion(&lcq->lcq_comp);
- lcq->lcq_stop = 1;
+ lcq->lcq_list.next = NULL;
wake_up(&lcq->lcq_waitq);
wait_for_completion(&lcq->lcq_comp);
OBD_FREE(lcq, sizeof(*lcq));