Split updeter functional for enable transaction support

apache_commons_compress
Terekhin Alexander 5 years ago
parent 8a8ed24cfa
commit 1344b50a62
  1. 2
      src/main/java/me/bearns/fias/helpers/Catalog.java
  2. 2
      src/main/java/me/bearns/fias/helpers/RegionFilter.java
  3. 2
      src/main/java/me/bearns/fias/helpers/UnmarshallerParameters.java
  4. 10
      src/main/java/me/bearns/fias/helpers/UpdateHelper.java
  5. 118
      src/main/java/me/bearns/fias/helpers/UpdaterImpl.java
  6. 92
      src/main/java/me/bearns/fias/service/FiasUpdater.java
  7. 3
      src/main/java/me/bearns/fias/service/StreamSaver.java
  8. 2
      src/main/java/me/bearns/fias/service/StreamSaverImpl.java
  9. 13
      src/test/java/me/bearns/fias/TransactionsTest.java

@ -1,4 +1,4 @@
package me.bearns.fias.util; package me.bearns.fias.helpers;
import me.bearns.fias.domain.AddressObjects; import me.bearns.fias.domain.AddressObjects;
import me.bearns.fias.domain.Addrobj; import me.bearns.fias.domain.Addrobj;

@ -1,4 +1,4 @@
package me.bearns.fias.util; package me.bearns.fias.helpers;
import me.bearns.fias.domain.Addrobj; import me.bearns.fias.domain.Addrobj;
import me.bearns.fias.domain.House; import me.bearns.fias.domain.House;

@ -1,4 +1,4 @@
package me.bearns.fias.util; package me.bearns.fias.helpers;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;

@ -0,0 +1,10 @@
package me.bearns.fias.helpers;
import me.bearns.fias.domain.FiasVersion;
import me.bearns.fias.exceptions.CommonException;
import java.util.List;
public interface UpdateHelper {
public void processUpdates(List<FiasVersion> updates, boolean reloadFlag) throws CommonException;
}

@ -0,0 +1,118 @@
package me.bearns.fias.helpers;
import me.bearns.fias.domain.FiasVersion;
import me.bearns.fias.exceptions.CommonException;
import me.bearns.fias.exceptions.DownloadException;
import me.bearns.fias.exceptions.UnzipException;
import me.bearns.fias.repository.FiasVersionRepository;
import me.bearns.fias.service.Downloader;
import me.bearns.fias.service.StreamSaver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Predicate;
import java.util.zip.ZipEntry;
import java.util.zip.ZipException;
import java.util.zip.ZipFile;
@Component
public class UpdaterImpl implements UpdateHelper {
@Autowired
private static Downloader fileService;
@Autowired
private static FiasVersionRepository versions;
@Autowired
private static Catalog catalog;
@Autowired
private static StreamSaver unmarshaller;
//transaction from here
@Transactional(rollbackFor = Exception.class)
public void processUpdates(List<FiasVersion> updates, boolean reloadFlag) throws CommonException {
if(reloadFlag) {
//todo reload flag impl
versions.deleteAll();
}
updates.sort((o1, o2) -> Math.toIntExact(o1.getVersionId() - o2.getVersionId()));
Map<FiasVersion, Future<File>> map = new HashMap<>();
//start download
updates.forEach(u -> map.put(u, fileService.download(reloadFlag ? u.getFiasCompleteXmlUrl() : u.getFiasDeltaXmlUrl())));
//strict order
for (FiasVersion item : updates) {
//set region filter
final String regions = item.getRegions();
RegionFilter filter = null;
if(regions != null) {
filter = new RegionFilter(regions);
}
final File file;
try {
//wait for downloading
file = map.get(item).get();
//process update
processArchive(file, filter);
//apply this version
versions.save(item);
} catch (InterruptedException e) {
e.printStackTrace();
//todo log
throw new DownloadException(e);
} catch (ExecutionException e) {
e.printStackTrace();
//todo log
throw new DownloadException(e);
}
}
}
private void processArchive(File file, Predicate filter) throws CommonException {
try(final ZipFile zipFile = new ZipFile(file)){
final Enumeration<? extends ZipEntry> entries = zipFile.entries();
while (entries.hasMoreElements()){
final ZipEntry entry = entries.nextElement();
final UnmarshallerParameters config = catalog.getByPrefix(entry.getName());
if(config!=null) { //todo
try (InputStream is = zipFile.getInputStream(entry)) {
unmarshaller.process(is, config, filter);
}
}
}
} catch (ZipException e) {
throw new UnzipException(e);
} catch (IOException e) {
throw new UnzipException(e);
}
}
}

