Zip stream processing for single thread

apache_commons_compress
Terekhin Alexander 5 years ago
parent a7b5c8c36c
commit ca5d3b0d03
  1. 11
      src/main/java/me/bearns/fias/exceptions/UnzipException.java
  2. 39
      src/main/java/me/bearns/fias/service/FiasUpdater.java
  3. 5
      src/main/java/me/bearns/fias/service/Updater.java
  4. 43
      src/main/java/me/bearns/fias/util/DownlodableUpdate.java
  5. 164
      src/main/java/me/bearns/fias/util/ZipReader.java

@ -0,0 +1,11 @@
package me.bearns.fias.exceptions;
public class UnzipException extends Exception {
public UnzipException(Exception e){
super(e);
}
public UnzipException(String s) {
super(s);
}
}

@ -2,19 +2,19 @@ package me.bearns.fias.service;
import me.bearns.fias.domain.FiasVersion;
import me.bearns.fias.exceptions.DownloadException;
import me.bearns.fias.exceptions.UnzipException;
import me.bearns.fias.repository.FiasVersionRepository;
import me.bearns.fias.util.DownlodableUpdate;
import me.bearns.fias.util.ZipReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.File;
import java.io.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Service
public class FiasUpdater implements Updater {
@ -29,7 +29,7 @@ public class FiasUpdater implements Updater {
OnlineVersion clientStub;
@Override
public void update() throws DownloadException {
public void update() throws DownloadException, UnzipException {
//Max value
FiasVersion dbVersionObj = versions.findTopByOrderByVersionIdDesc();
@ -55,7 +55,7 @@ public class FiasUpdater implements Updater {
}
@Override
public void reload(Long... regions) throws DownloadException {
public void reload(Long... regions) throws DownloadException, UnzipException {
final List<FiasVersion> lastVersion = clientStub.getLastVersion();
if(lastVersion != null) {
@ -65,7 +65,7 @@ public class FiasUpdater implements Updater {
}
//transaction from here
private void processUpdates(List<FiasVersion> updates, boolean reloadFlag, Long ... regions) throws DownloadException {
private void processUpdates(List<FiasVersion> updates, boolean reloadFlag, Long ... regions) throws DownloadException, UnzipException {
if(reloadFlag) {
//todo reload flag impl
@ -100,11 +100,34 @@ public class FiasUpdater implements Updater {
e.printStackTrace();
//todo log
throw new DownloadException(e);
} catch (FileNotFoundException e) {
throw new UnzipException(e);
}
}
}
private void processArchive(File file, Long ... regions) {
private void processArchive(File file, Long ... regions) throws FileNotFoundException, UnzipException {
try(final ZipReader reader = new ZipReader(new FileInputStream(file))){
final Set<String> list = reader.list();
list.forEach(s -> {
try(final InputStream inputStream = reader.read(s)) {
//todo
} catch (UnzipException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
});
} catch (IOException e) {
throw new UnzipException(e);
}
}
}

@ -1,6 +1,7 @@
package me.bearns.fias.service;
import me.bearns.fias.exceptions.DownloadException;
import me.bearns.fias.exceptions.UnzipException;
public interface Updater {
@ -8,11 +9,11 @@ public interface Updater {
/**
* Run online update for fias database
*/
public void update() throws DownloadException;
public void update() throws DownloadException, UnzipException;
/**
* Load or reload database for selected regions
* @param regions
*/
public void reload(Long ... regions) throws DownloadException;
public void reload(Long ... regions) throws DownloadException, UnzipException;
}

@ -1,43 +0,0 @@
package me.bearns.fias.util;
import me.bearns.fias.domain.FiasVersion;
import me.bearns.fias.service.Downloader;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.File;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class DownlodableUpdate {
@Autowired
private static Downloader service;
private final FiasVersion update;
private final Future<File> file;
private static DownlodableUpdate process(FiasVersion ver, boolean reload) {
return new DownlodableUpdate(ver, service.download(reload ? ver.getFiasCompleteXmlUrl() : ver.getFiasDeltaXmlUrl()));
}
public static DownlodableUpdate processReload(FiasVersion ver){
return process(ver, true);
}
public static DownlodableUpdate processUpdate(FiasVersion ver){
return process(ver, false);
}
private DownlodableUpdate(FiasVersion update, Future<File> file) {
this.update = update;
this.file = file;
}
public File getFile() throws ExecutionException, InterruptedException {
return file.get();
}
public FiasVersion getVersion() {
return update;
}
}

@ -0,0 +1,164 @@
package me.bearns.fias.util;
import me.bearns.fias.exceptions.UnzipException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
public class ZipReader implements Closeable {
private final InputStream in;
private final Set<String> set = new HashSet<>();
private final ReentrantLock mutex = new ReentrantLock();
public ZipReader(InputStream in){
this.in = in;
}
public Set<String> list() throws UnzipException {
if(set.isEmpty()) init();
return set;
}
public InputStream read(String file) throws UnzipException {
if(file == null) throw new UnzipException("Filename in archive cannot be null");
mutex.lock();
InputStream retval = null;
ZipInputStream zis = null;
try{
zis = new ZipInputStream(in);
ZipEntry zipEntry;
while ((zipEntry = zis.getNextEntry()) != null) {
if(file.equals(zipEntry.getName())) {
//TODO
retval = new FileStream(zis);
}
zis.closeEntry();
}
} catch (IOException e) {
throw new UnzipException(e);
} finally {
if(retval == null) {
mutex.unlock();
try {
if(zis!=null) zis.close();
} catch (IOException e) {
//log
}
throw new UnzipException("File not found in archive");
}
}
return retval;
}
private synchronized void init() throws UnzipException {
mutex.lock();
try(ZipInputStream zis = new ZipInputStream(in)){
ZipEntry zipEntry;
while ((zipEntry = zis.getNextEntry()) != null) {
set.add(zipEntry.getName());
zis.closeEntry();
}
} catch (IOException e) {
throw new UnzipException(e);
} finally {
mutex.unlock();
}
}
@Override
public void close() throws IOException {
mutex.unlock();
if(in!=null) {
in.close();
}
}
public class FileStream extends InputStream{
private ZipInputStream zis;
public FileStream(ZipInputStream zis) {
this.zis = zis;
}
@Override
public int read() throws IOException {
return zis.read();
}
@Override
public int read(byte[] b) throws IOException {
return zis.read(b);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return zis.read(b, off, len);
}
@Override
public long skip(long n) throws IOException {
return zis.skip(n);
}
@Override
public int available() throws IOException {
return zis.available();
}
@Override
public void close() throws IOException {
try {
zis.closeEntry();
zis.close();
} finally {
mutex.unlock();
}
}
@Override
public synchronized void mark(int readlimit) {
zis.mark(readlimit);
}
@Override
public synchronized void reset() throws IOException {
zis.reset();
}
@Override
public boolean markSupported() {
return zis.markSupported();
}
}
}
Loading…
Cancel
Save