network.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org/
  3. #
  4. # Copyright (C) 2015-2020
  5. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  6. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  7. #
  8. # This library is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU Lesser General Public
  10. # License as published by the Free Software Foundation; either
  11. # version 2.1 of the License, or (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  16. # Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public
  19. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  20. #
  21. import contextlib
  22. from distutils import spawn
  23. import asyncio
  24. import subprocess
  25. import sys
  26. import time
  27. import unittest
  28. import qubes.tests
  29. import qubes.firewall
  30. import qubes.vm.qubesvm
  31. import qubes.vm.appvm
  32. # noinspection PyAttributeOutsideInit,PyPep8Naming
  33. class VmNetworkingMixin(object):
  34. test_ip = '192.168.123.45'
  35. test_name = 'test.example.com'
  36. ping_cmd = 'ping -W 1 -n -c 1 {target}'
  37. ping_ip = ping_cmd.format(target=test_ip)
  38. ping_name = ping_cmd.format(target=test_name)
  39. # filled by load_tests
  40. template = None
  41. def run_cmd(self, vm, cmd, user="root"):
  42. '''Run a command *cmd* in a *vm* as *user*. Return its exit code.
  43. :type self: qubes.tests.SystemTestCase | VmNetworkingMixin
  44. :param qubes.vm.qubesvm.QubesVM vm: VM object to run command in
  45. :param str cmd: command to execute
  46. :param std user: user to execute command as
  47. :return int: command exit code
  48. '''
  49. try:
  50. self.loop.run_until_complete(vm.run_for_stdio(cmd, user=user))
  51. except subprocess.CalledProcessError as e:
  52. return e.returncode
  53. return 0
  54. def setUp(self):
  55. '''
  56. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  57. '''
  58. super(VmNetworkingMixin, self).setUp()
  59. if self.template.startswith('whonix-'):
  60. self.skipTest("Test not supported here - Whonix uses its own "
  61. "firewall settings")
  62. if self.template.endswith('-minimal'):
  63. self.skipTest(
  64. "Test not supported here - minimal template don't have "
  65. "networking packages by default")
  66. self.init_default_template(self.template)
  67. self.testnetvm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  68. name=self.make_vm_name('netvm1'),
  69. label='red')
  70. self.loop.run_until_complete(self.testnetvm.create_on_disk())
  71. self.testnetvm.provides_network = True
  72. self.testnetvm.netvm = None
  73. self.testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  74. name=self.make_vm_name('vm1'),
  75. label='red')
  76. self.loop.run_until_complete(self.testvm1.create_on_disk())
  77. self.testvm1.netvm = self.testnetvm
  78. self.app.save()
  79. self.configure_netvm()
  80. def _run_cmd_and_log_output(self, vm, cmd):
  81. """Used in tearDown to collect more info"""
  82. if not vm.is_running():
  83. return
  84. with contextlib.suppress(subprocess.CalledProcessError):
  85. output = self.loop.run_until_complete(
  86. self.testnetvm.run_for_stdio(cmd, user='root'))
  87. self.log.error('{}: {}: {}'.format(vm.name, cmd, output))
  88. def tearDown(self):
  89. # collect more info on failure
  90. if self._outcome and not self._outcome.success:
  91. for vm in (self.testnetvm, self.testvm1, getattr(self, 'proxy', None)):
  92. if vm is None:
  93. continue
  94. self._run_cmd_and_log_output(vm, 'ip a')
  95. self._run_cmd_and_log_output(vm, 'ip r')
  96. self._run_cmd_and_log_output(vm, 'iptables -vnL')
  97. self._run_cmd_and_log_output(vm, 'iptables -vnL -t nat')
  98. self._run_cmd_and_log_output(vm, 'nft list table qubes-firewall')
  99. self._run_cmd_and_log_output(vm, 'systemctl --no-pager status qubes-firewall')
  100. self._run_cmd_and_log_output(vm, 'systemctl --no-pager status qubes-iptables')
  101. self._run_cmd_and_log_output(vm, 'systemctl --no-pager status xendriverdomain')
  102. super(VmNetworkingMixin, self).tearDown()
  103. def configure_netvm(self):
  104. '''
  105. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  106. '''
  107. def run_netvm_cmd(cmd):
  108. try:
  109. self.loop.run_until_complete(
  110. self.testnetvm.run_for_stdio(cmd, user='root'))
  111. except subprocess.CalledProcessError as e:
  112. self.fail("Command '%s' failed: %s%s" %
  113. (cmd, e.stdout.decode(), e.stderr.decode()))
  114. if not self.testnetvm.is_running():
  115. self.loop.run_until_complete(self.testnetvm.start())
  116. # Ensure that dnsmasq is installed:
  117. try:
  118. self.loop.run_until_complete(self.testnetvm.run_for_stdio(
  119. 'dnsmasq --version', user='root'))
  120. except subprocess.CalledProcessError:
  121. self.skipTest("dnsmasq not installed")
  122. run_netvm_cmd("ip link add test0 type dummy")
  123. run_netvm_cmd("ip link set test0 up")
  124. run_netvm_cmd("ip addr add {}/24 dev test0".format(self.test_ip))
  125. run_netvm_cmd("iptables -I INPUT -d {} -j ACCEPT --wait".format(
  126. self.test_ip))
  127. # ignore failure
  128. self.run_cmd(self.testnetvm, "pkill dnsmasq")
  129. run_netvm_cmd("dnsmasq -a {ip} -A /{name}/{ip} -i test0 -z".format(
  130. ip=self.test_ip, name=self.test_name))
  131. run_netvm_cmd("echo nameserver {} > /etc/resolv.conf".format(
  132. self.test_ip))
  133. run_netvm_cmd("/usr/lib/qubes/qubes-setup-dnat-to-ns")
  134. def test_000_simple_networking(self):
  135. '''
  136. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  137. '''
  138. self.loop.run_until_complete(self.testvm1.start())
  139. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  140. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  141. def test_010_simple_proxyvm(self):
  142. '''
  143. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  144. '''
  145. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  146. name=self.make_vm_name('proxy'),
  147. label='red')
  148. self.proxy.provides_network = True
  149. self.proxy.netvm = self.testnetvm
  150. self.loop.run_until_complete(self.proxy.create_on_disk())
  151. self.testvm1.netvm = self.proxy
  152. self.app.save()
  153. self.loop.run_until_complete(self.testvm1.start())
  154. self.assertTrue(self.proxy.is_running())
  155. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  156. "Ping by IP from ProxyVM failed")
  157. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  158. "Ping by name from ProxyVM failed")
  159. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  160. "Ping by IP from AppVM failed")
  161. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  162. "Ping by IP from AppVM failed")
  163. @qubes.tests.expectedFailureIfTemplate('debian-7')
  164. @unittest.skipUnless(spawn.find_executable('xdotool'),
  165. "xdotool not installed")
  166. def test_020_simple_proxyvm_nm(self):
  167. '''
  168. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  169. '''
  170. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  171. name=self.make_vm_name('proxy'),
  172. label='red')
  173. self.proxy.provides_network = True
  174. self.loop.run_until_complete(self.proxy.create_on_disk())
  175. self.proxy.netvm = self.testnetvm
  176. self.proxy.features['service.network-manager'] = True
  177. self.testvm1.netvm = self.proxy
  178. self.app.save()
  179. self.loop.run_until_complete(self.testvm1.start())
  180. self.assertTrue(self.proxy.is_running())
  181. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  182. "Ping by IP failed")
  183. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  184. "Ping by name failed")
  185. # reconnect to make sure that device was configured by NM
  186. self.assertEqual(
  187. self.run_cmd(self.proxy, "nmcli device disconnect eth0",
  188. user="user"),
  189. 0, "Failed to disconnect eth0 using nmcli")
  190. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  191. "Network should be disabled, but apparently it isn't")
  192. self.assertEqual(
  193. self.run_cmd(self.proxy,
  194. 'nmcli connection up "VM uplink eth0" ifname eth0',
  195. user="user"),
  196. 0, "Failed to connect eth0 using nmcli")
  197. self.assertEqual(self.run_cmd(self.proxy, "nm-online", user="user"), 0,
  198. "Failed to wait for NM connection")
  199. # check for nm-applet presence
  200. self.assertEqual(subprocess.call([
  201. 'xdotool', 'search', '--class', '{}:nm-applet'.format(
  202. self.proxy.name)],
  203. stdout=subprocess.DEVNULL), 0, "nm-applet window not found")
  204. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  205. "Ping by IP failed (after NM reconnection")
  206. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  207. "Ping by name failed (after NM reconnection)")
  208. def test_030_firewallvm_firewall(self):
  209. '''
  210. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  211. '''
  212. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  213. name=self.make_vm_name('proxy'),
  214. label='red')
  215. self.proxy.provides_network = True
  216. self.loop.run_until_complete(self.proxy.create_on_disk())
  217. self.proxy.netvm = self.testnetvm
  218. self.testvm1.netvm = self.proxy
  219. self.app.save()
  220. # block all for first
  221. self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')]
  222. self.testvm1.firewall.save()
  223. self.loop.run_until_complete(self.testvm1.start())
  224. self.assertTrue(self.proxy.is_running())
  225. server = self.loop.run_until_complete(self.testnetvm.run(
  226. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  227. try:
  228. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  229. "Ping by IP from ProxyVM failed")
  230. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  231. "Ping by name from ProxyVM failed")
  232. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  233. "Ping by IP should be blocked")
  234. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  235. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  236. "TCP connection should be blocked")
  237. # block all except ICMP
  238. self.testvm1.firewall.rules = [(
  239. qubes.firewall.Rule(None, action='accept', proto='icmp')
  240. )]
  241. self.testvm1.firewall.save()
  242. # Ugly hack b/c there is no feedback when the rules are actually
  243. # applied
  244. time.sleep(3)
  245. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  246. "Ping by IP failed (should be allowed now)")
  247. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  248. "Ping by name should be blocked")
  249. # all TCP still blocked
  250. self.testvm1.firewall.rules = [
  251. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  252. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  253. ]
  254. self.testvm1.firewall.save()
  255. # Ugly hack b/c there is no feedback when the rules are actually
  256. # applied
  257. time.sleep(3)
  258. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  259. "Ping by name failed (should be allowed now)")
  260. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  261. "TCP connection should be blocked")
  262. # block all except target
  263. self.testvm1.firewall.rules = [
  264. qubes.firewall.Rule(None, action='accept', dsthost=self.test_ip,
  265. proto='tcp', dstports=1234),
  266. ]
  267. self.testvm1.firewall.save()
  268. # Ugly hack b/c there is no feedback when the rules are actually
  269. # applied
  270. time.sleep(3)
  271. self.assertEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  272. "TCP connection failed (should be allowed now)")
  273. # allow all except target
  274. self.testvm1.firewall.rules = [
  275. qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip,
  276. proto='tcp', dstports=1234),
  277. qubes.firewall.Rule(action='accept'),
  278. ]
  279. self.testvm1.firewall.save()
  280. # Ugly hack b/c there is no feedback when the rules are actually
  281. # applied
  282. time.sleep(3)
  283. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  284. "TCP connection should be blocked")
  285. finally:
  286. server.terminate()
  287. self.loop.run_until_complete(server.wait())
  288. def test_040_inter_vm(self):
  289. '''
  290. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  291. '''
  292. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  293. name=self.make_vm_name('proxy'),
  294. label='red')
  295. self.loop.run_until_complete(self.proxy.create_on_disk())
  296. self.proxy.provides_network = True
  297. self.proxy.netvm = self.testnetvm
  298. self.testvm1.netvm = self.proxy
  299. self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  300. name=self.make_vm_name('vm2'),
  301. label='red')
  302. self.loop.run_until_complete(self.testvm2.create_on_disk())
  303. self.testvm2.netvm = self.proxy
  304. self.app.save()
  305. self.loop.run_until_complete(asyncio.wait([
  306. self.testvm1.start(),
  307. self.testvm2.start()]))
  308. self.assertNotEqual(self.run_cmd(self.testvm1,
  309. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  310. self.testvm2.netvm = self.testnetvm
  311. self.assertNotEqual(self.run_cmd(self.testvm1,
  312. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  313. self.assertNotEqual(self.run_cmd(self.testvm2,
  314. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  315. self.testvm1.netvm = self.testnetvm
  316. self.assertNotEqual(self.run_cmd(self.testvm1,
  317. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  318. self.assertNotEqual(self.run_cmd(self.testvm2,
  319. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  320. def test_050_spoof_ip(self):
  321. '''Test if VM IP spoofing is blocked
  322. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  323. '''
  324. self.loop.run_until_complete(self.testvm1.start())
  325. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  326. self.assertEqual(self.run_cmd(self.testnetvm,
  327. 'iptables -I INPUT -i vif+ ! -s {} -p icmp -j LOG'.format(
  328. self.testvm1.ip)), 0)
  329. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  330. 'ip addr flush dev eth0 && '
  331. 'ip addr add 10.137.1.128/24 dev eth0 && '
  332. 'ip route add default dev eth0',
  333. user='root'))
  334. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  335. "Spoofed ping should be blocked")
  336. try:
  337. (output, _) = self.loop.run_until_complete(
  338. self.testnetvm.run_for_stdio('iptables -nxvL INPUT',
  339. user='root'))
  340. except subprocess.CalledProcessError:
  341. self.fail('iptables -nxvL INPUT failed')
  342. output = output.decode().splitlines()
  343. packets = output[2].lstrip().split()[0]
  344. self.assertEquals(packets, '0', 'Some packet hit the INPUT rule')
  345. def test_100_late_xldevd_startup(self):
  346. '''Regression test for #1990
  347. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  348. '''
  349. # Simulater late xl devd startup
  350. cmd = "systemctl stop xendriverdomain"
  351. if self.run_cmd(self.testnetvm, cmd) != 0:
  352. self.fail("Command '%s' failed" % cmd)
  353. self.loop.run_until_complete(self.testvm1.start())
  354. cmd = "systemctl start xendriverdomain"
  355. if self.run_cmd(self.testnetvm, cmd) != 0:
  356. self.fail("Command '%s' failed" % cmd)
  357. # let it initialize the interface(s)
  358. time.sleep(1)
  359. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  360. def test_110_dynamic_attach(self):
  361. self.testvm1.netvm = None
  362. self.loop.run_until_complete(self.testvm1.start())
  363. self.testvm1.netvm = self.testnetvm
  364. # wait for it to settle down
  365. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  366. 'udevadm settle'))
  367. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  368. def test_111_dynamic_detach_attach(self):
  369. self.loop.run_until_complete(self.testvm1.start())
  370. self.testvm1.netvm = None
  371. # wait for it to settle down
  372. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  373. 'udevadm settle'))
  374. self.testvm1.netvm = self.testnetvm
  375. # wait for it to settle down
  376. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  377. 'udevadm settle'))
  378. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  379. def test_112_reattach_after_provider_shutdown(self):
  380. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  381. name=self.make_vm_name('proxy'),
  382. label='red')
  383. self.proxy.provides_network = True
  384. self.proxy.netvm = self.testnetvm
  385. self.loop.run_until_complete(self.proxy.create_on_disk())
  386. self.testvm1.netvm = self.proxy
  387. self.loop.run_until_complete(self.testvm1.start())
  388. self.loop.run_until_complete(self.proxy.shutdown(force=True, wait=True))
  389. self.loop.run_until_complete(self.proxy.start())
  390. # wait for it to settle down
  391. self.loop.run_until_complete(self.wait_for_session(self.proxy))
  392. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  393. def test_113_reattach_after_provider_kill(self):
  394. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  395. name=self.make_vm_name('proxy'),
  396. label='red')
  397. self.proxy.provides_network = True
  398. self.proxy.netvm = self.testnetvm
  399. self.loop.run_until_complete(self.proxy.create_on_disk())
  400. self.testvm1.netvm = self.proxy
  401. self.loop.run_until_complete(self.testvm1.start())
  402. self.loop.run_until_complete(self.proxy.kill())
  403. self.loop.run_until_complete(self.proxy.start())
  404. # wait for it to settle down
  405. self.loop.run_until_complete(self.wait_for_session(self.proxy))
  406. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  407. def test_114_reattach_after_provider_crash(self):
  408. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  409. name=self.make_vm_name('proxy'),
  410. label='red')
  411. self.proxy.provides_network = True
  412. self.proxy.netvm = self.testnetvm
  413. self.loop.run_until_complete(self.proxy.create_on_disk())
  414. self.testvm1.netvm = self.proxy
  415. self.loop.run_until_complete(self.testvm1.start())
  416. p = self.loop.run_until_complete(self.proxy.run(
  417. 'echo c > /proc/sysrq-trigger', user='root'))
  418. self.loop.run_until_complete(p.wait())
  419. timeout = 10
  420. while self.proxy.is_running():
  421. self.loop.run_until_complete(asyncio.sleep(1))
  422. timeout -= 1
  423. self.assertGreater(timeout, 0,
  424. 'timeout waiting for crash cleanup')
  425. self.loop.run_until_complete(self.proxy.start())
  426. # wait for it to settle down
  427. self.loop.run_until_complete(self.wait_for_session(self.proxy))
  428. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  429. def test_200_fake_ip_simple(self):
  430. '''Test hiding VM real IP
  431. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  432. '''
  433. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  434. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  435. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  436. self.app.save()
  437. self.loop.run_until_complete(self.testvm1.start())
  438. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  439. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  440. try:
  441. (output, _) = self.loop.run_until_complete(
  442. self.testvm1.run_for_stdio(
  443. 'ip addr show dev eth0', user='root'))
  444. except subprocess.CalledProcessError:
  445. self.fail('ip addr show dev eth0 failed')
  446. output = output.decode()
  447. self.assertIn('192.168.1.128', output)
  448. self.assertNotIn(str(self.testvm1.ip), output)
  449. try:
  450. (output, _) = self.loop.run_until_complete(
  451. self.testvm1.run_for_stdio('ip route show', user='root'))
  452. except subprocess.CalledProcessError:
  453. self.fail('ip route show failed')
  454. output = output.decode()
  455. self.assertIn('192.168.1.1', output)
  456. self.assertNotIn(str(self.testvm1.netvm.ip), output)
  457. def test_201_fake_ip_without_gw(self):
  458. '''Test hiding VM real IP
  459. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  460. '''
  461. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  462. self.app.save()
  463. self.loop.run_until_complete(self.testvm1.start())
  464. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  465. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  466. try:
  467. (output, _) = self.loop.run_until_complete(
  468. self.testvm1.run_for_stdio('ip addr show dev eth0',
  469. user='root'))
  470. except subprocess.CalledProcessError:
  471. self.fail('ip addr show dev eth0 failed')
  472. output = output.decode()
  473. self.assertIn('192.168.1.128', output)
  474. self.assertNotIn(str(self.testvm1.ip), output)
  475. def test_202_fake_ip_firewall(self):
  476. '''Test hiding VM real IP, firewall
  477. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  478. '''
  479. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  480. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  481. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  482. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  483. name=self.make_vm_name('proxy'),
  484. label='red')
  485. self.proxy.provides_network = True
  486. self.loop.run_until_complete(self.proxy.create_on_disk())
  487. self.proxy.netvm = self.testnetvm
  488. self.testvm1.netvm = self.proxy
  489. self.app.save()
  490. # block all but ICMP and DNS
  491. self.testvm1.firewall.rules = [
  492. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  493. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  494. ]
  495. self.testvm1.firewall.save()
  496. self.loop.run_until_complete(self.testvm1.start())
  497. self.assertTrue(self.proxy.is_running())
  498. server = self.loop.run_until_complete(self.testnetvm.run(
  499. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  500. try:
  501. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  502. "Ping by IP from ProxyVM failed")
  503. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  504. "Ping by name from ProxyVM failed")
  505. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  506. "Ping by IP should be allowed")
  507. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  508. "Ping by name should be allowed")
  509. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  510. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  511. "TCP connection should be blocked")
  512. finally:
  513. server.terminate()
  514. self.loop.run_until_complete(server.wait())
  515. def test_203_fake_ip_inter_vm_allow(self):
  516. '''Access VM with "fake IP" from other VM (when firewall allows)
  517. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  518. '''
  519. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  520. name=self.make_vm_name('proxy'),
  521. label='red')
  522. self.loop.run_until_complete(self.proxy.create_on_disk())
  523. self.proxy.provides_network = True
  524. self.proxy.netvm = self.testnetvm
  525. self.testvm1.netvm = self.proxy
  526. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  527. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  528. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  529. self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  530. name=self.make_vm_name('vm2'),
  531. label='red')
  532. self.loop.run_until_complete(self.testvm2.create_on_disk())
  533. self.testvm2.netvm = self.proxy
  534. self.app.save()
  535. self.loop.run_until_complete(self.testvm1.start())
  536. self.loop.run_until_complete(self.testvm2.start())
  537. cmd = 'iptables -I FORWARD -s {} -d {} -j ACCEPT'.format(
  538. self.testvm2.ip, self.testvm1.ip)
  539. try:
  540. self.loop.run_until_complete(self.proxy.run_for_stdio(
  541. cmd, user='root'))
  542. except subprocess.CalledProcessError as e:
  543. raise AssertionError(
  544. '{} failed with: {}'.format(cmd, e.returncode)) from None
  545. try:
  546. cmd = 'iptables -I INPUT -s {} -j ACCEPT'.format(self.testvm2.ip)
  547. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  548. cmd, user='root'))
  549. except subprocess.CalledProcessError as e:
  550. raise AssertionError(
  551. '{} failed with: {}'.format(cmd, e.returncode)) from None
  552. self.assertEqual(self.run_cmd(self.testvm2,
  553. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  554. try:
  555. cmd = 'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip)
  556. (stdout, _) = self.loop.run_until_complete(
  557. self.testvm1.run_for_stdio(cmd, user='root'))
  558. except subprocess.CalledProcessError as e:
  559. raise AssertionError(
  560. '{} failed with {}'.format(cmd, e.returncode)) from None
  561. self.assertNotEqual(stdout.decode().split()[0], '0',
  562. 'Packets didn\'t managed to the VM')
  563. def test_204_fake_ip_proxy(self):
  564. '''Test hiding VM real IP
  565. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  566. '''
  567. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  568. name=self.make_vm_name('proxy'),
  569. label='red')
  570. self.loop.run_until_complete(self.proxy.create_on_disk())
  571. self.proxy.provides_network = True
  572. self.proxy.netvm = self.testnetvm
  573. self.proxy.features['net.fake-ip'] = '192.168.1.128'
  574. self.proxy.features['net.fake-gateway'] = '192.168.1.1'
  575. self.proxy.features['net.fake-netmask'] = '255.255.255.0'
  576. self.testvm1.netvm = self.proxy
  577. self.app.save()
  578. self.loop.run_until_complete(self.testvm1.start())
  579. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0)
  580. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0)
  581. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  582. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  583. try:
  584. (output, _) = self.loop.run_until_complete(
  585. self.proxy.run_for_stdio(
  586. 'ip addr show dev eth0', user='root'))
  587. except subprocess.CalledProcessError:
  588. self.fail('ip addr show dev eth0 failed')
  589. output = output.decode()
  590. self.assertIn('192.168.1.128', output)
  591. self.assertNotIn(str(self.testvm1.ip), output)
  592. try:
  593. (output, _) = self.loop.run_until_complete(
  594. self.proxy.run_for_stdio(
  595. 'ip route show', user='root'))
  596. except subprocess.CalledProcessError:
  597. self.fail('ip route show failed')
  598. output = output.decode()
  599. self.assertIn('192.168.1.1', output)
  600. self.assertNotIn(str(self.testvm1.netvm.ip), output)
  601. try:
  602. (output, _) = self.loop.run_until_complete(
  603. self.testvm1.run_for_stdio(
  604. 'ip addr show dev eth0', user='root'))
  605. except subprocess.CalledProcessError:
  606. self.fail('ip addr show dev eth0 failed')
  607. output = output.decode()
  608. self.assertNotIn('192.168.1.128', output)
  609. self.assertIn(str(self.testvm1.ip), output)
  610. try:
  611. (output, _) = self.loop.run_until_complete(
  612. self.testvm1.run_for_stdio(
  613. 'ip route show', user='root'))
  614. except subprocess.CalledProcessError:
  615. self.fail('ip route show failed')
  616. output = output.decode()
  617. self.assertIn('192.168.1.128', output)
  618. self.assertNotIn(str(self.proxy.ip), output)
  619. def test_210_custom_ip_simple(self):
  620. '''Custom AppVM IP
  621. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  622. '''
  623. self.testvm1.ip = '192.168.1.1'
  624. self.app.save()
  625. self.loop.run_until_complete(self.testvm1.start())
  626. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  627. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  628. def test_211_custom_ip_proxy(self):
  629. '''Custom ProxyVM IP
  630. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  631. '''
  632. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  633. name=self.make_vm_name('proxy'),
  634. label='red')
  635. self.loop.run_until_complete(self.proxy.create_on_disk())
  636. self.proxy.provides_network = True
  637. self.proxy.netvm = self.testnetvm
  638. self.proxy.ip = '192.168.1.1'
  639. self.testvm1.netvm = self.proxy
  640. self.app.save()
  641. self.loop.run_until_complete(self.testvm1.start())
  642. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  643. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  644. def test_212_custom_ip_firewall(self):
  645. '''Custom VM IP and firewall
  646. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  647. '''
  648. self.testvm1.ip = '192.168.1.1'
  649. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  650. name=self.make_vm_name('proxy'),
  651. label='red')
  652. self.proxy.provides_network = True
  653. self.loop.run_until_complete(self.proxy.create_on_disk())
  654. self.proxy.netvm = self.testnetvm
  655. self.testvm1.netvm = self.proxy
  656. self.app.save()
  657. # block all but ICMP and DNS
  658. self.testvm1.firewall.rules = [
  659. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  660. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  661. ]
  662. self.testvm1.firewall.save()
  663. self.loop.run_until_complete(self.testvm1.start())
  664. self.assertTrue(self.proxy.is_running())
  665. server = self.loop.run_until_complete(self.testnetvm.run(
  666. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  667. try:
  668. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  669. "Ping by IP from ProxyVM failed")
  670. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  671. "Ping by name from ProxyVM failed")
  672. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  673. "Ping by IP should be allowed")
  674. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  675. "Ping by name should be allowed")
  676. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  677. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  678. "TCP connection should be blocked")
  679. finally:
  680. server.terminate()
  681. self.loop.run_until_complete(server.wait())
  682. def create_testcases_for_templates():
  683. yield from qubes.tests.create_testcases_for_templates('VmNetworking',
  684. VmNetworkingMixin, qubes.tests.SystemTestCase,
  685. module=sys.modules[__name__])
  686. def load_tests(loader, tests, pattern):
  687. tests.addTests(loader.loadTestsFromNames(
  688. create_testcases_for_templates()))
  689. return tests
  690. qubes.tests.maybe_create_testcases_on_import(create_testcases_for_templates)