@ -4,10 +4,11 @@ import me.bearns.fias.domain.FiasVersion;
import me.bearns.fias.exceptions.CommonException; import me.bearns.fias.exceptions.CommonException;
import me.bearns.fias.exceptions.DownloadException; import me.bearns.fias.exceptions.DownloadException;
import me.bearns.fias.exceptions.UnzipException; import me.bearns.fias.exceptions.UnzipException;
import me.bearns.fias.helpers.UpdateHelper;
import me.bearns.fias.repository.FiasVersionRepository; import me.bearns.fias.repository.FiasVersionRepository;
import me.bearns.fias.util.Catalog; import me.bearns.fias.helpers.Catalog;
import me.bearns.fias.util.RegionFilter; import me.bearns.fias.helpers.RegionFilter;
import me.bearns.fias.util.UnmarshallerParameters; import me.bearns.fias.helpers.UnmarshallerParameters;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -27,17 +28,11 @@ public class FiasUpdater implements Updater {
@Autowired @Autowired
private static FiasVersionRepository versions; private static FiasVersionRepository versions;
@Autowired
private static StreamSaver unmarshaller;
@Autowired
private static Downloader fileService;
@Autowired @Autowired
private static OnlineVersion clientStub; private static OnlineVersion clientStub;
@Autowired @Autowired
private static Catalog catalog; private static UpdateHelper helper;
@Override @Override
public void update() throws CommonException { public void update() throws CommonException {
@ -61,7 +56,7 @@ public class FiasUpdater implements Updater {
if(regions != null) updates.forEach(u -> u.setRegions(regions)); if(regions != null) updates.forEach(u -> u.setRegions(regions));
if(updates != null && !updates.isEmpty()) { if(updates != null && !updates.isEmpty()) {
processUpdates(updates, false); helper.processUpdates(updates, false);
} }
} }
} }
@ -79,83 +74,10 @@ public class FiasUpdater implements Updater {
//set region filter //set region filter
lastVersion.get(0).setRegions(regions); lastVersion.get(0).setRegions(regions);
processUpdates(lastVersion, true); helper.processUpdates(lastVersion, true);
}
}
//transaction from here
@Transactional
private void processUpdates(List<FiasVersion> updates, boolean reloadFlag) throws CommonException {
if(reloadFlag) {
//todo reload flag impl
versions.deleteAll();
} }
updates.sort((o1, o2) -> Math.toIntExact(o1.getVersionId() - o2.getVersionId()));
Map<FiasVersion, Future<File>> map = new HashMap<>();
//start download
updates.forEach(u -> map.put(u, fileService.download(reloadFlag ? u.getFiasCompleteXmlUrl() : u.getFiasDeltaXmlUrl())));
//strict order
for (FiasVersion item : updates) {
//set region filter
final String regions = item.getRegions();
RegionFilter filter = null;
if(regions != null) {
filter = new RegionFilter(regions);
}
final File file;
try {
//wait for downloading
file = map.get(item).get();
//process update
processArchive(file, filter);
//apply this version
versions.save(item);
} catch (InterruptedException e) {
e.printStackTrace();
//todo log
throw new DownloadException(e);
} catch (ExecutionException e) {
e.printStackTrace();
//todo log
throw new DownloadException(e);
}
}
} }
private void processArchive(File file, Predicate filter) throws CommonException {
try(final ZipFile zipFile = new ZipFile(file)){
final Enumeration<? extends ZipEntry> entries = zipFile.entries();
while (entries.hasMoreElements()){
final ZipEntry entry = entries.nextElement();
final UnmarshallerParameters config = catalog.getByPrefix(entry.getName());
if(config!=null) { //todo
try (InputStream is = zipFile.getInputStream(entry)) {
unmarshaller.process(is, config, filter);
}
}
}
} catch (ZipException e) {
throw new UnzipException(e);
} catch (IOException e) {
throw new UnzipException(e);
}
}
} }

@ -1,8 +1,7 @@
package me.bearns.fias.service; package me.bearns.fias.service;
import me.bearns.fias.exceptions.CommonException; import me.bearns.fias.exceptions.CommonException;
import me.bearns.fias.exceptions.UnmarshallingException; import me.bearns.fias.helpers.UnmarshallerParameters;
import me.bearns.fias.util.UnmarshallerParameters;
import java.io.InputStream; import java.io.InputStream;
import java.util.function.Predicate; import java.util.function.Predicate;

@ -2,7 +2,7 @@ package me.bearns.fias.service;
import me.bearns.fias.exceptions.CommonException; import me.bearns.fias.exceptions.CommonException;
import me.bearns.fias.exceptions.UnmarshallingException; import me.bearns.fias.exceptions.UnmarshallingException;
import me.bearns.fias.util.UnmarshallerParameters; import me.bearns.fias.helpers.UnmarshallerParameters;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBContext;

@ -1,15 +1,16 @@
package me.bearns.fias; package me.bearns.fias;
import me.bearns.fias.domain.FiasVersion; import me.bearns.fias.domain.FiasVersion;
import me.bearns.fias.exceptions.UnmarshallingException;
import me.bearns.fias.helper.TransactionalSaveHelper; import me.bearns.fias.helper.TransactionalSaveHelper;
import me.bearns.fias.repository.FiasVersionRepository; import me.bearns.fias.repository.FiasVersionRepository;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Import;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -17,7 +18,6 @@ import java.util.List;
import static me.bearns.fias.helper.TransactionalSaveHelper.FIRST; import static me.bearns.fias.helper.TransactionalSaveHelper.FIRST;
import static me.bearns.fias.helper.TransactionalSaveHelper.NEXT; import static me.bearns.fias.helper.TransactionalSaveHelper.NEXT;
import static org.springframework.transaction.annotation.Isolation.READ_COMMITTED;
@RunWith(SpringRunner.class) @RunWith(SpringRunner.class)
@SpringBootTest @SpringBootTest
@ -42,13 +42,9 @@ public class TransactionsTest {
try { try {
helper.addNext(); helper.addNext();
assert false; assert false;
} catch (Exception e) { } catch (Exception e) {
//e.printStackTrace();
//just as plained //just as plained
assert true; assert true;
} }
@ -62,5 +58,10 @@ public class TransactionsTest {
} }
@After
public void clear(){
repository.deleteAll();
}
} }

Loading…
Cancel
Save