From ca5d3b0d030e4e660d1b454ca6e113688dddf993 Mon Sep 17 00:00:00 2001 From: Terekhin Alexander Date: Wed, 12 Aug 2020 18:46:10 +0300 Subject: [PATCH] Zip stream processing for single thread --- .../fias/exceptions/UnzipException.java | 11 ++ .../me/bearns/fias/service/FiasUpdater.java | 39 ++++- .../java/me/bearns/fias/service/Updater.java | 5 +- .../bearns/fias/util/DownlodableUpdate.java | 43 ----- .../java/me/bearns/fias/util/ZipReader.java | 164 ++++++++++++++++++ 5 files changed, 209 insertions(+), 53 deletions(-) create mode 100644 src/main/java/me/bearns/fias/exceptions/UnzipException.java delete mode 100644 src/main/java/me/bearns/fias/util/DownlodableUpdate.java create mode 100644 src/main/java/me/bearns/fias/util/ZipReader.java diff --git a/src/main/java/me/bearns/fias/exceptions/UnzipException.java b/src/main/java/me/bearns/fias/exceptions/UnzipException.java new file mode 100644 index 0000000..52f775d --- /dev/null +++ b/src/main/java/me/bearns/fias/exceptions/UnzipException.java @@ -0,0 +1,11 @@ +package me.bearns.fias.exceptions; + +public class UnzipException extends Exception { + public UnzipException(Exception e){ + super(e); + } + + public UnzipException(String s) { + super(s); + } +} diff --git a/src/main/java/me/bearns/fias/service/FiasUpdater.java b/src/main/java/me/bearns/fias/service/FiasUpdater.java index f6c73fb..d3da8f7 100644 --- a/src/main/java/me/bearns/fias/service/FiasUpdater.java +++ b/src/main/java/me/bearns/fias/service/FiasUpdater.java @@ -2,19 +2,19 @@ package me.bearns.fias.service; import me.bearns.fias.domain.FiasVersion; import me.bearns.fias.exceptions.DownloadException; +import me.bearns.fias.exceptions.UnzipException; import me.bearns.fias.repository.FiasVersionRepository; -import me.bearns.fias.util.DownlodableUpdate; +import me.bearns.fias.util.ZipReader; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.io.File; +import java.io.*; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.stream.Collectors; -import java.util.stream.Stream; @Service public class FiasUpdater implements Updater { @@ -29,7 +29,7 @@ public class FiasUpdater implements Updater { OnlineVersion clientStub; @Override - public void update() throws DownloadException { + public void update() throws DownloadException, UnzipException { //Max value FiasVersion dbVersionObj = versions.findTopByOrderByVersionIdDesc(); @@ -55,7 +55,7 @@ public class FiasUpdater implements Updater { } @Override - public void reload(Long... regions) throws DownloadException { + public void reload(Long... regions) throws DownloadException, UnzipException { final List lastVersion = clientStub.getLastVersion(); if(lastVersion != null) { @@ -65,7 +65,7 @@ public class FiasUpdater implements Updater { } //transaction from here - private void processUpdates(List updates, boolean reloadFlag, Long ... regions) throws DownloadException { + private void processUpdates(List updates, boolean reloadFlag, Long ... regions) throws DownloadException, UnzipException { if(reloadFlag) { //todo reload flag impl @@ -100,11 +100,34 @@ public class FiasUpdater implements Updater { e.printStackTrace(); //todo log throw new DownloadException(e); + } catch (FileNotFoundException e) { + throw new UnzipException(e); } } } - private void processArchive(File file, Long ... regions) { + private void processArchive(File file, Long ... regions) throws FileNotFoundException, UnzipException { + + try(final ZipReader reader = new ZipReader(new FileInputStream(file))){ + + final Set list = reader.list(); + + list.forEach(s -> { + try(final InputStream inputStream = reader.read(s)) { + + //todo + + + } catch (UnzipException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + }); + + } catch (IOException e) { + throw new UnzipException(e); + } } } \ No newline at end of file diff --git a/src/main/java/me/bearns/fias/service/Updater.java b/src/main/java/me/bearns/fias/service/Updater.java index 1ff616f..23b20b0 100644 --- a/src/main/java/me/bearns/fias/service/Updater.java +++ b/src/main/java/me/bearns/fias/service/Updater.java @@ -1,6 +1,7 @@ package me.bearns.fias.service; import me.bearns.fias.exceptions.DownloadException; +import me.bearns.fias.exceptions.UnzipException; public interface Updater { @@ -8,11 +9,11 @@ public interface Updater { /** * Run online update for fias database */ - public void update() throws DownloadException; + public void update() throws DownloadException, UnzipException; /** * Load or reload database for selected regions * @param regions */ - public void reload(Long ... regions) throws DownloadException; + public void reload(Long ... regions) throws DownloadException, UnzipException; } diff --git a/src/main/java/me/bearns/fias/util/DownlodableUpdate.java b/src/main/java/me/bearns/fias/util/DownlodableUpdate.java deleted file mode 100644 index 27c53a2..0000000 --- a/src/main/java/me/bearns/fias/util/DownlodableUpdate.java +++ /dev/null @@ -1,43 +0,0 @@ -package me.bearns.fias.util; - -import me.bearns.fias.domain.FiasVersion; -import me.bearns.fias.service.Downloader; -import org.springframework.beans.factory.annotation.Autowired; - -import java.io.File; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -public class DownlodableUpdate { - - @Autowired - private static Downloader service; - - private final FiasVersion update; - private final Future file; - - private static DownlodableUpdate process(FiasVersion ver, boolean reload) { - return new DownlodableUpdate(ver, service.download(reload ? ver.getFiasCompleteXmlUrl() : ver.getFiasDeltaXmlUrl())); - } - - public static DownlodableUpdate processReload(FiasVersion ver){ - return process(ver, true); - } - - public static DownlodableUpdate processUpdate(FiasVersion ver){ - return process(ver, false); - } - - private DownlodableUpdate(FiasVersion update, Future file) { - this.update = update; - this.file = file; - } - - public File getFile() throws ExecutionException, InterruptedException { - return file.get(); - } - - public FiasVersion getVersion() { - return update; - } -} diff --git a/src/main/java/me/bearns/fias/util/ZipReader.java b/src/main/java/me/bearns/fias/util/ZipReader.java new file mode 100644 index 0000000..1cebfe6 --- /dev/null +++ b/src/main/java/me/bearns/fias/util/ZipReader.java @@ -0,0 +1,164 @@ +package me.bearns.fias.util; + +import me.bearns.fias.exceptions.UnzipException; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +public class ZipReader implements Closeable { + + private final InputStream in; + private final Set set = new HashSet<>(); + private final ReentrantLock mutex = new ReentrantLock(); + + public ZipReader(InputStream in){ + this.in = in; + } + + public Set list() throws UnzipException { + + if(set.isEmpty()) init(); + + return set; + + } + + public InputStream read(String file) throws UnzipException { + + if(file == null) throw new UnzipException("Filename in archive cannot be null"); + + mutex.lock(); + + InputStream retval = null; + ZipInputStream zis = null; + + try{ + zis = new ZipInputStream(in); + + ZipEntry zipEntry; + while ((zipEntry = zis.getNextEntry()) != null) { + + if(file.equals(zipEntry.getName())) { + //TODO + retval = new FileStream(zis); + + } + zis.closeEntry(); + } + + } catch (IOException e) { + throw new UnzipException(e); + } finally { + if(retval == null) { + + mutex.unlock(); + + try { + if(zis!=null) zis.close(); + } catch (IOException e) { + //log + } + + throw new UnzipException("File not found in archive"); + } + } + + return retval; + + } + + private synchronized void init() throws UnzipException { + + mutex.lock(); + + try(ZipInputStream zis = new ZipInputStream(in)){ + + ZipEntry zipEntry; + while ((zipEntry = zis.getNextEntry()) != null) { + + set.add(zipEntry.getName()); + zis.closeEntry(); + } + + } catch (IOException e) { + throw new UnzipException(e); + } finally { + mutex.unlock(); + } + } + + @Override + public void close() throws IOException { + + mutex.unlock(); + + if(in!=null) { + in.close(); + } + } + + public class FileStream extends InputStream{ + + private ZipInputStream zis; + + public FileStream(ZipInputStream zis) { + this.zis = zis; + } + + @Override + public int read() throws IOException { + return zis.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return zis.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return zis.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + return zis.skip(n); + } + + @Override + public int available() throws IOException { + return zis.available(); + } + + @Override + public void close() throws IOException { + try { + zis.closeEntry(); + zis.close(); + } finally { + mutex.unlock(); + } + } + + @Override + public synchronized void mark(int readlimit) { + zis.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + zis.reset(); + } + + @Override + public boolean markSupported() { + return zis.markSupported(); + } + } +}